|
@@ -15,7 +15,8 @@
|
|
|
#
|
|
|
"""Tests for elyra-pipeline application"""
|
|
|
import json
|
|
|
-import os
|
|
|
+from pathlib import Path
|
|
|
+import shutil
|
|
|
|
|
|
from click.testing import CliRunner
|
|
|
from conftest import KFP_COMPONENT_CACHE_INSTANCE
|
|
@@ -26,58 +27,58 @@ from elyra.metadata.manager import MetadataManager
|
|
|
from elyra.metadata.metadata import Metadata
|
|
|
from elyra.metadata.schemaspaces import Runtimes
|
|
|
|
|
|
-SUB_COMMANDS = ['run', 'submit', 'describe', 'validate']
|
|
|
-
|
|
|
-PIPELINE_SOURCE_WITH_ZERO_LENGTH_PIPELINES_FIELD = \
|
|
|
- '{"doc_type":"pipeline","version":"3.0","id":"0","primary_pipeline":"1","pipelines":[],"schemas":[]}'
|
|
|
-
|
|
|
-PIPELINE_SOURCE_WITHOUT_PIPELINES_FIELD = \
|
|
|
- '{"doc_type":"pipeline","version":"3.0","id":"0","primary_pipeline":"1","schemas":[]}'
|
|
|
-
|
|
|
-PIPELINE_SOURCE_WITH_ZERO_NODES = \
|
|
|
- '{"doc_type":"pipeline","version":"3.0","id":"0","primary_pipeline":"1","pipelines":[{"id":"1","nodes":[],"app_data":{"runtime":"","version": 5, "runtime_type": "KUBEFLOW_PIPELINES", "properties": {"name": "generic"}}, "schemas":[]}]}' # noqa
|
|
|
-
|
|
|
-KFP_RUNTIME_INSTANCE = {
|
|
|
- "display_name": "PipelineApp KFP runtime instance",
|
|
|
- "metadata": {
|
|
|
- "api_endpoint": "http://acme.com:32470/pipeline",
|
|
|
- "cos_endpoint": "http://acme.com:30205",
|
|
|
- "cos_username": "minio",
|
|
|
- "cos_password": "miniosecret",
|
|
|
- "cos_bucket": "my-bucket",
|
|
|
- "tags": [],
|
|
|
- "engine": "Argo",
|
|
|
- "user_namespace": "kubeflow-user-example-com",
|
|
|
- "api_username": "user@example.com",
|
|
|
- "api_password": "12341234",
|
|
|
- "runtime_type": "KUBEFLOW_PIPELINES",
|
|
|
- "auth_type": "DEX_LEGACY"
|
|
|
- },
|
|
|
- "schema_name": "kfp"
|
|
|
-}
|
|
|
+# used to drive generic parameter handling tests
|
|
|
+SUB_COMMANDS = ['run', 'submit', 'describe', 'validate', 'export']
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
-def kfp_runtime_instance():
|
|
|
- """Creates an instance of a kfp scehma and removes after test. """
|
|
|
- instance_name = "pipeline_app_test"
|
|
|
+def kubeflow_pipelines_runtime_instance():
|
|
|
+ """Creates a Kubeflow Pipelines RTC and removes it after test. """
|
|
|
+ instance_name = "valid_kfp_test_config"
|
|
|
+ instance_config_file = Path(__file__).parent / 'resources' / 'runtime_configs' / f'{instance_name}.json'
|
|
|
+ with open(instance_config_file, 'r') as fd:
|
|
|
+ instance_config = json.load(fd)
|
|
|
+
|
|
|
md_mgr = MetadataManager(schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_ID)
|
|
|
# clean possible orphaned instance...
|
|
|
try:
|
|
|
md_mgr.remove(instance_name)
|
|
|
except Exception:
|
|
|
pass
|
|
|
- runtime_instance = md_mgr.create(instance_name, Metadata(**KFP_RUNTIME_INSTANCE))
|
|
|
+ runtime_instance = md_mgr.create(instance_name, Metadata(**instance_config))
|
|
|
+ yield runtime_instance.name
|
|
|
+ md_mgr.remove(runtime_instance.name)
|
|
|
+
|
|
|
+
|
|
|
+@pytest.fixture
|
|
|
+def airflow_runtime_instance():
|
|
|
+ """Creates an airflow RTC and removes it after test. """
|
|
|
+ instance_name = "valid_airflow_test_config"
|
|
|
+ instance_config_file = Path(__file__).parent / 'resources' / 'runtime_configs' / f'{instance_name}.json'
|
|
|
+ with open(instance_config_file, 'r') as fd:
|
|
|
+ instance_config = json.load(fd)
|
|
|
+
|
|
|
+ md_mgr = MetadataManager(schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_ID)
|
|
|
+ # clean possible orphaned instance...
|
|
|
+ try:
|
|
|
+ md_mgr.remove(instance_name)
|
|
|
+ except Exception:
|
|
|
+ pass
|
|
|
+ runtime_instance = md_mgr.create(instance_name, Metadata(**instance_config))
|
|
|
yield runtime_instance.name
|
|
|
md_mgr.remove(runtime_instance.name)
|
|
|
|
|
|
|
|
|
def test_no_opts():
|
|
|
+ """Verify that all commands are displayed in help"""
|
|
|
runner = CliRunner()
|
|
|
result = runner.invoke(pipeline)
|
|
|
assert 'run Run a pipeline in your local environment' in result.output
|
|
|
assert 'submit Submit a pipeline to be executed on the server' in result.output
|
|
|
assert 'describe Display pipeline summary' in result.output
|
|
|
+ assert 'export Export a pipeline to a runtime-specific format' in result.output
|
|
|
+ assert 'validate Validate pipeline' in result.output
|
|
|
+
|
|
|
assert result.exit_code == 0
|
|
|
|
|
|
|
|
@@ -88,214 +89,110 @@ def test_bad_subcommand():
|
|
|
assert result.exit_code != 0
|
|
|
|
|
|
|
|
|
-def test_subcommand_no_opts():
|
|
|
+@pytest.mark.parametrize("subcommand", SUB_COMMANDS)
|
|
|
+def test_subcommand_no_opts(subcommand):
|
|
|
runner = CliRunner()
|
|
|
- for command in SUB_COMMANDS:
|
|
|
- result = runner.invoke(pipeline, [command])
|
|
|
- assert "Error: Missing argument 'PIPELINE_PATH'" in result.output
|
|
|
- assert result.exit_code != 0
|
|
|
-
|
|
|
-
|
|
|
-def test_run_with_invalid_pipeline():
|
|
|
- runner = CliRunner()
|
|
|
-
|
|
|
- result = runner.invoke(pipeline, ['run', 'foo.pipeline'])
|
|
|
- assert "Pipeline file not found:" in result.output
|
|
|
- assert "foo.pipeline" in result.output
|
|
|
- assert result.exit_code != 0
|
|
|
-
|
|
|
-
|
|
|
-def test_submit_with_invalid_pipeline(kfp_runtime_instance):
|
|
|
- runner = CliRunner()
|
|
|
-
|
|
|
- result = runner.invoke(pipeline, ['submit', 'foo.pipeline',
|
|
|
- '--runtime-config', kfp_runtime_instance])
|
|
|
- assert "Pipeline file not found:" in result.output
|
|
|
- assert "foo.pipeline" in result.output
|
|
|
- assert result.exit_code != 0
|
|
|
-
|
|
|
-
|
|
|
-def test_describe_with_invalid_pipeline():
|
|
|
- runner = CliRunner()
|
|
|
-
|
|
|
- result = runner.invoke(pipeline, ['describe', 'foo.pipeline'])
|
|
|
- assert "Pipeline file not found:" in result.output
|
|
|
- assert "foo.pipeline" in result.output
|
|
|
+ result = runner.invoke(pipeline, [subcommand])
|
|
|
assert result.exit_code != 0
|
|
|
+ assert "Error: Missing argument 'PIPELINE_PATH'" in result.output
|
|
|
|
|
|
|
|
|
-def test_validate_with_invalid_pipeline():
|
|
|
+@pytest.mark.parametrize("subcommand", SUB_COMMANDS)
|
|
|
+def test_subcommand_invalid_pipeline_path(subcommand):
|
|
|
+ """Verify that every command only accepts a valid pipeline_path file name"""
|
|
|
runner = CliRunner()
|
|
|
|
|
|
- result = runner.invoke(pipeline, ['validate', 'foo.pipeline'])
|
|
|
- assert "Pipeline file not found:" in result.output
|
|
|
- assert "foo.pipeline" in result.output
|
|
|
+ # test: file not found
|
|
|
+ file_name = 'no-such.pipeline'
|
|
|
+ result = runner.invoke(pipeline, [subcommand, file_name])
|
|
|
assert result.exit_code != 0
|
|
|
+ assert f"Invalid value for 'PIPELINE_PATH': '{file_name}' is not a file." in result.output
|
|
|
|
|
|
-
|
|
|
-def test_run_with_unsupported_file_type():
|
|
|
- runner = CliRunner()
|
|
|
+ # test: file with wrong extension
|
|
|
with runner.isolated_filesystem():
|
|
|
- with open('foo.ipynb', 'w') as f:
|
|
|
- f.write('{ "nbformat": 4, "cells": [] }')
|
|
|
-
|
|
|
- result = runner.invoke(pipeline, ['run', 'foo.ipynb'])
|
|
|
- assert "Pipeline file should be a [.pipeline] file" in result.output
|
|
|
+ file_name = 'wrong.extension'
|
|
|
+ with open(file_name, 'w') as f:
|
|
|
+ f.write('I am not a pipeline file.')
|
|
|
+ result = runner.invoke(pipeline, [subcommand, file_name])
|
|
|
assert result.exit_code != 0
|
|
|
+ assert f"Invalid value for 'PIPELINE_PATH': '{file_name}' is not a .pipeline file." in result.output
|
|
|
|
|
|
|
|
|
-def test_submit_with_unsupported_file_type(kfp_runtime_instance):
|
|
|
+@pytest.mark.parametrize("subcommand", SUB_COMMANDS)
|
|
|
+def test_subcommand_with_no_pipelines_field(subcommand, kubeflow_pipelines_runtime_instance):
|
|
|
+ """Verify that every command properly detects pipeline issues"""
|
|
|
runner = CliRunner()
|
|
|
with runner.isolated_filesystem():
|
|
|
- with open('foo.ipynb', 'w') as f:
|
|
|
- f.write('{ "nbformat": 4, "cells": [] }')
|
|
|
-
|
|
|
- result = runner.invoke(pipeline, ['submit', 'foo.ipynb',
|
|
|
- '--runtime-config', kfp_runtime_instance])
|
|
|
- assert "Pipeline file should be a [.pipeline] file" in result.output
|
|
|
- assert result.exit_code != 0
|
|
|
-
|
|
|
+ pipeline_file = 'pipeline_without_pipelines_field.pipeline'
|
|
|
+ pipeline_file_path = Path(__file__).parent / 'resources' / 'pipelines' / pipeline_file
|
|
|
+ assert pipeline_file_path.is_file()
|
|
|
|
|
|
-def test_describe_with_unsupported_file_type():
|
|
|
- runner = CliRunner()
|
|
|
- with runner.isolated_filesystem():
|
|
|
- with open('foo.ipynb', 'w') as f:
|
|
|
- f.write('{ "nbformat": 4, "cells": [] }')
|
|
|
+ # every CLI command invocation requires these parameters
|
|
|
+ invoke_parameters = [subcommand, str(pipeline_file_path)]
|
|
|
+ if subcommand in ['submit', 'export']:
|
|
|
+ # these commands also require a runtime configuration
|
|
|
+ invoke_parameters.extend(['--runtime-config', kubeflow_pipelines_runtime_instance])
|
|
|
|
|
|
- result = runner.invoke(pipeline, ['describe', 'foo.ipynb'])
|
|
|
- assert "Pipeline file should be a [.pipeline] file" in result.output
|
|
|
+ result = runner.invoke(pipeline, invoke_parameters)
|
|
|
assert result.exit_code != 0
|
|
|
-
|
|
|
-
|
|
|
-def test_validate_with_unsupported_file_type():
|
|
|
- runner = CliRunner()
|
|
|
- with runner.isolated_filesystem():
|
|
|
- with open('foo.ipynb', 'w') as f:
|
|
|
- f.write('{ "nbformat": 4, "cells": [] }')
|
|
|
-
|
|
|
- result = runner.invoke(pipeline, ['validate', 'foo.ipynb'])
|
|
|
- assert "Pipeline file should be a [.pipeline] file" in result.output
|
|
|
- assert result.exit_code != 0
|
|
|
-
|
|
|
-
|
|
|
-def test_run_with_no_pipelines_field():
|
|
|
- runner = CliRunner()
|
|
|
- with runner.isolated_filesystem():
|
|
|
- with open('foo.pipeline', 'w') as pipeline_file:
|
|
|
- pipeline_file.write(PIPELINE_SOURCE_WITHOUT_PIPELINES_FIELD)
|
|
|
- pipeline_file_path = os.path.join(os.getcwd(), pipeline_file.name)
|
|
|
-
|
|
|
- result = runner.invoke(pipeline, ['run', pipeline_file_path])
|
|
|
- assert "Pipeline is missing 'pipelines' field." in result.output
|
|
|
- assert result.exit_code != 0
|
|
|
-
|
|
|
-
|
|
|
-def test_submit_with_no_pipelines_field(kfp_runtime_instance):
|
|
|
- runner = CliRunner()
|
|
|
- with runner.isolated_filesystem():
|
|
|
- with open('foo.pipeline', 'w') as pipeline_file:
|
|
|
- pipeline_file.write(PIPELINE_SOURCE_WITHOUT_PIPELINES_FIELD)
|
|
|
- pipeline_file_path = os.path.join(os.getcwd(), pipeline_file.name)
|
|
|
-
|
|
|
- result = runner.invoke(pipeline, ['submit', pipeline_file_path,
|
|
|
- '--runtime-config', kfp_runtime_instance])
|
|
|
assert "Pipeline is missing 'pipelines' field." in result.output
|
|
|
- assert result.exit_code != 0
|
|
|
|
|
|
|
|
|
-def test_describe_with_no_pipelines_field():
|
|
|
+@pytest.mark.parametrize("subcommand", SUB_COMMANDS)
|
|
|
+def test_subcommand_with_zero_length_pipelines_field(subcommand, kubeflow_pipelines_runtime_instance):
|
|
|
+ """Verify that every command properly detects pipeline issues"""
|
|
|
runner = CliRunner()
|
|
|
with runner.isolated_filesystem():
|
|
|
- with open('foo.pipeline', 'w') as pipeline_file:
|
|
|
- pipeline_file.write(PIPELINE_SOURCE_WITHOUT_PIPELINES_FIELD)
|
|
|
- pipeline_file_path = os.path.join(os.getcwd(), pipeline_file.name)
|
|
|
-
|
|
|
- result = runner.invoke(pipeline, ['describe', pipeline_file_path])
|
|
|
- assert "Pipeline is missing 'pipelines' field." in result.output
|
|
|
- assert result.exit_code != 0
|
|
|
-
|
|
|
+ pipeline_file = 'pipeline_with_zero_length_pipelines_field.pipeline'
|
|
|
+ pipeline_file_path = Path(__file__).parent / 'resources' / 'pipelines' / pipeline_file
|
|
|
+ assert pipeline_file_path.is_file()
|
|
|
|
|
|
-def test_validate_with_no_pipelines_field():
|
|
|
- runner = CliRunner()
|
|
|
- with runner.isolated_filesystem():
|
|
|
- with open('foo.pipeline', 'w') as pipeline_file:
|
|
|
- pipeline_file.write(PIPELINE_SOURCE_WITHOUT_PIPELINES_FIELD)
|
|
|
- pipeline_file_path = os.path.join(os.getcwd(), pipeline_file.name)
|
|
|
+ # every CLI command invocation requires these parameters
|
|
|
+ invoke_parameters = [subcommand, str(pipeline_file_path)]
|
|
|
+ if subcommand in ['submit', 'export']:
|
|
|
+ # these commands also require a runtime configuration
|
|
|
+ invoke_parameters.extend(['--runtime-config', kubeflow_pipelines_runtime_instance])
|
|
|
|
|
|
- result = runner.invoke(pipeline, ['validate', pipeline_file_path])
|
|
|
- assert "Pipeline is missing 'pipelines' field." in result.output
|
|
|
+ result = runner.invoke(pipeline, invoke_parameters)
|
|
|
assert result.exit_code != 0
|
|
|
-
|
|
|
-
|
|
|
-def test_run_with_zero_length_pipelines_field():
|
|
|
- runner = CliRunner()
|
|
|
- with runner.isolated_filesystem():
|
|
|
- with open('foo.pipeline', 'w') as pipeline_file:
|
|
|
- pipeline_file.write(PIPELINE_SOURCE_WITH_ZERO_LENGTH_PIPELINES_FIELD)
|
|
|
- pipeline_file_path = os.path.join(os.getcwd(), pipeline_file.name)
|
|
|
-
|
|
|
- result = runner.invoke(pipeline, ['run', pipeline_file_path])
|
|
|
assert "Pipeline has zero length 'pipelines' field." in result.output
|
|
|
- assert result.exit_code != 0
|
|
|
|
|
|
|
|
|
-def test_submit_with_zero_length_pipelines_field(kfp_runtime_instance):
|
|
|
- runner = CliRunner()
|
|
|
- with runner.isolated_filesystem():
|
|
|
- with open('foo.pipeline', 'w') as pipeline_file:
|
|
|
- pipeline_file.write(PIPELINE_SOURCE_WITH_ZERO_LENGTH_PIPELINES_FIELD)
|
|
|
- pipeline_file_path = os.path.join(os.getcwd(), pipeline_file.name)
|
|
|
+@pytest.mark.parametrize("subcommand", SUB_COMMANDS)
|
|
|
+def test_subcommand_with_no_nodes(subcommand, kubeflow_pipelines_runtime_instance):
|
|
|
+ """Verify that every command properly detects pipeline issues"""
|
|
|
|
|
|
- result = runner.invoke(pipeline, ['submit', pipeline_file_path,
|
|
|
- '--runtime-config', kfp_runtime_instance])
|
|
|
- assert "Pipeline has zero length 'pipelines' field." in result.output
|
|
|
- assert result.exit_code != 0
|
|
|
+ # don't run this test for the `describe` command
|
|
|
+ # (see test_describe_with_no_nodes)
|
|
|
+ if subcommand == 'describe':
|
|
|
+ return
|
|
|
|
|
|
-
|
|
|
-def test_describe_with_zero_length_pipelines_field():
|
|
|
runner = CliRunner()
|
|
|
with runner.isolated_filesystem():
|
|
|
- with open('foo.pipeline', 'w') as pipeline_file:
|
|
|
- pipeline_file.write(PIPELINE_SOURCE_WITH_ZERO_LENGTH_PIPELINES_FIELD)
|
|
|
- pipeline_file_path = os.path.join(os.getcwd(), pipeline_file.name)
|
|
|
-
|
|
|
- result = runner.invoke(pipeline, ['describe', pipeline_file_path])
|
|
|
- assert "Pipeline has zero length 'pipelines' field." in result.output
|
|
|
- assert result.exit_code != 0
|
|
|
+ pipeline_file = 'pipeline_with_zero_nodes.pipeline'
|
|
|
+ pipeline_file_path = Path(__file__).parent / 'resources' / 'pipelines' / pipeline_file
|
|
|
+ assert pipeline_file_path.is_file()
|
|
|
|
|
|
+ # every CLI command invocation requires these parameters
|
|
|
+ invoke_parameters = [subcommand, str(pipeline_file_path)]
|
|
|
+ if subcommand in ['submit', 'export']:
|
|
|
+ # these commands also require a runtime configuration
|
|
|
+ invoke_parameters.extend(['--runtime-config', kubeflow_pipelines_runtime_instance])
|
|
|
|
|
|
-def test_run_pipeline_with_no_nodes():
|
|
|
- runner = CliRunner()
|
|
|
- with runner.isolated_filesystem():
|
|
|
- with open('foo.pipeline', 'w') as pipeline_file:
|
|
|
- pipeline_file.write(PIPELINE_SOURCE_WITH_ZERO_NODES)
|
|
|
- pipeline_file_path = os.path.join(os.getcwd(), pipeline_file.name)
|
|
|
-
|
|
|
- result = runner.invoke(pipeline, ['run', pipeline_file_path])
|
|
|
- assert "At least one node must exist in the primary pipeline." in result.output
|
|
|
+ result = runner.invoke(pipeline, invoke_parameters)
|
|
|
assert result.exit_code != 0
|
|
|
-
|
|
|
-
|
|
|
-def test_submit_pipeline_with_no_nodes(kfp_runtime_instance):
|
|
|
- runner = CliRunner()
|
|
|
- with runner.isolated_filesystem():
|
|
|
- with open('foo.pipeline', 'w') as pipeline_file:
|
|
|
- pipeline_file.write(PIPELINE_SOURCE_WITH_ZERO_NODES)
|
|
|
- pipeline_file_path = os.path.join(os.getcwd(), pipeline_file.name)
|
|
|
-
|
|
|
- result = runner.invoke(pipeline, ['submit', pipeline_file_path, '--runtime-config', kfp_runtime_instance])
|
|
|
assert "At least one node must exist in the primary pipeline." in result.output
|
|
|
- assert result.exit_code != 0
|
|
|
|
|
|
|
|
|
-def test_describe_with_empty_pipeline():
|
|
|
+def test_describe_with_no_nodes():
|
|
|
runner = CliRunner()
|
|
|
with runner.isolated_filesystem():
|
|
|
- with open('foo.pipeline', 'w') as pipeline_file:
|
|
|
- pipeline_file.write(PIPELINE_SOURCE_WITH_ZERO_NODES)
|
|
|
- pipeline_file_path = os.path.join(os.getcwd(), pipeline_file.name)
|
|
|
+ pipeline_file = 'pipeline_with_zero_nodes.pipeline'
|
|
|
+ pipeline_file_path = Path(__file__).parent / 'resources' / 'pipelines' / pipeline_file
|
|
|
+ assert pipeline_file_path.is_file()
|
|
|
|
|
|
- result = runner.invoke(pipeline, ['describe', pipeline_file_path])
|
|
|
+ result = runner.invoke(pipeline, ['describe', str(pipeline_file_path)])
|
|
|
+ assert result.exit_code == 0, result.output
|
|
|
assert "Description: None" in result.output
|
|
|
assert "Type: KUBEFLOW_PIPELINES" in result.output
|
|
|
assert "Nodes: 0" in result.output
|
|
@@ -305,9 +202,9 @@ def test_describe_with_empty_pipeline():
|
|
|
|
|
|
def test_describe_with_kfp_components():
|
|
|
runner = CliRunner()
|
|
|
- pipeline_file_path = os.path.join(os.path.dirname(__file__), 'resources', 'kfp_3_node_custom.pipeline')
|
|
|
+ pipeline_file_path = Path(__file__).parent / 'resources' / 'pipelines' / 'kfp_3_node_custom.pipeline'
|
|
|
|
|
|
- result = runner.invoke(pipeline, ['describe', pipeline_file_path])
|
|
|
+ result = runner.invoke(pipeline, ['describe', str(pipeline_file_path)])
|
|
|
assert "Description: 3-node custom component pipeline" in result.output
|
|
|
assert "Type: KUBEFLOW_PIPELINES" in result.output
|
|
|
assert "Nodes: 3" in result.output
|
|
@@ -322,11 +219,13 @@ def test_describe_with_kfp_components():
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize('component_cache_instance', [KFP_COMPONENT_CACHE_INSTANCE], indirect=True)
|
|
|
-def test_validate_with_kfp_components(kfp_runtime_instance, component_cache_instance):
|
|
|
+def test_validate_with_kfp_components(kubeflow_pipelines_runtime_instance, component_cache_instance):
|
|
|
runner = CliRunner()
|
|
|
- pipeline_file_path = os.path.join(os.path.dirname(__file__), 'resources', 'kfp_3_node_custom.pipeline')
|
|
|
-
|
|
|
- result = runner.invoke(pipeline, ['validate', pipeline_file_path, '--runtime-config', kfp_runtime_instance])
|
|
|
+ pipeline_file_path = Path(__file__).parent / 'resources' / 'pipelines' / 'kfp_3_node_custom.pipeline'
|
|
|
+ result = runner.invoke(pipeline, ['validate',
|
|
|
+ str(pipeline_file_path),
|
|
|
+ '--runtime-config',
|
|
|
+ kubeflow_pipelines_runtime_instance])
|
|
|
assert "Validating pipeline..." in result.output
|
|
|
assert result.exit_code == 0
|
|
|
|
|
@@ -334,8 +233,8 @@ def test_validate_with_kfp_components(kfp_runtime_instance, component_cache_inst
|
|
|
def test_describe_with_missing_kfp_component():
|
|
|
runner = CliRunner()
|
|
|
with runner.isolated_filesystem():
|
|
|
- valid_file_path = os.path.join(os.path.dirname(__file__), 'resources', 'kfp_3_node_custom.pipeline')
|
|
|
- pipeline_file_path = os.path.join(os.getcwd(), 'foo.pipeline')
|
|
|
+ valid_file_path = Path(__file__).parent / 'resources' / 'pipelines' / 'kfp_3_node_custom.pipeline'
|
|
|
+ pipeline_file_path = Path.cwd() / 'foo.pipeline'
|
|
|
with open(pipeline_file_path, 'w') as pipeline_file:
|
|
|
with open(valid_file_path) as valid_file:
|
|
|
valid_data = json.load(valid_file)
|
|
@@ -343,18 +242,18 @@ def test_describe_with_missing_kfp_component():
|
|
|
valid_data['pipelines'][0]['nodes'][0]['op'] = valid_data['pipelines'][0]['nodes'][0]['op'] + 'Missing'
|
|
|
pipeline_file.write(json.dumps(valid_data))
|
|
|
|
|
|
- result = runner.invoke(pipeline, ['describe', pipeline_file_path])
|
|
|
+ result = runner.invoke(pipeline, ['describe', str(pipeline_file_path)])
|
|
|
assert "Description: 3-node custom component pipeline" in result.output
|
|
|
assert "Type: KUBEFLOW_PIPELINES" in result.output
|
|
|
assert "Nodes: 3" in result.output
|
|
|
assert result.exit_code == 0
|
|
|
|
|
|
|
|
|
-def test_validate_with_missing_kfp_component(kfp_runtime_instance):
|
|
|
+def test_validate_with_missing_kfp_component(kubeflow_pipelines_runtime_instance):
|
|
|
runner = CliRunner()
|
|
|
with runner.isolated_filesystem():
|
|
|
- valid_file_path = os.path.join(os.path.dirname(__file__), 'resources', 'kfp_3_node_custom.pipeline')
|
|
|
- pipeline_file_path = os.path.join(os.getcwd(), 'foo.pipeline')
|
|
|
+ valid_file_path = Path(__file__).parent / 'resources' / 'pipelines' / 'kfp_3_node_custom.pipeline'
|
|
|
+ pipeline_file_path = Path.cwd() / 'foo.pipeline'
|
|
|
with open(pipeline_file_path, 'w') as pipeline_file:
|
|
|
with open(valid_file_path) as valid_file:
|
|
|
valid_data = json.load(valid_file)
|
|
@@ -362,7 +261,294 @@ def test_validate_with_missing_kfp_component(kfp_runtime_instance):
|
|
|
valid_data['pipelines'][0]['nodes'][0]['op'] = valid_data['pipelines'][0]['nodes'][0]['op'] + 'Missing'
|
|
|
pipeline_file.write(json.dumps(valid_data))
|
|
|
|
|
|
- result = runner.invoke(pipeline, ['validate', pipeline_file_path, '--runtime-config', kfp_runtime_instance])
|
|
|
+ result = runner.invoke(pipeline, ['validate',
|
|
|
+ str(pipeline_file_path),
|
|
|
+ '--runtime-config',
|
|
|
+ kubeflow_pipelines_runtime_instance])
|
|
|
assert "Validating pipeline..." in result.output
|
|
|
assert "[Error][Calculate data hash] - This component was not found in the catalog." in result.output
|
|
|
assert result.exit_code != 0
|
|
|
+
|
|
|
+# ------------------------------------------------------------------
|
|
|
+# tests for 'export' command
|
|
|
+# ------------------------------------------------------------------
|
|
|
+
|
|
|
+
|
|
|
+def do_mock_export(output_path: str, dir_only=False):
|
|
|
+ # simulate export result
|
|
|
+ p = Path(output_path)
|
|
|
+ # create parent directories, if required
|
|
|
+ if not p.parent.is_dir():
|
|
|
+ p.parent.mkdir(parents=True, exist_ok=True)
|
|
|
+ if dir_only:
|
|
|
+ return
|
|
|
+ # create a mock export file
|
|
|
+ with open(output_path, 'w') as output:
|
|
|
+ output.write('dummy export output')
|
|
|
+
|
|
|
+
|
|
|
+def prepare_export_work_dir(work_dir: str,
|
|
|
+ source_dir: str):
|
|
|
+ """Copies the files in source_dir to work_dir"""
|
|
|
+ for file in Path(source_dir).glob('*'):
|
|
|
+ shutil.copy(str(file), work_dir)
|
|
|
+ # print for debug purposes; this is only displayed if an assert fails
|
|
|
+ print(f"Work directory content: {list(Path(work_dir).glob('*'))}")
|
|
|
+
|
|
|
+
|
|
|
+def test_export_invalid_runtime_config():
|
|
|
+ """Test user error scenarios: the specified runtime configuration is 'invalid'"""
|
|
|
+ runner = CliRunner()
|
|
|
+
|
|
|
+ # test pipeline; it's not used in this test
|
|
|
+ pipeline_file = 'kubeflow_pipelines.pipeline'
|
|
|
+ p = Path(__file__).parent / 'resources' / 'pipelines' / f'{pipeline_file}'
|
|
|
+ assert p.is_file()
|
|
|
+
|
|
|
+ # no runtime configuration was specified
|
|
|
+ result = runner.invoke(pipeline,
|
|
|
+ ['export',
|
|
|
+ str(p)])
|
|
|
+ assert result.exit_code != 0, result.output
|
|
|
+ assert "Error: Missing option '--runtime-config'." in result.output, result.output
|
|
|
+
|
|
|
+ # runtime configuration does not exist
|
|
|
+ config_name = 'no-such-config'
|
|
|
+ result = runner.invoke(pipeline,
|
|
|
+ ['export',
|
|
|
+ str(p),
|
|
|
+ '--runtime-config',
|
|
|
+ config_name])
|
|
|
+ assert result.exit_code != 0, result.output
|
|
|
+ assert f'Error: Invalid runtime configuration: {config_name}' in result.output
|
|
|
+ assert f"No such instance named '{config_name}' was found in the runtimes schemaspace." in result.output
|
|
|
+
|
|
|
+
|
|
|
+def test_export_incompatible_runtime_config(kubeflow_pipelines_runtime_instance,
|
|
|
+ airflow_runtime_instance):
|
|
|
+ """
|
|
|
+ Test user error scenarios: the specified runtime configuration is not compatible
|
|
|
+ with the pipeline type, e.g. KFP pipeline with Airflow runtime config
|
|
|
+ """
|
|
|
+ runner = CliRunner()
|
|
|
+
|
|
|
+ # try exporting a KFP pipeline using an Airflow runtime configuration
|
|
|
+ pipeline_file = "kubeflow_pipelines.pipeline"
|
|
|
+ p = Path(__file__).parent / 'resources' / 'pipelines' / f'{pipeline_file}'
|
|
|
+ assert p.is_file()
|
|
|
+
|
|
|
+ # try export using Airflow runtime configuration
|
|
|
+ result = runner.invoke(pipeline,
|
|
|
+ ['export',
|
|
|
+ str(p),
|
|
|
+ '--runtime-config',
|
|
|
+ airflow_runtime_instance])
|
|
|
+
|
|
|
+ assert result.exit_code != 0, result.output
|
|
|
+ assert "The runtime configuration type 'APACHE_AIRFLOW' does not "\
|
|
|
+ "match the pipeline's runtime type 'KUBEFLOW_PIPELINES'." in result.output
|
|
|
+
|
|
|
+ # try exporting an Airflow pipeline using a Kubeflow Pipelines runtime configuration
|
|
|
+ pipeline_file = "airflow.pipeline"
|
|
|
+ p = Path(__file__).parent / 'resources' / 'pipelines' / f'{pipeline_file}'
|
|
|
+ assert p.is_file()
|
|
|
+
|
|
|
+ # try export using KFP runtime configuration
|
|
|
+ result = runner.invoke(pipeline,
|
|
|
+ ['export',
|
|
|
+ str(p),
|
|
|
+ '--runtime-config',
|
|
|
+ kubeflow_pipelines_runtime_instance])
|
|
|
+
|
|
|
+ assert result.exit_code != 0, result.output
|
|
|
+ assert "The runtime configuration type 'KUBEFLOW_PIPELINES' does not "\
|
|
|
+ "match the pipeline's runtime type 'APACHE_AIRFLOW'." in result.output
|
|
|
+
|
|
|
+
|
|
|
+@pytest.mark.parametrize('component_cache_instance', [KFP_COMPONENT_CACHE_INSTANCE], indirect=True)
|
|
|
+def test_export_kubeflow_output_option(kubeflow_pipelines_runtime_instance,
|
|
|
+ component_cache_instance):
|
|
|
+ """Verify that the '--output' option works as expected for Kubeflow Pipelines"""
|
|
|
+ runner = CliRunner()
|
|
|
+ with runner.isolated_filesystem():
|
|
|
+ cwd = Path.cwd().resolve()
|
|
|
+ # copy pipeline file and depencencies
|
|
|
+ prepare_export_work_dir(str(cwd),
|
|
|
+ Path(__file__).parent / 'resources' / 'pipelines')
|
|
|
+ pipeline_file = 'kfp_3_node_custom.pipeline'
|
|
|
+ pipeline_file_path = cwd / pipeline_file
|
|
|
+ # make sure the pipeline file exists
|
|
|
+ assert pipeline_file_path.is_file() is True
|
|
|
+ print(f'Pipeline file: {pipeline_file_path}')
|
|
|
+
|
|
|
+ # Test: '--output' not specified; exported file is created
|
|
|
+ # in current directory and named like the pipeline file with
|
|
|
+ # a '.yaml' suffix
|
|
|
+ expected_output_file = pipeline_file_path.with_suffix('.yaml')
|
|
|
+
|
|
|
+ # this should succeed
|
|
|
+ result = runner.invoke(pipeline, ['export',
|
|
|
+ str(pipeline_file_path),
|
|
|
+ '--runtime-config',
|
|
|
+ kubeflow_pipelines_runtime_instance])
|
|
|
+
|
|
|
+ assert result.exit_code == 0, result.output
|
|
|
+ assert f"was exported to '{str(expected_output_file)}" in result.output, result.output
|
|
|
+
|
|
|
+ # Test: '--output' specified and ends with '.yaml'
|
|
|
+ expected_output_file = cwd / 'test-dir' / 'output.yaml'
|
|
|
+
|
|
|
+ # this should succeed
|
|
|
+ result = runner.invoke(pipeline, ['export',
|
|
|
+ str(pipeline_file_path),
|
|
|
+ '--runtime-config',
|
|
|
+ kubeflow_pipelines_runtime_instance,
|
|
|
+ '--output',
|
|
|
+ str(expected_output_file)])
|
|
|
+
|
|
|
+ assert result.exit_code == 0, result.output
|
|
|
+ assert f"was exported to '{str(expected_output_file)}" in result.output, result.output
|
|
|
+
|
|
|
+ # Test: '--output' specified and ends with '.yml'
|
|
|
+ expected_output_file = cwd / 'test-dir-2' / 'output.yml'
|
|
|
+
|
|
|
+ # this should succeed
|
|
|
+ result = runner.invoke(pipeline, ['export',
|
|
|
+ str(pipeline_file_path),
|
|
|
+ '--runtime-config',
|
|
|
+ kubeflow_pipelines_runtime_instance,
|
|
|
+ '--output',
|
|
|
+ str(expected_output_file)])
|
|
|
+
|
|
|
+ assert result.exit_code == 0, result.output
|
|
|
+ assert f"was exported to '{str(expected_output_file)}" in result.output, result.output
|
|
|
+
|
|
|
+
|
|
|
+def test_export_airflow_output_option(airflow_runtime_instance):
|
|
|
+ """Verify that the '--output' option works as expected for Airflow"""
|
|
|
+ runner = CliRunner()
|
|
|
+ with runner.isolated_filesystem():
|
|
|
+ cwd = Path.cwd().resolve()
|
|
|
+ # copy pipeline file and depencencies
|
|
|
+ prepare_export_work_dir(str(cwd),
|
|
|
+ Path(__file__).parent / 'resources' / 'pipelines')
|
|
|
+ pipeline_file = 'airflow.pipeline'
|
|
|
+ pipeline_file_path = cwd / pipeline_file
|
|
|
+ # make sure the pipeline file exists
|
|
|
+ assert pipeline_file_path.is_file() is True
|
|
|
+ print(f'Pipeline file: {pipeline_file_path}')
|
|
|
+
|
|
|
+ #
|
|
|
+ # Test: '--output' not specified; exported file is created
|
|
|
+ # in current directory and named like the pipeline file with
|
|
|
+ # a '.py' suffix
|
|
|
+ #
|
|
|
+ expected_output_file = pipeline_file_path.with_suffix('.py')
|
|
|
+ print(f'expected_output_file -> {expected_output_file}')
|
|
|
+ do_mock_export(str(expected_output_file))
|
|
|
+
|
|
|
+ # this should fail: default output file already exists
|
|
|
+ result = runner.invoke(pipeline, ['export',
|
|
|
+ str(pipeline_file_path),
|
|
|
+ '--runtime-config',
|
|
|
+ airflow_runtime_instance])
|
|
|
+
|
|
|
+ assert result.exit_code != 0, result.output
|
|
|
+ assert f"Error: Output file '{expected_output_file}' exists and option '--overwrite' "\
|
|
|
+ "was not specified." in result.output, result.output
|
|
|
+
|
|
|
+ #
|
|
|
+ # Test: '--output' specified and ends with '.py' (the value is treated
|
|
|
+ # as a file name)
|
|
|
+ #
|
|
|
+ expected_output_file = cwd / 'test-dir-2' / 'output.py'
|
|
|
+ do_mock_export(str(expected_output_file))
|
|
|
+
|
|
|
+ # this should fail: specified output file already exists
|
|
|
+ result = runner.invoke(pipeline, ['export',
|
|
|
+ str(pipeline_file_path),
|
|
|
+ '--runtime-config',
|
|
|
+ airflow_runtime_instance,
|
|
|
+ '--output',
|
|
|
+ str(expected_output_file)])
|
|
|
+ assert result.exit_code != 0, result.output
|
|
|
+ assert f"Error: Output file '{expected_output_file}' exists and option '--overwrite' "\
|
|
|
+ "was not specified." in result.output, result.output
|
|
|
+
|
|
|
+ #
|
|
|
+ # Test: '--output' specified and does not end with '.py' (the value
|
|
|
+ # is treated as a directory)
|
|
|
+ #
|
|
|
+ output_dir = cwd / 'test-dir-3'
|
|
|
+ expected_output_file = output_dir / Path(pipeline_file).with_suffix('.py')
|
|
|
+ do_mock_export(str(expected_output_file))
|
|
|
+
|
|
|
+ # this should fail: specified output file already exists
|
|
|
+ result = runner.invoke(pipeline, ['export',
|
|
|
+ str(pipeline_file_path),
|
|
|
+ '--runtime-config',
|
|
|
+ airflow_runtime_instance,
|
|
|
+ '--output',
|
|
|
+ str(output_dir)])
|
|
|
+ assert result.exit_code != 0, result.output
|
|
|
+ assert f"Error: Output file '{expected_output_file}' exists and option '--overwrite' "\
|
|
|
+ "was not specified." in result.output, result.output
|
|
|
+
|
|
|
+
|
|
|
+@pytest.mark.parametrize('component_cache_instance', [KFP_COMPONENT_CACHE_INSTANCE], indirect=True)
|
|
|
+def test_export_kubeflow_overwrite_option(kubeflow_pipelines_runtime_instance,
|
|
|
+ component_cache_instance):
|
|
|
+ """Verify that the '--overwrite' option works as expected for Kubeflow Pipelines"""
|
|
|
+ runner = CliRunner()
|
|
|
+ with runner.isolated_filesystem():
|
|
|
+ cwd = Path.cwd().resolve()
|
|
|
+ # copy pipeline file and depencencies
|
|
|
+ prepare_export_work_dir(str(cwd),
|
|
|
+ Path(__file__).parent / 'resources' / 'pipelines')
|
|
|
+ pipeline_file = 'kfp_3_node_custom.pipeline'
|
|
|
+ pipeline_file_path = cwd / pipeline_file
|
|
|
+ # make sure the pipeline file exists
|
|
|
+ assert pipeline_file_path.is_file() is True
|
|
|
+ print(f'Pipeline file: {pipeline_file_path}')
|
|
|
+
|
|
|
+ # Test: '--overwrite' not specified; exported file is created
|
|
|
+ # in current directory and named like the pipeline file with
|
|
|
+ # a '.yaml' suffix
|
|
|
+ expected_output_file = pipeline_file_path.with_suffix('.yaml')
|
|
|
+
|
|
|
+ # this should succeed
|
|
|
+ result = runner.invoke(pipeline, ['export',
|
|
|
+ str(pipeline_file_path),
|
|
|
+ '--runtime-config',
|
|
|
+ kubeflow_pipelines_runtime_instance])
|
|
|
+
|
|
|
+ assert result.exit_code == 0, result.output
|
|
|
+ assert f"was exported to '{str(expected_output_file)}" in result.output, result.output
|
|
|
+
|
|
|
+ # Test: '--overwrite' not specified; the output already exists
|
|
|
+ # this should fail
|
|
|
+ result = runner.invoke(pipeline, ['export',
|
|
|
+ str(pipeline_file_path),
|
|
|
+ '--runtime-config',
|
|
|
+ kubeflow_pipelines_runtime_instance])
|
|
|
+
|
|
|
+ assert result.exit_code != 0, result.output
|
|
|
+ assert f"Output file '{expected_output_file}' exists and option '--overwrite' was not" in result.output
|
|
|
+
|
|
|
+ # Test: '--overwrite' specified; exported file is created
|
|
|
+ # in current directory and named like the pipeline file with
|
|
|
+ # a '.yaml' suffix
|
|
|
+ # this should succeed
|
|
|
+ result = runner.invoke(pipeline, ['export',
|
|
|
+ str(pipeline_file_path),
|
|
|
+ '--runtime-config',
|
|
|
+ kubeflow_pipelines_runtime_instance,
|
|
|
+ '--overwrite'])
|
|
|
+
|
|
|
+ assert result.exit_code == 0, result.output
|
|
|
+ assert f"was exported to '{str(expected_output_file)}" in result.output, result.output
|
|
|
+
|
|
|
+
|
|
|
+# ------------------------------------------------------------------
|
|
|
+# end tests for 'export' command
|
|
|
+# ------------------------------------------------------------------
|