123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 |
- #
- # Copyright 2018-2022 Elyra Authors
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- import os
- import nbformat
- import pytest
- from elyra.pipeline.local.processor_local import LocalPipelineProcessor
- from elyra.pipeline.parser import PipelineParser
- from elyra.pipeline.pipeline import GenericOperation
- from elyra.tests.pipeline.util import _read_pipeline_resource
- from elyra.tests.pipeline.util import construct_pipeline
- from elyra.tests.pipeline.util import NotebookNode
- from elyra.tests.pipeline.util import PythonNode
- @pytest.fixture
- def pipeline_dir(tmp_path):
- pipeline_path = os.path.join(tmp_path, "pipeline")
- os.makedirs(pipeline_path)
- return pipeline_path
- def test_pipeline_execution_order_in_complex_pipeline():
- expected_operation_names = ["a", "b", "c", "d", "e", "f", "x", "y", "g", "h"]
- pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_dependency_complex.json")
- pipeline = PipelineParser().parse(pipeline_json)
- current_ordered_operation_names = _get_operation_names(pipeline.operations.values())
- assert current_ordered_operation_names != expected_operation_names
- operations = LocalPipelineProcessor._sort_operations(operations_by_id=pipeline.operations)
- ordered_operation_names = _get_operation_names(operations)
- assert ordered_operation_names == expected_operation_names
- def test_pipeline_execution_order_in_simple_pipeline():
- expected_operation_names = ["f", "a", "c", "g"]
- pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_dependency_simple.json")
- pipeline = PipelineParser().parse(pipeline_json)
- current_ordered_operation_names = _get_operation_names(pipeline.operations.values())
- assert current_ordered_operation_names != expected_operation_names
- operations = LocalPipelineProcessor._sort_operations(operations_by_id=pipeline.operations)
- ordered_operation_names = _get_operation_names(operations)
- assert ordered_operation_names == expected_operation_names
- def test_pipeline_get_envs():
- # Ensure pipeline operation env lists are properly converted to dictionaries.
- pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_dependency_complex.json")
- pipeline = PipelineParser().parse(pipeline_json)
- for op in pipeline.operations.values():
- assert isinstance(op, GenericOperation)
- op_envs = op.env_vars.to_dict()
- assert op_envs["OP_NAME"] == op.name
- def test_pipeline_execution(pipeline_dir):
- # Construct 4-node pipeline consisting of 3 notebooks and 1 python script.
- # This pipeline is "diamond shaped" with node1 feeding nodes 2 and 3, each then
- # feeding node4.
- node1 = NotebookNode("node1", num_outputs=2)
- node2 = PythonNode("node2", num_outputs=2, input_nodes=[node1])
- node3 = NotebookNode("node3", num_outputs=2, input_nodes=[node1])
- node4 = NotebookNode("node4", num_outputs=2, input_nodes=[node2, node3])
- nodes = [node1, node2, node3, node4]
- pipeline = construct_pipeline("p1", nodes=nodes, location=pipeline_dir)
- LocalPipelineProcessor(root_dir=pipeline_dir).process(pipeline)
- # Confirm outputs
- for node in nodes:
- for output in node.outputs:
- assert os.path.exists(os.path.join(pipeline_dir, output))
- def test_pipeline_execution_missing_kernelspec(pipeline_dir):
- # Construct 4-node pipeline consisting of 3 notebooks and 1 python script.
- # This pipeline is "diamond shaped" with node1 feeding nodes 2 and 3, each then
- # feeding node4.
- node1 = NotebookNode("node1", num_outputs=2)
- node2 = PythonNode("node2", num_outputs=2, input_nodes=[node1])
- node3 = NotebookNode("node3", num_outputs=2, input_nodes=[node1])
- node4 = NotebookNode("node4", num_outputs=2, input_nodes=[node2, node3])
- nodes = [node1, node2, node3, node4]
- pipeline = construct_pipeline("p1", nodes=nodes, location=pipeline_dir)
- node1nb_file = os.path.join(pipeline_dir, pipeline.operations[node1.id].filename)
- nb = nbformat.read(node1nb_file, 4)
- nb["metadata"].pop("kernelspec")
- nbformat.write(nb, node1nb_file)
- with pytest.raises(RuntimeError) as e:
- LocalPipelineProcessor(root_dir=pipeline_dir).process(pipeline)
- assert (
- "Error processing operation node1 (node1.ipynb): No kernel "
- "name found in notebook and no override provided." in str(e.value)
- )
- def test_pipeline_execution_bad_notebook(pipeline_dir):
- # Construct 4-node pipeline where node 3 (nodebook) produces a failure
- node1 = NotebookNode("node1", num_outputs=2)
- node2 = PythonNode("node2", num_outputs=2, input_nodes=[node1])
- node3 = NotebookNode("node3", num_outputs=2, input_nodes=[node1], fail=True)
- node4 = NotebookNode("node4", num_outputs=2, input_nodes=[node2, node3])
- nodes = [node1, node2, node3, node4]
- processed_nodes = [node1, node2]
- unprocessed_nodes = [node3, node4]
- pipeline = construct_pipeline("p1", nodes=nodes, location=pipeline_dir)
- with pytest.raises(RuntimeError) as e:
- LocalPipelineProcessor(root_dir=pipeline_dir).process(pipeline)
- assert "Error processing operation node3" in str(e.value)
- # Confirm outputs (and non-outputs)
- for node in processed_nodes:
- for output in node.outputs:
- assert os.path.exists(os.path.join(pipeline_dir, output))
- for node in unprocessed_nodes:
- for output in node.outputs:
- assert not os.path.exists(os.path.join(pipeline_dir, output))
- def test_pipeline_execution_bad_python(pipeline_dir):
- # Construct 4-node pipeline where node 2 (python) produces a failure
- node1 = NotebookNode("node1", num_outputs=2)
- node2 = PythonNode("node2", num_outputs=2, input_nodes=[node1], fail=True)
- node3 = NotebookNode("node3", num_outputs=2, input_nodes=[node1])
- node4 = NotebookNode("node4", num_outputs=2, input_nodes=[node2, node3])
- nodes = [node1, node2, node3, node4]
- processed_nodes = [node1]
- unprocessed_nodes = [node2, node3, node4]
- pipeline = construct_pipeline("p1", nodes=nodes, location=pipeline_dir)
- with pytest.raises(RuntimeError) as e:
- LocalPipelineProcessor(root_dir=pipeline_dir).process(pipeline)
- assert "Error processing operation node2" in str(e.value)
- # Confirm outputs (and non-outputs)
- for node in processed_nodes:
- for output in node.outputs:
- assert os.path.exists(os.path.join(pipeline_dir, output))
- for node in unprocessed_nodes:
- for output in node.outputs:
- assert not os.path.exists(os.path.join(pipeline_dir, output))
- def _get_operation_names(operations):
- operation_names = []
- for operation in operations:
- operation_names.append(operation.name)
- return operation_names
|