123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410 |
- #
- # Copyright 2018-2022 Elyra Authors
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- import os
- import string
- from typing import Dict
- from typing import List
- from typing import Optional
- from kfp.dsl import ContainerOp
- from kfp.dsl import RUN_ID_PLACEHOLDER
- from kubernetes.client.models import V1EmptyDirVolumeSource
- from kubernetes.client.models import V1EnvVar
- from kubernetes.client.models import V1EnvVarSource
- from kubernetes.client.models import V1ObjectFieldSelector
- from kubernetes.client.models import V1PersistentVolumeClaimVolumeSource
- from kubernetes.client.models import V1SecretKeySelector
- from kubernetes.client.models import V1Volume
- from kubernetes.client.models import V1VolumeMount
- from elyra._version import __version__
- from elyra.pipeline.pipeline import KubernetesSecret
- from elyra.pipeline.pipeline import VolumeMount
- """
- The ExecuteFileOp uses a python script to bootstrap the user supplied image with the required dependencies.
- In order for the script run properly, the image used, must at a minimum, have the 'curl' utility available
- and have python3
- """
- # Inputs and Outputs separator character. If updated,
- # same-named variable in bootstrapper.py must be updated!
- INOUT_SEPARATOR = ";"
- ELYRA_GITHUB_ORG = os.getenv("ELYRA_GITHUB_ORG", "elyra-ai")
- ELYRA_GITHUB_BRANCH = os.getenv("ELYRA_GITHUB_BRANCH", "master" if "dev" in __version__ else "v" + __version__)
- ELYRA_PIP_CONFIG_URL = os.getenv(
- "ELYRA_PIP_CONFIG_URL",
- f"https://raw.githubusercontent.com/{ELYRA_GITHUB_ORG}/elyra/{ELYRA_GITHUB_BRANCH}/etc/kfp/pip.conf",
- )
- ELYRA_BOOTSTRAP_SCRIPT_URL = os.getenv(
- "ELYRA_BOOTSTRAP_SCRIPT_URL",
- f"https://raw.githubusercontent.com/{ELYRA_GITHUB_ORG}/elyra/{ELYRA_GITHUB_BRANCH}/elyra/kfp/bootstrapper.py",
- )
- ELYRA_REQUIREMENTS_URL = os.getenv(
- "ELYRA_REQUIREMENTS_URL",
- f"https://raw.githubusercontent.com/{ELYRA_GITHUB_ORG}/"
- f"elyra/{ELYRA_GITHUB_BRANCH}/etc/generic/requirements-elyra.txt",
- )
- ELYRA_REQUIREMENTS_URL_PY37 = os.getenv(
- "ELYRA_REQUIREMENTS_URL_PY37",
- f"https://raw.githubusercontent.com/{ELYRA_GITHUB_ORG}/"
- f"elyra/{ELYRA_GITHUB_BRANCH}/etc/generic/requirements-elyra-py37.txt",
- )
- class ExecuteFileOp(ContainerOp):
- def __init__(
- self,
- pipeline_name: str,
- experiment_name: str,
- notebook: str,
- cos_endpoint: str,
- cos_bucket: str,
- cos_directory: str,
- cos_dependencies_archive: str,
- pipeline_version: Optional[str] = "",
- pipeline_source: Optional[str] = None,
- pipeline_outputs: Optional[List[str]] = None,
- pipeline_inputs: Optional[List[str]] = None,
- pipeline_envs: Optional[Dict[str, str]] = None,
- requirements_url: Optional[str] = None,
- bootstrap_script_url: Optional[str] = None,
- emptydir_volume_size: Optional[str] = None,
- cpu_request: Optional[str] = None,
- mem_request: Optional[str] = None,
- gpu_limit: Optional[str] = None,
- workflow_engine: Optional[str] = "argo",
- volume_mounts: Optional[List[VolumeMount]] = None,
- kubernetes_secrets: Optional[List[KubernetesSecret]] = None,
- **kwargs,
- ):
- """Create a new instance of ContainerOp.
- Args:
- pipeline_name: pipeline that this op belongs to
- experiment_name: the experiment where pipeline_name is executed
- notebook: name of the notebook that will be executed per this operation
- cos_endpoint: object storage endpoint e.g weaikish1.fyre.ibm.com:30442
- cos_bucket: bucket to retrieve archive from
- cos_directory: name of the directory in the object storage bucket to pull
- cos_dependencies_archive: archive file name to get from object storage bucket e.g archive1.tar.gz
- pipeline_version: optional version identifier
- pipeline_source: pipeline source
- pipeline_outputs: comma delimited list of files produced by the notebook
- pipeline_inputs: comma delimited list of files to be consumed/are required by the notebook
- pipeline_envs: dictionary of environmental variables to set in the container prior to execution
- requirements_url: URL to a python requirements.txt file to be installed prior to running the notebook
- bootstrap_script_url: URL to a custom python bootstrap script to run
- emptydir_volume_size: Size(GB) of the volume to create for the workspace when using CRIO container runtime
- cpu_request: number of CPUs requested for the operation
- mem_request: memory requested for the operation (in Gi)
- gpu_limit: maximum number of GPUs allowed for the operation
- workflow_engine: Kubeflow workflow engine, defaults to 'argo'
- volume_mounts: data volumes to be mounted
- kubernetes_secrets: secrets to be made available as environment variables
- kwargs: additional key value pairs to pass e.g. name, image, sidecars & is_exit_handler.
- See Kubeflow pipelines ContainerOp definition for more parameters or how to use
- https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.dsl.html#kfp.dsl.ContainerOp
- """
- self.pipeline_name = pipeline_name
- self.pipeline_version = pipeline_version
- self.pipeline_source = pipeline_source
- self.experiment_name = experiment_name
- self.notebook = notebook
- self.notebook_name = os.path.basename(notebook)
- self.cos_endpoint = cos_endpoint
- self.cos_bucket = cos_bucket
- self.cos_directory = cos_directory
- self.cos_dependencies_archive = cos_dependencies_archive
- self.container_work_dir_root_path = "./"
- self.container_work_dir_name = "jupyter-work-dir/"
- self.container_work_dir = self.container_work_dir_root_path + self.container_work_dir_name
- self.bootstrap_script_url = bootstrap_script_url
- self.requirements_url = requirements_url
- self.pipeline_outputs = pipeline_outputs
- self.pipeline_inputs = pipeline_inputs
- self.pipeline_envs = pipeline_envs
- self.cpu_request = cpu_request
- self.mem_request = mem_request
- self.gpu_limit = gpu_limit
- self.volume_mounts = volume_mounts # optional data volumes to be mounted to the pod
- self.kubernetes_secrets = kubernetes_secrets # optional secrets to be made available as env vars
- argument_list = []
- """ CRI-o support for kfp pipelines
- We need to attach an emptydir volume for each notebook that runs since CRI-o runtime does not allow
- us to write to the base image layer file system, only to volumes.
- """
- self.emptydir_volume_name = "workspace"
- self.emptydir_volume_size = emptydir_volume_size
- self.python_user_lib_path = ""
- self.python_user_lib_path_target = ""
- self.python_pip_config_url = ""
- if self.emptydir_volume_size:
- self.container_work_dir_root_path = "/opt/app-root/src/"
- self.container_python_dir_name = "python3/"
- self.container_work_dir = self.container_work_dir_root_path + self.container_work_dir_name
- self.python_user_lib_path = self.container_work_dir + self.container_python_dir_name
- self.python_user_lib_path_target = "--target=" + self.python_user_lib_path
- self.python_pip_config_url = ELYRA_PIP_CONFIG_URL
- if not self.bootstrap_script_url:
- self.bootstrap_script_url = ELYRA_BOOTSTRAP_SCRIPT_URL
- if not self.requirements_url:
- self.requirements_url = ELYRA_REQUIREMENTS_URL
- if "name" not in kwargs:
- raise TypeError("You need to provide a name for the operation.")
- elif not kwargs.get("name"):
- raise ValueError("You need to provide a name for the operation.")
- if "image" not in kwargs:
- raise ValueError("You need to provide an image.")
- if not notebook:
- raise ValueError("You need to provide a notebook.")
- if "arguments" not in kwargs:
- """If no arguments are passed, we use our own.
- If ['arguments'] are set, we assume container's ENTRYPOINT is set and dependencies are installed
- NOTE: Images being pulled must have python3 available on PATH and cURL utility
- """
- common_curl_options = '--fail -H "Cache-Control: no-cache"'
- argument_list.append(
- f"mkdir -p {self.container_work_dir} && cd {self.container_work_dir} && "
- f"echo 'Downloading {self.bootstrap_script_url}' && "
- f"curl {common_curl_options} -L {self.bootstrap_script_url} --output bootstrapper.py && "
- f"echo 'Downloading {self.requirements_url}' && "
- f"curl {common_curl_options} -L {self.requirements_url} --output requirements-elyra.txt && "
- f"echo 'Downloading {ELYRA_REQUIREMENTS_URL_PY37}' && "
- f"curl {common_curl_options} -L {ELYRA_REQUIREMENTS_URL_PY37} --output requirements-elyra-py37.txt && "
- )
- if self.emptydir_volume_size:
- argument_list.append(
- f"mkdir {self.container_python_dir_name} && cd {self.container_python_dir_name} && "
- f"echo 'Downloading {self.python_pip_config_url}' && "
- f"curl {common_curl_options} -L {self.python_pip_config_url} --output pip.conf && cd .. &&"
- )
- argument_list.append(
- f"python3 -m pip install {self.python_user_lib_path_target} packaging && "
- "python3 -m pip freeze > requirements-current.txt && "
- "python3 bootstrapper.py "
- f'--pipeline-name "{self.pipeline_name}" '
- f"--cos-endpoint {self.cos_endpoint} "
- f"--cos-bucket {self.cos_bucket} "
- f'--cos-directory "{self.cos_directory}" '
- f'--cos-dependencies-archive "{self.cos_dependencies_archive}" '
- f'--file "{self.notebook}" '
- )
- if self.pipeline_inputs:
- inputs_str = self._artifact_list_to_str(self.pipeline_inputs)
- argument_list.append(f'--inputs "{inputs_str}" ')
- if self.pipeline_outputs:
- outputs_str = self._artifact_list_to_str(self.pipeline_outputs)
- argument_list.append(f'--outputs "{outputs_str}" ')
- if self.emptydir_volume_size:
- argument_list.append(f'--user-volume-path "{self.python_user_lib_path}" ')
- kwargs["command"] = ["sh", "-c"]
- kwargs["arguments"] = "".join(argument_list)
- super().__init__(**kwargs)
- # add user-specified volume mounts: the referenced PVCs must exist
- # or this generic operation will fail
- if self.volume_mounts:
- unique_pvcs = []
- for volume_mount in self.volume_mounts:
- if volume_mount.pvc_name not in unique_pvcs:
- self.add_volume(
- V1Volume(
- name=volume_mount.pvc_name,
- persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(
- claim_name=volume_mount.pvc_name
- ),
- )
- )
- unique_pvcs.append(volume_mount.pvc_name)
- self.container.add_volume_mount(V1VolumeMount(mount_path=volume_mount.path, name=volume_mount.pvc_name))
- # We must deal with the envs after the superclass initialization since these amend the
- # container attribute that isn't available until now.
- if self.pipeline_envs:
- for key, value in self.pipeline_envs.items(): # Convert dict entries to format kfp needs
- self.container.add_env_variable(V1EnvVar(name=key, value=value))
- if self.kubernetes_secrets:
- for secret in self.kubernetes_secrets: # Convert tuple entries to format kfp needs
- self.container.add_env_variable(
- V1EnvVar(
- name=secret.env_var,
- value_from=V1EnvVarSource(secret_key_ref=V1SecretKeySelector(name=secret.name, key=secret.key)),
- )
- )
- # If crio volume size is found then assume kubeflow pipelines environment is using CRI-o as
- # its container runtime
- if self.emptydir_volume_size:
- self.add_volume(
- V1Volume(
- empty_dir=V1EmptyDirVolumeSource(medium="", size_limit=self.emptydir_volume_size),
- name=self.emptydir_volume_name,
- )
- )
- self.container.add_volume_mount(
- V1VolumeMount(mount_path=self.container_work_dir_root_path, name=self.emptydir_volume_name)
- )
- # Append to PYTHONPATH location of elyra dependencies in installed in Volume
- self.container.add_env_variable(V1EnvVar(name="PYTHONPATH", value=self.python_user_lib_path))
- if self.cpu_request:
- self.container.set_cpu_request(cpu=str(cpu_request))
- if self.mem_request:
- self.container.set_memory_request(memory=str(mem_request) + "G")
- if self.gpu_limit:
- gpu_vendor = self.pipeline_envs.get("GPU_VENDOR", "nvidia")
- self.container.set_gpu_limit(gpu=str(gpu_limit), vendor=gpu_vendor)
- # Generate unique ELYRA_RUN_NAME value and expose it as an environment
- # variable in the container
- if not workflow_engine:
- raise ValueError("workflow_engine is missing and needs to be specified.")
- if workflow_engine.lower() == "argo":
- # attach RUN_ID_PLACEHOLDER as run name
- # '{{workflow.annotations.pipelines.kubeflow.org/run_name}}' variable
- # cannot be resolved by Argo in KF 1.4
- run_name_placeholder = RUN_ID_PLACEHOLDER
- self.container.add_env_variable(V1EnvVar(name="ELYRA_RUN_NAME", value=run_name_placeholder))
- elif workflow_engine.lower() == "tekton":
- try:
- from kfp_tekton import TektonClient # noqa: F401
- except ImportError:
- raise ValueError(
- "kfp-tekton not installed. Please install using elyra[kfp-tekton] to use Tekton engine."
- )
- # For Tekton derive the value from the specified pod annotation
- annotation = "pipelines.kubeflow.org/run_name"
- field_path = f"metadata.annotations['{annotation}']"
- self.container.add_env_variable(
- V1EnvVar(
- name="ELYRA_RUN_NAME",
- value_from=V1EnvVarSource(field_ref=V1ObjectFieldSelector(field_path=field_path)),
- )
- )
- else:
- raise ValueError(f"{workflow_engine} is not a supported workflow engine.")
- # Attach metadata to the pod
- # Node type (a static type for this op)
- self.add_pod_label("elyra/node-type", ExecuteFileOp._normalize_label_value("notebook-script"))
- # Pipeline name
- self.add_pod_label("elyra/pipeline-name", ExecuteFileOp._normalize_label_value(self.pipeline_name))
- # Pipeline version
- self.add_pod_label("elyra/pipeline-version", ExecuteFileOp._normalize_label_value(self.pipeline_version))
- # Experiment name
- self.add_pod_label("elyra/experiment-name", ExecuteFileOp._normalize_label_value(self.experiment_name))
- # Pipeline node name
- self.add_pod_label("elyra/node-name", ExecuteFileOp._normalize_label_value(kwargs.get("name")))
- # Pipeline node file
- self.add_pod_annotation("elyra/node-file-name", self.notebook)
- # Identify the pipeline source, which can be a
- # pipeline file (mypipeline.pipeline), a Python
- # script or notebook that was submitted
- if self.pipeline_source is not None:
- self.add_pod_annotation("elyra/pipeline-source", self.pipeline_source)
- def _artifact_list_to_str(self, pipeline_array):
- trimmed_artifact_list = []
- for artifact_name in pipeline_array:
- if INOUT_SEPARATOR in artifact_name: # if INOUT_SEPARATOR is in name, throw since this is our separator
- raise ValueError(f"Illegal character ({INOUT_SEPARATOR}) found in filename '{artifact_name}'.")
- trimmed_artifact_list.append(artifact_name.strip())
- return INOUT_SEPARATOR.join(trimmed_artifact_list)
- @staticmethod
- def _normalize_label_value(value):
- """Produce a Kubernetes-compliant label from value
- Valid label values must be 63 characters or less and
- must be empty or begin and end with an alphanumeric
- character ([a-z0-9A-Z]) with dashes (-), underscores
- (_), dots (.), and alphanumerics between.
- """
- if value is None or len(value) == 0:
- return "" # nothing to do
- max_length = 63
- # This char is added at the front and/or back
- # of value, if the first and/or last character
- # is invalid. For example a value of "-abc"
- # is converted to "a-abc". The specified character
- # must meet the label value constraints.
- valid_char = "a"
- # This char is used to replace invalid characters
- # that are in the "middle" of value. For example
- # a value of "abc%def" is converted to "abc_def".
- # The specified character must meet the label value
- # constraints.
- valid_middle_char = "_"
- # must begin with [0-9a-zA-Z]
- valid_chars = string.ascii_letters + string.digits
- if value[0] not in valid_chars:
- value = valid_char + value
- value = value[:max_length] # enforce max length
- # must end with [0-9a-zA-Z]
- if value[-1] not in valid_chars:
- if len(value) <= max_length - 1:
- # append valid character if max length
- # would not be exceeded
- value = value + valid_char
- else:
- # replace with valid character
- value = value[:-1] + valid_char
- # middle chars must be [0-9a-zA-Z\-_.]
- valid_chars = valid_chars + "-_."
- newstr = ""
- for c in range(len(value)):
- if value[c] not in valid_chars:
- newstr = newstr + valid_middle_char
- else:
- newstr = newstr + value[c]
- value = newstr
- return value
|