# # Copyright 2018-2022 Elyra Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from dataclasses import asdict as dataclass_asdict from dataclasses import dataclass from dataclasses import is_dataclass import json import os import sys from typing import Any from typing import Dict from typing import List from typing import Optional from elyra.pipeline.pipeline_constants import DISALLOW_CACHED_OUTPUT from elyra.pipeline.pipeline_constants import ENV_VARIABLES from elyra.pipeline.pipeline_constants import KUBERNETES_POD_ANNOTATIONS from elyra.pipeline.pipeline_constants import KUBERNETES_SECRETS from elyra.pipeline.pipeline_constants import KUBERNETES_TOLERATIONS from elyra.pipeline.pipeline_constants import MOUNTED_VOLUMES # TODO: Make pipeline version available more widely # as today is only available on the pipeline editor PIPELINE_CURRENT_VERSION = 7 PIPELINE_CURRENT_SCHEMA = 3.0 class Operation(object): """ Represents a single operation in a pipeline representing a third-party component """ generic_node_types = ["execute-notebook-node", "execute-python-node", "execute-r-node"] @classmethod def create_instance( cls, id: str, type: str, name: str, classifier: str, parent_operation_ids: Optional[List[str]] = None, component_params: Optional[Dict[str, Any]] = None, ) -> "Operation": """Class method that creates the appropriate instance of Operation based on inputs.""" if Operation.is_generic_operation(classifier): return GenericOperation( id, type, name, classifier, parent_operation_ids=parent_operation_ids, component_params=component_params ) return Operation( id, type, name, classifier, parent_operation_ids=parent_operation_ids, component_params=component_params ) def __init__( self, id: str, type: str, name: str, classifier: str, parent_operation_ids: Optional[List[str]] = None, component_params: Optional[Dict[str, Any]] = None, ): """ :param id: Generated UUID, 128 bit number used as a unique identifier e.g. 123e4567-e89b-12d3-a456-426614174000 :param type: The type of node e.g. execution_node :param classifier: indicates the operation's class :param name: The name of the operation :param parent_operation_ids: List of parent operation 'ids' required to execute prior to this operation :param component_params: dictionary of parameter key:value pairs that are used in the creation of a a non-standard operation instance """ # Validate that the operation has all required properties if not id: raise ValueError("Invalid pipeline operation: Missing field 'operation id'.") if not type: raise ValueError("Invalid pipeline operation: Missing field 'operation type'.") if not classifier: raise ValueError("Invalid pipeline operation: Missing field 'operation classifier'.") if not name: raise ValueError("Invalid pipeline operation: Missing field 'operation name'.") self._id = id self._type = type self._classifier = classifier self._name = name self._parent_operation_ids = parent_operation_ids or [] self._component_params = component_params or {} self._doc = None self._mounted_volumes = [] param_volumes = component_params.get(MOUNTED_VOLUMES) if ( param_volumes is not None and isinstance(param_volumes, list) and (len(param_volumes) == 0 or isinstance(param_volumes[0], VolumeMount)) ): # The mounted_volumes property is an Elyra system property (ie, not defined in the component # spec) and must be removed from the component_params dict self._mounted_volumes = self._component_params.pop(MOUNTED_VOLUMES, []) self._kubernetes_tolerations = [] param_tolerations = component_params.get(KUBERNETES_TOLERATIONS) if ( param_tolerations is not None and isinstance(param_tolerations, list) and (len(param_tolerations) == 0 or isinstance(param_tolerations[0], KubernetesToleration)) ): # The kubernetes_tolerations property is the Elyra system property (ie, not defined in the component # spec) and must be removed from the component_params dict self._kubernetes_tolerations = self._component_params.pop(KUBERNETES_TOLERATIONS, []) self._kubernetes_pod_annotations = [] param_annotations = component_params.get(KUBERNETES_POD_ANNOTATIONS) if ( param_annotations is not None and isinstance(param_annotations, list) and (len(param_annotations) == 0 or isinstance(param_annotations[0], KubernetesAnnotation)) ): # The kubernetes_pod_annotations property is an Elyra system property (ie, not defined in the component # spec) and must be removed from the component_params dict self._kubernetes_pod_annotations = self._component_params.pop(KUBERNETES_POD_ANNOTATIONS, []) # If disabled, this operation is requested to be re-executed in the # target runtime environment, even if it was executed before. param_disallow_cached_output = component_params.get(DISALLOW_CACHED_OUTPUT) self._disallow_cached_output = param_disallow_cached_output # Scrub the inputs and outputs lists self._component_params["inputs"] = Operation._scrub_list(component_params.get("inputs", [])) self._component_params["outputs"] = Operation._scrub_list(component_params.get("outputs", [])) @property def id(self) -> str: return self._id @property def type(self) -> str: return self._type @property def classifier(self) -> str: return self._classifier @property def name(self) -> str: return self._name @name.setter def name(self, value: str): self._name = value @property def doc(self) -> str: return self._doc @doc.setter def doc(self, value: str): self._doc = value @property def parent_operation_ids(self) -> List[str]: return self._parent_operation_ids @property def component_params(self) -> Optional[Dict[str, Any]]: return self._component_params @property def component_params_as_dict(self) -> Dict[str, Any]: return self._component_params or {} @property def mounted_volumes(self) -> List["VolumeMount"]: return self._mounted_volumes @property def kubernetes_tolerations(self) -> List["KubernetesToleration"]: return self._kubernetes_tolerations @property def kubernetes_pod_annotations(self) -> List["KubernetesAnnotation"]: return self._kubernetes_pod_annotations @property def disallow_cached_output(self) -> Optional[bool]: """ Returns None if caching behavior is delegated to the runtime Returns True if cached output may be used (instead of executing the op to produce it) Returns False if cached output must not be used (instead of executing the op to produce it) """ return self._disallow_cached_output @property def inputs(self) -> Optional[List[str]]: return self._component_params.get("inputs") @inputs.setter def inputs(self, value: List[str]): self._component_params["inputs"] = value @property def outputs(self) -> Optional[List[str]]: return self._component_params.get("outputs") @property def is_generic(self) -> bool: return isinstance(self, GenericOperation) @outputs.setter def outputs(self, value: List[str]): self._component_params["outputs"] = value def __eq__(self, other: "Operation") -> bool: if isinstance(self, other.__class__): return ( self.id == other.id and self.type == other.type and self.classifier == other.classifier and self.name == other.name and self.parent_operation_ids == other.parent_operation_ids and self.component_params == other.component_params ) return False def __str__(self) -> str: params = "" for key, value in self.component_params_as_dict.items(): params += f"\t{key}: {value}, \n" return ( f"componentID : {self.id} \n " f"name : {self.name} \n " f"parent_operation_ids : {self.parent_operation_ids} \n " f"component_parameters: {{\n{params}}} \n" ) @staticmethod def _scrub_list(dirty: Optional[List[Optional[str]]]) -> List[str]: """ Clean an existing list by filtering out None and empty string values :param dirty: a List of values :return: a clean list without None or empty string values """ if not dirty: return [] return [clean for clean in dirty if clean] @staticmethod def is_generic_operation(operation_classifier) -> bool: return operation_classifier in Operation.generic_node_types class GenericOperation(Operation): """ Represents a single operation in a pipeline representing a generic (built-in) component """ def __init__( self, id: str, type: str, name: str, classifier: str, parent_operation_ids: Optional[List[str]] = None, component_params: Optional[Dict[str, Any]] = None, ): """ :param id: Generated UUID, 128 bit number used as a unique identifier e.g. 123e4567-e89b-12d3-a456-426614174000 :param type: The type of node e.g. execution_node :param classifier: indicates the operation's class :param name: The name of the operation :param parent_operation_ids: List of parent operation 'ids' required to execute prior to this operation :param component_params: dictionary of parameter key:value pairs that are used in the creation of a a non-standard operation instance Component_params for "generic components" (i.e., those with one of the following classifier values: ["execute-notebook-node", "execute-python-node", "execute-r-node"]) can expect to have the following entries. filename: The relative path to the source file in the users local environment to be executed e.g. path/to/file.ext runtime_image: The DockerHub image to be used for the operation e.g. user/docker_image_name:tag dependencies: List of local files/directories needed for the operation to run and packaged into each operation's dependency archive include_subdirectories: Include or Exclude subdirectories when packaging our 'dependencies' env_vars: List of Environmental variables to set in the container image e.g. FOO="BAR" inputs: List of files to be consumed by this operation, produced by parent operation(s) outputs: List of files produced by this operation to be included in a child operation(s) cpu: number of cpus requested to run the operation memory: amount of memory requested to run the operation (in Gi) gpu: number of gpus requested to run the operation Entries for other (non-built-in) component types are a function of the respective component. """ super().__init__( id, type, name, classifier, parent_operation_ids=parent_operation_ids, component_params=component_params ) if not component_params.get("filename"): raise ValueError("Invalid pipeline operation: Missing field 'operation filename'.") if not component_params.get("runtime_image"): raise ValueError("Invalid pipeline operation: Missing field 'operation runtime image'.") if component_params.get("cpu") and not self._validate_range(component_params.get("cpu"), min_value=1): raise ValueError("Invalid pipeline operation: CPU must be a positive value or None") if component_params.get("gpu") and not self._validate_range(component_params.get("gpu"), min_value=0): raise ValueError("Invalid pipeline operation: GPU must be a positive value or None") if component_params.get("memory") and not self._validate_range(component_params.get("memory"), min_value=1): raise ValueError("Invalid pipeline operation: Memory must be a positive value or None") # Re-build object to include default values self._component_params["filename"] = component_params.get("filename") self._component_params["runtime_image"] = component_params.get("runtime_image") self._component_params["dependencies"] = Operation._scrub_list(component_params.get("dependencies", [])) self._component_params["include_subdirectories"] = component_params.get("include_subdirectories", False) self._component_params["env_vars"] = KeyValueList(Operation._scrub_list(component_params.get("env_vars", []))) self._component_params["cpu"] = component_params.get("cpu") self._component_params["gpu"] = component_params.get("gpu") self._component_params["memory"] = component_params.get("memory") @property def name(self) -> str: if self._name == os.path.basename(self.filename): self._name = os.path.basename(self._name).split(".")[0] return self._name @name.setter def name(self, value): self._name = value @property def filename(self) -> str: return self._component_params.get("filename") @property def runtime_image(self) -> str: return self._component_params.get("runtime_image") @property def dependencies(self) -> Optional[List[str]]: return self._component_params.get("dependencies") @property def include_subdirectories(self) -> Optional[bool]: return self._component_params.get("include_subdirectories") @property def env_vars(self) -> Optional["KeyValueList"]: return self._component_params.get(ENV_VARIABLES) @property def cpu(self) -> Optional[str]: return self._component_params.get("cpu") @property def memory(self) -> Optional[str]: return self._component_params.get("memory") @property def gpu(self) -> Optional[str]: return self._component_params.get("gpu") @property def kubernetes_secrets(self) -> List["KubernetesSecret"]: return self._component_params.get(KUBERNETES_SECRETS) def __eq__(self, other: "GenericOperation") -> bool: if isinstance(self, other.__class__): return super().__eq__(other) return False def _validate_range(self, value: str, min_value: int = 0, max_value: int = sys.maxsize) -> bool: return int(value) in range(min_value, max_value) class Pipeline(object): """ Represents a single pipeline constructed in the pipeline editor """ def __init__( self, id: str, name: str, runtime: str, runtime_config: str, source: Optional[str] = None, description: Optional[str] = None, pipeline_parameters: Optional[Dict[str, Any]] = None, ): """ :param id: Generated UUID, 128 bit number used as a unique identifier e.g. 123e4567-e89b-12d3-a456-426614174000 :param name: Pipeline name, e.g. test-pipeline-123456 :param runtime: Type of runtime we want to use to execute our pipeline, e.g. kfp OR airflow :param runtime_config: Runtime configuration that should be used to submit the pipeline to execution :param source: The pipeline source, e.g. a pipeline file or a notebook. :param description: Pipeline description :param pipeline_parameters: Key/value pairs representing the parameters of this pipeline """ if not name: raise ValueError("Invalid pipeline: Missing pipeline name.") if not runtime: raise ValueError("Invalid pipeline: Missing runtime.") if not runtime_config: raise ValueError("Invalid pipeline: Missing runtime configuration.") self._id = id self._name = name self._description = description self._source = source self._runtime = runtime self._runtime_config = runtime_config self._pipeline_parameters = pipeline_parameters or {} self._operations = {} @property def id(self) -> str: return self._id @property def name(self) -> str: return self._name @property def source(self) -> str: return self._source @property def runtime(self) -> str: """ The runtime processor name that will execute the pipeline """ return self._runtime @property def runtime_config(self) -> str: """ The runtime configuration that should be used to submit the pipeline for execution """ return self._runtime_config @property def pipeline_parameters(self) -> Dict[str, Any]: """ The dictionary of global parameters associated with each node of the pipeline """ return self._pipeline_parameters @property def operations(self) -> Dict[str, Operation]: return self._operations @property def description(self) -> Optional[str]: """ Pipeline description """ return self._description def contains_generic_operations(self) -> bool: """ Returns a truthy value indicating whether the pipeline contains one or more generic operations. """ for op_id, op in self._operations.items(): if isinstance(op, GenericOperation): return True return False def __eq__(self, other: "Pipeline") -> bool: if isinstance(self, other.__class__): return ( self.id == other.id and self.name == other.name and self.source == other.source and self.description == other.description and self.runtime == other.runtime and self.runtime_config == other.runtime_config and self.operations == other.operations ) class KeyValueList(list): """ A list class that exposes functionality specific to lists whose entries are key-value pairs separated by a pre-defined character. """ _key_value_separator: str = "=" def to_dict(self) -> Dict[str, str]: """ Properties consisting of key-value pairs are stored in a list of separated strings, while most processing steps require a dictionary - so we must convert. If no key/value pairs are specified, an empty dictionary is returned, otherwise pairs are converted to dictionary entries, stripped of whitespace, and returned. """ kv_dict = {} for kv in self: if not kv: continue if self._key_value_separator not in kv: raise ValueError( f"Property {kv} does not contain the expected " f"separator character: '{self._key_value_separator}'." ) key, value = kv.split(self._key_value_separator, 1) key = key.strip() if not key: # Invalid entry; skip inclusion and continue continue if isinstance(value, str): value = value.strip() if not value: # Invalid entry; skip inclusion and continue continue kv_dict[key] = value return kv_dict @classmethod def to_str(cls, key: str, value: str) -> str: return f"{key}{cls._key_value_separator}{value}" @classmethod def from_dict(cls, kv_dict: Dict) -> "KeyValueList": """ Convert a set of key-value pairs stored in a dictionary to a KeyValueList of strings with the defined separator. """ str_list = [KeyValueList.to_str(key, value) for key, value in kv_dict.items()] return KeyValueList(str_list) @classmethod def merge(cls, primary: "KeyValueList", secondary: "KeyValueList") -> "KeyValueList": """ Merge two key-value pair lists, preferring the values given in the primary parameter in the case of a matching key between the two lists. """ primary_dict = primary.to_dict() secondary_dict = secondary.to_dict() return KeyValueList.from_dict({**secondary_dict, **primary_dict}) @classmethod def difference(cls, minuend: "KeyValueList", subtrahend: "KeyValueList") -> "KeyValueList": """ Given KeyValueLists, convert to dictionaries and remove any keys found in the second (subtrahend) from the first (minuend), if present. :param minuend: list to be subtracted from :param subtrahend: list whose keys will be removed from the minuend :returns: the difference of the two lists """ subtract_dict = minuend.to_dict() for key in subtrahend.to_dict().keys(): if key in subtract_dict: subtract_dict.pop(key) return KeyValueList.from_dict(subtract_dict) @dataclass class VolumeMount: path: str pvc_name: str @dataclass class KubernetesSecret: env_var: str name: str key: str @dataclass class KubernetesToleration: key: str operator: str value: str effect: str @dataclass class KubernetesAnnotation: key: str value: str class DataClassJSONEncoder(json.JSONEncoder): """ A JSON Encoder class to prevent errors during serialization of dataclasses. """ def default(self, o): """ Render dataclass content as dict """ if is_dataclass(o): return dataclass_asdict(o) return super().default(o)