123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670 |
- #
- # Copyright 2018-2022 Elyra Authors
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- import os
- from conftest import AIRFLOW_COMPONENT_CACHE_INSTANCE
- from conftest import KFP_COMPONENT_CACHE_INSTANCE
- import pytest
- from elyra.pipeline.pipeline import KubernetesSecret
- from elyra.pipeline.pipeline import PIPELINE_CURRENT_VERSION
- from elyra.pipeline.pipeline import VolumeMount
- from elyra.pipeline.pipeline_constants import KUBERNETES_SECRETS
- from elyra.pipeline.pipeline_constants import MOUNTED_VOLUMES
- from elyra.pipeline.pipeline_definition import PipelineDefinition
- from elyra.pipeline.validation import PipelineValidationManager
- from elyra.pipeline.validation import ValidationResponse
- from elyra.tests.pipeline.util import _read_pipeline_resource
- @pytest.fixture
- def load_pipeline():
- def _function(pipeline_filepath):
- response = ValidationResponse()
- pipeline = _read_pipeline_resource(f"resources/validation_pipelines/{pipeline_filepath}")
- return pipeline, response
- yield _function
- @pytest.fixture
- def validation_manager(setup_factory_data, component_cache):
- root = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__), "resources/validation_pipelines"))
- yield PipelineValidationManager.instance(root_dir=root)
- PipelineValidationManager.clear_instance()
- async def test_invalid_lower_pipeline_version(validation_manager, load_pipeline):
- pipeline, response = load_pipeline("generic_basic_pipeline_only_notebook.pipeline")
- pipeline_version = PIPELINE_CURRENT_VERSION - 1
- pipeline["pipelines"][0]["app_data"]["version"] = pipeline_version
- pipeline_definition = PipelineDefinition(pipeline_definition=pipeline)
- validation_manager._validate_pipeline_structure(pipeline_definition=pipeline_definition, response=response)
- issues = response.to_json().get("issues")
- assert len(issues) == 1
- assert issues[0]["severity"] == 1
- assert issues[0]["type"] == "invalidPipeline"
- assert (
- issues[0]["message"] == f"Pipeline version {pipeline_version} is out of date "
- "and needs to be migrated using the Elyra pipeline editor."
- )
- def test_invalid_upper_pipeline_version(validation_manager, load_pipeline):
- pipeline, response = load_pipeline("generic_basic_pipeline_only_notebook.pipeline")
- pipeline_version = PIPELINE_CURRENT_VERSION + 1
- pipeline["pipelines"][0]["app_data"]["version"] = pipeline_version
- pipeline_definition = PipelineDefinition(pipeline_definition=pipeline)
- validation_manager._validate_pipeline_structure(pipeline_definition=pipeline_definition, response=response)
- issues = response.to_json().get("issues")
- assert len(issues) == 1
- assert issues[0]["severity"] == 1
- assert issues[0]["type"] == "invalidPipeline"
- assert (
- issues[0]["message"] == "Pipeline was last edited in a newer version of Elyra. "
- "Update Elyra to use this pipeline."
- )
- def test_invalid_pipeline_version_that_needs_migration(validation_manager, load_pipeline):
- pipeline, response = load_pipeline("generic_basic_pipeline_only_notebook.pipeline")
- pipeline["pipelines"][0]["app_data"]["version"] = 3
- pipeline_definition = PipelineDefinition(pipeline_definition=pipeline)
- validation_manager._validate_pipeline_structure(pipeline_definition=pipeline_definition, response=response)
- issues = response.to_json().get("issues")
- assert len(issues) == 1
- assert issues[0]["severity"] == 1
- assert issues[0]["type"] == "invalidPipeline"
- assert "needs to be migrated" in issues[0]["message"]
- def test_basic_pipeline_structure(validation_manager, load_pipeline):
- pipeline, response = load_pipeline("generic_basic_pipeline_only_notebook.pipeline")
- pipeline_definition = PipelineDefinition(pipeline_definition=pipeline)
- validation_manager._validate_pipeline_structure(pipeline_definition=pipeline_definition, response=response)
- assert not response.has_fatal
- assert not response.to_json().get("issues")
- def test_basic_pipeline_structure_with_scripts(validation_manager, load_pipeline):
- pipeline, response = load_pipeline("generic_basic_pipeline_with_scripts.pipeline")
- pipeline_definition = PipelineDefinition(pipeline_definition=pipeline)
- validation_manager._validate_pipeline_structure(pipeline_definition=pipeline_definition, response=response)
- assert not response.has_fatal
- assert not response.to_json().get("issues")
- @pytest.mark.parametrize("catalog_instance", [KFP_COMPONENT_CACHE_INSTANCE], indirect=True)
- async def test_invalid_runtime_node_kubeflow(validation_manager, load_pipeline, catalog_instance):
- pipeline, response = load_pipeline("kf_invalid_node_op.pipeline")
- node_id = "eace43f8-c4b1-4a25-b331-d57d4fc29426"
- pipeline_definition = PipelineDefinition(pipeline_definition=pipeline)
- await validation_manager._validate_compatibility(
- pipeline_definition=pipeline_definition,
- response=response,
- pipeline_type="KUBEFLOW_PIPELINES",
- pipeline_runtime="kfp",
- )
- issues = response.to_json().get("issues")
- print(issues)
- assert len(issues) == 1
- assert issues[0]["severity"] == 1
- assert issues[0]["type"] == "invalidNodeType"
- assert issues[0]["data"]["nodeID"] == node_id
- @pytest.mark.parametrize("catalog_instance", [KFP_COMPONENT_CACHE_INSTANCE], indirect=True)
- async def test_invalid_runtime_node_kubeflow_with_supernode(validation_manager, load_pipeline, catalog_instance):
- pipeline, response = load_pipeline("kf_invalid_node_op_with_supernode.pipeline")
- node_id = "98aa7270-639b-42a4-9a07-b31cd0fa3205"
- pipeline_id = "00304a2b-dec4-4a73-ab4a-6830f97d7855"
- pipeline_definition = PipelineDefinition(pipeline_definition=pipeline)
- await validation_manager._validate_compatibility(
- pipeline_definition=pipeline_definition,
- response=response,
- pipeline_type="KUBEFLOW_PIPELINES",
- pipeline_runtime="kfp",
- )
- issues = response.to_json().get("issues")
- print(issues)
- assert len(issues) == 1
- assert issues[0]["severity"] == 1
- assert issues[0]["type"] == "invalidNodeType"
- assert issues[0]["data"]["pipelineId"] == pipeline_id
- assert issues[0]["data"]["nodeID"] == node_id
- async def test_invalid_pipeline_runtime_with_kubeflow_execution(validation_manager, load_pipeline):
- pipeline, response = load_pipeline("generic_basic_pipeline_with_scripts.pipeline")
- pipeline_definition = PipelineDefinition(pipeline_definition=pipeline)
- await validation_manager._validate_compatibility(
- pipeline_definition=pipeline_definition,
- response=response,
- pipeline_type="APACHE_AIRFLOW",
- pipeline_runtime="kfp",
- )
- issues = response.to_json().get("issues")
- assert len(issues) == 1
- assert issues[0]["severity"] == 1
- assert issues[0]["type"] == "invalidRuntime"
- async def test_invalid_pipeline_runtime_with_local_execution(validation_manager, load_pipeline):
- pipeline, response = load_pipeline("generic_basic_pipeline_with_scripts.pipeline")
- pipeline_definition = PipelineDefinition(pipeline_definition=pipeline)
- await validation_manager._validate_compatibility(
- pipeline_definition=pipeline_definition,
- response=response,
- pipeline_type="APACHE_AIRFLOW",
- pipeline_runtime="local",
- )
- issues = response.to_json().get("issues")
- assert len(issues) == 1
- assert issues[0]["severity"] == 1
- assert issues[0]["type"] == "invalidRuntime"
- assert issues[0]["data"]["pipelineType"] == "APACHE_AIRFLOW"
- async def test_invalid_node_op_with_airflow(validation_manager, load_pipeline):
- pipeline, response = load_pipeline("aa_invalid_node_op.pipeline")
- node_id = "749d4641-cee8-4a50-a0ed-30c07439908f"
- pipeline_definition = PipelineDefinition(pipeline_definition=pipeline)
- await validation_manager._validate_compatibility(
- pipeline_definition=pipeline_definition,
- response=response,
- pipeline_type="APACHE_AIRFLOW",
- pipeline_runtime="airflow",
- )
- issues = response.to_json().get("issues")
- assert len(issues) == 1
- assert issues[0]["severity"] == 1
- assert issues[0]["type"] == "invalidNodeType"
- assert issues[0]["data"]["nodeID"] == node_id
- async def test_invalid_node_property_structure(validation_manager, monkeypatch, load_pipeline):
- pipeline, response = load_pipeline("generic_invalid_node_property_structure.pipeline")
- node_id = "88ab83dc-d5f0-443a-8837-788ed16851b7"
- node_property = "runtime_image"
- pvm = validation_manager
- monkeypatch.setattr(pvm, "_validate_filepath", lambda node_id, node_label, property_name, filename, response: True)
- monkeypatch.setattr(pvm, "_validate_label", lambda node_id, node_label, response: True)
- pipeline_definition = PipelineDefinition(pipeline_definition=pipeline)
- await pvm._validate_node_properties(
- pipeline_definition=pipeline_definition, response=response, pipeline_type="GENERIC", pipeline_runtime="kfp"
- )
- issues = response.to_json().get("issues")
- assert len(issues) == 1
- assert issues[0]["severity"] == 1
- assert issues[0]["type"] == "invalidNodeProperty"
- assert issues[0]["data"]["propertyName"] == node_property
- assert issues[0]["data"]["nodeID"] == node_id
- @pytest.mark.parametrize("catalog_instance", [KFP_COMPONENT_CACHE_INSTANCE], indirect=True)
- async def test_missing_node_property_for_kubeflow_pipeline(
- validation_manager, monkeypatch, load_pipeline, catalog_instance
- ):
- pipeline, response = load_pipeline("kf_invalid_node_property_in_component.pipeline")
- node_id = "fe08b42d-bd8c-4e97-8010-0503a3185427"
- node_property = "notebook"
- pvm = validation_manager
- monkeypatch.setattr(pvm, "_validate_filepath", lambda node_id, file_dir, property_name, filename, response: True)
- pipeline_definition = PipelineDefinition(pipeline_definition=pipeline)
- await pvm._validate_node_properties(
- pipeline_definition=pipeline_definition,
- response=response,
- pipeline_type="KUBEFLOW_PIPELINES",
- pipeline_runtime="kfp",
- )
- issues = response.to_json().get("issues")
- assert len(issues) == 1
- assert issues[0]["severity"] == 1
- assert issues[0]["type"] == "invalidNodeProperty"
- assert issues[0]["data"]["propertyName"] == node_property
- assert issues[0]["data"]["nodeID"] == node_id
- def test_invalid_node_property_image_name(validation_manager, load_pipeline):
- pipeline, response = load_pipeline("generic_invalid_node_property_image_name.pipeline")
- node_ids = ["88ab83dc-d5f0-443a-8837-788ed16851b7", "7ae74ba6-d49f-48ea-9e4f-e44d13594b2f"]
- node_property = "runtime_image"
- for i, node_id in enumerate(node_ids):
- node = pipeline["pipelines"][0]["nodes"][i]
- node_label = node["app_data"].get("label")
- image_name = node["app_data"]["component_parameters"].get("runtime_image")
- validation_manager._validate_container_image_name(node["id"], node_label, image_name, response)
- issues = response.to_json().get("issues")
- assert len(issues) == 2
- # Test missing runtime image in node 0
- assert issues[0]["severity"] == 1
- assert issues[0]["type"] == "invalidNodeProperty"
- assert issues[0]["data"]["propertyName"] == node_property
- assert issues[0]["data"]["nodeID"] == node_ids[0]
- assert issues[0]["message"] == "Required property value is missing."
- # Test invalid format for runtime image in node 1
- assert issues[1]["severity"] == 1
- assert issues[1]["type"] == "invalidNodeProperty"
- assert issues[1]["data"]["propertyName"] == node_property
- assert issues[1]["data"]["nodeID"] == node_ids[1]
- assert (
- issues[1]["message"] == "Node contains an invalid runtime image. Runtime image "
- "must conform to the format [registry/]owner/image:tag"
- )
- def test_invalid_node_property_image_name_list(validation_manager):
- response = ValidationResponse()
- node_label = "test_label"
- node_id = "test-id"
- failing_image_names = [
- "12345566:one-two-three",
- "someregistry.io/some_org/some_tag/something/",
- "docker.io//missing_org_name:test",
- ]
- for image_name in failing_image_names:
- validation_manager._validate_container_image_name(node_id, node_label, image_name, response)
- issues = response.to_json().get("issues")
- assert len(issues) == len(failing_image_names)
- def test_invalid_node_property_dependency_filepath_workspace(validation_manager):
- response = ValidationResponse()
- node = {"id": "test-id", "app_data": {"label": "test"}}
- property_name = "test-property"
- validation_manager._validate_filepath(
- node_id=node["id"],
- file_dir=os.getcwd(),
- property_name=property_name,
- node_label=node["app_data"]["label"],
- filename="../invalid_filepath/to/file.ipynb",
- response=response,
- )
- issues = response.to_json().get("issues")
- assert issues[0]["severity"] == 1
- assert issues[0]["type"] == "invalidFilePath"
- assert issues[0]["data"]["propertyName"] == property_name
- assert issues[0]["data"]["nodeID"] == node["id"]
- def test_invalid_node_property_dependency_filepath_non_existent(validation_manager):
- response = ValidationResponse()
- node = {"id": "test-id", "app_data": {"label": "test"}}
- property_name = "test-property"
- validation_manager._validate_filepath(
- node_id=node["id"],
- file_dir=os.getcwd(),
- property_name=property_name,
- node_label=node["app_data"]["label"],
- filename="invalid_filepath/to/file.ipynb",
- response=response,
- )
- issues = response.to_json().get("issues")
- assert issues[0]["severity"] == 1
- assert issues[0]["type"] == "invalidFilePath"
- assert issues[0]["data"]["propertyName"] == property_name
- assert issues[0]["data"]["nodeID"] == node["id"]
- def test_valid_node_property_dependency_filepath(validation_manager):
- response = ValidationResponse()
- valid_filename = os.path.join(
- os.path.dirname(__file__), "resources/validation_pipelines/generic_single_cycle.pipeline"
- )
- node = {"id": "test-id", "app_data": {"label": "test"}}
- property_name = "test-property"
- validation_manager._validate_filepath(
- node_id=node["id"],
- file_dir=os.getcwd(),
- property_name=property_name,
- node_label=node["app_data"]["label"],
- filename=valid_filename,
- response=response,
- )
- assert not response.has_fatal
- assert not response.to_json().get("issues")
- async def test_valid_node_property_pipeline_filepath(monkeypatch, validation_manager, load_pipeline):
- pipeline, response = load_pipeline("generic_basic_filepath_check.pipeline")
- monkeypatch.setattr(validation_manager, "_validate_label", lambda node_id, node_label, response: True)
- pipeline_definition = PipelineDefinition(pipeline_definition=pipeline)
- await validation_manager._validate_node_properties(
- pipeline_definition=pipeline_definition, response=response, pipeline_type="GENERIC", pipeline_runtime="kfp"
- )
- assert not response.has_fatal
- assert not response.to_json().get("issues")
- def test_invalid_node_property_resource_value(validation_manager, load_pipeline):
- pipeline, response = load_pipeline("generic_invalid_node_property_hardware_resources.pipeline")
- node_id = "88ab83dc-d5f0-443a-8837-788ed16851b7"
- node = pipeline["pipelines"][0]["nodes"][0]
- validation_manager._validate_resource_value(
- node["id"],
- node["app_data"]["label"],
- resource_name="memory",
- resource_value=node["app_data"]["component_parameters"]["memory"],
- response=response,
- )
- issues = response.to_json().get("issues")
- assert len(issues) == 1
- assert issues[0]["severity"] == 1
- assert issues[0]["type"] == "invalidNodeProperty"
- assert issues[0]["data"]["propertyName"] == "memory"
- assert issues[0]["data"]["nodeID"] == node_id
- def test_invalid_node_property_env_var(validation_manager):
- response = ValidationResponse()
- node = {"id": "test-id", "app_data": {"label": "test"}}
- invalid_env_var = 'TEST_ENV_ONE"test_one"'
- validation_manager._validate_environmental_variables(
- node_id=node["id"], node_label=node["app_data"]["label"], env_var=invalid_env_var, response=response
- )
- issues = response.to_json().get("issues")
- assert issues[0]["severity"] == 1
- assert issues[0]["type"] == "invalidEnvPair"
- assert issues[0]["data"]["propertyName"] == "env_vars"
- assert issues[0]["data"]["nodeID"] == "test-id"
- def test_invalid_node_property_volumes(validation_manager):
- response = ValidationResponse()
- node = {"id": "test-id", "app_data": {"label": "test"}}
- volumes = [
- VolumeMount("/mount/test", "rwx-test-claim"), # valid
- VolumeMount("/mount/test_two", "second-claim"), # valid
- VolumeMount("/mount/test_four", "second#claim"), # invalid pvc name
- ]
- validation_manager._validate_mounted_volumes(
- node_id=node["id"], node_label=node["app_data"]["label"], volumes=volumes, response=response
- )
- issues = response.to_json().get("issues")
- assert issues[0]["severity"] == 1
- assert issues[0]["type"] == "invalidVolumeMount"
- assert issues[0]["data"]["propertyName"] == MOUNTED_VOLUMES
- assert issues[0]["data"]["nodeID"] == "test-id"
- assert "not a valid Kubernetes resource name" in issues[0]["message"]
- def test_invalid_node_property_secrets(validation_manager):
- response = ValidationResponse()
- node = {"id": "test-id", "app_data": {"label": "test"}}
- secrets = [
- KubernetesSecret("ENV_VAR1", "test-secret", "test-key1"), # valid
- KubernetesSecret("ENV_VAR2", "test-secret", "test-key2"), # valid
- KubernetesSecret("ENV_VAR3", "test-secret", ""), # invalid: improper format of secret name/key
- KubernetesSecret("ENV_VAR5", "test%secret", "test-key"), # invalid: not a valid Kubernetes resource name
- KubernetesSecret("ENV_VAR6", "test-secret", "test$key2"), # invalid: not a valid Kubernetes secret key
- ]
- validation_manager._validate_kubernetes_secrets(
- node_id=node["id"], node_label=node["app_data"]["label"], secrets=secrets, response=response
- )
- issues = response.to_json().get("issues")
- assert issues[0]["severity"] == 1
- assert issues[0]["type"] == "invalidKubernetesSecret"
- assert issues[0]["data"]["propertyName"] == KUBERNETES_SECRETS
- assert issues[0]["data"]["nodeID"] == "test-id"
- assert "improperly formatted representation of secret name and key" in issues[0]["message"]
- assert "not a valid Kubernetes resource name" in issues[1]["message"]
- assert "not a valid Kubernetes secret key" in issues[2]["message"]
- def test_valid_node_property_label(validation_manager):
- response = ValidationResponse()
- node = {"id": "test-id"}
- valid_label_name = "dead-bread-dead-bread-dead-bread-dead-bread-dead-bread-dead-bre"
- validation_manager._validate_label(node_id=node["id"], node_label=valid_label_name, response=response)
- issues = response.to_json().get("issues")
- assert len(issues) == 0
- def test_valid_node_property_label_min_length(validation_manager):
- response = ValidationResponse()
- node = {"id": "test-id", "app_data": {"label": "test"}}
- valid_label_name = "d"
- validation_manager._validate_label(node_id=node["id"], node_label=valid_label_name, response=response)
- issues = response.to_json().get("issues")
- assert len(issues) == 0
- def test_invalid_node_property_label_filename_exceeds_max_length(validation_manager):
- response = ValidationResponse()
- node = {"id": "test-id", "app_data": {"label": "test"}}
- valid_label_name = "deadbread-deadbread-deadbread-deadbread-deadbread-deadbread-de.py"
- validation_manager._validate_label(node_id=node["id"], node_label=valid_label_name, response=response)
- issues = response.to_json().get("issues")
- assert len(issues) == 2
- def test_invalid_node_property_label_max_length(validation_manager):
- response = ValidationResponse()
- node = {"id": "test-id", "app_data": {"label": "test"}}
- invalid_label_name = "dead-bread-dead-bread-dead-bread-dead-bread-dead-bread-dead-bred"
- validation_manager._validate_label(node_id=node["id"], node_label=invalid_label_name, response=response)
- issues = response.to_json().get("issues")
- assert len(issues) == 1
- assert issues[0]["severity"] == 2
- assert issues[0]["type"] == "invalidNodeLabel"
- assert issues[0]["data"]["propertyName"] == "label"
- assert issues[0]["data"]["nodeID"] == "test-id"
- def test_valid_node_property_label_filename_has_relative_path(validation_manager):
- response = ValidationResponse()
- node = {"id": "test-id", "app_data": {"label": "test"}}
- valid_label_name = "deadbread.py"
- validation_manager._validate_label(node_id=node["id"], node_label=valid_label_name, response=response)
- issues = response.to_json().get("issues")
- assert len(issues) == 0
- def test_invalid_node_property_label_bad_characters(validation_manager):
- response = ValidationResponse()
- node = {"id": "test-id"}
- invalid_label_name = "bad_label_*&^&$"
- validation_manager._validate_label(node_id=node["id"], node_label=invalid_label_name, response=response)
- issues = response.to_json().get("issues")
- assert len(issues) == 1
- assert issues[0]["severity"] == 2
- assert issues[0]["type"] == "invalidNodeLabel"
- assert issues[0]["data"]["propertyName"] == "label"
- assert issues[0]["data"]["nodeID"] == "test-id"
- def test_pipeline_graph_single_cycle(validation_manager, load_pipeline):
- pipeline, response = load_pipeline("generic_single_cycle.pipeline")
- # cycle_ID = ['c309f6dd-b022-4b1c-b2b0-b6449bb26e8f', '8cb986cb-4fc9-4b1d-864d-0ec64b7ac13c']
- validation_manager._validate_pipeline_graph(pipeline=pipeline, response=response)
- issues = response.to_json().get("issues")
- assert len(issues) == 1
- assert issues[0]["severity"] == 1
- assert issues[0]["type"] == "circularReference"
- # assert issues[0]['data']['linkIDList'].sort() == cycle_ID.sort()
- def test_pipeline_graph_double_cycle(validation_manager, load_pipeline):
- pipeline, response = load_pipeline("generic_double_cycle.pipeline")
- # cycle_ID = ['597b2971-b95d-4df7-a36d-9d93b0345298', 'b63378e4-9085-4a33-9330-6f86054681f4']
- # cycle_two_ID = ['c309f6dd-b022-4b1c-b2b0-b6449bb26e8f', '8cb986cb-4fc9-4b1d-864d-0ec64b7ac13c']
- validation_manager._validate_pipeline_graph(pipeline=pipeline, response=response)
- issues = response.to_json().get("issues")
- assert len(issues) == 1
- assert issues[0]["severity"] == 1
- assert issues[0]["type"] == "circularReference"
- # assert issues[0]['data']['linkIDList'].sort() == cycle_ID.sort()
- # assert issues[1]['severity'] == 1
- # assert issues[1]['type'] == 'circularReference'
- # assert issues[1]['data']['linkIDList'].sort() == cycle_two_ID.sort()
- def test_pipeline_graph_singleton(validation_manager, load_pipeline):
- pipeline, response = load_pipeline("generic_singleton.pipeline")
- node_id = "0195fefd-3ceb-4a90-a12c-3958ef0ff42e"
- validation_manager._validate_pipeline_graph(pipeline=pipeline, response=response)
- issues = response.to_json().get("issues")
- assert len(issues) == 1
- assert not response.has_fatal
- assert issues[0]["severity"] == 2
- assert issues[0]["type"] == "singletonReference"
- assert issues[0]["data"]["nodeID"] == node_id
- def test_pipeline_valid_kfp_with_supernode(validation_manager, load_pipeline):
- pipeline, response = load_pipeline("kf_supernode_valid.pipeline")
- validation_manager._validate_pipeline_graph(pipeline=pipeline, response=response)
- issues = response.to_json().get("issues")
- assert len(issues) == 0
- assert not response.has_fatal
- def test_pipeline_invalid_single_cycle_kfp_with_supernode(validation_manager, load_pipeline):
- pipeline, response = load_pipeline("kf_supernode_invalid_single_cycle.pipeline")
- validation_manager._validate_pipeline_graph(pipeline=pipeline, response=response)
- issues = response.to_json().get("issues")
- assert len(issues) == 1
- assert response.has_fatal
- assert issues[0]["severity"] == 1
- assert issues[0]["type"] == "circularReference"
- @pytest.mark.parametrize("catalog_instance", [KFP_COMPONENT_CACHE_INSTANCE], indirect=True)
- async def test_pipeline_kfp_inputpath_parameter(validation_manager, load_pipeline, catalog_instance, component_cache):
- pipeline, response = load_pipeline("kf_inputpath_parameter.pipeline")
- pipeline_definition = PipelineDefinition(pipeline_definition=pipeline)
- await validation_manager._validate_node_properties(
- pipeline_definition=pipeline_definition,
- response=response,
- pipeline_type="KUBEFLOW_PIPELINES",
- pipeline_runtime="kfp",
- )
- issues = response.to_json().get("issues")
- assert len(issues) == 0
- @pytest.mark.parametrize("catalog_instance", [KFP_COMPONENT_CACHE_INSTANCE], indirect=True)
- async def test_pipeline_invalid_kfp_inputpath_parameter(
- validation_manager, load_pipeline, catalog_instance, component_cache
- ):
- invalid_key_node_id = "089a12df-fe2f-4fcb-ae37-a1f8a6259ca1"
- missing_param_node_id = "e8820c55-dc79-46d1-b32e-924fa5d70d2a"
- pipeline, response = load_pipeline("kf_invalid_inputpath_parameter.pipeline")
- pipeline_definition = PipelineDefinition(pipeline_definition=pipeline)
- await validation_manager._validate_node_properties(
- pipeline_definition=pipeline_definition,
- response=response,
- pipeline_type="KUBEFLOW_PIPELINES",
- pipeline_runtime="kfp",
- )
- issues = response.to_json().get("issues")
- assert len(issues) == 2
- assert response.has_fatal
- assert issues[0]["severity"] == 1
- assert issues[0]["type"] == "invalidNodeProperty"
- assert issues[0]["data"]["nodeID"] == invalid_key_node_id
- assert issues[1]["severity"] == 1
- assert issues[1]["type"] == "invalidNodeProperty"
- assert issues[1]["data"]["nodeID"] == missing_param_node_id
- @pytest.mark.parametrize("catalog_instance", [KFP_COMPONENT_CACHE_INSTANCE], indirect=True)
- async def test_pipeline_invalid_kfp_inputpath_missing_connection(
- validation_manager, load_pipeline, catalog_instance, component_cache
- ):
- invalid_node_id = "5b78ea0a-e5fc-4022-94d4-7b9dc170d794"
- pipeline, response = load_pipeline("kf_invalid_inputpath_missing_connection.pipeline")
- pipeline_definition = PipelineDefinition(pipeline_definition=pipeline)
- await validation_manager._validate_node_properties(
- pipeline_definition=pipeline_definition,
- response=response,
- pipeline_type="KUBEFLOW_PIPELINES",
- pipeline_runtime="kfp",
- )
- issues = response.to_json().get("issues")
- assert len(issues) == 1
- assert response.has_fatal
- assert issues[0]["severity"] == 1
- assert issues[0]["type"] == "invalidNodeProperty"
- assert issues[0]["data"]["nodeID"] == invalid_node_id
- @pytest.mark.parametrize("catalog_instance", [AIRFLOW_COMPONENT_CACHE_INSTANCE], indirect=True)
- async def test_pipeline_aa_parent_node_missing_xcom_push(
- validation_manager, load_pipeline, catalog_instance, component_cache
- ):
- invalid_node_id = "b863d458-21b5-4a46-8420-5a814b7bd525"
- invalid_operator = "BashOperator"
- pipeline, response = load_pipeline("aa_parent_node_missing_xcom.pipeline")
- pipeline_definition = PipelineDefinition(pipeline_definition=pipeline)
- await validation_manager._validate_node_properties(
- pipeline_definition=pipeline_definition,
- response=response,
- pipeline_type="APACHE_AIRFLOW",
- pipeline_runtime="airflow",
- )
- issues = response.to_json().get("issues")
- assert len(issues) == 1
- assert response.has_fatal
- assert issues[0]["severity"] == 1
- assert issues[0]["type"] == "invalidNodeProperty"
- assert issues[0]["data"]["nodeID"] == invalid_node_id
- assert issues[0]["data"]["parentNodeID"] == invalid_operator
|