123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849 |
- #
- # Copyright 2018-2022 Elyra Authors
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- """Tests for elyra-pipeline application"""
- import json
- from pathlib import Path
- import shutil
- from click.testing import CliRunner
- from conftest import KFP_COMPONENT_CACHE_INSTANCE
- import pytest
- from elyra.cli.pipeline_app import pipeline
- from elyra.metadata.manager import MetadataManager
- from elyra.metadata.metadata import Metadata
- from elyra.metadata.schemaspaces import Runtimes
- # used to drive generic parameter handling tests
- SUB_COMMANDS = ["run", "submit", "describe", "validate", "export"]
- @pytest.fixture
- def kubeflow_pipelines_runtime_instance():
- """Creates a Kubeflow Pipelines RTC and removes it after test."""
- instance_name = "valid_kfp_test_config"
- instance_config_file = Path(__file__).parent / "resources" / "runtime_configs" / f"{instance_name}.json"
- with open(instance_config_file, "r") as fd:
- instance_config = json.load(fd)
- md_mgr = MetadataManager(schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_ID)
- # clean possible orphaned instance...
- try:
- md_mgr.remove(instance_name)
- except Exception:
- pass
- runtime_instance = md_mgr.create(instance_name, Metadata(**instance_config))
- yield runtime_instance.name
- md_mgr.remove(runtime_instance.name)
- @pytest.fixture
- def airflow_runtime_instance():
- """Creates an airflow RTC and removes it after test."""
- instance_name = "valid_airflow_test_config"
- instance_config_file = Path(__file__).parent / "resources" / "runtime_configs" / f"{instance_name}.json"
- with open(instance_config_file, "r") as fd:
- instance_config = json.load(fd)
- md_mgr = MetadataManager(schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_ID)
- # clean possible orphaned instance...
- try:
- md_mgr.remove(instance_name)
- except Exception:
- pass
- runtime_instance = md_mgr.create(instance_name, Metadata(**instance_config))
- yield runtime_instance.name
- md_mgr.remove(runtime_instance.name)
- def test_no_opts():
- """Verify that all commands are displayed in help"""
- runner = CliRunner()
- result = runner.invoke(pipeline)
- assert "run Run a pipeline in your local environment" in result.output
- assert "submit Submit a pipeline to be executed on the server" in result.output
- assert "describe Display pipeline summary" in result.output
- assert "export Export a pipeline to a runtime-specific format" in result.output
- assert "validate Validate pipeline" in result.output
- assert result.exit_code == 0
- def test_bad_subcommand():
- runner = CliRunner()
- result = runner.invoke(pipeline, ["invalid_command"])
- assert "Error: No such command 'invalid_command'" in result.output
- assert result.exit_code != 0
- @pytest.mark.parametrize("subcommand", SUB_COMMANDS)
- def test_subcommand_no_opts(subcommand):
- runner = CliRunner()
- result = runner.invoke(pipeline, [subcommand])
- assert result.exit_code != 0
- assert "Error: Missing argument 'PIPELINE_PATH'" in result.output
- @pytest.mark.parametrize("subcommand", SUB_COMMANDS)
- def test_subcommand_invalid_pipeline_path(subcommand):
- """Verify that every command only accepts a valid pipeline_path file name"""
- runner = CliRunner()
- # test: file not found
- file_name = "no-such.pipeline"
- result = runner.invoke(pipeline, [subcommand, file_name])
- assert result.exit_code != 0
- assert f"Invalid value for 'PIPELINE_PATH': '{file_name}' is not a file." in result.output
- # test: file with wrong extension
- with runner.isolated_filesystem():
- file_name = "wrong.extension"
- with open(file_name, "w") as f:
- f.write("I am not a pipeline file.")
- result = runner.invoke(pipeline, [subcommand, file_name])
- assert result.exit_code != 0
- assert f"Invalid value for 'PIPELINE_PATH': '{file_name}' is not a .pipeline file." in result.output
- @pytest.mark.parametrize("subcommand", SUB_COMMANDS)
- def test_subcommand_with_no_pipelines_field(subcommand, kubeflow_pipelines_runtime_instance):
- """Verify that every command properly detects pipeline issues"""
- runner = CliRunner()
- with runner.isolated_filesystem():
- pipeline_file = "pipeline_without_pipelines_field.pipeline"
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / pipeline_file
- assert pipeline_file_path.is_file()
- # every CLI command invocation requires these parameters
- invoke_parameters = [subcommand, str(pipeline_file_path)]
- if subcommand in ["submit", "export"]:
- # these commands also require a runtime configuration
- invoke_parameters.extend(["--runtime-config", kubeflow_pipelines_runtime_instance])
- result = runner.invoke(pipeline, invoke_parameters)
- assert result.exit_code != 0
- assert "Pipeline is missing 'pipelines' field." in result.output
- @pytest.mark.parametrize("subcommand", SUB_COMMANDS)
- def test_subcommand_with_zero_length_pipelines_field(subcommand, kubeflow_pipelines_runtime_instance):
- """Verify that every command properly detects pipeline issues"""
- runner = CliRunner()
- with runner.isolated_filesystem():
- pipeline_file = "pipeline_with_zero_length_pipelines_field.pipeline"
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / pipeline_file
- assert pipeline_file_path.is_file()
- # every CLI command invocation requires these parameters
- invoke_parameters = [subcommand, str(pipeline_file_path)]
- if subcommand in ["submit", "export"]:
- # these commands also require a runtime configuration
- invoke_parameters.extend(["--runtime-config", kubeflow_pipelines_runtime_instance])
- result = runner.invoke(pipeline, invoke_parameters)
- assert result.exit_code != 0
- assert "Pipeline has zero length 'pipelines' field." in result.output
- @pytest.mark.parametrize("subcommand", SUB_COMMANDS)
- def test_subcommand_with_no_nodes(subcommand, kubeflow_pipelines_runtime_instance):
- """Verify that every command properly detects pipeline issues"""
- # don't run this test for the `describe` command
- # (see test_describe_with_no_nodes)
- if subcommand == "describe":
- return
- runner = CliRunner()
- with runner.isolated_filesystem():
- pipeline_file = "pipeline_with_zero_nodes.pipeline"
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / pipeline_file
- assert pipeline_file_path.is_file()
- # every CLI command invocation requires these parameters
- invoke_parameters = [subcommand, str(pipeline_file_path)]
- if subcommand in ["submit", "export"]:
- # these commands also require a runtime configuration
- invoke_parameters.extend(["--runtime-config", kubeflow_pipelines_runtime_instance])
- result = runner.invoke(pipeline, invoke_parameters)
- assert result.exit_code != 0
- def test_describe_with_no_nodes():
- runner = CliRunner()
- with runner.isolated_filesystem():
- pipeline_file = "pipeline_with_zero_nodes.pipeline"
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / pipeline_file
- assert pipeline_file_path.is_file()
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert result.exit_code == 0, result.output
- assert "Description: None" in result.output
- assert "Type: KUBEFLOW_PIPELINES" in result.output
- assert "Nodes: 0" in result.output
- assert "File Dependencies:\n None Listed" in result.output
- assert "Component Dependencies:\n None Listed" in result.output
- def test_describe_with_kfp_components():
- runner = CliRunner()
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "kfp_3_node_custom.pipeline"
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert "Description: 3-node custom component pipeline" in result.output
- assert "Type: KUBEFLOW_PIPELINES" in result.output
- assert "Nodes: 3" in result.output
- assert "File Dependencies:\n None Listed" in result.output
- assert (
- "- https://raw.githubusercontent.com/kubeflow/pipelines/1.6.0/components/"
- "basics/Calculate_hash/component.yaml" in result.output
- )
- assert (
- "- /opt/anaconda3/envs/elyra-dev/share/jupyter/components/"
- "kfp/filter_text_using_shell_and_grep.yaml" in result.output
- )
- assert (
- "- https://raw.githubusercontent.com/kubeflow/pipelines/1.6.0/components/"
- "web/Download/component.yaml" in result.output
- )
- assert result.exit_code == 0
- @pytest.mark.parametrize("catalog_instance", [KFP_COMPONENT_CACHE_INSTANCE], indirect=True)
- def test_validate_with_kfp_components(jp_environ, kubeflow_pipelines_runtime_instance, catalog_instance):
- runner = CliRunner()
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "kfp_3_node_custom.pipeline"
- result = runner.invoke(
- pipeline, ["validate", str(pipeline_file_path), "--runtime-config", kubeflow_pipelines_runtime_instance]
- )
- assert "Validating pipeline..." in result.output
- assert result.exit_code == 0
- def test_describe_with_missing_kfp_component():
- runner = CliRunner()
- with runner.isolated_filesystem():
- valid_file_path = Path(__file__).parent / "resources" / "pipelines" / "kfp_3_node_custom.pipeline"
- pipeline_file_path = Path.cwd() / "foo.pipeline"
- with open(pipeline_file_path, "w") as pipeline_file:
- with open(valid_file_path) as valid_file:
- valid_data = json.load(valid_file)
- # Update known component name to trigger a missing component
- valid_data["pipelines"][0]["nodes"][0]["op"] = valid_data["pipelines"][0]["nodes"][0]["op"] + "Missing"
- pipeline_file.write(json.dumps(valid_data))
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert "Description: 3-node custom component pipeline" in result.output
- assert "Type: KUBEFLOW_PIPELINES" in result.output
- assert "Nodes: 3" in result.output
- assert result.exit_code == 0
- def test_describe_notebook_script_report():
- """
- Test report output for notebook/script property when none, one or many instances are present
- :return:
- """
- runner = CliRunner()
- # Test report output when only notebook/s present
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_notebook.pipeline"
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert "Notebooks:\n" in result.output
- assert "dummy_notebook_1.ipynb" in result.output
- assert "dummy_notebook_2.ipynb" in result.output
- # Ensure no entries for scripts
- assert "Scripts:\n None Listed" in result.output
- assert result.exit_code == 0
- # Test report output when only script/s are present
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_script.pipeline"
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert "Scripts:\n" in result.output
- assert "dummy_script_1.py" in result.output
- assert "dummy_script_2.py" in result.output
- # Ensure no entries for notebooks
- assert "Notebooks:\n None Listed" in result.output
- assert result.exit_code == 0
- # Test report output when both notebook and script are present
- pipeline_file_path = (
- Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_notebook_and_script.pipeline"
- )
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert "Notebooks:\n" in result.output
- assert "dummy_notebook_1.ipynb" in result.output
- assert "Scripts:\n" in result.output
- assert "dummy_script_1.py" in result.output
- assert result.exit_code == 0
- def test_describe_notebook_script_json():
- """
- Test json output for notebook/script property when none, one or many instances are present
- :return:
- """
- runner = CliRunner()
- # Test report output when only notebook/s present
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_notebook.pipeline"
- result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
- result_json = json.loads(result.output)
- # Ensure that script entry is absent
- assert not result_json.get("scripts")
- # Tests notebook output is a list and check list count
- assert isinstance(result_json.get("notebooks"), list) and len(result_json.get("notebooks")) == 2
- # Tests the list content. Constructs below to address random sort order in list
- assert Path(result_json.get("notebooks")[0]).name != Path(result_json.get("notebooks")[1]).name
- assert Path(result_json.get("notebooks")[0]).name in list(["dummy_notebook_1.ipynb", "dummy_notebook_2.ipynb"])
- assert Path(result_json.get("notebooks")[1]).name in list(["dummy_notebook_1.ipynb", "dummy_notebook_2.ipynb"])
- assert result.exit_code == 0
- # Test report output when only script/s are present
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_script.pipeline"
- result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
- result_json = json.loads(result.output)
- # Ensure that notebook entry is absent
- assert not result_json.get("notebooks")
- # Tests script output is a list and check list count
- assert isinstance(result_json.get("scripts"), list) and len(result_json.get("scripts")) == 2
- # Tests the list content. Constructs below to address random sort order in list
- assert Path(result_json.get("scripts")[0]).name != Path(result_json.get("scripts")[1]).name
- assert Path(result_json.get("scripts")[0]).name in list(["dummy_script_1.py", "dummy_script_2.py"])
- assert Path(result_json.get("scripts")[1]).name in list(["dummy_script_1.py", "dummy_script_2.py"])
- assert result.exit_code == 0
- # Test report output when both notebook and script are present
- pipeline_file_path = (
- Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_notebook_and_script.pipeline"
- )
- result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
- result_json = json.loads(result.output)
- # Tests output is a list.
- assert isinstance(result_json.get("notebooks"), list) and len(result_json.get("runtime_image")) == 1
- assert isinstance(result_json.get("scripts"), list) and len(result_json.get("runtime_image")) == 1
- # Tests the list content
- assert Path(result_json.get("notebooks")[0]).name == "dummy_notebook_1.ipynb"
- assert Path(result_json.get("scripts")[0]).name == "dummy_script_1.py"
- assert result.exit_code == 0
- def test_describe_runtime_image_report():
- """
- Test report output for runtime_image property when none, one or many instances are present
- :return:
- """
- runner = CliRunner()
- # Test report output when there are no runtime_images
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_zero_runtime_image.pipeline"
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert "Runtime Image:\n None Listed" in result.output
- assert result.exit_code == 0
- # Test report output when there is a single runtime_image
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_one_runtime_image.pipeline"
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert "Runtime Image:\n - tensorflow/tensorflow:2.0.0-py3" in result.output
- assert result.exit_code == 0
- # Test report output where there are two / multiple runtime_images
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_many_runtime_image.pipeline"
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert "Runtime Image:\n" in result.output
- assert "- tensorflow/tensorflow:2.0.0-py3" in result.output
- assert "- elyra/examples:1.0.0-py3" in result.output
- assert result.exit_code == 0
- def test_describe_runtime_image_json():
- """
- Test json format output for runtime_image property when none, one or many instances are present
- :return:
- """
- runner = CliRunner()
- # Test json output when there are no runtime_images
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "kfp_3_node_custom.pipeline"
- result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
- result_json = json.loads(result.output)
- assert not result_json.get("runtime_image")
- assert result.exit_code == 0
- # Test json output when there is a single runtime_image
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_one_runtime_image.pipeline"
- result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
- result_json = json.loads(result.output)
- # Tests output is a list
- assert isinstance(result_json.get("runtime_image"), list) and len(result_json.get("runtime_image")) == 1
- # Tests the list content
- assert result_json.get("runtime_image")[0] == "tensorflow/tensorflow:2.0.0-py3"
- assert result.exit_code == 0
- # Test json output where there are two / multiple runtime_images
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_many_runtime_image.pipeline"
- result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
- result_json = json.loads(result.output)
- # Tests output is a list. Count is one as the same value of runtime_image is repeated
- assert isinstance(result_json.get("runtime_image"), list) and len(result_json.get("runtime_image")) == 2
- # Tests the list content
- assert result_json.get("runtime_image")[0] != result_json.get("runtime_image")[1]
- assert result_json.get("runtime_image")[0] in list(["tensorflow/tensorflow:2.0.0-py3", "elyra/examples:1.0.0-py3"])
- assert result_json.get("runtime_image")[1] in list(["tensorflow/tensorflow:2.0.0-py3", "elyra/examples:1.0.0-py3"])
- assert result.exit_code == 0
- def test_describe_mount_report():
- """
- Test report format output for mount property when none, one or many mounts are present
- :return:
- """
- runner = CliRunner()
- # Test report output when there are no mount volumes
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_zero_mount.pipeline"
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert "Mounted Volumes:\n None Listed" in result.output
- assert result.exit_code == 0
- # Test report output when there is a single mount volume
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_one_mount.pipeline"
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert "Mounted Volumes:\n - rwx-test-claim" in result.output
- assert result.exit_code == 0
- # Test report output where there are two / multiple mount volumes
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_many_mount.pipeline"
- result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
- assert "Mounted Volumes:\n" in result.output
- assert " - rwx-test-claim" in result.output
- assert " - rwx-test-claim-1" in result.output
- assert result.exit_code == 0
- def test_describe_mount_json():
- """
- Test json output for mount property when none, one or many mounts are present
- :return:
- """
- runner = CliRunner()
- # Test json output when there are no mount volumes
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_zero_mount.pipeline"
- result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
- result_json = json.loads(result.output)
- assert not result_json.get("mounted_volumes")
- assert result.exit_code == 0
- # Test json output when there is a single mount volume
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_one_mount.pipeline"
- result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
- result_json = json.loads(result.output)
- assert isinstance(result_json.get("mounted_volumes"), list) and len(result_json.get("mounted_volumes")) == 1
- assert result_json.get("mounted_volumes")[0] == "rwx-test-claim"
- assert result.exit_code == 0
- # Test json output where there are two / multiple mount volumes
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_many_mount.pipeline"
- result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
- result_json = json.loads(result.output)
- assert isinstance(result_json.get("mounted_volumes"), list) and len(result_json.get("mounted_volumes")) == 2
- assert set(result_json.get("mounted_volumes")) == set(list(["rwx-test-claim", "rwx-test-claim-1"]))
- assert result.exit_code == 0
- def test_validate_with_missing_kfp_component(jp_environ, kubeflow_pipelines_runtime_instance):
- runner = CliRunner()
- with runner.isolated_filesystem():
- valid_file_path = Path(__file__).parent / "resources" / "pipelines" / "kfp_3_node_custom.pipeline"
- pipeline_file_path = Path.cwd() / "foo.pipeline"
- with open(pipeline_file_path, "w") as pipeline_file:
- with open(valid_file_path) as valid_file:
- valid_data = json.load(valid_file)
- # Update known component name to trigger a missing component
- valid_data["pipelines"][0]["nodes"][0]["op"] = valid_data["pipelines"][0]["nodes"][0]["op"] + "Missing"
- pipeline_file.write(json.dumps(valid_data))
- result = runner.invoke(
- pipeline, ["validate", str(pipeline_file_path), "--runtime-config", kubeflow_pipelines_runtime_instance]
- )
- assert "Validating pipeline..." in result.output
- assert "[Error][Calculate data hash] - This component was not found in the catalog." in result.output
- assert result.exit_code != 0
- def test_validate_with_no_runtime_config():
- runner = CliRunner()
- with runner.isolated_filesystem():
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "kfp_3_node_custom.pipeline"
- result = runner.invoke(pipeline, ["validate", str(pipeline_file_path)])
- assert "Validating pipeline..." in result.output
- assert (
- "[Error] - This pipeline contains at least one runtime-specific component, "
- "but pipeline runtime is 'local'" in result.output
- )
- assert result.exit_code != 0
- # ------------------------------------------------------------------
- # tests for 'submit' command
- # ------------------------------------------------------------------
- def test_submit_invalid_monitor_interval_option(kubeflow_pipelines_runtime_instance):
- """Verify that the '--monitor-timeout' option works as expected"""
- runner = CliRunner()
- with runner.isolated_filesystem():
- # dummy pipeline - it's not used
- pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "kfp_3_node_custom.pipeline"
- assert pipeline_file_path.is_file()
- # this should fail: '--monitor-timeout' must be an integer
- invalid_option_value = "abc"
- result = runner.invoke(
- pipeline,
- [
- "submit",
- str(pipeline_file_path),
- "--runtime-config",
- kubeflow_pipelines_runtime_instance,
- "--monitor-timeout",
- invalid_option_value,
- ],
- )
- assert result.exit_code != 0
- assert (
- f"Invalid value for '--monitor-timeout': '{invalid_option_value}' is not "
- "a valid integer" in result.output
- )
- # this should fail: '--monitor-timeout' must be a positive integer
- invalid_option_value = 0
- result = runner.invoke(
- pipeline,
- [
- "submit",
- str(pipeline_file_path),
- "--runtime-config",
- kubeflow_pipelines_runtime_instance,
- "--monitor-timeout",
- invalid_option_value,
- ],
- )
- assert result.exit_code != 0
- assert (
- f"Invalid value for '--monitor-timeout': '{invalid_option_value}' is not "
- "a positive integer" in result.output
- )
- # ------------------------------------------------------------------
- # end tests for 'submit' command
- # ------------------------------------------------------------------
- # tests for 'export' command
- # ------------------------------------------------------------------
- def do_mock_export(output_path: str, dir_only=False):
- # simulate export result
- p = Path(output_path)
- # create parent directories, if required
- if not p.parent.is_dir():
- p.parent.mkdir(parents=True, exist_ok=True)
- if dir_only:
- return
- # create a mock export file
- with open(output_path, "w") as output:
- output.write("dummy export output")
- def prepare_export_work_dir(work_dir: str, source_dir: str):
- """Copies the files in source_dir to work_dir"""
- for file in Path(source_dir).glob("*"):
- shutil.copy(str(file), work_dir)
- # print for debug purposes; this is only displayed if an assert fails
- print(f"Work directory content: {list(Path(work_dir).glob('*'))}")
- def test_export_invalid_runtime_config():
- """Test user error scenarios: the specified runtime configuration is 'invalid'"""
- runner = CliRunner()
- # test pipeline; it's not used in this test
- pipeline_file = "kubeflow_pipelines.pipeline"
- p = Path(__file__).parent / "resources" / "pipelines" / f"{pipeline_file}"
- assert p.is_file()
- # no runtime configuration was specified
- result = runner.invoke(pipeline, ["export", str(p)])
- assert result.exit_code != 0, result.output
- assert "Error: Missing option '--runtime-config'." in result.output, result.output
- # runtime configuration does not exist
- config_name = "no-such-config"
- result = runner.invoke(pipeline, ["export", str(p), "--runtime-config", config_name])
- assert result.exit_code != 0, result.output
- assert f"Error: Invalid runtime configuration: {config_name}" in result.output
- assert f"No such instance named '{config_name}' was found in the runtimes schemaspace." in result.output
- def test_export_incompatible_runtime_config(kubeflow_pipelines_runtime_instance, airflow_runtime_instance):
- """
- Test user error scenarios: the specified runtime configuration is not compatible
- with the pipeline type, e.g. KFP pipeline with Airflow runtime config
- """
- runner = CliRunner()
- # try exporting a KFP pipeline using an Airflow runtime configuration
- pipeline_file = "kubeflow_pipelines.pipeline"
- p = Path(__file__).parent / "resources" / "pipelines" / f"{pipeline_file}"
- assert p.is_file()
- # try export using Airflow runtime configuration
- result = runner.invoke(pipeline, ["export", str(p), "--runtime-config", airflow_runtime_instance])
- assert result.exit_code != 0, result.output
- assert (
- "The runtime configuration type 'APACHE_AIRFLOW' does not "
- "match the pipeline's runtime type 'KUBEFLOW_PIPELINES'." in result.output
- )
- # try exporting an Airflow pipeline using a Kubeflow Pipelines runtime configuration
- pipeline_file = "airflow.pipeline"
- p = Path(__file__).parent / "resources" / "pipelines" / f"{pipeline_file}"
- assert p.is_file()
- # try export using KFP runtime configuration
- result = runner.invoke(pipeline, ["export", str(p), "--runtime-config", kubeflow_pipelines_runtime_instance])
- assert result.exit_code != 0, result.output
- assert (
- "The runtime configuration type 'KUBEFLOW_PIPELINES' does not "
- "match the pipeline's runtime type 'APACHE_AIRFLOW'." in result.output
- )
- @pytest.mark.parametrize("catalog_instance", [KFP_COMPONENT_CACHE_INSTANCE], indirect=True)
- def test_export_kubeflow_output_option(jp_environ, kubeflow_pipelines_runtime_instance, catalog_instance):
- """Verify that the '--output' option works as expected for Kubeflow Pipelines"""
- runner = CliRunner()
- with runner.isolated_filesystem():
- cwd = Path.cwd().resolve()
- # copy pipeline file and depencencies
- prepare_export_work_dir(str(cwd), Path(__file__).parent / "resources" / "pipelines")
- pipeline_file = "kfp_3_node_custom.pipeline"
- pipeline_file_path = cwd / pipeline_file
- # make sure the pipeline file exists
- assert pipeline_file_path.is_file() is True
- print(f"Pipeline file: {pipeline_file_path}")
- # Test: '--output' not specified; exported file is created
- # in current directory and named like the pipeline file with
- # a '.yaml' suffix
- expected_output_file = pipeline_file_path.with_suffix(".yaml")
- # this should succeed
- result = runner.invoke(
- pipeline, ["export", str(pipeline_file_path), "--runtime-config", kubeflow_pipelines_runtime_instance]
- )
- assert result.exit_code == 0, result.output
- assert f"was exported to '{str(expected_output_file)}" in result.output, result.output
- # Test: '--output' specified and ends with '.yaml'
- expected_output_file = cwd / "test-dir" / "output.yaml"
- # this should succeed
- result = runner.invoke(
- pipeline,
- [
- "export",
- str(pipeline_file_path),
- "--runtime-config",
- kubeflow_pipelines_runtime_instance,
- "--output",
- str(expected_output_file),
- ],
- )
- assert result.exit_code == 0, result.output
- assert f"was exported to '{str(expected_output_file)}" in result.output, result.output
- # Test: '--output' specified and ends with '.yml'
- expected_output_file = cwd / "test-dir-2" / "output.yml"
- # this should succeed
- result = runner.invoke(
- pipeline,
- [
- "export",
- str(pipeline_file_path),
- "--runtime-config",
- kubeflow_pipelines_runtime_instance,
- "--output",
- str(expected_output_file),
- ],
- )
- assert result.exit_code == 0, result.output
- assert f"was exported to '{str(expected_output_file)}" in result.output, result.output
- def test_export_airflow_output_option(airflow_runtime_instance):
- """Verify that the '--output' option works as expected for Airflow"""
- runner = CliRunner()
- with runner.isolated_filesystem():
- cwd = Path.cwd().resolve()
- # copy pipeline file and depencencies
- prepare_export_work_dir(str(cwd), Path(__file__).parent / "resources" / "pipelines")
- pipeline_file = "airflow.pipeline"
- pipeline_file_path = cwd / pipeline_file
- # make sure the pipeline file exists
- assert pipeline_file_path.is_file() is True
- print(f"Pipeline file: {pipeline_file_path}")
- #
- # Test: '--output' not specified; exported file is created
- # in current directory and named like the pipeline file with
- # a '.py' suffix
- #
- expected_output_file = pipeline_file_path.with_suffix(".py")
- print(f"expected_output_file -> {expected_output_file}")
- do_mock_export(str(expected_output_file))
- # this should fail: default output file already exists
- result = runner.invoke(
- pipeline, ["export", str(pipeline_file_path), "--runtime-config", airflow_runtime_instance]
- )
- assert result.exit_code != 0, result.output
- assert (
- f"Error: Output file '{expected_output_file}' exists and option '--overwrite' "
- "was not specified." in result.output
- ), result.output
- #
- # Test: '--output' specified and ends with '.py' (the value is treated
- # as a file name)
- #
- expected_output_file = cwd / "test-dir-2" / "output.py"
- do_mock_export(str(expected_output_file))
- # this should fail: specified output file already exists
- result = runner.invoke(
- pipeline,
- [
- "export",
- str(pipeline_file_path),
- "--runtime-config",
- airflow_runtime_instance,
- "--output",
- str(expected_output_file),
- ],
- )
- assert result.exit_code != 0, result.output
- assert (
- f"Error: Output file '{expected_output_file}' exists and option '--overwrite' "
- "was not specified." in result.output
- ), result.output
- #
- # Test: '--output' specified and does not end with '.py' (the value
- # is treated as a directory)
- #
- output_dir = cwd / "test-dir-3"
- expected_output_file = output_dir / Path(pipeline_file).with_suffix(".py")
- do_mock_export(str(expected_output_file))
- # this should fail: specified output file already exists
- result = runner.invoke(
- pipeline,
- [
- "export",
- str(pipeline_file_path),
- "--runtime-config",
- airflow_runtime_instance,
- "--output",
- str(output_dir),
- ],
- )
- assert result.exit_code != 0, result.output
- assert (
- f"Error: Output file '{expected_output_file}' exists and option '--overwrite' "
- "was not specified." in result.output
- ), result.output
- @pytest.mark.parametrize("catalog_instance", [KFP_COMPONENT_CACHE_INSTANCE], indirect=True)
- def test_export_kubeflow_overwrite_option(jp_environ, kubeflow_pipelines_runtime_instance, catalog_instance):
- """Verify that the '--overwrite' option works as expected for Kubeflow Pipelines"""
- runner = CliRunner()
- with runner.isolated_filesystem():
- cwd = Path.cwd().resolve()
- # copy pipeline file and depencencies
- prepare_export_work_dir(str(cwd), Path(__file__).parent / "resources" / "pipelines")
- pipeline_file = "kfp_3_node_custom.pipeline"
- pipeline_file_path = cwd / pipeline_file
- # make sure the pipeline file exists
- assert pipeline_file_path.is_file() is True
- print(f"Pipeline file: {pipeline_file_path}")
- # Test: '--overwrite' not specified; exported file is created
- # in current directory and named like the pipeline file with
- # a '.yaml' suffix
- expected_output_file = pipeline_file_path.with_suffix(".yaml")
- # this should succeed
- result = runner.invoke(
- pipeline, ["export", str(pipeline_file_path), "--runtime-config", kubeflow_pipelines_runtime_instance]
- )
- assert result.exit_code == 0, result.output
- assert f"was exported to '{str(expected_output_file)}" in result.output, result.output
- # Test: '--overwrite' not specified; the output already exists
- # this should fail
- result = runner.invoke(
- pipeline, ["export", str(pipeline_file_path), "--runtime-config", kubeflow_pipelines_runtime_instance]
- )
- assert result.exit_code != 0, result.output
- assert f"Output file '{expected_output_file}' exists and option '--overwrite' was not" in result.output
- # Test: '--overwrite' specified; exported file is created
- # in current directory and named like the pipeline file with
- # a '.yaml' suffix
- # this should succeed
- result = runner.invoke(
- pipeline,
- ["export", str(pipeline_file_path), "--runtime-config", kubeflow_pipelines_runtime_instance, "--overwrite"],
- )
- assert result.exit_code == 0, result.output
- assert f"was exported to '{str(expected_output_file)}" in result.output, result.output
- # ------------------------------------------------------------------
- # end tests for 'export' command
- # ------------------------------------------------------------------
|