test_pipeline_definition.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from unittest import mock
  17. from conftest import AIRFLOW_TEST_OPERATOR_CATALOG
  18. import pytest
  19. from elyra.pipeline import pipeline_constants
  20. from elyra.pipeline.pipeline import KeyValueList
  21. from elyra.pipeline.pipeline import VolumeMount
  22. from elyra.pipeline.pipeline_constants import ENV_VARIABLES
  23. from elyra.pipeline.pipeline_constants import KUBERNETES_SECRETS
  24. from elyra.pipeline.pipeline_constants import MOUNTED_VOLUMES
  25. from elyra.pipeline.pipeline_constants import RUNTIME_IMAGE
  26. from elyra.pipeline.pipeline_definition import Node
  27. from elyra.pipeline.pipeline_definition import PipelineDefinition
  28. from elyra.tests.pipeline.util import _read_pipeline_resource
  29. @pytest.fixture
  30. def mock_pipeline_property_propagation(monkeypatch):
  31. # Mock propagate_pipeline_default_properties to skip propagation
  32. monkeypatch.setattr(PipelineDefinition, "propagate_pipeline_default_properties", lambda x: True)
  33. def test_valid_pipeline():
  34. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
  35. pipeline_definition = PipelineDefinition(pipeline_definition=pipeline_json)
  36. assert pipeline_definition.is_valid()
  37. def test_validation_flags_missing_schema_version(mock_pipeline_property_propagation):
  38. _check_missing_pipeline_field("version", "Pipeline schema version field is missing.")
  39. def test_validation_flags_schema_version_has_wrong_type(mock_pipeline_property_propagation):
  40. _check_pipeline_field_type("version", 3.0, "Pipeline schema version field should be a string.")
  41. def test_validation_flags_missing_pipelines_field(mock_pipeline_property_propagation):
  42. _check_missing_pipeline_field("pipelines", "Pipeline is missing 'pipelines' field.")
  43. def test_validation_flags_pipelines_has_wrong_type(mock_pipeline_property_propagation):
  44. _check_pipeline_field_type("pipelines", "", "Field 'pipelines' should be a list.")
  45. def test_validation_flags_pipelines_is_empty(mock_pipeline_property_propagation):
  46. _check_pipeline_field_type("pipelines", list(), "Pipeline has zero length 'pipelines' field.")
  47. def test_validation_flags_missing_primary_pipeline_field(mock_pipeline_property_propagation):
  48. _check_missing_pipeline_field("primary_pipeline", "Could not determine the primary pipeline.")
  49. def test_validation_flags_missing_primary_pipeline_nodes_field(mock_pipeline_property_propagation):
  50. _check_missing_primary_pipeline_field("nodes", "At least one node must exist in the primary pipeline.")
  51. def test_validation_flags_missing_app_data_field(mock_pipeline_property_propagation):
  52. _check_missing_primary_pipeline_field("app_data", "Primary pipeline is missing the 'app_data' field.")
  53. def test_validation_flags_missing_version_field():
  54. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
  55. pipeline_json["pipelines"][0]["app_data"].pop("version")
  56. pipeline_definition = PipelineDefinition(pipeline_definition=pipeline_json)
  57. assert pipeline_definition.is_valid() is False
  58. assert "Primary pipeline is missing the 'version' field." in pipeline_definition.validate()
  59. def test_updates_to_primary_pipeline_updates_pipeline_definition():
  60. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
  61. pipeline_definition = PipelineDefinition(pipeline_definition=pipeline_json)
  62. pipeline_definition.primary_pipeline.set("version", 3)
  63. assert pipeline_definition.primary_pipeline.version == 3
  64. assert pipeline_definition.to_dict()["pipelines"][0]["app_data"]["version"] == 3
  65. def test_updates_to_nodes_updates_pipeline_definition():
  66. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
  67. pipeline_definition = PipelineDefinition(pipeline_definition=pipeline_json)
  68. for node in pipeline_definition.primary_pipeline.nodes:
  69. node.set_component_parameter("filename", "foo")
  70. for node in pipeline_definition.to_dict()["pipelines"][0]["nodes"]:
  71. assert node["app_data"]["component_parameters"]["filename"] == "foo"
  72. def test_envs_to_dict():
  73. test_list = ["TEST= one", "TEST_TWO=two ", "TEST_THREE =", " TEST_FOUR=1", "TEST_FIVE = fi=ve "]
  74. test_dict_correct = {"TEST": "one", "TEST_TWO": "two", "TEST_FOUR": "1", "TEST_FIVE": "fi=ve"}
  75. assert KeyValueList(test_list).to_dict() == test_dict_correct
  76. def test_env_dict_to_list():
  77. test_dict = {"TEST": "one", "TEST_TWO": "two", "TEST_FOUR": "1"}
  78. test_list_correct = ["TEST=one", "TEST_TWO=two", "TEST_FOUR=1"]
  79. assert KeyValueList.from_dict(test_dict) == test_list_correct
  80. def test_convert_kv_properties(monkeypatch):
  81. kv_test_property_name = "kv_test_property"
  82. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid_with_pipeline_default.json")
  83. # Mock get_kv_properties() to ensure the "kv_test_property" variable is included in the list
  84. mock_kv_property_list = [pipeline_constants.ENV_VARIABLES, kv_test_property_name]
  85. monkeypatch.setattr(PipelineDefinition, "get_kv_properties", mock.Mock(return_value=mock_kv_property_list))
  86. # Mock set_elyra_properties_to_skip() so that a ComponentCache instance is not created unnecessarily
  87. monkeypatch.setattr(Node, "set_elyra_properties_to_skip", mock.Mock(return_value=None))
  88. pipeline_definition = PipelineDefinition(pipeline_definition=pipeline_json)
  89. node = None
  90. for node in pipeline_definition.pipeline_nodes:
  91. if node.op == "execute-notebook-node": # assign the generic node to the node variable
  92. break
  93. pipeline_defaults = pipeline_definition.primary_pipeline.get_property(pipeline_constants.PIPELINE_DEFAULTS)
  94. for kv_property in mock_kv_property_list:
  95. assert isinstance(node.get_component_parameter(kv_property), KeyValueList)
  96. assert isinstance(pipeline_defaults[kv_property], KeyValueList)
  97. # Ensure a non-list property is not converted to a KeyValueList
  98. assert not isinstance(
  99. pipeline_definition.primary_pipeline.get_property(pipeline_constants.RUNTIME_IMAGE), KeyValueList
  100. )
  101. # Ensure plain list property is not converted to a KeyValueList
  102. assert not isinstance(node.get_component_parameter("outputs"), KeyValueList)
  103. def test_propagate_pipeline_default_properties(monkeypatch):
  104. kv_list_correct = ["var1=var1", "var2=var2", "var3=var_three"]
  105. kv_test_property_name = "kv_test_property"
  106. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid_with_pipeline_default.json")
  107. # Mock get_kv_properties() to ensure the "kv_test_property" variable is included in the list
  108. mock_kv_property_list = [pipeline_constants.ENV_VARIABLES, kv_test_property_name]
  109. monkeypatch.setattr(PipelineDefinition, "get_kv_properties", mock.Mock(return_value=mock_kv_property_list))
  110. # Mock set_elyra_properties_to_skip() so that a ComponentCache instance is not created unnecessarily
  111. monkeypatch.setattr(Node, "set_elyra_properties_to_skip", mock.Mock(return_value=None))
  112. pipeline_definition = PipelineDefinition(pipeline_definition=pipeline_json)
  113. generic_node = None
  114. custom_node_derive1 = None
  115. custom_node_derive2 = None
  116. custom_node_test = None
  117. for node in pipeline_definition.pipeline_nodes:
  118. if "Notebook" in node.id:
  119. generic_node = node
  120. elif "DeriveFromTestOperator1" in node.id:
  121. custom_node_derive1 = node
  122. elif "DeriveFromTestOperator2" in node.id:
  123. custom_node_derive2 = node
  124. elif "TestOperator1" in node.id:
  125. custom_node_test = node
  126. # Ensure that default properties have been propagated
  127. assert generic_node.get_component_parameter(pipeline_constants.ENV_VARIABLES) == kv_list_correct
  128. assert generic_node.get_component_parameter(kv_test_property_name) == kv_list_correct
  129. # Ensure that runtime image and env vars are not propagated to custom components
  130. assert custom_node_test.get_component_parameter(RUNTIME_IMAGE) is None
  131. assert custom_node_derive1.get_component_parameter(RUNTIME_IMAGE) is None
  132. assert custom_node_derive2.get_component_parameter(ENV_VARIABLES) is None
  133. @pytest.mark.parametrize("catalog_instance", [AIRFLOW_TEST_OPERATOR_CATALOG], indirect=True)
  134. def test_property_id_collision_with_system_property(monkeypatch, catalog_instance):
  135. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid_with_pipeline_default.json")
  136. pipeline_definition = PipelineDefinition(pipeline_definition=pipeline_json)
  137. custom_node_derive1 = None
  138. custom_node_derive2 = None
  139. custom_node_test = None
  140. for node in pipeline_definition.pipeline_nodes:
  141. if "DeriveFromTestOperator1" in node.id:
  142. custom_node_derive1 = node
  143. elif "DeriveFromTestOperator2" in node.id:
  144. custom_node_derive2 = node
  145. elif "TestOperator1" in node.id:
  146. custom_node_test = node
  147. # DeriveFromTestOperator does not define its own 'mounted_volumes'
  148. # property and should not skip the Elyra 'mounted_volumes' property
  149. assert MOUNTED_VOLUMES not in custom_node_derive1.elyra_properties_to_skip
  150. assert MOUNTED_VOLUMES not in custom_node_derive2.elyra_properties_to_skip
  151. # Property value should be a combination of the lists given on the
  152. # pipeline node and in the pipeline default properties
  153. assert custom_node_derive1.get_component_parameter(MOUNTED_VOLUMES) == [
  154. VolumeMount(path="/mnt/vol2", pvc_name="pvc-claim-2"),
  155. VolumeMount(path="/mnt/vol1", pvc_name="pvc-claim-1"),
  156. ]
  157. assert custom_node_derive2.get_component_parameter(MOUNTED_VOLUMES) == [
  158. VolumeMount(path="/mnt/vol2", pvc_name="pvc-claim-2")
  159. ]
  160. # TestOperator defines its own 'mounted_volumes' property
  161. # and should skip the Elyra system property of the same name
  162. assert MOUNTED_VOLUMES in custom_node_test.elyra_properties_to_skip
  163. # Property value should be as-assigned in pipeline file
  164. assert custom_node_test.get_component_parameter(MOUNTED_VOLUMES) == "a component-parsed property"
  165. def test_remove_env_vars_with_matching_secrets(monkeypatch):
  166. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid_with_pipeline_default.json")
  167. # Mock set_elyra_properties_to_skip() so that a ComponentCache instance is not created unnecessarily
  168. monkeypatch.setattr(Node, "set_elyra_properties_to_skip", mock.Mock(return_value=None))
  169. pipeline_definition = PipelineDefinition(pipeline_definition=pipeline_json)
  170. node = None
  171. for node in pipeline_definition.pipeline_nodes:
  172. if node.op == "execute-notebook-node": # assign the generic node to the node variable
  173. break
  174. # Set kubernetes_secret property to have all the same keys as those in the env_vars property
  175. kubernetes_secrets = KeyValueList(["var1=name1:key1", "var2=name2:key2", "var3=name3:key3"])
  176. node.set_component_parameter(KUBERNETES_SECRETS, kubernetes_secrets)
  177. node.remove_env_vars_with_matching_secrets()
  178. assert node.get_component_parameter(ENV_VARIABLES) == []
  179. def _check_pipeline_correct_pipeline_name():
  180. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
  181. pipeline_definition = PipelineDefinition(pipeline_definition=pipeline_json)
  182. primary_pipeline = pipeline_definition.primary_pipeline
  183. assert primary_pipeline.name == "{{name}}"
  184. def _check_pipeline_correct_pipeline_alternative_name():
  185. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid_alternative_name.json")
  186. pipeline_definition = PipelineDefinition(pipeline_definition=pipeline_json)
  187. primary_pipeline = pipeline_definition.primary_pipeline
  188. assert primary_pipeline.name == "{{alternative_name}}"
  189. #####################
  190. # Utility functions #
  191. #####################
  192. def _check_missing_pipeline_field(field: str, error_msg: str):
  193. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
  194. pipeline_json.pop(field)
  195. pipeline_definition = PipelineDefinition(pipeline_definition=pipeline_json)
  196. assert pipeline_definition.is_valid() is False
  197. assert error_msg in pipeline_definition.validate()
  198. def _check_pipeline_field_type(field: str, wrong_type_value: any, error_msg: str):
  199. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
  200. pipeline_json.pop(field)
  201. pipeline_json[field] = wrong_type_value
  202. pipeline_definition = PipelineDefinition(pipeline_definition=pipeline_json)
  203. assert pipeline_definition.is_valid() is False
  204. assert error_msg in pipeline_definition.validate()
  205. def _check_missing_primary_pipeline_field(field: str, error_msg: str):
  206. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
  207. pipeline_json["pipelines"][0].pop(field)
  208. pipeline_definition = PipelineDefinition(pipeline_definition=pipeline_json)
  209. assert pipeline_definition.is_valid() is False
  210. assert error_msg in pipeline_definition.validate()