test_pipeline_parser.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import pytest
  17. from elyra.pipeline.parser import PipelineParser
  18. from elyra.pipeline.pipeline import GenericOperation
  19. from elyra.tests.pipeline.util import _read_pipeline_resource
  20. @pytest.fixture
  21. def valid_operation():
  22. component_parameters = {
  23. "filename": "{{filename}}",
  24. "runtime_image": "{{runtime_image}}",
  25. "env_vars": ["var1=var1", "var2=var2"],
  26. "dependencies": ["a.txt", "b.txt", "c.txt"],
  27. "outputs": ["d.txt", "e.txt", "f.txt"],
  28. }
  29. return GenericOperation(
  30. id="{{uuid}}",
  31. type="execution_node",
  32. classifier="execute-notebook-node",
  33. name="{{label}}",
  34. component_params=component_parameters,
  35. )
  36. def test_valid_pipeline(valid_operation):
  37. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
  38. pipeline = PipelineParser().parse(pipeline_json)
  39. assert pipeline.name == "{{name}}"
  40. assert pipeline.runtime == "{{runtime}}"
  41. assert pipeline.runtime_config == "{{runtime-config}}"
  42. assert len(pipeline.operations) == 1
  43. assert pipeline.operations["{{uuid}}"] == valid_operation
  44. def test_pipeline_with_dirty_list_values(valid_operation):
  45. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_with_invalid_list_values.json")
  46. pipeline = PipelineParser().parse(pipeline_json)
  47. assert pipeline.name == "{{name}}"
  48. assert pipeline.runtime == "{{runtime}}"
  49. assert pipeline.runtime_config == "{{runtime-config}}"
  50. assert len(pipeline.operations) == 1
  51. assert pipeline.operations["{{uuid}}"] == valid_operation
  52. def test_multinode_pipeline():
  53. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_3_node_sample.json")
  54. pipeline = PipelineParser().parse(pipeline_json)
  55. assert len(pipeline.operations) == 3
  56. def test_supernode_pipeline():
  57. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_with_supernode.json")
  58. pipeline = PipelineParser().parse(pipeline_json)
  59. assert len(pipeline.operations) == 4
  60. # Confirm structure of pipeline:
  61. # Two execution nodes feed their outputs to super-node with one execution_node.
  62. # Super-node's execution node, then sends its output to external execution node.
  63. # 4 nodes total. Super-node execution node should have two parent-operations
  64. # pointing at first two nodes, and final node should have one parent pointing
  65. # at execution node WITHIN supernode.
  66. external_input_node_ids = ["db9f3f5b-b2e3-4824-aadd-c1c6bf652534", "f6584209-6f22-434f-9820-41327b6c749d"]
  67. supernode_excution_node_id = "079c0e12-eb5f-4fcc-983b-09e011869fee"
  68. external_node_id = "7628306d-2cc2-405c-94a1-fe42c95567a1"
  69. for node_id in pipeline.operations:
  70. # Validate operations list
  71. if node_id in external_input_node_ids:
  72. # These are input nodes, ensure parent_operation_ids are empty
  73. assert len(pipeline.operations[node_id].parent_operation_ids) == 0
  74. continue
  75. if node_id == supernode_excution_node_id:
  76. # Node within supernode, should have two parent_ops matching external_input_node_ids
  77. assert len(pipeline.operations[node_id].parent_operation_ids) == 2
  78. assert set(pipeline.operations[node_id].parent_operation_ids) == set(external_input_node_ids)
  79. continue
  80. if node_id == external_node_id:
  81. # Final external node, should have super_node embedded node as parent op.
  82. assert len(pipeline.operations[node_id].parent_operation_ids) == 1
  83. assert pipeline.operations[node_id].parent_operation_ids[0] == supernode_excution_node_id
  84. continue
  85. assert False, "Invalid node_id encountered in pipeline operations!"
  86. def test_multiple_pipeline_definition():
  87. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/" "pipeline_multiple_pipeline_definitions.json")
  88. with pytest.raises(ValueError):
  89. PipelineParser().parse(pipeline_json)
  90. def test_pipeline_operations_and_handle_artifact_file_details():
  91. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_3_node_sample.json")
  92. pipeline = PipelineParser().parse(pipeline_json)
  93. assert len(pipeline.operations) == 3
  94. for op in pipeline.operations.values():
  95. assert "." not in op.name
  96. def test_pipeline_with_dependencies():
  97. pipeline_json = _read_pipeline_resource(
  98. "resources/sample_pipelines/" "pipeline_3_node_sample_with_dependencies.json"
  99. )
  100. pipeline = PipelineParser().parse(pipeline_json)
  101. assert len(pipeline.operations["acc4527d-7cc8-4c16-b520-5aa0f50a2e34"].parent_operation_ids) == 2
  102. def test_pipeline_with_comments():
  103. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/" "pipeline_3_node_sample_with_comments.json")
  104. pipeline = PipelineParser().parse(pipeline_json)
  105. assert (
  106. pipeline.operations["d52ddfb4-dd0e-47ac-abc7-fa30bb95d45c"].doc
  107. == "Generate community stats and then aggregate them on an overview dashboard"
  108. )
  109. def test_pipeline_global_attributes():
  110. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
  111. pipeline = PipelineParser().parse(pipeline_json)
  112. assert pipeline.name == "{{name}}"
  113. assert pipeline.runtime == "{{runtime}}"
  114. assert pipeline.runtime_config == "{{runtime-config}}"
  115. def test_missing_pipeline_name_should_default_to_untitled():
  116. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
  117. pipeline_json["pipelines"][0]["app_data"]["properties"].pop("name")
  118. pipeline = PipelineParser().parse(pipeline_json)
  119. assert pipeline.name == "untitled"
  120. def test_missing_pipeline_runtime():
  121. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
  122. pipeline_json["pipelines"][0]["app_data"].pop("runtime")
  123. with pytest.raises(ValueError) as e:
  124. PipelineParser().parse(pipeline_json)
  125. assert "Invalid pipeline: Missing runtime." in str(e.value)
  126. def test_missing_pipeline_runtime_configuration():
  127. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
  128. pipeline_json["pipelines"][0]["app_data"].pop("runtime_config")
  129. with pytest.raises(ValueError) as e:
  130. PipelineParser().parse(pipeline_json)
  131. assert "Invalid pipeline: Missing runtime configuration" in str(e.value)
  132. def test_missing_operation_id():
  133. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
  134. pipeline_json["pipelines"][0]["nodes"][0].pop("id")
  135. with pytest.raises(ValueError) as e:
  136. PipelineParser().parse(pipeline_json)
  137. assert "Missing field 'operation id'" in str(e.value)
  138. def test_missing_operation_type():
  139. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
  140. pipeline_json["pipelines"][0]["nodes"][0].pop("type")
  141. with pytest.raises(ValueError) as e:
  142. PipelineParser().parse(pipeline_json)
  143. assert "Node type 'None' is invalid!" in str(e.value)
  144. def test_invalid_node_type():
  145. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
  146. pipeline_json["pipelines"][0]["nodes"][0]["type"] = "foo"
  147. with pytest.raises(ValueError) as e:
  148. PipelineParser().parse(pipeline_json)
  149. assert "Node type 'foo' is invalid!" in str(e.value)
  150. def test_missing_operation_filename():
  151. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
  152. pipeline_json["pipelines"][0]["nodes"][0]["app_data"]["component_parameters"].pop("filename")
  153. with pytest.raises(ValueError) as e:
  154. PipelineParser().parse(pipeline_json)
  155. assert "Missing field 'operation filename" in str(e.value)
  156. def test_missing_operation_image():
  157. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
  158. pipeline_json["pipelines"][0]["nodes"][0]["app_data"]["component_parameters"].pop("runtime_image")
  159. with pytest.raises(ValueError) as e:
  160. PipelineParser().parse(pipeline_json)
  161. assert "Missing field 'operation runtime image'" in str(e.value)