test_pipeline_definition.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from unittest import mock
  17. import pytest
  18. from elyra.pipeline import pipeline_constants
  19. from elyra.pipeline.pipeline import KeyValueList
  20. from elyra.pipeline.pipeline_constants import ENV_VARIABLES
  21. from elyra.pipeline.pipeline_constants import KUBERNETES_SECRETS
  22. from elyra.pipeline.pipeline_definition import PipelineDefinition
  23. from elyra.tests.pipeline.util import _read_pipeline_resource
  24. @pytest.fixture
  25. def mock_pipeline_property_propagation(monkeypatch):
  26. # Mock propagate_pipeline_default_properties to skip propagation
  27. monkeypatch.setattr(PipelineDefinition, "propagate_pipeline_default_properties", lambda x: True)
  28. def test_valid_pipeline():
  29. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
  30. pipeline_definition = PipelineDefinition(pipeline_definition=pipeline_json)
  31. assert pipeline_definition.is_valid()
  32. def test_validation_flags_missing_schema_version(mock_pipeline_property_propagation):
  33. _check_missing_pipeline_field("version", "Pipeline schema version field is missing.")
  34. def test_validation_flags_schema_version_has_wrong_type(mock_pipeline_property_propagation):
  35. _check_pipeline_field_type("version", 3.0, "Pipeline schema version field should be a string.")
  36. def test_validation_flags_missing_pipelines_field(mock_pipeline_property_propagation):
  37. _check_missing_pipeline_field("pipelines", "Pipeline is missing 'pipelines' field.")
  38. def test_validation_flags_pipelines_has_wrong_type(mock_pipeline_property_propagation):
  39. _check_pipeline_field_type("pipelines", "", "Field 'pipelines' should be a list.")
  40. def test_validation_flags_pipelines_is_empty(mock_pipeline_property_propagation):
  41. _check_pipeline_field_type("pipelines", list(), "Pipeline has zero length 'pipelines' field.")
  42. def test_validation_flags_missing_primary_pipeline_field(mock_pipeline_property_propagation):
  43. _check_missing_pipeline_field("primary_pipeline", "Could not determine the primary pipeline.")
  44. def test_validation_flags_missing_primary_pipeline_nodes_field(mock_pipeline_property_propagation):
  45. _check_missing_primary_pipeline_field("nodes", "At least one node must exist in the primary pipeline.")
  46. def test_validation_flags_missing_app_data_field(mock_pipeline_property_propagation):
  47. _check_missing_primary_pipeline_field("app_data", "Primary pipeline is missing the 'app_data' field.")
  48. def test_validation_flags_missing_version_field():
  49. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
  50. pipeline_json["pipelines"][0]["app_data"].pop("version")
  51. pipeline_definition = PipelineDefinition(pipeline_definition=pipeline_json)
  52. assert pipeline_definition.is_valid() is False
  53. assert "Primary pipeline is missing the 'version' field." in pipeline_definition.validate()
  54. def test_updates_to_primary_pipeline_updates_pipeline_definition():
  55. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
  56. pipeline_definition = PipelineDefinition(pipeline_definition=pipeline_json)
  57. pipeline_definition.primary_pipeline.set("version", 3)
  58. assert pipeline_definition.primary_pipeline.version == 3
  59. assert pipeline_definition.to_dict()["pipelines"][0]["app_data"]["version"] == 3
  60. def test_updates_to_nodes_updates_pipeline_definition():
  61. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
  62. pipeline_definition = PipelineDefinition(pipeline_definition=pipeline_json)
  63. for node in pipeline_definition.primary_pipeline.nodes:
  64. node.set_component_parameter("filename", "foo")
  65. for node in pipeline_definition.to_dict()["pipelines"][0]["nodes"]:
  66. assert node["app_data"]["component_parameters"]["filename"] == "foo"
  67. def test_envs_to_dict():
  68. test_list = ["TEST= one", "TEST_TWO=two ", "TEST_THREE =", " TEST_FOUR=1", "TEST_FIVE = fi=ve "]
  69. test_dict_correct = {"TEST": "one", "TEST_TWO": "two", "TEST_FOUR": "1", "TEST_FIVE": "fi=ve"}
  70. assert KeyValueList(test_list).to_dict() == test_dict_correct
  71. def test_env_dict_to_list():
  72. test_dict = {"TEST": "one", "TEST_TWO": "two", "TEST_FOUR": "1"}
  73. test_list_correct = ["TEST=one", "TEST_TWO=two", "TEST_FOUR=1"]
  74. assert KeyValueList.from_dict(test_dict) == test_list_correct
  75. def test_convert_kv_properties(monkeypatch):
  76. kv_test_property_name = "kv_test_property"
  77. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid_with_pipeline_default.json")
  78. # Mock get_kv_properties() to ensure the "kv_test_property" variable is included in the list
  79. mock_kv_property_list = [pipeline_constants.ENV_VARIABLES, kv_test_property_name]
  80. monkeypatch.setattr(PipelineDefinition, "get_kv_properties", mock.Mock(return_value=mock_kv_property_list))
  81. pipeline_definition = PipelineDefinition(pipeline_definition=pipeline_json)
  82. node = pipeline_definition.primary_pipeline.nodes.pop()
  83. pipeline_defaults = pipeline_definition.primary_pipeline.get_property(pipeline_constants.PIPELINE_DEFAULTS)
  84. for kv_property in mock_kv_property_list:
  85. assert isinstance(node.get_component_parameter(kv_property), KeyValueList)
  86. assert isinstance(pipeline_defaults[kv_property], KeyValueList)
  87. # Ensure a non-list property is not converted to a KeyValueList
  88. assert not isinstance(
  89. pipeline_definition.primary_pipeline.get_property(pipeline_constants.RUNTIME_IMAGE), KeyValueList
  90. )
  91. # Ensure plain list property is not converted to a KeyValueList
  92. assert not isinstance(node.get_component_parameter("outputs"), KeyValueList)
  93. def test_propagate_pipeline_default_properties(monkeypatch):
  94. kv_list_correct = ["var1=var1", "var2=var2", "var3=var_three"]
  95. kv_test_property_name = "kv_test_property"
  96. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid_with_pipeline_default.json")
  97. # Mock get_kv_properties() to ensure the "kv_test_property" variable is included in the list
  98. mock_kv_property_list = [pipeline_constants.ENV_VARIABLES, kv_test_property_name]
  99. monkeypatch.setattr(PipelineDefinition, "get_kv_properties", mock.Mock(return_value=mock_kv_property_list))
  100. pipeline_definition = PipelineDefinition(pipeline_definition=pipeline_json)
  101. node = pipeline_definition.primary_pipeline.nodes.pop()
  102. assert node.get_component_parameter(pipeline_constants.ENV_VARIABLES) == kv_list_correct
  103. assert node.get_component_parameter(kv_test_property_name) == kv_list_correct
  104. def test_remove_env_vars_with_matching_secrets():
  105. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid_with_pipeline_default.json")
  106. pipeline_definition = PipelineDefinition(pipeline_definition=pipeline_json)
  107. node = pipeline_definition.primary_pipeline.nodes.pop()
  108. # Set kubernetes_secret property to have all the same keys as those in the env_vars property
  109. kubernetes_secrets = KeyValueList(["var1=name1:key1", "var2=name2:key2", "var3=name3:key3"])
  110. node.set_component_parameter(KUBERNETES_SECRETS, kubernetes_secrets)
  111. node.remove_env_vars_with_matching_secrets()
  112. assert node.get_component_parameter(ENV_VARIABLES) == []
  113. def _check_pipeline_correct_pipeline_name():
  114. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
  115. pipeline_definition = PipelineDefinition(pipeline_definition=pipeline_json)
  116. primary_pipeline = pipeline_definition.primary_pipeline
  117. assert primary_pipeline.name == "{{name}}"
  118. def _check_pipeline_correct_pipeline_alternative_name():
  119. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid_alternative_name.json")
  120. pipeline_definition = PipelineDefinition(pipeline_definition=pipeline_json)
  121. primary_pipeline = pipeline_definition.primary_pipeline
  122. assert primary_pipeline.name == "{{alternative_name}}"
  123. #####################
  124. # Utility functions #
  125. #####################
  126. def _check_missing_pipeline_field(field: str, error_msg: str):
  127. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
  128. pipeline_json.pop(field)
  129. pipeline_definition = PipelineDefinition(pipeline_definition=pipeline_json)
  130. assert pipeline_definition.is_valid() is False
  131. assert error_msg in pipeline_definition.validate()
  132. def _check_pipeline_field_type(field: str, wrong_type_value: any, error_msg: str):
  133. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
  134. pipeline_json.pop(field)
  135. pipeline_json[field] = wrong_type_value
  136. pipeline_definition = PipelineDefinition(pipeline_definition=pipeline_json)
  137. assert pipeline_definition.is_valid() is False
  138. assert error_msg in pipeline_definition.validate()
  139. def _check_missing_primary_pipeline_field(field: str, error_msg: str):
  140. pipeline_json = _read_pipeline_resource("resources/sample_pipelines/pipeline_valid.json")
  141. pipeline_json["pipelines"][0].pop(field)
  142. pipeline_definition = PipelineDefinition(pipeline_definition=pipeline_json)
  143. assert pipeline_definition.is_valid() is False
  144. assert error_msg in pipeline_definition.validate()