pipeline.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from dataclasses import asdict as dataclass_asdict
  17. from dataclasses import dataclass
  18. from dataclasses import is_dataclass
  19. import json
  20. import os
  21. import sys
  22. from typing import Any
  23. from typing import Dict
  24. from typing import List
  25. from typing import Optional
  26. from elyra.pipeline.pipeline_constants import DISALLOW_CACHED_OUTPUT
  27. from elyra.pipeline.pipeline_constants import ENV_VARIABLES
  28. from elyra.pipeline.pipeline_constants import KUBERNETES_POD_ANNOTATIONS
  29. from elyra.pipeline.pipeline_constants import KUBERNETES_SECRETS
  30. from elyra.pipeline.pipeline_constants import KUBERNETES_TOLERATIONS
  31. from elyra.pipeline.pipeline_constants import MOUNTED_VOLUMES
  32. # TODO: Make pipeline version available more widely
  33. # as today is only available on the pipeline editor
  34. PIPELINE_CURRENT_VERSION = 7
  35. PIPELINE_CURRENT_SCHEMA = 3.0
  36. class Operation(object):
  37. """
  38. Represents a single operation in a pipeline representing a third-party component
  39. """
  40. generic_node_types = ["execute-notebook-node", "execute-python-node", "execute-r-node"]
  41. @classmethod
  42. def create_instance(
  43. cls,
  44. id: str,
  45. type: str,
  46. name: str,
  47. classifier: str,
  48. parent_operation_ids: Optional[List[str]] = None,
  49. component_params: Optional[Dict[str, Any]] = None,
  50. ) -> "Operation":
  51. """Class method that creates the appropriate instance of Operation based on inputs."""
  52. if Operation.is_generic_operation(classifier):
  53. return GenericOperation(
  54. id, type, name, classifier, parent_operation_ids=parent_operation_ids, component_params=component_params
  55. )
  56. return Operation(
  57. id, type, name, classifier, parent_operation_ids=parent_operation_ids, component_params=component_params
  58. )
  59. def __init__(
  60. self,
  61. id: str,
  62. type: str,
  63. name: str,
  64. classifier: str,
  65. parent_operation_ids: Optional[List[str]] = None,
  66. component_params: Optional[Dict[str, Any]] = None,
  67. ):
  68. """
  69. :param id: Generated UUID, 128 bit number used as a unique identifier
  70. e.g. 123e4567-e89b-12d3-a456-426614174000
  71. :param type: The type of node e.g. execution_node
  72. :param classifier: indicates the operation's class
  73. :param name: The name of the operation
  74. :param parent_operation_ids: List of parent operation 'ids' required to execute prior to this operation
  75. :param component_params: dictionary of parameter key:value pairs that are used in the creation of a
  76. a non-standard operation instance
  77. """
  78. # Validate that the operation has all required properties
  79. if not id:
  80. raise ValueError("Invalid pipeline operation: Missing field 'operation id'.")
  81. if not type:
  82. raise ValueError("Invalid pipeline operation: Missing field 'operation type'.")
  83. if not classifier:
  84. raise ValueError("Invalid pipeline operation: Missing field 'operation classifier'.")
  85. if not name:
  86. raise ValueError("Invalid pipeline operation: Missing field 'operation name'.")
  87. self._id = id
  88. self._type = type
  89. self._classifier = classifier
  90. self._name = name
  91. self._parent_operation_ids = parent_operation_ids or []
  92. self._component_params = component_params or {}
  93. self._doc = None
  94. self._mounted_volumes = []
  95. param_volumes = component_params.get(MOUNTED_VOLUMES)
  96. if (
  97. param_volumes is not None
  98. and isinstance(param_volumes, list)
  99. and (len(param_volumes) == 0 or isinstance(param_volumes[0], VolumeMount))
  100. ):
  101. # The mounted_volumes property is an Elyra system property (ie, not defined in the component
  102. # spec) and must be removed from the component_params dict
  103. self._mounted_volumes = self._component_params.pop(MOUNTED_VOLUMES, [])
  104. self._kubernetes_tolerations = []
  105. param_tolerations = component_params.get(KUBERNETES_TOLERATIONS)
  106. if (
  107. param_tolerations is not None
  108. and isinstance(param_tolerations, list)
  109. and (len(param_tolerations) == 0 or isinstance(param_tolerations[0], KubernetesToleration))
  110. ):
  111. # The kubernetes_tolerations property is the Elyra system property (ie, not defined in the component
  112. # spec) and must be removed from the component_params dict
  113. self._kubernetes_tolerations = self._component_params.pop(KUBERNETES_TOLERATIONS, [])
  114. self._kubernetes_pod_annotations = []
  115. param_annotations = component_params.get(KUBERNETES_POD_ANNOTATIONS)
  116. if (
  117. param_annotations is not None
  118. and isinstance(param_annotations, list)
  119. and (len(param_annotations) == 0 or isinstance(param_annotations[0], KubernetesAnnotation))
  120. ):
  121. # The kubernetes_pod_annotations property is an Elyra system property (ie, not defined in the component
  122. # spec) and must be removed from the component_params dict
  123. self._kubernetes_pod_annotations = self._component_params.pop(KUBERNETES_POD_ANNOTATIONS, [])
  124. # If disabled, this operation is requested to be re-executed in the
  125. # target runtime environment, even if it was executed before.
  126. param_disallow_cached_output = component_params.get(DISALLOW_CACHED_OUTPUT)
  127. self._disallow_cached_output = param_disallow_cached_output
  128. # Scrub the inputs and outputs lists
  129. self._component_params["inputs"] = Operation._scrub_list(component_params.get("inputs", []))
  130. self._component_params["outputs"] = Operation._scrub_list(component_params.get("outputs", []))
  131. @property
  132. def id(self) -> str:
  133. return self._id
  134. @property
  135. def type(self) -> str:
  136. return self._type
  137. @property
  138. def classifier(self) -> str:
  139. return self._classifier
  140. @property
  141. def name(self) -> str:
  142. return self._name
  143. @name.setter
  144. def name(self, value: str):
  145. self._name = value
  146. @property
  147. def doc(self) -> str:
  148. return self._doc
  149. @doc.setter
  150. def doc(self, value: str):
  151. self._doc = value
  152. @property
  153. def parent_operation_ids(self) -> List[str]:
  154. return self._parent_operation_ids
  155. @property
  156. def component_params(self) -> Optional[Dict[str, Any]]:
  157. return self._component_params
  158. @property
  159. def component_params_as_dict(self) -> Dict[str, Any]:
  160. return self._component_params or {}
  161. @property
  162. def mounted_volumes(self) -> List["VolumeMount"]:
  163. return self._mounted_volumes
  164. @property
  165. def kubernetes_tolerations(self) -> List["KubernetesToleration"]:
  166. return self._kubernetes_tolerations
  167. @property
  168. def kubernetes_pod_annotations(self) -> List["KubernetesAnnotation"]:
  169. return self._kubernetes_pod_annotations
  170. @property
  171. def disallow_cached_output(self) -> Optional[bool]:
  172. """
  173. Returns None if caching behavior is delegated to the runtime
  174. Returns True if cached output may be used (instead of executing the op to produce it)
  175. Returns False if cached output must not be used (instead of executing the op to produce it)
  176. """
  177. return self._disallow_cached_output
  178. @property
  179. def inputs(self) -> Optional[List[str]]:
  180. return self._component_params.get("inputs")
  181. @inputs.setter
  182. def inputs(self, value: List[str]):
  183. self._component_params["inputs"] = value
  184. @property
  185. def outputs(self) -> Optional[List[str]]:
  186. return self._component_params.get("outputs")
  187. @property
  188. def is_generic(self) -> bool:
  189. return isinstance(self, GenericOperation)
  190. @outputs.setter
  191. def outputs(self, value: List[str]):
  192. self._component_params["outputs"] = value
  193. def __eq__(self, other: "Operation") -> bool:
  194. if isinstance(self, other.__class__):
  195. return (
  196. self.id == other.id
  197. and self.type == other.type
  198. and self.classifier == other.classifier
  199. and self.name == other.name
  200. and self.parent_operation_ids == other.parent_operation_ids
  201. and self.component_params == other.component_params
  202. )
  203. return False
  204. def __str__(self) -> str:
  205. params = ""
  206. for key, value in self.component_params_as_dict.items():
  207. params += f"\t{key}: {value}, \n"
  208. return (
  209. f"componentID : {self.id} \n "
  210. f"name : {self.name} \n "
  211. f"parent_operation_ids : {self.parent_operation_ids} \n "
  212. f"component_parameters: {{\n{params}}} \n"
  213. )
  214. @staticmethod
  215. def _scrub_list(dirty: Optional[List[Optional[str]]]) -> List[str]:
  216. """
  217. Clean an existing list by filtering out None and empty string values
  218. :param dirty: a List of values
  219. :return: a clean list without None or empty string values
  220. """
  221. if not dirty:
  222. return []
  223. return [clean for clean in dirty if clean]
  224. @staticmethod
  225. def is_generic_operation(operation_classifier) -> bool:
  226. return operation_classifier in Operation.generic_node_types
  227. class GenericOperation(Operation):
  228. """
  229. Represents a single operation in a pipeline representing a generic (built-in) component
  230. """
  231. def __init__(
  232. self,
  233. id: str,
  234. type: str,
  235. name: str,
  236. classifier: str,
  237. parent_operation_ids: Optional[List[str]] = None,
  238. component_params: Optional[Dict[str, Any]] = None,
  239. ):
  240. """
  241. :param id: Generated UUID, 128 bit number used as a unique identifier
  242. e.g. 123e4567-e89b-12d3-a456-426614174000
  243. :param type: The type of node e.g. execution_node
  244. :param classifier: indicates the operation's class
  245. :param name: The name of the operation
  246. :param parent_operation_ids: List of parent operation 'ids' required to execute prior to this operation
  247. :param component_params: dictionary of parameter key:value pairs that are used in the creation of a
  248. a non-standard operation instance
  249. Component_params for "generic components" (i.e., those with one of the following classifier values:
  250. ["execute-notebook-node", "execute-python-node", "execute-r-node"]) can expect to have the following
  251. entries.
  252. filename: The relative path to the source file in the users local environment
  253. to be executed e.g. path/to/file.ext
  254. runtime_image: The DockerHub image to be used for the operation
  255. e.g. user/docker_image_name:tag
  256. dependencies: List of local files/directories needed for the operation to run
  257. and packaged into each operation's dependency archive
  258. include_subdirectories: Include or Exclude subdirectories when packaging our 'dependencies'
  259. env_vars: List of Environmental variables to set in the container image
  260. e.g. FOO="BAR"
  261. inputs: List of files to be consumed by this operation, produced by parent operation(s)
  262. outputs: List of files produced by this operation to be included in a child operation(s)
  263. cpu: number of cpus requested to run the operation
  264. memory: amount of memory requested to run the operation (in Gi)
  265. gpu: number of gpus requested to run the operation
  266. Entries for other (non-built-in) component types are a function of the respective component.
  267. """
  268. super().__init__(
  269. id, type, name, classifier, parent_operation_ids=parent_operation_ids, component_params=component_params
  270. )
  271. if not component_params.get("filename"):
  272. raise ValueError("Invalid pipeline operation: Missing field 'operation filename'.")
  273. if not component_params.get("runtime_image"):
  274. raise ValueError("Invalid pipeline operation: Missing field 'operation runtime image'.")
  275. if component_params.get("cpu") and not self._validate_range(component_params.get("cpu"), min_value=1):
  276. raise ValueError("Invalid pipeline operation: CPU must be a positive value or None")
  277. if component_params.get("gpu") and not self._validate_range(component_params.get("gpu"), min_value=0):
  278. raise ValueError("Invalid pipeline operation: GPU must be a positive value or None")
  279. if component_params.get("memory") and not self._validate_range(component_params.get("memory"), min_value=1):
  280. raise ValueError("Invalid pipeline operation: Memory must be a positive value or None")
  281. # Re-build object to include default values
  282. self._component_params["filename"] = component_params.get("filename")
  283. self._component_params["runtime_image"] = component_params.get("runtime_image")
  284. self._component_params["dependencies"] = Operation._scrub_list(component_params.get("dependencies", []))
  285. self._component_params["include_subdirectories"] = component_params.get("include_subdirectories", False)
  286. self._component_params["env_vars"] = KeyValueList(Operation._scrub_list(component_params.get("env_vars", [])))
  287. self._component_params["cpu"] = component_params.get("cpu")
  288. self._component_params["gpu"] = component_params.get("gpu")
  289. self._component_params["memory"] = component_params.get("memory")
  290. @property
  291. def name(self) -> str:
  292. if self._name == os.path.basename(self.filename):
  293. self._name = os.path.basename(self._name).split(".")[0]
  294. return self._name
  295. @name.setter
  296. def name(self, value):
  297. self._name = value
  298. @property
  299. def filename(self) -> str:
  300. return self._component_params.get("filename")
  301. @property
  302. def runtime_image(self) -> str:
  303. return self._component_params.get("runtime_image")
  304. @property
  305. def dependencies(self) -> Optional[List[str]]:
  306. return self._component_params.get("dependencies")
  307. @property
  308. def include_subdirectories(self) -> Optional[bool]:
  309. return self._component_params.get("include_subdirectories")
  310. @property
  311. def env_vars(self) -> Optional["KeyValueList"]:
  312. return self._component_params.get(ENV_VARIABLES)
  313. @property
  314. def cpu(self) -> Optional[str]:
  315. return self._component_params.get("cpu")
  316. @property
  317. def memory(self) -> Optional[str]:
  318. return self._component_params.get("memory")
  319. @property
  320. def gpu(self) -> Optional[str]:
  321. return self._component_params.get("gpu")
  322. @property
  323. def kubernetes_secrets(self) -> List["KubernetesSecret"]:
  324. return self._component_params.get(KUBERNETES_SECRETS)
  325. def __eq__(self, other: "GenericOperation") -> bool:
  326. if isinstance(self, other.__class__):
  327. return super().__eq__(other)
  328. return False
  329. def _validate_range(self, value: str, min_value: int = 0, max_value: int = sys.maxsize) -> bool:
  330. return int(value) in range(min_value, max_value)
  331. class Pipeline(object):
  332. """
  333. Represents a single pipeline constructed in the pipeline editor
  334. """
  335. def __init__(
  336. self,
  337. id: str,
  338. name: str,
  339. runtime: str,
  340. runtime_config: str,
  341. source: Optional[str] = None,
  342. description: Optional[str] = None,
  343. pipeline_parameters: Optional[Dict[str, Any]] = None,
  344. ):
  345. """
  346. :param id: Generated UUID, 128 bit number used as a unique identifier
  347. e.g. 123e4567-e89b-12d3-a456-426614174000
  348. :param name: Pipeline name, e.g. test-pipeline-123456
  349. :param runtime: Type of runtime we want to use to execute our pipeline, e.g. kfp OR airflow
  350. :param runtime_config: Runtime configuration that should be used to submit the pipeline to execution
  351. :param source: The pipeline source, e.g. a pipeline file or a notebook.
  352. :param description: Pipeline description
  353. :param pipeline_parameters: Key/value pairs representing the parameters of this pipeline
  354. """
  355. if not name:
  356. raise ValueError("Invalid pipeline: Missing pipeline name.")
  357. if not runtime:
  358. raise ValueError("Invalid pipeline: Missing runtime.")
  359. if not runtime_config:
  360. raise ValueError("Invalid pipeline: Missing runtime configuration.")
  361. self._id = id
  362. self._name = name
  363. self._description = description
  364. self._source = source
  365. self._runtime = runtime
  366. self._runtime_config = runtime_config
  367. self._pipeline_parameters = pipeline_parameters or {}
  368. self._operations = {}
  369. @property
  370. def id(self) -> str:
  371. return self._id
  372. @property
  373. def name(self) -> str:
  374. return self._name
  375. @property
  376. def source(self) -> str:
  377. return self._source
  378. @property
  379. def runtime(self) -> str:
  380. """
  381. The runtime processor name that will execute the pipeline
  382. """
  383. return self._runtime
  384. @property
  385. def runtime_config(self) -> str:
  386. """
  387. The runtime configuration that should be used to submit the pipeline for execution
  388. """
  389. return self._runtime_config
  390. @property
  391. def pipeline_parameters(self) -> Dict[str, Any]:
  392. """
  393. The dictionary of global parameters associated with each node of the pipeline
  394. """
  395. return self._pipeline_parameters
  396. @property
  397. def operations(self) -> Dict[str, Operation]:
  398. return self._operations
  399. @property
  400. def description(self) -> Optional[str]:
  401. """
  402. Pipeline description
  403. """
  404. return self._description
  405. def contains_generic_operations(self) -> bool:
  406. """
  407. Returns a truthy value indicating whether the pipeline contains
  408. one or more generic operations.
  409. """
  410. for op_id, op in self._operations.items():
  411. if isinstance(op, GenericOperation):
  412. return True
  413. return False
  414. def __eq__(self, other: "Pipeline") -> bool:
  415. if isinstance(self, other.__class__):
  416. return (
  417. self.id == other.id
  418. and self.name == other.name
  419. and self.source == other.source
  420. and self.description == other.description
  421. and self.runtime == other.runtime
  422. and self.runtime_config == other.runtime_config
  423. and self.operations == other.operations
  424. )
  425. class KeyValueList(list):
  426. """
  427. A list class that exposes functionality specific to lists whose entries are
  428. key-value pairs separated by a pre-defined character.
  429. """
  430. _key_value_separator: str = "="
  431. def to_dict(self) -> Dict[str, str]:
  432. """
  433. Properties consisting of key-value pairs are stored in a list of separated
  434. strings, while most processing steps require a dictionary - so we must convert.
  435. If no key/value pairs are specified, an empty dictionary is returned, otherwise
  436. pairs are converted to dictionary entries, stripped of whitespace, and returned.
  437. """
  438. kv_dict = {}
  439. for kv in self:
  440. if not kv:
  441. continue
  442. if self._key_value_separator not in kv:
  443. raise ValueError(
  444. f"Property {kv} does not contain the expected "
  445. f"separator character: '{self._key_value_separator}'."
  446. )
  447. key, value = kv.split(self._key_value_separator, 1)
  448. key = key.strip()
  449. if not key:
  450. # Invalid entry; skip inclusion and continue
  451. continue
  452. if isinstance(value, str):
  453. value = value.strip()
  454. if not value:
  455. # Invalid entry; skip inclusion and continue
  456. continue
  457. kv_dict[key] = value
  458. return kv_dict
  459. @classmethod
  460. def to_str(cls, key: str, value: str) -> str:
  461. return f"{key}{cls._key_value_separator}{value}"
  462. @classmethod
  463. def from_dict(cls, kv_dict: Dict) -> "KeyValueList":
  464. """
  465. Convert a set of key-value pairs stored in a dictionary to
  466. a KeyValueList of strings with the defined separator.
  467. """
  468. str_list = [KeyValueList.to_str(key, value) for key, value in kv_dict.items()]
  469. return KeyValueList(str_list)
  470. @classmethod
  471. def merge(cls, primary: "KeyValueList", secondary: "KeyValueList") -> "KeyValueList":
  472. """
  473. Merge two key-value pair lists, preferring the values given in the
  474. primary parameter in the case of a matching key between the two lists.
  475. """
  476. primary_dict = primary.to_dict()
  477. secondary_dict = secondary.to_dict()
  478. return KeyValueList.from_dict({**secondary_dict, **primary_dict})
  479. @classmethod
  480. def difference(cls, minuend: "KeyValueList", subtrahend: "KeyValueList") -> "KeyValueList":
  481. """
  482. Given KeyValueLists, convert to dictionaries and remove any keys found in the
  483. second (subtrahend) from the first (minuend), if present.
  484. :param minuend: list to be subtracted from
  485. :param subtrahend: list whose keys will be removed from the minuend
  486. :returns: the difference of the two lists
  487. """
  488. subtract_dict = minuend.to_dict()
  489. for key in subtrahend.to_dict().keys():
  490. if key in subtract_dict:
  491. subtract_dict.pop(key)
  492. return KeyValueList.from_dict(subtract_dict)
  493. @dataclass
  494. class VolumeMount:
  495. path: str
  496. pvc_name: str
  497. @dataclass
  498. class KubernetesSecret:
  499. env_var: str
  500. name: str
  501. key: str
  502. @dataclass
  503. class KubernetesToleration:
  504. key: str
  505. operator: str
  506. value: str
  507. effect: str
  508. @dataclass
  509. class KubernetesAnnotation:
  510. key: str
  511. value: str
  512. class DataClassJSONEncoder(json.JSONEncoder):
  513. """
  514. A JSON Encoder class to prevent errors during serialization of dataclasses.
  515. """
  516. def default(self, o):
  517. """
  518. Render dataclass content as dict
  519. """
  520. if is_dataclass(o):
  521. return dataclass_asdict(o)
  522. return super().default(o)