123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624 |
- #
- # Copyright 2018-2022 Elyra Authors
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- from dataclasses import asdict as dataclass_asdict
- from dataclasses import dataclass
- from dataclasses import is_dataclass
- import json
- import os
- import sys
- from typing import Any
- from typing import Dict
- from typing import List
- from typing import Optional
- from elyra.pipeline.pipeline_constants import DISALLOW_CACHED_OUTPUT
- from elyra.pipeline.pipeline_constants import ENV_VARIABLES
- from elyra.pipeline.pipeline_constants import KUBERNETES_POD_ANNOTATIONS
- from elyra.pipeline.pipeline_constants import KUBERNETES_SECRETS
- from elyra.pipeline.pipeline_constants import KUBERNETES_TOLERATIONS
- from elyra.pipeline.pipeline_constants import MOUNTED_VOLUMES
- # TODO: Make pipeline version available more widely
- # as today is only available on the pipeline editor
- PIPELINE_CURRENT_VERSION = 7
- PIPELINE_CURRENT_SCHEMA = 3.0
- class Operation(object):
- """
- Represents a single operation in a pipeline representing a third-party component
- """
- generic_node_types = ["execute-notebook-node", "execute-python-node", "execute-r-node"]
- @classmethod
- def create_instance(
- cls,
- id: str,
- type: str,
- name: str,
- classifier: str,
- parent_operation_ids: Optional[List[str]] = None,
- component_params: Optional[Dict[str, Any]] = None,
- ) -> "Operation":
- """Class method that creates the appropriate instance of Operation based on inputs."""
- if Operation.is_generic_operation(classifier):
- return GenericOperation(
- id, type, name, classifier, parent_operation_ids=parent_operation_ids, component_params=component_params
- )
- return Operation(
- id, type, name, classifier, parent_operation_ids=parent_operation_ids, component_params=component_params
- )
- def __init__(
- self,
- id: str,
- type: str,
- name: str,
- classifier: str,
- parent_operation_ids: Optional[List[str]] = None,
- component_params: Optional[Dict[str, Any]] = None,
- ):
- """
- :param id: Generated UUID, 128 bit number used as a unique identifier
- e.g. 123e4567-e89b-12d3-a456-426614174000
- :param type: The type of node e.g. execution_node
- :param classifier: indicates the operation's class
- :param name: The name of the operation
- :param parent_operation_ids: List of parent operation 'ids' required to execute prior to this operation
- :param component_params: dictionary of parameter key:value pairs that are used in the creation of a
- a non-standard operation instance
- """
- # Validate that the operation has all required properties
- if not id:
- raise ValueError("Invalid pipeline operation: Missing field 'operation id'.")
- if not type:
- raise ValueError("Invalid pipeline operation: Missing field 'operation type'.")
- if not classifier:
- raise ValueError("Invalid pipeline operation: Missing field 'operation classifier'.")
- if not name:
- raise ValueError("Invalid pipeline operation: Missing field 'operation name'.")
- self._id = id
- self._type = type
- self._classifier = classifier
- self._name = name
- self._parent_operation_ids = parent_operation_ids or []
- self._component_params = component_params or {}
- self._doc = None
- self._mounted_volumes = []
- param_volumes = component_params.get(MOUNTED_VOLUMES)
- if (
- param_volumes is not None
- and isinstance(param_volumes, list)
- and (len(param_volumes) == 0 or isinstance(param_volumes[0], VolumeMount))
- ):
- # The mounted_volumes property is an Elyra system property (ie, not defined in the component
- # spec) and must be removed from the component_params dict
- self._mounted_volumes = self._component_params.pop(MOUNTED_VOLUMES, [])
- self._kubernetes_tolerations = []
- param_tolerations = component_params.get(KUBERNETES_TOLERATIONS)
- if (
- param_tolerations is not None
- and isinstance(param_tolerations, list)
- and (len(param_tolerations) == 0 or isinstance(param_tolerations[0], KubernetesToleration))
- ):
- # The kubernetes_tolerations property is the Elyra system property (ie, not defined in the component
- # spec) and must be removed from the component_params dict
- self._kubernetes_tolerations = self._component_params.pop(KUBERNETES_TOLERATIONS, [])
- self._kubernetes_pod_annotations = []
- param_annotations = component_params.get(KUBERNETES_POD_ANNOTATIONS)
- if (
- param_annotations is not None
- and isinstance(param_annotations, list)
- and (len(param_annotations) == 0 or isinstance(param_annotations[0], KubernetesAnnotation))
- ):
- # The kubernetes_pod_annotations property is an Elyra system property (ie, not defined in the component
- # spec) and must be removed from the component_params dict
- self._kubernetes_pod_annotations = self._component_params.pop(KUBERNETES_POD_ANNOTATIONS, [])
- # If disabled, this operation is requested to be re-executed in the
- # target runtime environment, even if it was executed before.
- param_disallow_cached_output = component_params.get(DISALLOW_CACHED_OUTPUT)
- self._disallow_cached_output = param_disallow_cached_output
- # Scrub the inputs and outputs lists
- self._component_params["inputs"] = Operation._scrub_list(component_params.get("inputs", []))
- self._component_params["outputs"] = Operation._scrub_list(component_params.get("outputs", []))
- @property
- def id(self) -> str:
- return self._id
- @property
- def type(self) -> str:
- return self._type
- @property
- def classifier(self) -> str:
- return self._classifier
- @property
- def name(self) -> str:
- return self._name
- @name.setter
- def name(self, value: str):
- self._name = value
- @property
- def doc(self) -> str:
- return self._doc
- @doc.setter
- def doc(self, value: str):
- self._doc = value
- @property
- def parent_operation_ids(self) -> List[str]:
- return self._parent_operation_ids
- @property
- def component_params(self) -> Optional[Dict[str, Any]]:
- return self._component_params
- @property
- def component_params_as_dict(self) -> Dict[str, Any]:
- return self._component_params or {}
- @property
- def mounted_volumes(self) -> List["VolumeMount"]:
- return self._mounted_volumes
- @property
- def kubernetes_tolerations(self) -> List["KubernetesToleration"]:
- return self._kubernetes_tolerations
- @property
- def kubernetes_pod_annotations(self) -> List["KubernetesAnnotation"]:
- return self._kubernetes_pod_annotations
- @property
- def disallow_cached_output(self) -> Optional[bool]:
- """
- Returns None if caching behavior is delegated to the runtime
- Returns True if cached output may be used (instead of executing the op to produce it)
- Returns False if cached output must not be used (instead of executing the op to produce it)
- """
- return self._disallow_cached_output
- @property
- def inputs(self) -> Optional[List[str]]:
- return self._component_params.get("inputs")
- @inputs.setter
- def inputs(self, value: List[str]):
- self._component_params["inputs"] = value
- @property
- def outputs(self) -> Optional[List[str]]:
- return self._component_params.get("outputs")
- @property
- def is_generic(self) -> bool:
- return isinstance(self, GenericOperation)
- @outputs.setter
- def outputs(self, value: List[str]):
- self._component_params["outputs"] = value
- def __eq__(self, other: "Operation") -> bool:
- if isinstance(self, other.__class__):
- return (
- self.id == other.id
- and self.type == other.type
- and self.classifier == other.classifier
- and self.name == other.name
- and self.parent_operation_ids == other.parent_operation_ids
- and self.component_params == other.component_params
- )
- return False
- def __str__(self) -> str:
- params = ""
- for key, value in self.component_params_as_dict.items():
- params += f"\t{key}: {value}, \n"
- return (
- f"componentID : {self.id} \n "
- f"name : {self.name} \n "
- f"parent_operation_ids : {self.parent_operation_ids} \n "
- f"component_parameters: {{\n{params}}} \n"
- )
- @staticmethod
- def _scrub_list(dirty: Optional[List[Optional[str]]]) -> List[str]:
- """
- Clean an existing list by filtering out None and empty string values
- :param dirty: a List of values
- :return: a clean list without None or empty string values
- """
- if not dirty:
- return []
- return [clean for clean in dirty if clean]
- @staticmethod
- def is_generic_operation(operation_classifier) -> bool:
- return operation_classifier in Operation.generic_node_types
- class GenericOperation(Operation):
- """
- Represents a single operation in a pipeline representing a generic (built-in) component
- """
- def __init__(
- self,
- id: str,
- type: str,
- name: str,
- classifier: str,
- parent_operation_ids: Optional[List[str]] = None,
- component_params: Optional[Dict[str, Any]] = None,
- ):
- """
- :param id: Generated UUID, 128 bit number used as a unique identifier
- e.g. 123e4567-e89b-12d3-a456-426614174000
- :param type: The type of node e.g. execution_node
- :param classifier: indicates the operation's class
- :param name: The name of the operation
- :param parent_operation_ids: List of parent operation 'ids' required to execute prior to this operation
- :param component_params: dictionary of parameter key:value pairs that are used in the creation of a
- a non-standard operation instance
- Component_params for "generic components" (i.e., those with one of the following classifier values:
- ["execute-notebook-node", "execute-python-node", "execute-r-node"]) can expect to have the following
- entries.
- filename: The relative path to the source file in the users local environment
- to be executed e.g. path/to/file.ext
- runtime_image: The DockerHub image to be used for the operation
- e.g. user/docker_image_name:tag
- dependencies: List of local files/directories needed for the operation to run
- and packaged into each operation's dependency archive
- include_subdirectories: Include or Exclude subdirectories when packaging our 'dependencies'
- env_vars: List of Environmental variables to set in the container image
- e.g. FOO="BAR"
- inputs: List of files to be consumed by this operation, produced by parent operation(s)
- outputs: List of files produced by this operation to be included in a child operation(s)
- cpu: number of cpus requested to run the operation
- memory: amount of memory requested to run the operation (in Gi)
- gpu: number of gpus requested to run the operation
- Entries for other (non-built-in) component types are a function of the respective component.
- """
- super().__init__(
- id, type, name, classifier, parent_operation_ids=parent_operation_ids, component_params=component_params
- )
- if not component_params.get("filename"):
- raise ValueError("Invalid pipeline operation: Missing field 'operation filename'.")
- if not component_params.get("runtime_image"):
- raise ValueError("Invalid pipeline operation: Missing field 'operation runtime image'.")
- if component_params.get("cpu") and not self._validate_range(component_params.get("cpu"), min_value=1):
- raise ValueError("Invalid pipeline operation: CPU must be a positive value or None")
- if component_params.get("gpu") and not self._validate_range(component_params.get("gpu"), min_value=0):
- raise ValueError("Invalid pipeline operation: GPU must be a positive value or None")
- if component_params.get("memory") and not self._validate_range(component_params.get("memory"), min_value=1):
- raise ValueError("Invalid pipeline operation: Memory must be a positive value or None")
- # Re-build object to include default values
- self._component_params["filename"] = component_params.get("filename")
- self._component_params["runtime_image"] = component_params.get("runtime_image")
- self._component_params["dependencies"] = Operation._scrub_list(component_params.get("dependencies", []))
- self._component_params["include_subdirectories"] = component_params.get("include_subdirectories", False)
- self._component_params["env_vars"] = KeyValueList(Operation._scrub_list(component_params.get("env_vars", [])))
- self._component_params["cpu"] = component_params.get("cpu")
- self._component_params["gpu"] = component_params.get("gpu")
- self._component_params["memory"] = component_params.get("memory")
- @property
- def name(self) -> str:
- if self._name == os.path.basename(self.filename):
- self._name = os.path.basename(self._name).split(".")[0]
- return self._name
- @name.setter
- def name(self, value):
- self._name = value
- @property
- def filename(self) -> str:
- return self._component_params.get("filename")
- @property
- def runtime_image(self) -> str:
- return self._component_params.get("runtime_image")
- @property
- def dependencies(self) -> Optional[List[str]]:
- return self._component_params.get("dependencies")
- @property
- def include_subdirectories(self) -> Optional[bool]:
- return self._component_params.get("include_subdirectories")
- @property
- def env_vars(self) -> Optional["KeyValueList"]:
- return self._component_params.get(ENV_VARIABLES)
- @property
- def cpu(self) -> Optional[str]:
- return self._component_params.get("cpu")
- @property
- def memory(self) -> Optional[str]:
- return self._component_params.get("memory")
- @property
- def gpu(self) -> Optional[str]:
- return self._component_params.get("gpu")
- @property
- def kubernetes_secrets(self) -> List["KubernetesSecret"]:
- return self._component_params.get(KUBERNETES_SECRETS)
- def __eq__(self, other: "GenericOperation") -> bool:
- if isinstance(self, other.__class__):
- return super().__eq__(other)
- return False
- def _validate_range(self, value: str, min_value: int = 0, max_value: int = sys.maxsize) -> bool:
- return int(value) in range(min_value, max_value)
- class Pipeline(object):
- """
- Represents a single pipeline constructed in the pipeline editor
- """
- def __init__(
- self,
- id: str,
- name: str,
- runtime: str,
- runtime_config: str,
- source: Optional[str] = None,
- description: Optional[str] = None,
- pipeline_parameters: Optional[Dict[str, Any]] = None,
- ):
- """
- :param id: Generated UUID, 128 bit number used as a unique identifier
- e.g. 123e4567-e89b-12d3-a456-426614174000
- :param name: Pipeline name, e.g. test-pipeline-123456
- :param runtime: Type of runtime we want to use to execute our pipeline, e.g. kfp OR airflow
- :param runtime_config: Runtime configuration that should be used to submit the pipeline to execution
- :param source: The pipeline source, e.g. a pipeline file or a notebook.
- :param description: Pipeline description
- :param pipeline_parameters: Key/value pairs representing the parameters of this pipeline
- """
- if not name:
- raise ValueError("Invalid pipeline: Missing pipeline name.")
- if not runtime:
- raise ValueError("Invalid pipeline: Missing runtime.")
- if not runtime_config:
- raise ValueError("Invalid pipeline: Missing runtime configuration.")
- self._id = id
- self._name = name
- self._description = description
- self._source = source
- self._runtime = runtime
- self._runtime_config = runtime_config
- self._pipeline_parameters = pipeline_parameters or {}
- self._operations = {}
- @property
- def id(self) -> str:
- return self._id
- @property
- def name(self) -> str:
- return self._name
- @property
- def source(self) -> str:
- return self._source
- @property
- def runtime(self) -> str:
- """
- The runtime processor name that will execute the pipeline
- """
- return self._runtime
- @property
- def runtime_config(self) -> str:
- """
- The runtime configuration that should be used to submit the pipeline for execution
- """
- return self._runtime_config
- @property
- def pipeline_parameters(self) -> Dict[str, Any]:
- """
- The dictionary of global parameters associated with each node of the pipeline
- """
- return self._pipeline_parameters
- @property
- def operations(self) -> Dict[str, Operation]:
- return self._operations
- @property
- def description(self) -> Optional[str]:
- """
- Pipeline description
- """
- return self._description
- def contains_generic_operations(self) -> bool:
- """
- Returns a truthy value indicating whether the pipeline contains
- one or more generic operations.
- """
- for op_id, op in self._operations.items():
- if isinstance(op, GenericOperation):
- return True
- return False
- def __eq__(self, other: "Pipeline") -> bool:
- if isinstance(self, other.__class__):
- return (
- self.id == other.id
- and self.name == other.name
- and self.source == other.source
- and self.description == other.description
- and self.runtime == other.runtime
- and self.runtime_config == other.runtime_config
- and self.operations == other.operations
- )
- class KeyValueList(list):
- """
- A list class that exposes functionality specific to lists whose entries are
- key-value pairs separated by a pre-defined character.
- """
- _key_value_separator: str = "="
- def to_dict(self) -> Dict[str, str]:
- """
- Properties consisting of key-value pairs are stored in a list of separated
- strings, while most processing steps require a dictionary - so we must convert.
- If no key/value pairs are specified, an empty dictionary is returned, otherwise
- pairs are converted to dictionary entries, stripped of whitespace, and returned.
- """
- kv_dict = {}
- for kv in self:
- if not kv:
- continue
- if self._key_value_separator not in kv:
- raise ValueError(
- f"Property {kv} does not contain the expected "
- f"separator character: '{self._key_value_separator}'."
- )
- key, value = kv.split(self._key_value_separator, 1)
- key = key.strip()
- if not key:
- # Invalid entry; skip inclusion and continue
- continue
- if isinstance(value, str):
- value = value.strip()
- if not value:
- # Invalid entry; skip inclusion and continue
- continue
- kv_dict[key] = value
- return kv_dict
- @classmethod
- def to_str(cls, key: str, value: str) -> str:
- return f"{key}{cls._key_value_separator}{value}"
- @classmethod
- def from_dict(cls, kv_dict: Dict) -> "KeyValueList":
- """
- Convert a set of key-value pairs stored in a dictionary to
- a KeyValueList of strings with the defined separator.
- """
- str_list = [KeyValueList.to_str(key, value) for key, value in kv_dict.items()]
- return KeyValueList(str_list)
- @classmethod
- def merge(cls, primary: "KeyValueList", secondary: "KeyValueList") -> "KeyValueList":
- """
- Merge two key-value pair lists, preferring the values given in the
- primary parameter in the case of a matching key between the two lists.
- """
- primary_dict = primary.to_dict()
- secondary_dict = secondary.to_dict()
- return KeyValueList.from_dict({**secondary_dict, **primary_dict})
- @classmethod
- def difference(cls, minuend: "KeyValueList", subtrahend: "KeyValueList") -> "KeyValueList":
- """
- Given KeyValueLists, convert to dictionaries and remove any keys found in the
- second (subtrahend) from the first (minuend), if present.
- :param minuend: list to be subtracted from
- :param subtrahend: list whose keys will be removed from the minuend
- :returns: the difference of the two lists
- """
- subtract_dict = minuend.to_dict()
- for key in subtrahend.to_dict().keys():
- if key in subtract_dict:
- subtract_dict.pop(key)
- return KeyValueList.from_dict(subtract_dict)
- @dataclass
- class VolumeMount:
- path: str
- pvc_name: str
- @dataclass
- class KubernetesSecret:
- env_var: str
- name: str
- key: str
- @dataclass
- class KubernetesToleration:
- key: str
- operator: str
- value: str
- effect: str
- @dataclass
- class KubernetesAnnotation:
- key: str
- value: str
- class DataClassJSONEncoder(json.JSONEncoder):
- """
- A JSON Encoder class to prevent errors during serialization of dataclasses.
- """
- def default(self, o):
- """
- Render dataclass content as dict
- """
- if is_dataclass(o):
- return dataclass_asdict(o)
- return super().default(o)
|