123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813 |
- #
- # Copyright 2018-2022 Elyra Authors
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- from datetime import datetime
- import os
- import re
- import tempfile
- import time
- from typing import Dict
- from urllib.parse import urlsplit
- from kfp import Client as ArgoClient
- from kfp import compiler as kfp_argo_compiler
- from kfp import components as components
- from kfp.dsl import PipelineConf
- from kfp.aws import use_aws_secret # noqa H306
- from kubernetes import client as k8s_client
- from kubernetes.client import V1PersistentVolumeClaimVolumeSource
- from kubernetes.client import V1Toleration
- from kubernetes.client import V1Volume
- from kubernetes.client import V1VolumeMount
- try:
- from kfp_tekton import compiler as kfp_tekton_compiler
- from kfp_tekton import TektonClient
- except ImportError:
- # We may not have kfp-tekton available and that's okay!
- kfp_tekton_compiler = None
- TektonClient = None
- from elyra._version import __version__
- from elyra.kfp.operator import ExecuteFileOp
- from elyra.metadata.schemaspaces import RuntimeImages
- from elyra.metadata.schemaspaces import Runtimes
- from elyra.pipeline.component_catalog import ComponentCache
- from elyra.pipeline.kfp.kfp_authentication import AuthenticationError
- from elyra.pipeline.kfp.kfp_authentication import KFPAuthenticator
- from elyra.pipeline.pipeline import GenericOperation
- from elyra.pipeline.pipeline import Operation
- from elyra.pipeline.pipeline import Pipeline
- from elyra.pipeline.pipeline_constants import COS_OBJECT_PREFIX
- from elyra.pipeline.processor import PipelineProcessor
- from elyra.pipeline.processor import RuntimePipelineProcessor
- from elyra.pipeline.processor import RuntimePipelineProcessorResponse
- from elyra.pipeline.runtime_type import RuntimeProcessorType
- from elyra.util.cos import join_paths
- from elyra.util.path import get_absolute_path
- class KfpPipelineProcessor(RuntimePipelineProcessor):
- _type = RuntimeProcessorType.KUBEFLOW_PIPELINES
- _name = "kfp"
- # Provide users with the ability to identify a writable directory in the
- # running container where the notebook | script is executed. The location
- # must exist and be known before the container is started.
- # Defaults to `/tmp`
- WCD = os.getenv("ELYRA_WRITABLE_CONTAINER_DIR", "/tmp").strip().rstrip("/")
- def process(self, pipeline):
- """
- Runs a pipeline on Kubeflow Pipelines
- Each time a pipeline is processed, a new version
- is uploaded and run under the same experiment name.
- """
- timestamp = datetime.now().strftime("%m%d%H%M%S")
- ################
- # Runtime Configs
- ################
- runtime_configuration = self._get_metadata_configuration(
- schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_ID, name=pipeline.runtime_config
- )
- # unpack Kubeflow Pipelines configs
- api_endpoint = runtime_configuration.metadata["api_endpoint"].rstrip("/")
- public_api_endpoint = runtime_configuration.metadata.get("public_api_endpoint", api_endpoint)
- api_username = runtime_configuration.metadata.get("api_username")
- api_password = runtime_configuration.metadata.get("api_password")
- user_namespace = runtime_configuration.metadata.get("user_namespace")
- engine = runtime_configuration.metadata.get("engine")
- if engine == "Tekton" and not TektonClient:
- raise ValueError(
- "Python package `kfp-tekton` is not installed. "
- "Please install using `elyra[kfp-tekton]` to use Tekton engine."
- )
- # unpack Cloud Object Storage configs
- cos_endpoint = runtime_configuration.metadata["cos_endpoint"]
- cos_public_endpoint = runtime_configuration.metadata.get("public_cos_endpoint", cos_endpoint)
- cos_bucket = runtime_configuration.metadata["cos_bucket"]
- # Determine which provider to use to authenticate with Kubeflow
- auth_type = runtime_configuration.metadata.get("auth_type")
- try:
- auth_info = KFPAuthenticator().authenticate(
- api_endpoint,
- auth_type_str=auth_type,
- runtime_config_name=pipeline.runtime_config,
- auth_parm_1=api_username,
- auth_parm_2=api_password,
- )
- self.log.debug(f"Authenticator returned {auth_info}")
- except AuthenticationError as ae:
- if ae.get_request_history() is not None:
- self.log.info("An authentication error was raised. Diagnostic information follows.")
- self.log.info(ae.request_history_to_string())
- raise RuntimeError(f"Kubeflow authentication failed: {ae}")
- #############
- # Create Kubeflow Client
- #############
- try:
- if engine == "Tekton":
- client = TektonClient(
- host=api_endpoint,
- cookies=auth_info.get("cookies", None),
- credentials=auth_info.get("credentials", None),
- existing_token=auth_info.get("existing_token", None),
- namespace=user_namespace,
- )
- else:
- client = ArgoClient(
- host=api_endpoint,
- cookies=auth_info.get("cookies", None),
- credentials=auth_info.get("credentials", None),
- existing_token=auth_info.get("existing_token", None),
- namespace=user_namespace,
- )
- except Exception as ex:
- # a common cause of these errors is forgetting to include `/pipeline` or including it with an 's'
- api_endpoint_obj = urlsplit(api_endpoint)
- if api_endpoint_obj.path != "/pipeline":
- api_endpoint_tip = api_endpoint_obj._replace(path="/pipeline").geturl()
- tip_string = (
- f" - [TIP: did you mean to set '{api_endpoint_tip}' as the endpoint, "
- f"take care not to include 's' at end]"
- )
- else:
- tip_string = ""
- raise RuntimeError(
- f"Failed to initialize `kfp.Client()` against: '{api_endpoint}' - "
- f"Check Kubeflow Pipelines runtime configuration: '{pipeline.runtime_config}'"
- f"{tip_string}"
- ) from ex
- #############
- # Verify Namespace
- #############
- try:
- client.list_experiments(namespace=user_namespace, page_size=1)
- except Exception as ex:
- if user_namespace:
- tip_string = f"[TIP: ensure namespace '{user_namespace}' is correct]"
- else:
- tip_string = "[TIP: you probably need to set a namespace]"
- raise RuntimeError(
- f"Failed to `kfp.Client().list_experiments()` against: '{api_endpoint}' - "
- f"Check Kubeflow Pipelines runtime configuration: '{pipeline.runtime_config}' - "
- f"{tip_string}"
- ) from ex
- #############
- # Pipeline Metadata none - inherited
- #############
- # generate a pipeline name
- pipeline_name = pipeline.name
- # generate a pipeline description
- pipeline_description = pipeline.description
- if pipeline_description is None:
- pipeline_description = f"Created with Elyra {__version__} pipeline editor using `{pipeline.source}`."
- #############
- # Submit & Run the Pipeline
- #############
- self.log_pipeline_info(pipeline_name, "submitting pipeline")
- with tempfile.TemporaryDirectory() as temp_dir:
- self.log.debug(f"Created temporary directory at: {temp_dir}")
- pipeline_path = os.path.join(temp_dir, f"{pipeline_name}.tar.gz")
- #############
- # Get Pipeline ID
- #############
- try:
- # get the kubeflow pipeline id (returns None if not found, otherwise the ID of the pipeline)
- pipeline_id = client.get_pipeline_id(pipeline_name)
- # calculate what "pipeline version" name to use
- if pipeline_id is None:
- # the first "pipeline version" name must be the pipeline name
- pipeline_version_name = pipeline_name
- else:
- # generate a unique name for a new "pipeline version" by appending the current timestamp
- pipeline_version_name = f"{pipeline_name}-{timestamp}"
- except Exception as ex:
- raise RuntimeError(
- f"Failed to get ID of Kubeflow pipeline: '{pipeline_name}' - "
- f"Check Kubeflow Pipelines runtime configuration: '{pipeline.runtime_config}'"
- ) from ex
- #############
- # Compile the Pipeline
- #############
- try:
- t0 = time.time()
- # generate a name for the experiment (lowercase because experiments are case intensive)
- experiment_name = pipeline_name.lower()
- # Create an instance id that will be used to store
- # the pipelines' dependencies, if applicable
- pipeline_instance_id = f"{pipeline_name}-{timestamp}"
- pipeline_function = lambda: self._cc_pipeline( # nopep8 E731
- pipeline,
- pipeline_name=pipeline_name,
- pipeline_version=pipeline_version_name,
- experiment_name=experiment_name,
- pipeline_instance_id=pipeline_instance_id,
- )
- # collect pipeline configuration information
- pipeline_conf = self._generate_pipeline_conf(pipeline)
- # compile the pipeline
- if engine == "Tekton":
- kfp_tekton_compiler.TektonCompiler().compile(
- pipeline_function, pipeline_path, pipeline_conf=pipeline_conf
- )
- else:
- kfp_argo_compiler.Compiler().compile(pipeline_function, pipeline_path, pipeline_conf=pipeline_conf)
- except RuntimeError:
- raise
- except Exception as ex:
- raise RuntimeError(
- f"Failed to compile pipeline '{pipeline_name}' with engine '{engine}' to: '{pipeline_path}'"
- ) from ex
- self.log_pipeline_info(pipeline_name, "pipeline compiled", duration=time.time() - t0)
- #############
- # Upload Pipeline Version
- #############
- try:
- t0 = time.time()
- # CASE 1: pipeline needs to be created
- if pipeline_id is None:
- # create new pipeline (and initial "pipeline version")
- kfp_pipeline = client.upload_pipeline(
- pipeline_package_path=pipeline_path,
- pipeline_name=pipeline_name,
- description=pipeline_description,
- )
- # extract the ID of the pipeline we created
- pipeline_id = kfp_pipeline.id
- # the initial "pipeline version" has the same id as the pipeline itself
- version_id = pipeline_id
- # CASE 2: pipeline already exists
- else:
- # upload the "pipeline version"
- kfp_pipeline = client.upload_pipeline_version(
- pipeline_package_path=pipeline_path,
- pipeline_version_name=pipeline_version_name,
- pipeline_id=pipeline_id,
- )
- # extract the id of the "pipeline version" that was created
- version_id = kfp_pipeline.id
- except Exception as ex:
- # a common cause of these errors is forgetting to include `/pipeline` or including it with an 's'
- api_endpoint_obj = urlsplit(api_endpoint)
- if api_endpoint_obj.path != "/pipeline":
- api_endpoint_tip = api_endpoint_obj._replace(path="/pipeline").geturl()
- tip_string = (
- f" - [TIP: did you mean to set '{api_endpoint_tip}' as the endpoint, "
- f"take care not to include 's' at end]"
- )
- else:
- tip_string = ""
- raise RuntimeError(
- f"Failed to upload Kubeflow pipeline '{pipeline_name}' - "
- f"Check Kubeflow Pipelines runtime configuration: '{pipeline.runtime_config}'"
- f"{tip_string}"
- ) from ex
- self.log_pipeline_info(pipeline_name, "pipeline uploaded", duration=time.time() - t0)
- #############
- # Create Experiment
- #############
- try:
- t0 = time.time()
- # create a new experiment (if already exists, this a no-op)
- experiment = client.create_experiment(name=experiment_name, namespace=user_namespace)
- except Exception as ex:
- raise RuntimeError(
- f"Failed to create Kubeflow experiment: '{experiment_name}' - "
- f"Check Kubeflow Pipelines runtime configuration: '{pipeline.runtime_config}'"
- ) from ex
- self.log_pipeline_info(pipeline_name, "created experiment", duration=time.time() - t0)
- #############
- # Create Pipeline Run
- #############
- try:
- t0 = time.time()
- # generate name for the pipeline run
- job_name = pipeline_instance_id
- # create pipeline run (or specified pipeline version)
- run = client.run_pipeline(
- experiment_id=experiment.id, job_name=job_name, pipeline_id=pipeline_id, version_id=version_id
- )
- except Exception as ex:
- raise RuntimeError(
- f"Failed to create Kubeflow pipeline run: '{job_name}' - "
- f"Check Kubeflow Pipelines runtime configuration: '{pipeline.runtime_config}'"
- ) from ex
- if run is None:
- # client.run_pipeline seemed to have encountered an issue
- # but didn't raise an exception
- raise RuntimeError(
- f"Failed to create Kubeflow pipeline run: '{job_name}' - "
- f"Check Kubeflow Pipelines runtime configuration: '{pipeline.runtime_config}'"
- )
- self.log_pipeline_info(
- pipeline_name,
- f"pipeline submitted: {public_api_endpoint}/#/runs/details/{run.id}",
- duration=time.time() - t0,
- )
- if pipeline.contains_generic_operations():
- object_storage_url = f"{cos_public_endpoint}"
- os_path = join_paths(pipeline.pipeline_parameters.get(COS_OBJECT_PREFIX), pipeline_instance_id)
- object_storage_path = f"/{cos_bucket}/{os_path}"
- else:
- object_storage_url = None
- object_storage_path = None
- return KfpPipelineProcessorResponse(
- run_id=run.id,
- run_url=f"{public_api_endpoint}/#/runs/details/{run.id}",
- object_storage_url=object_storage_url,
- object_storage_path=object_storage_path,
- )
- def export(self, pipeline: Pipeline, pipeline_export_format: str, pipeline_export_path: str, overwrite: bool):
- # Verify that the KfpPipelineProcessor supports the given export format
- self._verify_export_format(pipeline_export_format)
- t0_all = time.time()
- timestamp = datetime.now().strftime("%m%d%H%M%S")
- pipeline_name = pipeline.name
- # Create an instance id that will be used to store
- # the pipelines' dependencies, if applicable
- pipeline_instance_id = f"{pipeline_name}-{timestamp}"
- # Since pipeline_export_path may be relative to the notebook directory, ensure
- # we're using its absolute form.
- absolute_pipeline_export_path = get_absolute_path(self.root_dir, pipeline_export_path)
- runtime_configuration = self._get_metadata_configuration(
- schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_ID, name=pipeline.runtime_config
- )
- engine = runtime_configuration.metadata.get("engine")
- if engine == "Tekton" and not TektonClient:
- raise ValueError("kfp-tekton not installed. Please install using elyra[kfp-tekton] to use Tekton engine.")
- if os.path.exists(absolute_pipeline_export_path) and not overwrite:
- raise ValueError("File " + absolute_pipeline_export_path + " already exists.")
- self.log_pipeline_info(pipeline_name, f"Exporting pipeline as a .{pipeline_export_format} file")
- # Export pipeline as static configuration file (YAML formatted)
- try:
- # Exported pipeline is not associated with an experiment
- # or a version. The association is established when the
- # pipeline is imported into KFP by the user.
- pipeline_function = lambda: self._cc_pipeline(
- pipeline, pipeline_name, pipeline_instance_id=pipeline_instance_id
- ) # nopep8
- if engine == "Tekton":
- self.log.info("Compiling pipeline for Tekton engine")
- kfp_tekton_compiler.TektonCompiler().compile(pipeline_function, absolute_pipeline_export_path)
- else:
- self.log.info("Compiling pipeline for Argo engine")
- kfp_argo_compiler.Compiler().compile(pipeline_function, absolute_pipeline_export_path)
- except RuntimeError:
- raise
- except Exception as ex:
- if ex.__cause__:
- raise RuntimeError(str(ex)) from ex
- raise RuntimeError(
- f"Error pre-processing pipeline '{pipeline_name}' for export to '{absolute_pipeline_export_path}'",
- str(ex),
- ) from ex
- self.log_pipeline_info(
- pipeline_name, f"pipeline exported to '{pipeline_export_path}'", duration=(time.time() - t0_all)
- )
- return pipeline_export_path # Return the input value, not its absolute form
- def _collect_envs(self, operation: Operation, **kwargs) -> Dict:
- """
- Amends envs collected from superclass with those pertaining to this subclass
- :return: dictionary containing environment name/value pairs
- """
- envs = super()._collect_envs(operation, **kwargs)
- # Only Unix-style path spec is supported.
- envs["ELYRA_WRITABLE_CONTAINER_DIR"] = self.WCD
- return envs
- def _cc_pipeline(
- self,
- pipeline: Pipeline,
- pipeline_name: str,
- pipeline_version: str = "",
- experiment_name: str = "",
- pipeline_instance_id: str = None,
- export=False,
- ):
- runtime_configuration = self._get_metadata_configuration(
- schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_ID, name=pipeline.runtime_config
- )
- cos_endpoint = runtime_configuration.metadata["cos_endpoint"]
- cos_username = runtime_configuration.metadata.get("cos_username")
- cos_password = runtime_configuration.metadata.get("cos_password")
- cos_secret = runtime_configuration.metadata.get("cos_secret")
- cos_bucket = runtime_configuration.metadata.get("cos_bucket")
- engine = runtime_configuration.metadata["engine"]
- pipeline_instance_id = pipeline_instance_id or pipeline_name
- artifact_object_prefix = join_paths(pipeline.pipeline_parameters.get(COS_OBJECT_PREFIX), pipeline_instance_id)
- self.log_pipeline_info(
- pipeline_name,
- f"processing pipeline dependencies for upload to '{cos_endpoint}' "
- f"bucket '{cos_bucket}' folder '{artifact_object_prefix}'",
- )
- t0_all = time.time()
- emptydir_volume_size = ""
- container_runtime = bool(os.getenv("CRIO_RUNTIME", "False").lower() == "true")
- # Create dictionary that maps component Id to its ContainerOp instance
- target_ops = {}
- # Sort operations based on dependency graph (topological order)
- sorted_operations = PipelineProcessor._sort_operations(pipeline.operations)
- # Determine whether access to cloud storage is required
- for operation in sorted_operations:
- if isinstance(operation, GenericOperation):
- self._verify_cos_connectivity(runtime_configuration)
- break
- # All previous operation outputs should be propagated throughout the pipeline.
- # In order to process this recursively, the current operation's inputs should be combined
- # from its parent's inputs (which, themselves are derived from the outputs of their parent)
- # and its parent's outputs.
- PipelineProcessor._propagate_operation_inputs_outputs(pipeline, sorted_operations)
- for operation in sorted_operations:
- if container_runtime:
- # Volume size to create when using CRI-o, NOTE: IBM Cloud minimum is 20Gi
- emptydir_volume_size = "20Gi"
- sanitized_operation_name = self._sanitize_operation_name(operation.name)
- # Create pipeline operation
- # If operation is one of the "generic" set of NBs or scripts, construct custom ExecuteFileOp
- if isinstance(operation, GenericOperation):
- # Collect env variables
- pipeline_envs = self._collect_envs(
- operation, cos_secret=cos_secret, cos_username=cos_username, cos_password=cos_password
- )
- operation_artifact_archive = self._get_dependency_archive_name(operation)
- self.log.debug(
- f"Creating pipeline component archive '{operation_artifact_archive}' for operation '{operation}'"
- )
- target_ops[operation.id] = ExecuteFileOp(
- name=sanitized_operation_name,
- pipeline_name=pipeline_name,
- experiment_name=experiment_name,
- notebook=operation.filename,
- cos_endpoint=cos_endpoint,
- cos_bucket=cos_bucket,
- cos_directory=artifact_object_prefix,
- cos_dependencies_archive=operation_artifact_archive,
- pipeline_version=pipeline_version,
- pipeline_source=pipeline.source,
- pipeline_inputs=operation.inputs,
- pipeline_outputs=operation.outputs,
- pipeline_envs=pipeline_envs,
- emptydir_volume_size=emptydir_volume_size,
- cpu_request=operation.cpu,
- mem_request=operation.memory,
- gpu_limit=operation.gpu,
- workflow_engine=engine,
- image=operation.runtime_image,
- file_outputs={
- "mlpipeline-metrics": f"{pipeline_envs['ELYRA_WRITABLE_CONTAINER_DIR']}/mlpipeline-metrics.json", # noqa
- "mlpipeline-ui-metadata": f"{pipeline_envs['ELYRA_WRITABLE_CONTAINER_DIR']}/mlpipeline-ui-metadata.json", # noqa
- },
- volume_mounts=operation.mounted_volumes,
- kubernetes_secrets=operation.kubernetes_secrets,
- kubernetes_tolerations=operation.kubernetes_tolerations,
- kubernetes_pod_annotations=operation.kubernetes_pod_annotations,
- )
- if operation.doc:
- target_ops[operation.id].add_pod_annotation("elyra/node-user-doc", operation.doc)
- # TODO Can we move all of this to apply to non-standard components as well? Test when servers are up
- if cos_secret and not export:
- target_ops[operation.id].apply(use_aws_secret(cos_secret))
- image_namespace = self._get_metadata_configuration(RuntimeImages.RUNTIME_IMAGES_SCHEMASPACE_ID)
- for image_instance in image_namespace:
- if image_instance.metadata["image_name"] == operation.runtime_image and image_instance.metadata.get(
- "pull_policy"
- ):
- target_ops[operation.id].container.set_image_pull_policy(image_instance.metadata["pull_policy"])
- self.log_pipeline_info(
- pipeline_name,
- f"processing operation dependencies for id '{operation.id}'",
- operation_name=operation.name,
- )
- self._upload_dependencies_to_object_store(
- runtime_configuration, pipeline_name, operation, prefix=artifact_object_prefix
- )
- # If operation is a "non-standard" component, load it's spec and create operation with factory function
- else:
- # Retrieve component from cache
- component = ComponentCache.instance().get_component(self._type, operation.classifier)
- # Convert the user-entered value of certain properties according to their type
- for component_property in component.properties:
- # Get corresponding property's value from parsed pipeline
- property_value = operation.component_params.get(component_property.ref)
- self.log.debug(
- f"Processing component parameter '{component_property.name}' "
- f"of type '{component_property.data_type}'"
- )
- if component_property.data_type == "inputpath":
- output_node_id = property_value["value"]
- output_node_parameter_key = property_value["option"].replace("elyra_output_", "")
- operation.component_params[component_property.ref] = target_ops[output_node_id].outputs[
- output_node_parameter_key
- ]
- elif component_property.data_type == "inputvalue":
- active_property = property_value["activeControl"]
- active_property_value = property_value.get(active_property, None)
- # If the value is not found, assign it the default value assigned in parser
- if active_property_value is None:
- active_property_value = component_property.value
- if isinstance(active_property_value, dict) and set(active_property_value.keys()) == {
- "value",
- "option",
- }:
- output_node_id = active_property_value["value"]
- output_node_parameter_key = active_property_value["option"].replace("elyra_output_", "")
- operation.component_params[component_property.ref] = target_ops[output_node_id].outputs[
- output_node_parameter_key
- ]
- elif component_property.default_data_type == "dictionary":
- processed_value = self._process_dictionary_value(active_property_value)
- operation.component_params[component_property.ref] = processed_value
- elif component_property.default_data_type == "list":
- processed_value = self._process_list_value(active_property_value)
- operation.component_params[component_property.ref] = processed_value
- else:
- operation.component_params[component_property.ref] = active_property_value
- # Build component task factory
- try:
- factory_function = components.load_component_from_text(component.definition)
- except Exception as e:
- # TODO Fix error messaging and break exceptions down into categories
- self.log.error(f"Error loading component spec for {operation.name}: {str(e)}")
- raise RuntimeError(f"Error loading component spec for {operation.name}.")
- # Add factory function, which returns a ContainerOp task instance, to pipeline operation dict
- try:
- comp_spec_inputs = [
- inputs.name.lower().replace(" ", "_") for inputs in factory_function.component_spec.inputs or []
- ]
- # Remove inputs and outputs from params dict
- # TODO: need to have way to retrieve only required params
- parameter_removal_list = ["inputs", "outputs"]
- for component_param in operation.component_params_as_dict.keys():
- if component_param not in comp_spec_inputs:
- parameter_removal_list.append(component_param)
- for parameter in parameter_removal_list:
- operation.component_params_as_dict.pop(parameter, None)
- # Create ContainerOp instance and assign appropriate user-provided name
- sanitized_component_params = {
- self._sanitize_param_name(name): value
- for name, value in operation.component_params_as_dict.items()
- }
- container_op = factory_function(**sanitized_component_params)
- container_op.set_display_name(operation.name)
- # Attach node comment
- if operation.doc:
- container_op.add_pod_annotation("elyra/node-user-doc", operation.doc)
- # Add user-specified volume mounts: the referenced PVCs must exist
- # or this operation will fail
- if operation.mounted_volumes:
- unique_pvcs = []
- for volume_mount in operation.mounted_volumes:
- if volume_mount.pvc_name not in unique_pvcs:
- container_op.add_volume(
- V1Volume(
- name=volume_mount.pvc_name,
- persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(
- claim_name=volume_mount.pvc_name
- ),
- )
- )
- unique_pvcs.append(volume_mount.pvc_name)
- container_op.add_volume_mount(
- V1VolumeMount(mount_path=volume_mount.path, name=volume_mount.pvc_name)
- )
- # Add user-specified Kubernetes tolerations
- if operation.kubernetes_tolerations:
- unique_tolerations = []
- for toleration in operation.kubernetes_tolerations:
- if str(toleration) not in unique_tolerations:
- container_op.add_toleration(
- V1Toleration(
- effect=toleration.effect,
- key=toleration.key,
- operator=toleration.operator,
- value=toleration.value,
- )
- )
- unique_tolerations.append(str(toleration))
- # Add user-specified pod annotations
- if operation.kubernetes_pod_annotations:
- unique_annotations = []
- for annotation in operation.kubernetes_pod_annotations:
- if annotation.key not in unique_annotations:
- container_op.add_pod_annotation(annotation.key, annotation.value)
- unique_annotations.append(annotation.key)
- # Force re-execution of the operation by setting staleness to zero days
- # https://www.kubeflow.org/docs/components/pipelines/overview/caching/#managing-caching-staleness
- if operation.disallow_cached_output:
- container_op.set_caching_options(enable_caching=False)
- container_op.execution_options.caching_strategy.max_cache_staleness = "P0D"
- target_ops[operation.id] = container_op
- except Exception as e:
- # TODO Fix error messaging and break exceptions down into categories
- self.log.error(f"Error constructing component {operation.name}: {str(e)}")
- raise RuntimeError(f"Error constructing component {operation.name}.")
- # Process dependencies after all the operations have been created
- for operation in pipeline.operations.values():
- op = target_ops[operation.id]
- for parent_operation_id in operation.parent_operation_ids:
- parent_op = target_ops[parent_operation_id] # Parent Operation
- op.after(parent_op)
- self.log_pipeline_info(pipeline_name, "pipeline dependencies processed", duration=(time.time() - t0_all))
- return target_ops
- def _generate_pipeline_conf(self, pipeline: dict) -> PipelineConf:
- """
- Returns a KFP pipeline configuration for this pipeline, which can be empty.
- :param pipeline: pipeline dictionary
- :type pipeline: dict
- :return: https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.dsl.html#kfp.dsl.PipelineConf
- :rtype: kfp.dsl import PipelineConf
- """
- self.log.debug("Generating pipeline configuration ...")
- pipeline_conf = PipelineConf()
- #
- # Gather input for container image pull secrets in support of private container image registries
- # https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.dsl.html#kfp.dsl.PipelineConf.set_image_pull_secrets
- #
- image_namespace = self._get_metadata_configuration(schemaspace=RuntimeImages.RUNTIME_IMAGES_SCHEMASPACE_ID)
- # iterate through pipeline operations and create list of Kubernetes secret names
- # that are associated with generic components
- container_image_pull_secret_names = []
- for operation in pipeline.operations.values():
- if isinstance(operation, GenericOperation):
- for image_instance in image_namespace:
- if image_instance.metadata["image_name"] == operation.runtime_image:
- if image_instance.metadata.get("pull_secret"):
- container_image_pull_secret_names.append(image_instance.metadata.get("pull_secret"))
- break
- if len(container_image_pull_secret_names) > 0:
- # de-duplicate the pull secret name list, create Kubernetes resource
- # references and add them to the pipeline configuration
- container_image_pull_secrets = []
- for secret_name in list(set(container_image_pull_secret_names)):
- container_image_pull_secrets.append(k8s_client.V1ObjectReference(name=secret_name))
- pipeline_conf.set_image_pull_secrets(container_image_pull_secrets)
- self.log.debug(
- f"Added {len(container_image_pull_secrets)}" " image pull secret(s) to the pipeline configuration."
- )
- return pipeline_conf
- @staticmethod
- def _sanitize_operation_name(name: str) -> str:
- """
- In KFP, only letters, numbers, spaces, "_", and "-" are allowed in name.
- :param name: name of the operation
- """
- return re.sub("-+", "-", re.sub("[^-_0-9A-Za-z ]+", "-", name)).lstrip("-").rstrip("-")
- @staticmethod
- def _sanitize_param_name(name: str) -> str:
- """
- Sanitize a component parameter name.
- Behavior is mirrored from how Kubeflow 1.X sanitizes identifier names:
- - https://github.com/kubeflow/pipelines/blob/1.8.1/sdk/python/kfp/components/_naming.py#L32-L42
- - https://github.com/kubeflow/pipelines/blob/1.8.1/sdk/python/kfp/components/_naming.py#L49-L50
- """
- normalized_name = name.lower()
- # remove non-word characters
- normalized_name = re.sub(r"[\W_]", " ", normalized_name)
- # no double spaces, leading or trailing spaces
- normalized_name = re.sub(" +", " ", normalized_name).strip()
- # no leading digits
- if re.match(r"\d", normalized_name):
- normalized_name = "n" + normalized_name
- return normalized_name.replace(" ", "_")
- class KfpPipelineProcessorResponse(RuntimePipelineProcessorResponse):
- _type = RuntimeProcessorType.KUBEFLOW_PIPELINES
- _name = "kfp"
- def __init__(self, run_id, run_url, object_storage_url, object_storage_path):
- super().__init__(run_url, object_storage_url, object_storage_path)
- self.run_id = run_id
- def to_json(self):
- response = super().to_json()
- response["run_id"] = self.run_id
- return response
|