123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233 |
- #
- # Copyright 2018-2022 Elyra Authors
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- import pytest
- from elyra.pipeline.parser import PipelineParser
- from elyra.pipeline.pipeline import GenericOperation
- from elyra.tests.pipeline.util import _read_pipeline_resource
- @pytest.fixture
- def valid_operation():
- component_parameters = {
- "filename": "{{filename}}",
- "runtime_image": "{{runtime_image}}",
- "env_vars": ["var1=var1", "var2=var2"],
- "dependencies": ["a.txt", "b.txt", "c.txt"],
- "outputs": ["d.txt", "e.txt", "f.txt"],
- }
- return GenericOperation(
- id="{{uuid}}",
- type="execution_node",
- classifier="execute-notebook-node",
- name="{{label}}",
- component_params=component_parameters,
- )
- def test_valid_pipeline(valid_operation):
- pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
- pipeline = PipelineParser().parse(pipeline_json)
- assert pipeline.name == "{{name}}"
- assert pipeline.runtime == "{{runtime}}"
- assert pipeline.runtime_config == "{{runtime-config}}"
- assert len(pipeline.operations) == 1
- assert pipeline.operations["{{uuid}}"] == valid_operation
- def test_pipeline_with_dirty_list_values(valid_operation):
- pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_with_invalid_list_values.json")
- pipeline = PipelineParser().parse(pipeline_json)
- assert pipeline.name == "{{name}}"
- assert pipeline.runtime == "{{runtime}}"
- assert pipeline.runtime_config == "{{runtime-config}}"
- assert len(pipeline.operations) == 1
- assert pipeline.operations["{{uuid}}"] == valid_operation
- def test_multinode_pipeline():
- pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_3_node_sample.json")
- pipeline = PipelineParser().parse(pipeline_json)
- assert len(pipeline.operations) == 3
- def test_supernode_pipeline():
- pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_with_supernode.json")
- pipeline = PipelineParser().parse(pipeline_json)
- assert len(pipeline.operations) == 4
- # Confirm structure of pipeline:
- # Two execution nodes feed their outputs to super-node with one execution_node.
- # Super-node's execution node, then sends its output to external execution node.
- # 4 nodes total. Super-node execution node should have two parent-operations
- # pointing at first two nodes, and final node should have one parent pointing
- # at execution node WITHIN supernode.
- external_input_node_ids = ["db9f3f5b-b2e3-4824-aadd-c1c6bf652534", "f6584209-6f22-434f-9820-41327b6c749d"]
- supernode_excution_node_id = "079c0e12-eb5f-4fcc-983b-09e011869fee"
- external_node_id = "7628306d-2cc2-405c-94a1-fe42c95567a1"
- for node_id in pipeline.operations:
- # Validate operations list
- if node_id in external_input_node_ids:
- # These are input nodes, ensure parent_operation_ids are empty
- assert len(pipeline.operations[node_id].parent_operation_ids) == 0
- continue
- if node_id == supernode_excution_node_id:
- # Node within supernode, should have two parent_ops matching external_input_node_ids
- assert len(pipeline.operations[node_id].parent_operation_ids) == 2
- assert set(pipeline.operations[node_id].parent_operation_ids) == set(external_input_node_ids)
- continue
- if node_id == external_node_id:
- # Final external node, should have super_node embedded node as parent op.
- assert len(pipeline.operations[node_id].parent_operation_ids) == 1
- assert pipeline.operations[node_id].parent_operation_ids[0] == supernode_excution_node_id
- continue
- assert False, "Invalid node_id encountered in pipeline operations!"
- def test_multiple_pipeline_definition():
- pipeline_json = _read_pipeline_resource("resources/sample_pipelines/" "pipeline_multiple_pipeline_definitions.json")
- with pytest.raises(ValueError):
- PipelineParser().parse(pipeline_json)
- def test_pipeline_operations_and_handle_artifact_file_details():
- pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_3_node_sample.json")
- pipeline = PipelineParser().parse(pipeline_json)
- assert len(pipeline.operations) == 3
- for op in pipeline.operations.values():
- assert "." not in op.name
- def test_pipeline_with_dependencies():
- pipeline_json = _read_pipeline_resource(
- "resources/sample_pipelines/" "pipeline_3_node_sample_with_dependencies.json"
- )
- pipeline = PipelineParser().parse(pipeline_json)
- assert len(pipeline.operations["acc4527d-7cc8-4c16-b520-5aa0f50a2e34"].parent_operation_ids) == 2
- def test_pipeline_with_comments():
- pipeline_json = _read_pipeline_resource("resources/sample_pipelines/" "pipeline_3_node_sample_with_comments.json")
- pipeline = PipelineParser().parse(pipeline_json)
- assert (
- pipeline.operations["d52ddfb4-dd0e-47ac-abc7-fa30bb95d45c"].doc
- == "Generate community stats and then aggregate them on an overview dashboard"
- )
- def test_pipeline_global_attributes():
- pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
- pipeline = PipelineParser().parse(pipeline_json)
- assert pipeline.name == "{{name}}"
- assert pipeline.runtime == "{{runtime}}"
- assert pipeline.runtime_config == "{{runtime-config}}"
- def test_missing_pipeline_name_should_default_to_untitled():
- pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
- pipeline_json["pipelines"][0]["app_data"]["properties"].pop("name")
- pipeline = PipelineParser().parse(pipeline_json)
- assert pipeline.name == "untitled"
- def test_missing_pipeline_runtime():
- pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
- pipeline_json["pipelines"][0]["app_data"].pop("runtime")
- with pytest.raises(ValueError) as e:
- PipelineParser().parse(pipeline_json)
- assert "Invalid pipeline: Missing runtime." in str(e.value)
- def test_missing_pipeline_runtime_configuration():
- pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
- pipeline_json["pipelines"][0]["app_data"].pop("runtime_config")
- with pytest.raises(ValueError) as e:
- PipelineParser().parse(pipeline_json)
- assert "Invalid pipeline: Missing runtime configuration" in str(e.value)
- def test_missing_operation_id():
- pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
- pipeline_json["pipelines"][0]["nodes"][0].pop("id")
- with pytest.raises(ValueError) as e:
- PipelineParser().parse(pipeline_json)
- assert "Missing field 'operation id'" in str(e.value)
- def test_missing_operation_type():
- pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
- pipeline_json["pipelines"][0]["nodes"][0].pop("type")
- with pytest.raises(ValueError) as e:
- PipelineParser().parse(pipeline_json)
- assert "Node type 'None' is invalid!" in str(e.value)
- def test_invalid_node_type():
- pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
- pipeline_json["pipelines"][0]["nodes"][0]["type"] = "foo"
- with pytest.raises(ValueError) as e:
- PipelineParser().parse(pipeline_json)
- assert "Node type 'foo' is invalid!" in str(e.value)
- def test_missing_operation_filename():
- pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
- pipeline_json["pipelines"][0]["nodes"][0]["app_data"]["component_parameters"].pop("filename")
- with pytest.raises(ValueError) as e:
- PipelineParser().parse(pipeline_json)
- assert "Missing field 'operation filename" in str(e.value)
- def test_missing_operation_image():
- pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
- pipeline_json["pipelines"][0]["nodes"][0]["app_data"]["component_parameters"].pop("runtime_image")
- with pytest.raises(ValueError) as e:
- PipelineParser().parse(pipeline_json)
- assert "Missing field 'operation runtime image'" in str(e.value)
|