pipeline.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from dataclasses import asdict as dataclass_asdict
  17. from dataclasses import dataclass
  18. from dataclasses import is_dataclass
  19. import json
  20. import os
  21. import sys
  22. from typing import Any
  23. from typing import Dict
  24. from typing import List
  25. from typing import Optional
  26. from elyra.pipeline.pipeline_constants import ENV_VARIABLES
  27. # TODO: Make pipeline version available more widely
  28. # as today is only available on the pipeline editor
  29. PIPELINE_CURRENT_VERSION = 7
  30. PIPELINE_CURRENT_SCHEMA = 3.0
  31. class Operation(object):
  32. """
  33. Represents a single operation in a pipeline representing a third-party component
  34. """
  35. generic_node_types = ["execute-notebook-node", "execute-python-node", "execute-r-node"]
  36. @classmethod
  37. def create_instance(
  38. cls,
  39. id: str,
  40. type: str,
  41. name: str,
  42. classifier: str,
  43. parent_operation_ids: Optional[List[str]] = None,
  44. component_params: Optional[Dict[str, Any]] = None,
  45. ) -> "Operation":
  46. """Class method that creates the appropriate instance of Operation based on inputs."""
  47. if Operation.is_generic_operation(classifier):
  48. return GenericOperation(
  49. id, type, name, classifier, parent_operation_ids=parent_operation_ids, component_params=component_params
  50. )
  51. return Operation(
  52. id, type, name, classifier, parent_operation_ids=parent_operation_ids, component_params=component_params
  53. )
  54. def __init__(
  55. self,
  56. id: str,
  57. type: str,
  58. name: str,
  59. classifier: str,
  60. parent_operation_ids: Optional[List[str]] = None,
  61. component_params: Optional[Dict[str, Any]] = None,
  62. ):
  63. """
  64. :param id: Generated UUID, 128 bit number used as a unique identifier
  65. e.g. 123e4567-e89b-12d3-a456-426614174000
  66. :param type: The type of node e.g. execution_node
  67. :param classifier: indicates the operation's class
  68. :param name: The name of the operation
  69. :param parent_operation_ids: List of parent operation 'ids' required to execute prior to this operation
  70. :param component_params: dictionary of parameter key:value pairs that are used in the creation of a
  71. a non-standard operation instance
  72. """
  73. # Validate that the operation has all required properties
  74. if not id:
  75. raise ValueError("Invalid pipeline operation: Missing field 'operation id'.")
  76. if not type:
  77. raise ValueError("Invalid pipeline operation: Missing field 'operation type'.")
  78. if not classifier:
  79. raise ValueError("Invalid pipeline operation: Missing field 'operation classifier'.")
  80. if not name:
  81. raise ValueError("Invalid pipeline operation: Missing field 'operation name'.")
  82. self._id = id
  83. self._type = type
  84. self._classifier = classifier
  85. self._name = name
  86. self._parent_operation_ids = parent_operation_ids or []
  87. self._component_params = component_params or {}
  88. self._doc = None
  89. # Scrub the inputs and outputs lists
  90. self._component_params["inputs"] = Operation._scrub_list(component_params.get("inputs", []))
  91. self._component_params["outputs"] = Operation._scrub_list(component_params.get("outputs", []))
  92. @property
  93. def id(self) -> str:
  94. return self._id
  95. @property
  96. def type(self) -> str:
  97. return self._type
  98. @property
  99. def classifier(self) -> str:
  100. return self._classifier
  101. @property
  102. def name(self) -> str:
  103. return self._name
  104. @name.setter
  105. def name(self, value: str):
  106. self._name = value
  107. @property
  108. def doc(self) -> str:
  109. return self._doc
  110. @doc.setter
  111. def doc(self, value: str):
  112. self._doc = value
  113. @property
  114. def parent_operation_ids(self) -> List[str]:
  115. return self._parent_operation_ids
  116. @property
  117. def component_params(self) -> Optional[Dict[str, Any]]:
  118. return self._component_params
  119. @property
  120. def component_params_as_dict(self) -> Dict[str, Any]:
  121. return self._component_params or {}
  122. @property
  123. def inputs(self) -> Optional[List[str]]:
  124. return self._component_params.get("inputs")
  125. @inputs.setter
  126. def inputs(self, value: List[str]):
  127. self._component_params["inputs"] = value
  128. @property
  129. def outputs(self) -> Optional[List[str]]:
  130. return self._component_params.get("outputs")
  131. @outputs.setter
  132. def outputs(self, value: List[str]):
  133. self._component_params["outputs"] = value
  134. def __eq__(self, other: "Operation") -> bool:
  135. if isinstance(self, other.__class__):
  136. return (
  137. self.id == other.id
  138. and self.type == other.type
  139. and self.classifier == other.classifier
  140. and self.name == other.name
  141. and self.parent_operation_ids == other.parent_operation_ids
  142. and self.component_params == other.component_params
  143. )
  144. return False
  145. def __str__(self) -> str:
  146. params = ""
  147. for key, value in self.component_params_as_dict.items():
  148. params += f"\t{key}: {value}, \n"
  149. return (
  150. f"componentID : {self.id} \n "
  151. f"name : {self.name} \n "
  152. f"parent_operation_ids : {self.parent_operation_ids} \n "
  153. f"component_parameters: {{\n{params}}} \n"
  154. )
  155. @staticmethod
  156. def _scrub_list(dirty: Optional[List[Optional[str]]]) -> List[str]:
  157. """
  158. Clean an existing list by filtering out None and empty string values
  159. :param dirty: a List of values
  160. :return: a clean list without None or empty string values
  161. """
  162. if not dirty:
  163. return []
  164. return [clean for clean in dirty if clean]
  165. @staticmethod
  166. def is_generic_operation(operation_classifier) -> bool:
  167. return operation_classifier in Operation.generic_node_types
  168. class GenericOperation(Operation):
  169. """
  170. Represents a single operation in a pipeline representing a generic (built-in) component
  171. """
  172. def __init__(
  173. self,
  174. id: str,
  175. type: str,
  176. name: str,
  177. classifier: str,
  178. parent_operation_ids: Optional[List[str]] = None,
  179. component_params: Optional[Dict[str, Any]] = None,
  180. ):
  181. """
  182. :param id: Generated UUID, 128 bit number used as a unique identifier
  183. e.g. 123e4567-e89b-12d3-a456-426614174000
  184. :param type: The type of node e.g. execution_node
  185. :param classifier: indicates the operation's class
  186. :param name: The name of the operation
  187. :param parent_operation_ids: List of parent operation 'ids' required to execute prior to this operation
  188. :param component_params: dictionary of parameter key:value pairs that are used in the creation of a
  189. a non-standard operation instance
  190. Component_params for "generic components" (i.e., those with one of the following classifier values:
  191. ["execute-notebook-node", "execute-python-node", "execute-r-node"]) can expect to have the following
  192. entries.
  193. filename: The relative path to the source file in the users local environment
  194. to be executed e.g. path/to/file.ext
  195. runtime_image: The DockerHub image to be used for the operation
  196. e.g. user/docker_image_name:tag
  197. dependencies: List of local files/directories needed for the operation to run
  198. and packaged into each operation's dependency archive
  199. include_subdirectories: Include or Exclude subdirectories when packaging our 'dependencies'
  200. env_vars: List of Environmental variables to set in the container image
  201. e.g. FOO="BAR"
  202. inputs: List of files to be consumed by this operation, produced by parent operation(s)
  203. outputs: List of files produced by this operation to be included in a child operation(s)
  204. cpu: number of cpus requested to run the operation
  205. memory: amount of memory requested to run the operation (in Gi)
  206. gpu: number of gpus requested to run the operation
  207. Entries for other (non-built-in) component types are a function of the respective component.
  208. """
  209. super().__init__(
  210. id, type, name, classifier, parent_operation_ids=parent_operation_ids, component_params=component_params
  211. )
  212. if not component_params.get("filename"):
  213. raise ValueError("Invalid pipeline operation: Missing field 'operation filename'.")
  214. if not component_params.get("runtime_image"):
  215. raise ValueError("Invalid pipeline operation: Missing field 'operation runtime image'.")
  216. if component_params.get("cpu") and not self._validate_range(component_params.get("cpu"), min_value=1):
  217. raise ValueError("Invalid pipeline operation: CPU must be a positive value or None")
  218. if component_params.get("gpu") and not self._validate_range(component_params.get("gpu"), min_value=0):
  219. raise ValueError("Invalid pipeline operation: GPU must be a positive value or None")
  220. if component_params.get("memory") and not self._validate_range(component_params.get("memory"), min_value=1):
  221. raise ValueError("Invalid pipeline operation: Memory must be a positive value or None")
  222. # Re-build object to include default values
  223. self._component_params["filename"] = component_params.get("filename")
  224. self._component_params["runtime_image"] = component_params.get("runtime_image")
  225. self._component_params["dependencies"] = Operation._scrub_list(component_params.get("dependencies", []))
  226. self._component_params["include_subdirectories"] = component_params.get("include_subdirectories", False)
  227. self._component_params["env_vars"] = KeyValueList(Operation._scrub_list(component_params.get("env_vars", [])))
  228. self._component_params["cpu"] = component_params.get("cpu")
  229. self._component_params["gpu"] = component_params.get("gpu")
  230. self._component_params["memory"] = component_params.get("memory")
  231. @property
  232. def name(self) -> str:
  233. if self._name == os.path.basename(self.filename):
  234. self._name = os.path.basename(self._name).split(".")[0]
  235. return self._name
  236. @name.setter
  237. def name(self, value):
  238. self._name = value
  239. @property
  240. def filename(self) -> str:
  241. return self._component_params.get("filename")
  242. @property
  243. def runtime_image(self) -> str:
  244. return self._component_params.get("runtime_image")
  245. @property
  246. def dependencies(self) -> Optional[List[str]]:
  247. return self._component_params.get("dependencies")
  248. @property
  249. def include_subdirectories(self) -> Optional[bool]:
  250. return self._component_params.get("include_subdirectories")
  251. @property
  252. def env_vars(self) -> Optional["KeyValueList"]:
  253. return self._component_params.get(ENV_VARIABLES)
  254. @property
  255. def cpu(self) -> Optional[str]:
  256. return self._component_params.get("cpu")
  257. @property
  258. def memory(self) -> Optional[str]:
  259. return self._component_params.get("memory")
  260. @property
  261. def gpu(self) -> Optional[str]:
  262. return self._component_params.get("gpu")
  263. def __eq__(self, other: "GenericOperation") -> bool:
  264. if isinstance(self, other.__class__):
  265. return super().__eq__(other)
  266. return False
  267. def _validate_range(self, value: str, min_value: int = 0, max_value: int = sys.maxsize) -> bool:
  268. return int(value) in range(min_value, max_value)
  269. class Pipeline(object):
  270. """
  271. Represents a single pipeline constructed in the pipeline editor
  272. """
  273. def __init__(
  274. self,
  275. id: str,
  276. name: str,
  277. runtime: str,
  278. runtime_config: str,
  279. source: Optional[str] = None,
  280. description: Optional[str] = None,
  281. pipeline_parameters: Optional[Dict[str, Any]] = None,
  282. ):
  283. """
  284. :param id: Generated UUID, 128 bit number used as a unique identifier
  285. e.g. 123e4567-e89b-12d3-a456-426614174000
  286. :param name: Pipeline name, e.g. test-pipeline-123456
  287. :param runtime: Type of runtime we want to use to execute our pipeline, e.g. kfp OR airflow
  288. :param runtime_config: Runtime configuration that should be used to submit the pipeline to execution
  289. :param source: The pipeline source, e.g. a pipeline file or a notebook.
  290. :param description: Pipeline description
  291. :param pipeline_parameters: Key/value pairs representing the parameters of this pipeline
  292. """
  293. if not name:
  294. raise ValueError("Invalid pipeline: Missing pipeline name.")
  295. if not runtime:
  296. raise ValueError("Invalid pipeline: Missing runtime.")
  297. if not runtime_config:
  298. raise ValueError("Invalid pipeline: Missing runtime configuration.")
  299. self._id = id
  300. self._name = name
  301. self._description = description
  302. self._source = source
  303. self._runtime = runtime
  304. self._runtime_config = runtime_config
  305. self._pipeline_parameters = pipeline_parameters or {}
  306. self._operations = {}
  307. @property
  308. def id(self) -> str:
  309. return self._id
  310. @property
  311. def name(self) -> str:
  312. return self._name
  313. @property
  314. def source(self) -> str:
  315. return self._source
  316. @property
  317. def runtime(self) -> str:
  318. """
  319. The runtime processor name that will execute the pipeline
  320. """
  321. return self._runtime
  322. @property
  323. def runtime_config(self) -> str:
  324. """
  325. The runtime configuration that should be used to submit the pipeline for execution
  326. """
  327. return self._runtime_config
  328. @property
  329. def pipeline_parameters(self) -> Dict[str, Any]:
  330. """
  331. The dictionary of global parameters associated with each node of the pipeline
  332. """
  333. return self._pipeline_parameters
  334. @property
  335. def operations(self) -> Dict[str, Operation]:
  336. return self._operations
  337. @property
  338. def description(self) -> Optional[str]:
  339. """
  340. Pipeline description
  341. """
  342. return self._description
  343. def contains_generic_operations(self) -> bool:
  344. """
  345. Returns a truthy value indicating whether the pipeline contains
  346. one or more generic operations.
  347. """
  348. for op_id, op in self._operations.items():
  349. if isinstance(op, GenericOperation):
  350. return True
  351. return False
  352. def __eq__(self, other: "Pipeline") -> bool:
  353. if isinstance(self, other.__class__):
  354. return (
  355. self.id == other.id
  356. and self.name == other.name
  357. and self.source == other.source
  358. and self.description == other.description
  359. and self.runtime == other.runtime
  360. and self.runtime_config == other.runtime_config
  361. and self.operations == other.operations
  362. )
  363. class KeyValueList(list):
  364. """
  365. A list class that exposes functionality specific to lists whose entries are
  366. key-value pairs separated by a pre-defined character.
  367. """
  368. _key_value_separator: str = "="
  369. def to_dict(self) -> Dict[str, str]:
  370. """
  371. Properties consisting of key-value pairs are stored in a list of separated
  372. strings, while most processing steps require a dictionary - so we must convert.
  373. If no key/value pairs are specified, an empty dictionary is returned, otherwise
  374. pairs are converted to dictionary entries, stripped of whitespace, and returned.
  375. """
  376. kv_dict = {}
  377. for kv in self:
  378. if not kv:
  379. continue
  380. if self._key_value_separator not in kv:
  381. raise ValueError(
  382. f"Property {kv} does not contain the expected "
  383. f"separator character: '{self._key_value_separator}'."
  384. )
  385. key, value = kv.split(self._key_value_separator, 1)
  386. key = key.strip()
  387. if not key:
  388. # Invalid entry; skip inclusion and continue
  389. continue
  390. if isinstance(value, str):
  391. value = value.strip()
  392. if not value:
  393. # Invalid entry; skip inclusion and continue
  394. continue
  395. kv_dict[key] = value
  396. return kv_dict
  397. @classmethod
  398. def to_str(cls, key: str, value: str) -> str:
  399. return f"{key}{cls._key_value_separator}{value}"
  400. @classmethod
  401. def from_dict(cls, kv_dict: Dict) -> "KeyValueList":
  402. """
  403. Convert a set of key-value pairs stored in a dictionary to
  404. a KeyValueList of strings with the defined separator.
  405. """
  406. str_list = [KeyValueList.to_str(key, value) for key, value in kv_dict.items()]
  407. return KeyValueList(str_list)
  408. @classmethod
  409. def merge(cls, primary: "KeyValueList", secondary: "KeyValueList") -> "KeyValueList":
  410. """
  411. Merge two key-value pair lists, preferring the values given in the
  412. primary parameter in the case of a matching key between the two lists.
  413. """
  414. primary_dict = primary.to_dict()
  415. secondary_dict = secondary.to_dict()
  416. return KeyValueList.from_dict({**secondary_dict, **primary_dict})
  417. @classmethod
  418. def difference(cls, minuend: "KeyValueList", subtrahend: "KeyValueList") -> "KeyValueList":
  419. """
  420. Given KeyValueLists, convert to dictionaries and remove any keys found in the
  421. second (subtrahend) from the first (minuend), if present.
  422. :param minuend: list to be subtracted from
  423. :param subtrahend: list whose keys will be removed from the minuend
  424. :returns: the difference of the two lists
  425. """
  426. subtract_dict = minuend.to_dict()
  427. for key in subtrahend.to_dict().keys():
  428. if key in subtract_dict:
  429. subtract_dict.pop(key)
  430. return KeyValueList.from_dict(subtract_dict)
  431. @dataclass
  432. class VolumeMount:
  433. path: str
  434. pvc_name: str
  435. @dataclass
  436. class KubernetesSecret:
  437. env_var: str
  438. name: str
  439. key: str
  440. class DataClassJSONEncoder(json.JSONEncoder):
  441. """
  442. A JSON Encoder class to prevent errors during serialization of dataclasses.
  443. """
  444. def default(self, o):
  445. """
  446. Render dataclass content as dict
  447. """
  448. if is_dataclass(o):
  449. return dataclass_asdict(o)
  450. return super().default(o)