test_pipeline_processor_local.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import os
  17. import nbformat
  18. import pytest
  19. from elyra.pipeline.local.processor_local import LocalPipelineProcessor
  20. from elyra.pipeline.parser import PipelineParser
  21. from elyra.pipeline.pipeline import GenericOperation
  22. from elyra.tests.pipeline.util import _read_pipeline_resource
  23. from elyra.tests.pipeline.util import construct_pipeline
  24. from elyra.tests.pipeline.util import NotebookNode
  25. from elyra.tests.pipeline.util import PythonNode
  26. @pytest.fixture
  27. def pipeline_dir(tmp_path):
  28. pipeline_path = os.path.join(tmp_path, "pipeline")
  29. os.makedirs(pipeline_path)
  30. return pipeline_path
  31. def test_pipeline_execution_order_in_complex_pipeline():
  32. expected_operation_names = ["a", "b", "c", "d", "e", "f", "x", "y", "g", "h"]
  33. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_dependency_complex.json")
  34. pipeline = PipelineParser().parse(pipeline_json)
  35. current_ordered_operation_names = _get_operation_names(pipeline.operations.values())
  36. assert current_ordered_operation_names != expected_operation_names
  37. operations = LocalPipelineProcessor._sort_operations(operations_by_id=pipeline.operations)
  38. ordered_operation_names = _get_operation_names(operations)
  39. assert ordered_operation_names == expected_operation_names
  40. def test_pipeline_execution_order_in_simple_pipeline():
  41. expected_operation_names = ["f", "a", "c", "g"]
  42. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_dependency_simple.json")
  43. pipeline = PipelineParser().parse(pipeline_json)
  44. current_ordered_operation_names = _get_operation_names(pipeline.operations.values())
  45. assert current_ordered_operation_names != expected_operation_names
  46. operations = LocalPipelineProcessor._sort_operations(operations_by_id=pipeline.operations)
  47. ordered_operation_names = _get_operation_names(operations)
  48. assert ordered_operation_names == expected_operation_names
  49. def test_pipeline_get_envs():
  50. # Ensure pipeline operation env lists are properly converted to dictionaries.
  51. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_dependency_complex.json")
  52. pipeline = PipelineParser().parse(pipeline_json)
  53. for op in pipeline.operations.values():
  54. assert isinstance(op, GenericOperation)
  55. op_envs = op.env_vars.to_dict()
  56. assert op_envs["OP_NAME"] == op.name
  57. def test_pipeline_execution(pipeline_dir):
  58. # Construct 4-node pipeline consisting of 3 notebooks and 1 python script.
  59. # This pipeline is "diamond shaped" with node1 feeding nodes 2 and 3, each then
  60. # feeding node4.
  61. node1 = NotebookNode("node1", num_outputs=2)
  62. node2 = PythonNode("node2", num_outputs=2, input_nodes=[node1])
  63. node3 = NotebookNode("node3", num_outputs=2, input_nodes=[node1])
  64. node4 = NotebookNode("node4", num_outputs=2, input_nodes=[node2, node3])
  65. nodes = [node1, node2, node3, node4]
  66. pipeline = construct_pipeline("p1", nodes=nodes, location=pipeline_dir)
  67. LocalPipelineProcessor(root_dir=pipeline_dir).process(pipeline)
  68. # Confirm outputs
  69. for node in nodes:
  70. for output in node.outputs:
  71. assert os.path.exists(os.path.join(pipeline_dir, output))
  72. def test_pipeline_execution_missing_kernelspec(pipeline_dir):
  73. # Construct 4-node pipeline consisting of 3 notebooks and 1 python script.
  74. # This pipeline is "diamond shaped" with node1 feeding nodes 2 and 3, each then
  75. # feeding node4.
  76. node1 = NotebookNode("node1", num_outputs=2)
  77. node2 = PythonNode("node2", num_outputs=2, input_nodes=[node1])
  78. node3 = NotebookNode("node3", num_outputs=2, input_nodes=[node1])
  79. node4 = NotebookNode("node4", num_outputs=2, input_nodes=[node2, node3])
  80. nodes = [node1, node2, node3, node4]
  81. pipeline = construct_pipeline("p1", nodes=nodes, location=pipeline_dir)
  82. node1nb_file = os.path.join(pipeline_dir, pipeline.operations[node1.id].filename)
  83. nb = nbformat.read(node1nb_file, 4)
  84. nb["metadata"].pop("kernelspec")
  85. nbformat.write(nb, node1nb_file)
  86. with pytest.raises(RuntimeError) as e:
  87. LocalPipelineProcessor(root_dir=pipeline_dir).process(pipeline)
  88. assert (
  89. "Error processing operation node1 (node1.ipynb): No kernel "
  90. "name found in notebook and no override provided." in str(e.value)
  91. )
  92. def test_pipeline_execution_bad_notebook(pipeline_dir):
  93. # Construct 4-node pipeline where node 3 (nodebook) produces a failure
  94. node1 = NotebookNode("node1", num_outputs=2)
  95. node2 = PythonNode("node2", num_outputs=2, input_nodes=[node1])
  96. node3 = NotebookNode("node3", num_outputs=2, input_nodes=[node1], fail=True)
  97. node4 = NotebookNode("node4", num_outputs=2, input_nodes=[node2, node3])
  98. nodes = [node1, node2, node3, node4]
  99. processed_nodes = [node1, node2]
  100. unprocessed_nodes = [node3, node4]
  101. pipeline = construct_pipeline("p1", nodes=nodes, location=pipeline_dir)
  102. with pytest.raises(RuntimeError) as e:
  103. LocalPipelineProcessor(root_dir=pipeline_dir).process(pipeline)
  104. assert "Error processing operation node3" in str(e.value)
  105. # Confirm outputs (and non-outputs)
  106. for node in processed_nodes:
  107. for output in node.outputs:
  108. assert os.path.exists(os.path.join(pipeline_dir, output))
  109. for node in unprocessed_nodes:
  110. for output in node.outputs:
  111. assert not os.path.exists(os.path.join(pipeline_dir, output))
  112. def test_pipeline_execution_bad_python(pipeline_dir):
  113. # Construct 4-node pipeline where node 2 (python) produces a failure
  114. node1 = NotebookNode("node1", num_outputs=2)
  115. node2 = PythonNode("node2", num_outputs=2, input_nodes=[node1], fail=True)
  116. node3 = NotebookNode("node3", num_outputs=2, input_nodes=[node1])
  117. node4 = NotebookNode("node4", num_outputs=2, input_nodes=[node2, node3])
  118. nodes = [node1, node2, node3, node4]
  119. processed_nodes = [node1]
  120. unprocessed_nodes = [node2, node3, node4]
  121. pipeline = construct_pipeline("p1", nodes=nodes, location=pipeline_dir)
  122. with pytest.raises(RuntimeError) as e:
  123. LocalPipelineProcessor(root_dir=pipeline_dir).process(pipeline)
  124. assert "Error processing operation node2" in str(e.value)
  125. # Confirm outputs (and non-outputs)
  126. for node in processed_nodes:
  127. for output in node.outputs:
  128. assert os.path.exists(os.path.join(pipeline_dir, output))
  129. for node in unprocessed_nodes:
  130. for output in node.outputs:
  131. assert not os.path.exists(os.path.join(pipeline_dir, output))
  132. def _get_operation_names(operations):
  133. operation_names = []
  134. for operation in operations:
  135. operation_names.append(operation.name)
  136. return operation_names