12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271 |
- #
- # Copyright 2018-2022 Elyra Authors
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- """Tests for elyra-pipeline application"""
- import json
- from pathlib import Path
- import shutil
- from click.testing import CliRunner
- from conftest import KFP_COMPONENT_CACHE_INSTANCE
- import pytest
- from elyra.cli.pipeline_app import pipeline
- from elyra.metadata.manager import MetadataManager
- from elyra.metadata.metadata import Metadata
- from elyra.metadata.schemaspaces import Runtimes
- from elyra.pipeline.component_catalog import ComponentCache
- # used to drive generic parameter handling tests
- SUB_COMMANDS = ["run", "submit", "describe", "validate", "export"]
- @pytest.fixture(autouse=True)
- def destroy_component_cache():
- """
- This fixture clears any ComponentCache instances that
- may have been created during CLI processes so that
- those instance doesn't side-affect later tests.
- """
- yield
- ComponentCache.clear_instance()
- @pytest.fixture
- def kubeflow_pipelines_runtime_instance():
- """Creates a Kubeflow Pipelines RTC and removes it after test."""
- instance_name = "valid_kfp_test_config"
- instance_config_file = Path(__file__).parent / "resources" / "runtime_configs" / f"{instance_name}.json"
- with open(instance_config_file, "r") as fd:
- instance_config = json.load(fd)
- md_mgr = MetadataManager(schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_ID)
- # clean possible orphaned instance...
- try:
- md_mgr.remove(instance_name)
- except Exception:
- pass
- runtime_instance = md_mgr.create(instance_name, Metadata(**instance_config))
- yield runtime_instance.name
- md_mgr.remove(runtime_instance.name)
- @pytest.fixture
- def airflow_runtime_instance():
- """Creates an airflow RTC and removes it after test."""
- instance_name = "valid_airflow_test_config"
- instance_config_file = Path(__file__).parent / "resources" / "runtime_configs" / f"{instance_name}.json"
- with open(instance_config_file, "r") as fd:
- instance_config = json.load(fd)
- md_mgr = MetadataManager(schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_ID)
- # clean possible orphaned instance...
- try:
- md_mgr.remove(instance_name)
- except Exception:
- pass
- runtime_instance = md_mgr.create(instance_name, Metadata(**instance_config))
- yield runtime_instance.name
- md_mgr.remove(runtime_instance.name)
- def test_no_opts():
- """Verify that all commands are displayed in help"""
- runner = CliRunner()
- result = runner.invoke(pipeline)
- assert "run Run a pipeline in your local environment" in result.output
- assert "submit Submit a pipeline to be executed on the server" in result.output
- assert "describe Display pipeline summary" in result.output
- assert "export Export a pipeline to a runtime-specific format" in result.output
- assert "validate Validate pipeline" in result.output
- assert result.exit_code == 0
- def test_bad_subcommand():
- runner = CliRunner()
- result = runner.invoke(pipeline, ["invalid_command"])
- assert "Error: No such command 'invalid_command'" in result.output
- assert result.exit_code != 0
- @pytest.mark.parametrize("subcommand", SUB_COMMANDS)
- def test_subcommand_no_opts(subcommand):
- runner = CliRunner()
- result = runner.invoke(pipeline, [subcommand])
- assert result.exit_code != 0
- assert "Error: Missing argument 'PIPELINE_PATH'" in result.output
- @pytest.mark.parametrize("subcommand", SUB_COMMANDS)
- def test_subcommand_invalid_pipeline_path(subcommand):
- """Verify that every command only accepts a valid pipeline_path file name"""
- runner = CliRunner()
- # test: file not found
- file_name = "no-such.pipeline"
- result = runner.invoke(pipeline, [subcommand, file_name])
- assert result.exit_code != 0
- assert f"Invalid value for 'PIPELINE_PATH': '{file_name}' is not a file." in result.output
- # test: file with wrong extension
- with runner.isolated_filesystem():
- file_name = "wrong.extension"
- with open(file_name, "w") as f:
- f.write("I am not a pipeline file.")
- result = runner.invoke(pipeline, [subcommand, file_name])
- assert result.exit_code != 0
- assert f"Invalid value for 'PIPELINE_PATH': '{file_name}' is not a .pipeline file." in result.output
- @pytest.mark.parametrize("subcommand", SUB_COMMANDS)
- def test_subcommand_with_no_pipelines_field(subcommand, kubeflow_pipelines_runtime_instance):
- """Verify that every command properly detects pipeline issues"""
- runner = CliRunner()
- with runner.isolated_filesystem():
- pipeline_file = "pipeline_without_pipelines_field.pipeline"
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / pipeline_file
- assert pipeline_file_path.is_file()
- # every CLI command invocation requires these parameters
- invoke_parameters = [subcommand, str(pipeline_file_path)]
- if subcommand in ["submit", "export"]:
- # these commands also require a runtime configuration
- invoke_parameters.extend(["--runtime-config", kubeflow_pipelines_runtime_instance])
- result = runner.invoke(pipeline, invoke_parameters)
- assert result.exit_code != 0
- assert "Pipeline is missing 'pipelines' field." in result.output
- @pytest.mark.parametrize("subcommand", SUB_COMMANDS)
- def test_subcommand_with_zero_length_pipelines_field(subcommand, kubeflow_pipelines_runtime_instance):
- """Verify that every command properly detects pipeline issues"""
- runner = CliRunner()
- with runner.isolated_filesystem():
- pipeline_file = "pipeline_with_zero_length_pipelines_field.pipeline"
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / pipeline_file
- assert pipeline_file_path.is_file()
- # every CLI command invocation requires these parameters
- invoke_parameters = [subcommand, str(pipeline_file_path)]
- if subcommand in ["submit", "export"]:
- # these commands also require a runtime configuration
- invoke_parameters.extend(["--runtime-config", kubeflow_pipelines_runtime_instance])
- result = runner.invoke(pipeline, invoke_parameters)
- assert result.exit_code != 0
- assert "Pipeline has zero length 'pipelines' field." in result.output
- @pytest.mark.parametrize("subcommand", SUB_COMMANDS)
- def test_subcommand_with_no_nodes(subcommand, kubeflow_pipelines_runtime_instance):
- """Verify that every command properly detects pipeline issues"""
- # don't run this test for the `describe` command
- # (see test_describe_with_no_nodes)
- if subcommand == "describe":
- return
- runner = CliRunner()
- with runner.isolated_filesystem():
- pipeline_file = "pipeline_with_zero_nodes.pipeline"
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / pipeline_file
- assert pipeline_file_path.is_file()
- # every CLI command invocation requires these parameters
- invoke_parameters = [subcommand, str(pipeline_file_path)]
- if subcommand in ["submit", "export"]:
- # these commands also require a runtime configuration
- invoke_parameters.extend(["--runtime-config", kubeflow_pipelines_runtime_instance])
- result = runner.invoke(pipeline, invoke_parameters)
- assert result.exit_code != 0
- # ------------------------------------------------------------------
- # tests for 'describe' command
- # ------------------------------------------------------------------
- def test_describe_with_no_nodes():
- """
- Verify that describe yields the expected results if a pipeline without any
- nodes is is provided as input.
- """
- runner = CliRunner()
- with runner.isolated_filesystem():
- pipeline_file = "pipeline_with_zero_nodes.pipeline"
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / pipeline_file
- assert pipeline_file_path.is_file()
- # verify human-readable output
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert result.exit_code == 0, result.output
- assert "Pipeline name: pipeline_with_zero_nodes" in result.output
- assert "Description: None specified" in result.output
- assert "Pipeline type: None specified" in result.output
- assert "Pipeline runtime: Generic" in result.output
- assert "Pipeline format version: 7" in result.output
- assert "Number of generic nodes: 0" in result.output
- assert "Number of generic nodes: 0" in result.output
- assert "Script dependencies: None specified" in result.output
- assert "Notebook dependencies: None specified" in result.output
- assert "Local file dependencies: None specified" in result.output
- assert "Component dependencies: None specified" in result.output
- assert "Volume dependencies: None specified" in result.output
- assert "Container image dependencies: None specified" in result.output
- assert "Kubernetes secret dependencies: None specified" in result.output
- # verify machine-readable output
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path), "--json"])
- assert result.exit_code == 0, result.output
- result_json = json.loads(result.output)
- assert result_json["name"] == "pipeline_with_zero_nodes"
- assert result_json["description"] is None
- assert result_json["pipeline_type"] is None
- assert result_json["pipeline_format_version"] == 7
- assert result_json["pipeline_runtime"] == "Generic"
- assert result_json["generic_node_count"] == 0
- assert result_json["custom_node_count"] == 0
- for property in [
- "scripts",
- "notebooks",
- "files",
- "custom_components",
- "container_images",
- "volumes",
- "kubernetes_secrets",
- ]:
- assert isinstance(result_json["dependencies"][property], list)
- assert len(result_json["dependencies"][property]) == 0
- def test_describe_with_kfp_components():
- runner = CliRunner()
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "kfp_3_node_custom.pipeline"
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert "Description: 3-node custom component pipeline" in result.output
- assert "Pipeline type: KUBEFLOW_PIPELINES" in result.output
- assert "Number of custom nodes: 3" in result.output
- assert "Number of generic nodes: 0" in result.output
- assert "Local file dependencies: None specified" in result.output
- assert (
- '- {"catalog_type": "elyra-kfp-examples-catalog", "component_ref": {"component-id": "download_data.yaml"}}'
- in result.output
- )
- assert (
- '- {"catalog_type": "elyra-kfp-examples-catalog", "component_ref": '
- '{"component-id": "filter_text_using_shell_and_grep.yaml"}}' in result.output
- )
- assert (
- '- {"catalog_type": "elyra-kfp-examples-catalog", "component_ref": {"component-id": "calculate_hash.yaml"}}'
- in result.output
- )
- assert result.exit_code == 0
- def test_describe_with_missing_kfp_component():
- runner = CliRunner()
- with runner.isolated_filesystem():
- valid_file_path = Path(__file__).parent / "resources" / "pipelines" / "kfp_3_node_custom.pipeline"
- pipeline_file_path = Path.cwd() / "foo.pipeline"
- with open(pipeline_file_path, "w") as pipeline_file:
- with open(valid_file_path) as valid_file:
- valid_data = json.load(valid_file)
- # Update known component name to trigger a missing component
- valid_data["pipelines"][0]["nodes"][0]["op"] = valid_data["pipelines"][0]["nodes"][0]["op"] + "Missing"
- pipeline_file.write(json.dumps(valid_data))
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert "Description: 3-node custom component pipeline" in result.output
- assert "Pipeline type: KUBEFLOW_PIPELINES" in result.output
- assert "Number of custom nodes: 3" in result.output
- assert "Number of generic nodes: 0" in result.output
- assert result.exit_code == 0
- def test_describe_notebooks_scripts_report():
- """
- Test human-readable output for notebooks/scripts property when none, one or many instances are present
- """
- runner = CliRunner()
- #
- # Pipeline references no notebooks/no scripts:
- # - Pipeline does not contain nodes -> test_describe_with_no_nodes
- # - Pipeline contains only script nodes
- # - Pipeline contains only notebook nodes
- # - Pipeline contains only custom components
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_scripts.pipeline"
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert result.exit_code == 0
- assert "Notebook dependencies: None specified" in result.output
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_notebooks.pipeline"
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert result.exit_code == 0
- assert "Script dependencies: None specified" in result.output
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "kfp_3_node_custom.pipeline"
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert result.exit_code == 0
- assert "Notebook dependencies: None specified" in result.output
- assert "Script dependencies: None specified" in result.output
- #
- # Pipeline references multiple notebooks:
- #
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_notebooks.pipeline"
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert result.exit_code == 0
- assert "Notebook dependencies:\n" in result.output
- assert "notebooks/notebook_1.ipyn" in result.output
- assert "notebooks/notebook_2.ipyn" in result.output
- # Ensure no entries for scripts
- assert "Script dependencies: None specified" in result.output
- assert "Number of generic nodes: 2" in result.output
- assert "Number of custom nodes: 0" in result.output
- #
- # Pipeline references multiple scripts:
- #
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_scripts.pipeline"
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert result.exit_code == 0
- assert "Script dependencies:\n" in result.output
- assert "scripts/script_1.py" in result.output
- assert "scripts/script_2.py" in result.output
- assert "scripts/script_3.py" in result.output
- # Ensure no entries for notebooks
- assert "Notebook dependencies: None specified" in result.output
- assert "Number of generic nodes: 3" in result.output
- assert "Number of custom nodes: 0" in result.output
- #
- # Pipeline references multiple notebooks and scripts:
- #
- pipeline_file_path = (
- Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_notebooks_and_scripts.pipeline"
- )
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert result.exit_code == 0
- assert "Notebook dependencies:\n" in result.output
- assert "notebooks/notebook_1.ipyn" in result.output
- assert "notebooks/notebook_2.ipyn" in result.output
- assert "Script dependencies:\n" in result.output
- assert "scripts/script_1.py" in result.output
- assert "scripts/script_2.py" in result.output
- assert "scripts/script_3.py" in result.output
- assert "Number of generic nodes: 5" in result.output
- assert "Number of custom nodes: 0" in result.output
- def test_describe_notebooks_scripts_json():
- """
- Test machine-readable output for notebooks/scripts property when none, one or many instances are present
- """
- runner = CliRunner()
- #
- # Pipeline references no notebooks/no scripts:
- # - Pipeline does not contain nodes -> test_describe_with_no_nodes
- # - Pipeline contains only script nodes
- # - Pipeline contains only notebook nodes
- # - Pipeline contains only custom components
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_scripts.pipeline"
- result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
- assert result.exit_code == 0
- result_json = json.loads(result.output)
- assert result_json["generic_node_count"] == 3
- assert result_json["custom_node_count"] == 0
- dependencies = result_json.get("dependencies")
- assert isinstance(dependencies.get("notebooks"), list)
- assert len(dependencies.get("notebooks")) == 0
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_notebooks.pipeline"
- result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
- assert result.exit_code == 0
- result_json = json.loads(result.output)
- assert result_json["generic_node_count"] == 2
- assert result_json["custom_node_count"] == 0
- dependencies = result_json.get("dependencies")
- assert isinstance(dependencies.get("scripts"), list)
- assert len(dependencies.get("scripts")) == 0
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "kfp_3_node_custom.pipeline"
- result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
- assert result.exit_code == 0
- result_json = json.loads(result.output)
- assert result_json["generic_node_count"] == 0
- assert result_json["custom_node_count"] == 3
- dependencies = result_json.get("dependencies")
- assert isinstance(dependencies.get("notebooks"), list)
- assert len(dependencies.get("notebooks")) == 0
- assert isinstance(dependencies.get("scripts"), list)
- assert len(dependencies.get("scripts")) == 0
- #
- # Pipeline references multiple notebooks:
- #
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_notebooks.pipeline"
- result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
- assert result.exit_code == 0
- result_json = json.loads(result.output)
- assert result_json["generic_node_count"] == 2
- assert result_json["custom_node_count"] == 0
- dependencies = result_json.get("dependencies")
- assert isinstance(dependencies.get("notebooks"), list)
- assert any(x.endswith("notebooks/notebook_1.ipynb") for x in dependencies["notebooks"]), dependencies["notebooks"]
- assert any(x.endswith("notebooks/notebook_2.ipynb") for x in dependencies["notebooks"]), dependencies["notebooks"]
- # Ensure no entries for scripts
- assert isinstance(dependencies.get("scripts"), list)
- assert len(dependencies.get("scripts")) == 0
- #
- # Pipeline references multiple scripts:
- #
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_scripts.pipeline"
- result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
- assert result.exit_code == 0
- result_json = json.loads(result.output)
- assert result_json["generic_node_count"] == 3
- assert result_json["custom_node_count"] == 0
- dependencies = result_json.get("dependencies")
- assert isinstance(dependencies.get("scripts"), list)
- assert len(dependencies.get("scripts")) == 3
- assert any(x.endswith("scripts/script_1.py") for x in dependencies["scripts"]), dependencies["scripts"]
- assert any(x.endswith("scripts/script_2.py") for x in dependencies["scripts"]), dependencies["scripts"]
- assert any(x.endswith("scripts/script_3.py") for x in dependencies["scripts"]), dependencies["scripts"]
- # Ensure no entries for notebooks
- assert isinstance(dependencies.get("notebooks"), list)
- assert len(dependencies.get("notebooks")) == 0
- #
- # Pipeline references multiple notebooks and scripts:
- #
- pipeline_file_path = (
- Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_notebooks_and_scripts.pipeline"
- )
- result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
- assert result.exit_code == 0
- result_json = json.loads(result.output)
- assert result_json["generic_node_count"] == 5
- assert result_json["custom_node_count"] == 0
- assert isinstance(result_json.get("dependencies"), dict)
- dependencies = result_json.get("dependencies")
- assert isinstance(dependencies.get("scripts"), list)
- assert len(dependencies.get("scripts")) == 3
- assert any(x.endswith("scripts/script_1.py") for x in dependencies["scripts"]), dependencies["scripts"]
- assert any(x.endswith("scripts/script_2.py") for x in dependencies["scripts"]), dependencies["scripts"]
- assert any(x.endswith("scripts/script_3.py") for x in dependencies["scripts"]), dependencies["scripts"]
- assert isinstance(dependencies.get("notebooks"), list)
- assert len(dependencies.get("notebooks")) == 2
- assert any(x.endswith("notebooks/notebook_1.ipynb") for x in dependencies["notebooks"]), dependencies["notebooks"]
- assert any(x.endswith("notebooks/notebook_2.ipynb") for x in dependencies["notebooks"]), dependencies["notebooks"]
- def test_describe_container_images_report():
- """
- Test report output for container_images property when none, one or many instances are present
- """
- runner = CliRunner()
- #
- # Pipeline references no container images
- # - Pipeline does not contain nodes -> test_describe_with_no_nodes
- # - Pipeline contains only custom components
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "kfp_3_node_custom.pipeline"
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert result.exit_code == 0
- assert "Container image dependencies: None specified" in result.output
- #
- # Pipeline references multiple container images through notebook nodes:
- #
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_notebooks.pipeline"
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert result.exit_code == 0
- assert "Container image dependencies:\n" in result.output
- assert "- tensorflow/tensorflow:2.8.0" in result.output, result.output
- #
- # Pipeline references multiple container images through script nodes
- #
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_scripts.pipeline"
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert result.exit_code == 0
- assert "Container image dependencies:\n" in result.output, result.output
- assert "- tensorflow/tensorflow:2.8.0-gpu" in result.output, result.output
- assert "- tensorflow/tensorflow:2.8.0" in result.output, result.output
- #
- # Pipeline references multiple notebooks and scripts:
- #
- pipeline_file_path = (
- Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_notebooks_and_scripts.pipeline"
- )
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert result.exit_code == 0
- assert "Container image dependencies:\n" in result.output
- assert "- tensorflow/tensorflow:2.8.0-gpu" in result.output, result.output
- assert "- tensorflow/tensorflow:2.8.0" in result.output, result.output
- assert "- amancevice/pandas:1.4.1" in result.output, result.output
- def test_describe_container_images_json():
- """
- Test JSON output for runtime_images property when none, one or many instances are present
- """
- runner = CliRunner()
- #
- # Pipeline references no container images
- # - Pipeline does not contain nodes -> test_describe_with_no_nodes
- # - Pipeline contains only custom components
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "kfp_3_node_custom.pipeline"
- result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
- assert result.exit_code == 0
- result_json = json.loads(result.output)
- assert result_json["generic_node_count"] == 0
- assert result_json["custom_node_count"] == 3
- assert isinstance(result_json.get("dependencies"), dict)
- dependencies = result_json["dependencies"]
- assert isinstance(dependencies["container_images"], list)
- assert len(dependencies["container_images"]) == 0
- #
- # Pipeline references multiple container images through notebook nodes:
- #
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_notebooks.pipeline"
- result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
- assert result.exit_code == 0
- result_json = json.loads(result.output)
- assert result_json["generic_node_count"] == 2
- assert result_json["custom_node_count"] == 0
- assert isinstance(result_json.get("dependencies"), dict)
- dependencies = result_json["dependencies"]
- assert isinstance(dependencies["container_images"], list)
- assert len(dependencies["container_images"]) == 1
- assert "tensorflow/tensorflow:2.8.0" in dependencies["container_images"], dependencies["container_images"]
- #
- # Pipeline references multiple container images through script nodes
- #
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_scripts.pipeline"
- result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
- assert result.exit_code == 0
- result_json = json.loads(result.output)
- assert result_json["generic_node_count"] == 3
- assert result_json["custom_node_count"] == 0
- assert isinstance(result_json.get("dependencies"), dict)
- dependencies = result_json["dependencies"]
- assert isinstance(dependencies["container_images"], list)
- assert len(dependencies["container_images"]) == 2
- assert "tensorflow/tensorflow:2.8.0" in dependencies["container_images"], dependencies["container_images"]
- assert "tensorflow/tensorflow:2.8.0-gpu" in dependencies["container_images"], dependencies["container_images"]
- #
- # Pipeline references multiple notebooks and scripts:
- #
- pipeline_file_path = (
- Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_notebooks_and_scripts.pipeline"
- )
- result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
- assert result.exit_code == 0
- result_json = json.loads(result.output)
- assert result_json["generic_node_count"] == 5
- assert result_json["custom_node_count"] == 0
- assert isinstance(result_json.get("dependencies"), dict)
- dependencies = result_json["dependencies"]
- assert isinstance(dependencies["container_images"], list)
- assert len(dependencies["container_images"]) == 3
- assert "tensorflow/tensorflow:2.8.0" in dependencies["container_images"], dependencies["container_images"]
- assert "tensorflow/tensorflow:2.8.0-gpu" in dependencies["container_images"], dependencies["container_images"]
- assert "amancevice/pandas:1.4.1" in dependencies["container_images"], dependencies["container_images"]
- def test_describe_volumes_report():
- """
- Test report format output for volumes property when none, one or many volume mounts are present
- """
- runner = CliRunner()
- #
- # Pipeline references no volumes
- # - Pipeline does not contain nodes -> test_describe_with_no_nodes
- # - Pipeline contains only custom components
- # - No generic nodes mount a volume
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "kfp_3_node_custom.pipeline"
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert result.exit_code == 0
- assert "Volume dependencies: None specified" in result.output
- #
- # Pipeline references multiple volumes through notebook nodes:
- #
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_notebooks.pipeline"
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert result.exit_code == 0
- assert "Volume dependencies:\n" in result.output
- assert "- pvc-claim-1" in result.output, result.output
- #
- # Pipeline references multiple volumes through script nodes
- #
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_scripts.pipeline"
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert result.exit_code == 0
- assert "Volume dependencies:\n" in result.output, result.output
- assert "- pvc-claim-2" in result.output, result.output
- assert "- pvc-claim-3" in result.output, result.output
- #
- # Pipeline references multiple volumes through notebook and script nodes:
- #
- pipeline_file_path = (
- Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_notebooks_and_scripts.pipeline"
- )
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert result.exit_code == 0
- assert "Volume dependencies:\n" in result.output, result.output
- assert "- pvc-claim-1" in result.output, result.output
- assert "- pvc-claim-2" in result.output, result.output
- assert "- pvc-claim-3" in result.output, result.output
- def test_describe_volumes_json():
- """
- Test JSON output for volumes property when none, one or many volume mounts are present
- """
- runner = CliRunner()
- #
- # Pipeline references no volumes
- # - Pipeline does not contain nodes -> test_describe_with_no_nodes
- # - Pipeline contains only custom components
- # - No generic nodes mount a volume
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "kfp_3_node_custom.pipeline"
- result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
- assert result.exit_code == 0
- result_json = json.loads(result.output)
- assert result_json["generic_node_count"] == 0
- assert result_json["custom_node_count"] == 3
- assert isinstance(result_json.get("dependencies"), dict)
- dependencies = result_json["dependencies"]
- assert isinstance(dependencies["volumes"], list)
- assert len(dependencies["volumes"]) == 0
- #
- # Pipeline references multiple volumes through notebook nodes:
- #
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_notebooks.pipeline"
- result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
- assert result.exit_code == 0
- result_json = json.loads(result.output)
- assert result_json["generic_node_count"] == 2
- assert result_json["custom_node_count"] == 0
- assert isinstance(result_json.get("dependencies"), dict)
- dependencies = result_json["dependencies"]
- assert len(dependencies["volumes"]) == 1
- assert "pvc-claim-1" in dependencies["volumes"], dependencies["volumes"]
- #
- # Pipeline references multiple volumes through script nodes
- #
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_scripts.pipeline"
- result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
- assert result.exit_code == 0
- result_json = json.loads(result.output)
- assert result_json["generic_node_count"] == 3
- assert result_json["custom_node_count"] == 0
- assert isinstance(result_json.get("dependencies"), dict)
- dependencies = result_json["dependencies"]
- assert len(dependencies["volumes"]) == 2
- assert "pvc-claim-2" in dependencies["volumes"], dependencies["volumes"]
- assert "pvc-claim-3" in dependencies["volumes"], dependencies["volumes"]
- #
- # Pipeline references multiple volumes through notebook and script nodes:
- #
- pipeline_file_path = (
- Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_notebooks_and_scripts.pipeline"
- )
- result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
- assert result.exit_code == 0
- result_json = json.loads(result.output)
- assert result_json["generic_node_count"] == 5
- assert result_json["custom_node_count"] == 0
- assert isinstance(result_json.get("dependencies"), dict)
- dependencies = result_json["dependencies"]
- assert len(dependencies["volumes"]) == 3
- assert "pvc-claim-1" in dependencies["volumes"], dependencies["volumes"]
- assert "pvc-claim-2" in dependencies["volumes"], dependencies["volumes"]
- assert "pvc-claim-3" in dependencies["volumes"], dependencies["volumes"]
- def test_describe_kubernetes_secrets_report():
- """
- Test report format output for the 'kubernetes_secrets' dependency property
- """
- runner = CliRunner()
- #
- # Pipeline references no Kubernetes secrets
- # - Pipeline does not contain nodes -> test_describe_with_no_nodes
- # - Pipeline contains only custom components
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "kfp_3_node_custom.pipeline"
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert result.exit_code == 0
- assert "Kubernetes secret dependencies: None specified" in result.output
- #
- # Pipeline references multiple Kubernetes secrets through notebook nodes:
- #
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_notebooks.pipeline"
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert result.exit_code == 0
- assert "Kubernetes secret dependencies:\n" in result.output
- assert "- secret-1" in result.output, result.output
- #
- # Pipeline references multiple Kubernetes secrets through script nodes
- #
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_scripts.pipeline"
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert result.exit_code == 0
- assert "Kubernetes secret dependencies:\n" in result.output
- assert "- secret-2" in result.output, result.output
- #
- # Pipeline references multiple multiple Kubernetes secrets
- #
- pipeline_file_path = (
- Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_notebooks_and_scripts.pipeline"
- )
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert result.exit_code == 0
- assert "Kubernetes secret dependencies:\n" in result.output
- assert "- secret-1" in result.output, result.output
- assert "- secret-2" in result.output, result.output
- assert "- secret-3" in result.output, result.output
- def test_describe_kubernetes_secrets_json():
- """
- Test JSON output for the 'kubernetes_secrets' dependency property
- """
- runner = CliRunner()
- #
- # Pipeline references no Kubernetes secrets
- # - Pipeline does not contain nodes -> test_describe_with_no_nodes
- # - Pipeline contains only custom components
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "kfp_3_node_custom.pipeline"
- result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
- assert result.exit_code == 0
- result_json = json.loads(result.output)
- assert result_json["generic_node_count"] == 0
- assert result_json["custom_node_count"] == 3
- assert isinstance(result_json.get("dependencies"), dict)
- dependencies = result_json["dependencies"]
- assert isinstance(dependencies["kubernetes_secrets"], list)
- assert len(dependencies["kubernetes_secrets"]) == 0
- #
- # Pipeline references one Kubernetes secret through notebook nodes
- #
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_notebooks.pipeline"
- result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
- assert result.exit_code == 0
- result_json = json.loads(result.output)
- assert result_json["generic_node_count"] == 2
- assert result_json["custom_node_count"] == 0
- assert isinstance(result_json.get("dependencies"), dict)
- dependencies = result_json["dependencies"]
- assert isinstance(dependencies["kubernetes_secrets"], list)
- assert len(dependencies["kubernetes_secrets"]) == 1
- assert "secret-1" in dependencies["kubernetes_secrets"], dependencies["kubernetes_secrets"]
- #
- # Pipeline references one Kubernetes secret through script nodes
- #
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_scripts.pipeline"
- result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
- assert result.exit_code == 0
- result_json = json.loads(result.output)
- assert result_json["generic_node_count"] == 3
- assert result_json["custom_node_count"] == 0
- assert isinstance(result_json.get("dependencies"), dict)
- dependencies = result_json["dependencies"]
- assert isinstance(dependencies["kubernetes_secrets"], list)
- assert len(dependencies["kubernetes_secrets"]) == 1
- assert "secret-2" in dependencies["kubernetes_secrets"], dependencies["kubernetes_secrets"]
- #
- # Pipeline references multiple Kubernetes secrets
- #
- pipeline_file_path = (
- Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_notebooks_and_scripts.pipeline"
- )
- result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
- assert result.exit_code == 0
- result_json = json.loads(result.output)
- assert result_json["generic_node_count"] == 5
- assert result_json["custom_node_count"] == 0
- assert isinstance(result_json.get("dependencies"), dict)
- dependencies = result_json["dependencies"]
- assert isinstance(dependencies["kubernetes_secrets"], list)
- assert len(dependencies["kubernetes_secrets"]) == 3
- assert "secret-1" in dependencies["kubernetes_secrets"], dependencies["kubernetes_secrets"]
- assert "secret-2" in dependencies["kubernetes_secrets"], dependencies["kubernetes_secrets"]
- assert "secret-3" in dependencies["kubernetes_secrets"], dependencies["kubernetes_secrets"]
- def test_describe_custom_component_dependencies_json():
- """
- Test JSON output for the 'custom_components' dependency property
- """
- runner = CliRunner()
- #
- # - Pipeline contains only custom components
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "kfp_3_node_custom.pipeline"
- result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
- assert result.exit_code == 0
- result_json = json.loads(result.output)
- assert result_json["generic_node_count"] == 0
- assert result_json["custom_node_count"] == 3
- assert isinstance(result_json.get("dependencies"), dict)
- dependencies = result_json["dependencies"]
- assert isinstance(dependencies["custom_components"], list)
- assert len(dependencies["custom_components"]) == 3
- assert dependencies["custom_components"][0]["catalog_type"] == "elyra-kfp-examples-catalog"
- assert dependencies["custom_components"][1]["catalog_type"] == "elyra-kfp-examples-catalog"
- assert dependencies["custom_components"][2]["catalog_type"] == "elyra-kfp-examples-catalog"
- expected_component_ids = ["download_data.yaml", "filter_text_using_shell_and_grep.yaml", "calculate_hash.yaml"]
- assert dependencies["custom_components"][0]["component_ref"]["component-id"] in expected_component_ids
- expected_component_ids.remove(dependencies["custom_components"][0]["component_ref"]["component-id"])
- assert dependencies["custom_components"][1]["component_ref"]["component-id"] in expected_component_ids
- expected_component_ids.remove(dependencies["custom_components"][1]["component_ref"]["component-id"])
- assert dependencies["custom_components"][2]["component_ref"]["component-id"] in expected_component_ids
- expected_component_ids.remove(dependencies["custom_components"][2]["component_ref"]["component-id"])
- # ------------------------------------------------------------------
- # end tests for 'describe' command
- # ------------------------------------------------------------------
- # tests for 'validate' command
- # ------------------------------------------------------------------
- @pytest.mark.parametrize("catalog_instance", [KFP_COMPONENT_CACHE_INSTANCE], indirect=True)
- def test_validate_with_kfp_components(jp_environ, kubeflow_pipelines_runtime_instance, catalog_instance):
- runner = CliRunner()
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "kfp_3_node_custom.pipeline"
- result = runner.invoke(
- pipeline, ["validate", str(pipeline_file_path), "--runtime-config", kubeflow_pipelines_runtime_instance]
- )
- assert "Validating pipeline..." in result.output
- assert result.exit_code == 0, result.output
- def test_validate_with_missing_kfp_component(jp_environ, kubeflow_pipelines_runtime_instance):
- runner = CliRunner()
- with runner.isolated_filesystem():
- valid_file_path = Path(__file__).parent / "resources" / "pipelines" / "kfp_3_node_custom.pipeline"
- pipeline_file_path = Path.cwd() / "foo.pipeline"
- with open(pipeline_file_path, "w") as pipeline_file:
- with open(valid_file_path) as valid_file:
- valid_data = json.load(valid_file)
- # Update known component name to trigger a missing component
- valid_data["pipelines"][0]["nodes"][0]["op"] = valid_data["pipelines"][0]["nodes"][0]["op"] + "Missing"
- pipeline_file.write(json.dumps(valid_data))
- result = runner.invoke(
- pipeline, ["validate", str(pipeline_file_path), "--runtime-config", kubeflow_pipelines_runtime_instance]
- )
- assert "Validating pipeline..." in result.output
- assert "[Error][Calculate data hash] - This component was not found in the catalog." in result.output
- assert result.exit_code != 0
- def test_validate_with_no_runtime_config(jp_environ):
- runner = CliRunner()
- with runner.isolated_filesystem():
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "kfp_3_node_custom.pipeline"
- result = runner.invoke(pipeline, ["validate", str(pipeline_file_path)])
- assert "Validating pipeline..." in result.output
- assert (
- "[Error] - This pipeline contains at least one runtime-specific component, "
- "but pipeline runtime is 'local'" in result.output
- )
- assert result.exit_code != 0
- # ------------------------------------------------------------------
- # tests for 'submit' command
- # ------------------------------------------------------------------
- def test_submit_invalid_monitor_interval_option(kubeflow_pipelines_runtime_instance):
- """Verify that the '--monitor-timeout' option works as expected"""
- runner = CliRunner()
- with runner.isolated_filesystem():
- # dummy pipeline - it's not used
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "kfp_3_node_custom.pipeline"
- assert pipeline_file_path.is_file()
- # this should fail: '--monitor-timeout' must be an integer
- invalid_option_value = "abc"
- result = runner.invoke(
- pipeline,
- [
- "submit",
- str(pipeline_file_path),
- "--runtime-config",
- kubeflow_pipelines_runtime_instance,
- "--monitor-timeout",
- invalid_option_value,
- ],
- )
- assert result.exit_code != 0
- assert (
- f"Invalid value for '--monitor-timeout': '{invalid_option_value}' is not "
- "a valid integer" in result.output
- )
- # this should fail: '--monitor-timeout' must be a positive integer
- invalid_option_value = 0
- result = runner.invoke(
- pipeline,
- [
- "submit",
- str(pipeline_file_path),
- "--runtime-config",
- kubeflow_pipelines_runtime_instance,
- "--monitor-timeout",
- invalid_option_value,
- ],
- )
- assert result.exit_code != 0
- assert (
- f"Invalid value for '--monitor-timeout': '{invalid_option_value}' is not "
- "a positive integer" in result.output
- )
- # ------------------------------------------------------------------
- # end tests for 'submit' command
- # ------------------------------------------------------------------
- # tests for 'export' command
- # ------------------------------------------------------------------
- def do_mock_export(output_path: str, dir_only=False):
- # simulate export result
- p = Path(output_path)
- # create parent directories, if required
- if not p.parent.is_dir():
- p.parent.mkdir(parents=True, exist_ok=True)
- if dir_only:
- return
- # create a mock export file
- with open(output_path, "w") as output:
- output.write("dummy export output")
- def prepare_export_work_dir(work_dir: str, source_dir: str):
- """Copies the files in source_dir to work_dir"""
- for file in Path(source_dir).glob("*pipeline"):
- shutil.copy(str(file), work_dir)
- # print for debug purposes; this is only displayed if an assert fails
- print(f"Work directory content: {list(Path(work_dir).glob('*'))}")
- def test_export_invalid_runtime_config():
- """Test user error scenarios: the specified runtime configuration is 'invalid'"""
- runner = CliRunner()
- # test pipeline; it's not used in this test
- pipeline_file = "kubeflow_pipelines.pipeline"
- p = Path(__file__).parent / "resources" / "pipelines" / f"{pipeline_file}"
- assert p.is_file()
- # no runtime configuration was specified
- result = runner.invoke(pipeline, ["export", str(p)])
- assert result.exit_code != 0, result.output
- assert "Error: Missing option '--runtime-config'." in result.output, result.output
- # runtime configuration does not exist
- config_name = "no-such-config"
- result = runner.invoke(pipeline, ["export", str(p), "--runtime-config", config_name])
- assert result.exit_code != 0, result.output
- assert f"Error: Invalid runtime configuration: {config_name}" in result.output
- assert f"No such instance named '{config_name}' was found in the runtimes schemaspace." in result.output
- def test_export_incompatible_runtime_config(kubeflow_pipelines_runtime_instance, airflow_runtime_instance):
- """
- Test user error scenarios: the specified runtime configuration is not compatible
- with the pipeline type, e.g. KFP pipeline with Airflow runtime config
- """
- runner = CliRunner()
- # try exporting a KFP pipeline using an Airflow runtime configuration
- pipeline_file = "kubeflow_pipelines.pipeline"
- p = Path(__file__).parent / "resources" / "pipelines" / f"{pipeline_file}"
- assert p.is_file()
- # try export using Airflow runtime configuration
- result = runner.invoke(pipeline, ["export", str(p), "--runtime-config", airflow_runtime_instance])
- assert result.exit_code != 0, result.output
- assert (
- "The runtime configuration type 'APACHE_AIRFLOW' does not "
- "match the pipeline's runtime type 'KUBEFLOW_PIPELINES'." in result.output
- )
- # try exporting an Airflow pipeline using a Kubeflow Pipelines runtime configuration
- pipeline_file = "airflow.pipeline"
- p = Path(__file__).parent / "resources" / "pipelines" / f"{pipeline_file}"
- assert p.is_file()
- # try export using KFP runtime configuration
- result = runner.invoke(pipeline, ["export", str(p), "--runtime-config", kubeflow_pipelines_runtime_instance])
- assert result.exit_code != 0, result.output
- assert (
- "The runtime configuration type 'KUBEFLOW_PIPELINES' does not "
- "match the pipeline's runtime type 'APACHE_AIRFLOW'." in result.output
- )
- @pytest.mark.parametrize("catalog_instance", [KFP_COMPONENT_CACHE_INSTANCE], indirect=True)
- def test_export_kubeflow_output_option(jp_environ, kubeflow_pipelines_runtime_instance, catalog_instance):
- """Verify that the '--output' option works as expected for Kubeflow Pipelines"""
- runner = CliRunner()
- with runner.isolated_filesystem():
- cwd = Path.cwd().resolve()
- # copy pipeline file and depencencies
- prepare_export_work_dir(str(cwd), Path(__file__).parent / "resources" / "pipelines")
- pipeline_file = "kfp_3_node_custom.pipeline"
- pipeline_file_path = cwd / pipeline_file
- # make sure the pipeline file exists
- assert pipeline_file_path.is_file() is True
- print(f"Pipeline file: {pipeline_file_path}")
- # Test: '--output' not specified; exported file is created
- # in current directory and named like the pipeline file with
- # a '.yaml' suffix
- expected_output_file = pipeline_file_path.with_suffix(".yaml")
- # this should succeed
- result = runner.invoke(
- pipeline, ["export", str(pipeline_file_path), "--runtime-config", kubeflow_pipelines_runtime_instance]
- )
- assert result.exit_code == 0, result.output
- assert f"was exported to '{str(expected_output_file)}" in result.output, result.output
- # Test: '--output' specified and ends with '.yaml'
- expected_output_file = cwd / "test-dir" / "output.yaml"
- # this should succeed
- result = runner.invoke(
- pipeline,
- [
- "export",
- str(pipeline_file_path),
- "--runtime-config",
- kubeflow_pipelines_runtime_instance,
- "--output",
- str(expected_output_file),
- ],
- )
- assert result.exit_code == 0, result.output
- assert f"was exported to '{str(expected_output_file)}" in result.output, result.output
- # Test: '--output' specified and ends with '.yml'
- expected_output_file = cwd / "test-dir-2" / "output.yml"
- # this should succeed
- result = runner.invoke(
- pipeline,
- [
- "export",
- str(pipeline_file_path),
- "--runtime-config",
- kubeflow_pipelines_runtime_instance,
- "--output",
- str(expected_output_file),
- ],
- )
- assert result.exit_code == 0, result.output
- assert f"was exported to '{str(expected_output_file)}" in result.output, result.output
- def test_export_airflow_output_option(airflow_runtime_instance):
- """Verify that the '--output' option works as expected for Airflow"""
- runner = CliRunner()
- with runner.isolated_filesystem():
- cwd = Path.cwd().resolve()
- # copy pipeline file and depencencies
- prepare_export_work_dir(str(cwd), Path(__file__).parent / "resources" / "pipelines")
- pipeline_file = "airflow.pipeline"
- pipeline_file_path = cwd / pipeline_file
- # make sure the pipeline file exists
- assert pipeline_file_path.is_file() is True
- print(f"Pipeline file: {pipeline_file_path}")
- #
- # Test: '--output' not specified; exported file is created
- # in current directory and named like the pipeline file with
- # a '.py' suffix
- #
- expected_output_file = pipeline_file_path.with_suffix(".py")
- print(f"expected_output_file -> {expected_output_file}")
- do_mock_export(str(expected_output_file))
- # this should fail: default output file already exists
- result = runner.invoke(
- pipeline, ["export", str(pipeline_file_path), "--runtime-config", airflow_runtime_instance]
- )
- assert result.exit_code != 0, result.output
- assert (
- f"Error: Output file '{expected_output_file}' exists and option '--overwrite' "
- "was not specified." in result.output
- ), result.output
- #
- # Test: '--output' specified and ends with '.py' (the value is treated
- # as a file name)
- #
- expected_output_file = cwd / "test-dir-2" / "output.py"
- do_mock_export(str(expected_output_file))
- # this should fail: specified output file already exists
- result = runner.invoke(
- pipeline,
- [
- "export",
- str(pipeline_file_path),
- "--runtime-config",
- airflow_runtime_instance,
- "--output",
- str(expected_output_file),
- ],
- )
- assert result.exit_code != 0, result.output
- assert (
- f"Error: Output file '{expected_output_file}' exists and option '--overwrite' "
- "was not specified." in result.output
- ), result.output
- #
- # Test: '--output' specified and does not end with '.py' (the value
- # is treated as a directory)
- #
- output_dir = cwd / "test-dir-3"
- expected_output_file = output_dir / Path(pipeline_file).with_suffix(".py")
- do_mock_export(str(expected_output_file))
- # this should fail: specified output file already exists
- result = runner.invoke(
- pipeline,
- [
- "export",
- str(pipeline_file_path),
- "--runtime-config",
- airflow_runtime_instance,
- "--output",
- str(output_dir),
- ],
- )
- assert result.exit_code != 0, result.output
- assert (
- f"Error: Output file '{expected_output_file}' exists and option '--overwrite' "
- "was not specified." in result.output
- ), result.output
- @pytest.mark.parametrize("catalog_instance", [KFP_COMPONENT_CACHE_INSTANCE], indirect=True)
- def test_export_kubeflow_overwrite_option(jp_environ, kubeflow_pipelines_runtime_instance, catalog_instance):
- """Verify that the '--overwrite' option works as expected for Kubeflow Pipelines"""
- runner = CliRunner()
- with runner.isolated_filesystem():
- cwd = Path.cwd().resolve()
- # copy pipeline file and depencencies
- prepare_export_work_dir(str(cwd), Path(__file__).parent / "resources" / "pipelines")
- pipeline_file = "kfp_3_node_custom.pipeline"
- pipeline_file_path = cwd / pipeline_file
- # make sure the pipeline file exists
- assert pipeline_file_path.is_file() is True
- print(f"Pipeline file: {pipeline_file_path}")
- # Test: '--overwrite' not specified; exported file is created
- # in current directory and named like the pipeline file with
- # a '.yaml' suffix
- expected_output_file = pipeline_file_path.with_suffix(".yaml")
- # this should succeed
- result = runner.invoke(
- pipeline, ["export", str(pipeline_file_path), "--runtime-config", kubeflow_pipelines_runtime_instance]
- )
- assert result.exit_code == 0, result.output
- assert f"was exported to '{str(expected_output_file)}" in result.output, result.output
- # Test: '--overwrite' not specified; the output already exists
- # this should fail
- result = runner.invoke(
- pipeline, ["export", str(pipeline_file_path), "--runtime-config", kubeflow_pipelines_runtime_instance]
- )
- assert result.exit_code != 0, result.output
- assert f"Output file '{expected_output_file}' exists and option '--overwrite' was not" in result.output
- # Test: '--overwrite' specified; exported file is created
- # in current directory and named like the pipeline file with
- # a '.yaml' suffix
- # this should succeed
- result = runner.invoke(
- pipeline,
- ["export", str(pipeline_file_path), "--runtime-config", kubeflow_pipelines_runtime_instance, "--overwrite"],
- )
- assert result.exit_code == 0, result.output
- assert f"was exported to '{str(expected_output_file)}" in result.output, result.output
- # ------------------------------------------------------------------
- # end tests for 'export' command
- # ------------------------------------------------------------------
|