pipeline_definition.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import json
  17. import os
  18. from typing import Any
  19. from typing import Dict
  20. from typing import List
  21. from typing import Optional
  22. from typing import Set
  23. from jinja2 import Environment
  24. from jinja2 import PackageLoader
  25. from jinja2 import Undefined
  26. from elyra.pipeline.component_catalog import ComponentCache
  27. from elyra.pipeline.pipeline import KeyValueList
  28. from elyra.pipeline.pipeline import KubernetesAnnotation
  29. from elyra.pipeline.pipeline import KubernetesSecret
  30. from elyra.pipeline.pipeline import KubernetesToleration
  31. from elyra.pipeline.pipeline import Operation
  32. from elyra.pipeline.pipeline import VolumeMount
  33. from elyra.pipeline.pipeline_constants import ELYRA_COMPONENT_PROPERTIES
  34. from elyra.pipeline.pipeline_constants import ENV_VARIABLES
  35. from elyra.pipeline.pipeline_constants import KUBERNETES_POD_ANNOTATIONS
  36. from elyra.pipeline.pipeline_constants import KUBERNETES_SECRETS
  37. from elyra.pipeline.pipeline_constants import KUBERNETES_TOLERATIONS
  38. from elyra.pipeline.pipeline_constants import MOUNTED_VOLUMES
  39. from elyra.pipeline.pipeline_constants import PIPELINE_DEFAULTS
  40. from elyra.pipeline.pipeline_constants import PIPELINE_META_PROPERTIES
  41. from elyra.pipeline.runtime_type import RuntimeProcessorType
  42. class AppDataBase(object): # ABC
  43. """
  44. An abstraction for app_data based nodes
  45. """
  46. _node: Dict = None
  47. def __init__(self, node: Dict):
  48. """
  49. Constructor with the node json structure
  50. :param node: the node json
  51. """
  52. self._node = node
  53. @property
  54. def id(self) -> str:
  55. """
  56. The node id
  57. :return: the node unique identifier
  58. """
  59. return self._node.get("id")
  60. def get(self, key: str, default_value=None) -> Any:
  61. """
  62. Retrieve node values for a given key.
  63. These key/value pairs are stored in the app_data stanza
  64. :param key: The key to be retrieved
  65. :param default_value: a default value in case the key is not found
  66. :return: the value or the default_value if the key is not found
  67. """
  68. return self._node["app_data"].get(key, default_value)
  69. def set(self, key: str, value: Any):
  70. """
  71. Update node values for a given key.
  72. These key/value pairs are stored in the app_data stanza
  73. :param key: The key to be set
  74. :param value: The value to be set
  75. """
  76. if not key:
  77. raise ValueError("Key is required")
  78. if not value:
  79. raise ValueError("Value is required")
  80. self._node["app_data"][key] = value
  81. def to_dict(self) -> Dict:
  82. return self._node
  83. class Pipeline(AppDataBase):
  84. _nodes: list = None
  85. def __init__(self, node: Dict):
  86. """
  87. The constructor with pipeline json structure
  88. :param node: the node pipeline
  89. """
  90. super().__init__(node)
  91. @property
  92. def version(self) -> int:
  93. """
  94. The pipeline version
  95. :return: The version
  96. """
  97. return int(self._node["app_data"].get("version"))
  98. @property
  99. def runtime(self) -> str:
  100. """The runtime processor name associated with the pipeline.
  101. NOTE: This value should really be derived from runtime_config.
  102. :return: The runtime keyword
  103. """
  104. return self._node["app_data"].get("runtime")
  105. @property
  106. def runtime_config(self) -> str:
  107. """The runtime configuration associated with the pipeline.
  108. :return: The runtime configuration key. This should be a valid key from the Runtimes metadata
  109. """
  110. return self._node["app_data"].get("runtime_config")
  111. @property
  112. def type(self):
  113. """The runtime type.
  114. NOTE: This value should really be derived from runtime_config.
  115. :return: The runtime_type keyword associated with the pipeline.
  116. """
  117. return self._node["app_data"].get("runtime_type")
  118. @property
  119. def name(self) -> str:
  120. """
  121. The pipeline name
  122. :rtype: The pipeline name or `untitled`
  123. """
  124. return self._node["app_data"].get("name", self._node["app_data"].get("properties", {}).get("name", "untitled"))
  125. @property
  126. def source(self) -> str:
  127. """
  128. The pipeline source
  129. :rtype: The pipeline source
  130. """
  131. return self._node["app_data"].get("source")
  132. @property
  133. def nodes(self) -> list:
  134. """
  135. The list of nodes for the pipeline
  136. :rtype: object
  137. """
  138. if "nodes" not in self._node:
  139. raise ValueError("Pipeline is missing 'nodes' field.")
  140. if self._nodes is None:
  141. nodes: list = list()
  142. for node in self._node["nodes"]:
  143. nodes.append(Node(node))
  144. self._nodes = nodes
  145. return self._nodes
  146. @property
  147. def comments(self) -> list:
  148. """
  149. The list of user comments in the pipeline
  150. :rtype: list of comments
  151. """
  152. return self._node["app_data"]["ui_data"].get("comments", [])
  153. @property
  154. def pipeline_parameters(self) -> Dict[str, Any]:
  155. """
  156. Retrieve pipeline parameters, which are defined as all
  157. key/value pairs in the 'properties' stanza that are not
  158. either pipeline meta-properties (e.g. name, description,
  159. and runtime) or the pipeline defaults dictionary
  160. """
  161. all_properties = self._node["app_data"].get("properties", {})
  162. excluded_properties = PIPELINE_META_PROPERTIES + [PIPELINE_DEFAULTS]
  163. pipeline_parameters = {}
  164. for property_name, value in all_properties.items():
  165. if property_name not in excluded_properties:
  166. pipeline_parameters[property_name] = value
  167. return pipeline_parameters
  168. def get_property(self, key: str, default_value=None) -> Any:
  169. """
  170. Retrieve pipeline values for a given key.
  171. :param key: the key to be retrieved
  172. :param default_value: a default value in case the key is not found
  173. :return: the value or the default_value if the key is not found
  174. """
  175. return_value = default_value
  176. if "properties" in self._node["app_data"]:
  177. return_value = self._node["app_data"]["properties"].get(key, default_value)
  178. return return_value
  179. def set_property(self, key: str, value: Any):
  180. """
  181. Update pipeline values for a given key.
  182. :param key: the key to be set
  183. :param value: the value to be set
  184. """
  185. if not key:
  186. raise ValueError("Key is required")
  187. if not value:
  188. raise ValueError("Value is required")
  189. self._node["app_data"]["properties"][key] = value
  190. def convert_kv_properties(self, kv_properties: Set[str]):
  191. """
  192. Convert pipeline defaults-level list properties that have been identified
  193. as sets of key-value pairs from a plain list type to the KeyValueList type.
  194. """
  195. pipeline_defaults = self.get_property(PIPELINE_DEFAULTS, {})
  196. for property_name, value in pipeline_defaults.items():
  197. if property_name not in kv_properties:
  198. continue
  199. # Replace plain list with KeyValueList
  200. pipeline_defaults[property_name] = KeyValueList(value)
  201. if pipeline_defaults:
  202. self.set_property(PIPELINE_DEFAULTS, pipeline_defaults)
  203. class Node(AppDataBase):
  204. def __init__(self, node: Dict):
  205. super().__init__(node)
  206. self._elyra_properties_to_skip = set()
  207. @property
  208. def type(self) -> str:
  209. """
  210. The node type
  211. :return: type (e.g. execution_node, super_node)
  212. """
  213. return self._node.get("type")
  214. @property
  215. def op(self) -> str:
  216. """
  217. The node op, which identify the operation to be executed
  218. :return: op (e.g. execute-notebook-node)
  219. """
  220. return self._node.get("op")
  221. @property
  222. def label(self) -> str:
  223. """
  224. The node label
  225. :return: node label
  226. """
  227. return self._node["app_data"]["ui_data"].get("label", self._node["app_data"].get("label", None))
  228. @property
  229. def subflow_pipeline_id(self) -> Pipeline:
  230. """
  231. The Super Node pipeline reference. Only available when type is a super node.
  232. :return:
  233. """
  234. if self._node["type"] != "super_node":
  235. raise ValueError("Node must be a super_node in order to retrieve a subflow pipeline id")
  236. if "subflow_ref" in self._node:
  237. return self._node["subflow_ref"].get("pipeline_id_ref")
  238. else:
  239. return None
  240. @property
  241. def component_links(self) -> List:
  242. """
  243. Retrieve component links to other components.
  244. :return: the list of links associated with this node or an empty list if none are found
  245. """
  246. if self.type in ["execution_node", "super_node"]:
  247. return self._node["inputs"][0].get("links", [])
  248. else:
  249. # binding nodes do not contain links
  250. return []
  251. @property
  252. def component_source(self) -> Optional[str]:
  253. """
  254. Retrieve the component source path.
  255. :return: None, if the node is a generic component, the component path otherwise.
  256. """
  257. if self.type == "execution_node":
  258. return self._node["app_data"].get("component_source", None)
  259. return None
  260. @property
  261. def elyra_properties_to_skip(self) -> Set[str]:
  262. """
  263. Elyra-defined node properties whose processing should be skipped
  264. on the basis that their id collides with a property defined in
  265. the component definition for this Node.
  266. """
  267. return self._elyra_properties_to_skip
  268. def set_elyra_properties_to_skip(self, runtime_type_name: Optional[str]) -> None:
  269. """
  270. Determine which Elyra-defined node-level properties to skip
  271. on the basis that their id collides with a property defined in
  272. the component definition for this Node. Then, set the node
  273. property accordingly.
  274. """
  275. if Operation.is_generic_operation(self.op):
  276. # Generic operations will never have any collisions as all their properties are Elyra-owned
  277. return
  278. if not runtime_type_name:
  279. return
  280. runtime_type = RuntimeProcessorType.get_instance_by_name(runtime_type_name)
  281. component = ComponentCache.instance().get_component(runtime_type, self.op)
  282. if not component:
  283. return
  284. # Properties that have the same ref (id) as Elyra-owned node properties
  285. # should be skipped during property propagation and conversion
  286. properties_to_skip = [prop.ref for prop in component.properties if prop.ref in ELYRA_COMPONENT_PROPERTIES]
  287. self._elyra_properties_to_skip = set(properties_to_skip)
  288. def get_component_parameter(self, key: str, default_value=None) -> Any:
  289. """
  290. Retrieve component parameter values.
  291. These key/value pairs are stored in app_data.component_parameters
  292. :param key: the parameter key to be retrieved
  293. :param default_value: a default value in case the key is not found
  294. :return: the value or the default value if the key is not found
  295. """
  296. value = self._node["app_data"].get("component_parameters", {}).get(key, default_value)
  297. return None if value == "None" else value
  298. def set_component_parameter(self, key: str, value: Any):
  299. """
  300. Update component parameter values for a given key.
  301. These key/value pairs are stored in app_data.component_parameters
  302. :param key: The parameter key to be retrieved
  303. :param value: the value to be set
  304. """
  305. if not key:
  306. raise ValueError("Key is required")
  307. if value is None:
  308. raise ValueError("Value is required")
  309. if key not in self.elyra_properties_to_skip:
  310. # This parameter has been parsed from a custom component definition and
  311. # its value should not be manually set
  312. self._node["app_data"]["component_parameters"][key] = value
  313. def get_all_component_parameters(self) -> Dict[str, Any]:
  314. """
  315. Retrieve all component parameter key-value pairs.
  316. """
  317. return self._node["app_data"]["component_parameters"]
  318. def convert_kv_properties(self, kv_properties: Set[str]):
  319. """
  320. Convert node-level list properties that have been identified as sets of
  321. key-value pairs from a plain list type to the KeyValueList type. If any
  322. k-v property has already been converted to a KeyValueList, all k-v
  323. properties are assumed to have already been converted.
  324. """
  325. for kv_property in kv_properties:
  326. value = self.get_component_parameter(kv_property)
  327. if not value or not isinstance(value, list): # not list or KeyValueList
  328. continue
  329. if isinstance(value, KeyValueList) or not isinstance(value[0], str):
  330. # A KeyValueList instance implies all relevant properties have already been converted
  331. # Similarly, if KeyValueList items aren't strings, this implies they have already been
  332. # converted to the appropriate data class objects
  333. return
  334. # Convert plain list to KeyValueList
  335. self.set_component_parameter(kv_property, KeyValueList(value))
  336. def remove_env_vars_with_matching_secrets(self):
  337. """
  338. In the case of a matching key between env vars and kubernetes secrets,
  339. prefer the Kubernetes Secret and remove the matching env var.
  340. """
  341. env_vars = self.get_component_parameter(ENV_VARIABLES)
  342. secrets = self.get_component_parameter(KUBERNETES_SECRETS)
  343. if isinstance(env_vars, KeyValueList) and isinstance(secrets, KeyValueList):
  344. new_list = KeyValueList.difference(minuend=env_vars, subtrahend=secrets)
  345. self.set_component_parameter(ENV_VARIABLES, new_list)
  346. def convert_data_class_properties(self):
  347. """
  348. Convert select node-level list properties to their corresponding dataclass
  349. object type. No validation is performed.
  350. """
  351. volume_mounts = self.get_component_parameter(MOUNTED_VOLUMES)
  352. if volume_mounts and isinstance(volume_mounts, KeyValueList):
  353. volume_objects = []
  354. for mount_path, pvc_name in volume_mounts.to_dict().items():
  355. formatted_mount_path = f"/{mount_path.strip('/')}"
  356. # Create a VolumeMount class instance and add to list
  357. volume_objects.append(VolumeMount(formatted_mount_path, pvc_name))
  358. self.set_component_parameter(MOUNTED_VOLUMES, volume_objects)
  359. secrets = self.get_component_parameter(KUBERNETES_SECRETS)
  360. if secrets and isinstance(secrets, KeyValueList):
  361. secret_objects = []
  362. for env_var_name, secret in secrets.to_dict().items():
  363. secret_name, *optional_key = secret.split(":", 1)
  364. secret_key = ""
  365. if optional_key:
  366. secret_key = optional_key[0].strip()
  367. # Create a KubernetesSecret class instance and add to list
  368. secret_objects.append(KubernetesSecret(env_var_name, secret_name.strip(), secret_key))
  369. self.set_component_parameter(KUBERNETES_SECRETS, secret_objects)
  370. kubernetes_tolerations = self.get_component_parameter(KUBERNETES_TOLERATIONS)
  371. if kubernetes_tolerations and isinstance(kubernetes_tolerations, KeyValueList):
  372. tolerations_objects = []
  373. for toleration, toleration_definition in kubernetes_tolerations.to_dict().items():
  374. # A definition comprises of "<key>:<operator>:<value>:<effect>"
  375. parts = toleration_definition.split(":")
  376. key, operator, value, effect = (parts + [""] * 4)[:4]
  377. # Create a KubernetesToleration class instance and add to list
  378. # Note that the instance might be invalid.
  379. tolerations_objects.append(KubernetesToleration(key, operator, value, effect))
  380. self.set_component_parameter(KUBERNETES_TOLERATIONS, tolerations_objects)
  381. kubernetes_pod_annotations = self.get_component_parameter(KUBERNETES_POD_ANNOTATIONS)
  382. if kubernetes_pod_annotations and isinstance(kubernetes_pod_annotations, KeyValueList):
  383. annotations_objects = []
  384. for annotation_key, annotation_value in kubernetes_pod_annotations.to_dict().items():
  385. # Validation should have verified that the provided values are valid
  386. # Create a KubernetesAnnotation class instance and add to list
  387. annotations_objects.append(KubernetesAnnotation(annotation_key, annotation_value))
  388. self.set_component_parameter(KUBERNETES_POD_ANNOTATIONS, annotations_objects)
  389. class PipelineDefinition(object):
  390. """
  391. Represents a helper class to manipulate pipeline json structure
  392. """
  393. _pipelines: list = None
  394. _primary_pipeline: Pipeline = None
  395. _validated: bool = False
  396. _validation_issues: list = None
  397. def __init__(
  398. self,
  399. pipeline_path: Optional[str] = None,
  400. pipeline_definition: Optional[Dict] = None,
  401. validate: bool = False,
  402. ):
  403. """
  404. The constructor enables either passing a pipeline path or the content of the pipeline definition.
  405. :param pipeline_path: this is the path to a pipeline
  406. :param pipeline_definition: this is the piepline json
  407. :param validate: flag to turn validation during pipeline initialization
  408. """
  409. if not pipeline_path and not pipeline_definition:
  410. # at least one parameter should be provided
  411. raise ValueError("At least one parameter must be provided ('pipeline_path' or 'pipeline_definition')")
  412. if pipeline_path and pipeline_definition:
  413. # only one parameter should be provided
  414. raise ValueError("Only one parameter should be provided ('pipeline_path' or 'pipeline_definition')")
  415. if pipeline_path:
  416. # supporting loading pipeline from file
  417. if not os.path.exists(pipeline_path):
  418. raise ValueError(f"Pipeline file not found: '{pipeline_path}'\n")
  419. with open(pipeline_path) as f:
  420. try:
  421. self._pipeline_definition = json.load(f)
  422. except ValueError as ve:
  423. raise ValueError(f"Pipeline file is invalid: \n {ve}")
  424. else:
  425. # supporting passing the pipeline definition directly
  426. self._pipeline_definition = pipeline_definition
  427. if validate:
  428. self.validate()
  429. self.propagate_pipeline_default_properties()
  430. @property
  431. def id(self) -> str:
  432. """
  433. The pipeline definition id
  434. :return: the unid
  435. """
  436. return self._pipeline_definition.get("id")
  437. @property
  438. def schema_version(self) -> str:
  439. """
  440. The schema used by the Pipeline definition
  441. :return: the version
  442. """
  443. return self._pipeline_definition.get("version")
  444. @property
  445. def pipelines(self) -> list:
  446. """
  447. The list of pipelines defined in the pipeline definition
  448. :return: the list of pipelines
  449. """
  450. if not self._pipelines:
  451. if "pipelines" not in self._pipeline_definition:
  452. raise ValueError("Pipeline is missing 'pipelines' field.")
  453. elif len(self._pipeline_definition["pipelines"]) == 0:
  454. raise ValueError("Pipeline has zero length 'pipelines' field.")
  455. pipelines: list = list()
  456. for pipeline in self._pipeline_definition["pipelines"]:
  457. pipelines.append(Pipeline(pipeline))
  458. self._pipelines = pipelines
  459. return self._pipelines
  460. @property
  461. def primary_pipeline(self) -> Pipeline:
  462. """
  463. The primary pipeline associated with this pipeline definition
  464. :return: the primary pipeline
  465. """
  466. if not self._primary_pipeline:
  467. if "pipelines" not in self._pipeline_definition:
  468. raise ValueError("Pipeline is missing 'pipelines' field.")
  469. elif len(self._pipeline_definition["pipelines"]) == 0:
  470. raise ValueError("Pipeline has zero length 'pipelines' field.")
  471. # Find primary pipeline
  472. self._primary_pipeline = self.get_pipeline_definition(self._pipeline_definition.get("primary_pipeline"))
  473. assert self._primary_pipeline is not None, "No primary pipeline was found"
  474. return self._primary_pipeline
  475. @property
  476. def pipeline_nodes(self) -> List[Node]:
  477. """
  478. All nodes of all pipelines associated with a pipeline definition
  479. """
  480. return [node for pipeline in self.pipelines for node in pipeline.nodes]
  481. def validate(self) -> list:
  482. """
  483. Validates the pipeline definition structure and semantics
  484. :return: the list of issues found
  485. """
  486. # If it has been validated before
  487. if self._validated:
  488. # return current list of issues
  489. return self._validation_issues
  490. # Has not been validated before
  491. validation_issues = []
  492. # Validate pipeline schema version
  493. if "version" not in self._pipeline_definition:
  494. validation_issues.append("Pipeline schema version field is missing.")
  495. elif not isinstance(self._pipeline_definition["version"], str):
  496. validation_issues.append("Pipeline schema version field should be a string.")
  497. # Validate pipelines
  498. if "pipelines" not in self._pipeline_definition:
  499. validation_issues.append("Pipeline is missing 'pipelines' field.")
  500. elif not isinstance(self._pipeline_definition["pipelines"], list):
  501. validation_issues.append("Field 'pipelines' should be a list.")
  502. elif len(self._pipeline_definition["pipelines"]) == 0:
  503. validation_issues.append("Pipeline has zero length 'pipelines' field.")
  504. # Validate primary pipeline
  505. if "primary_pipeline" not in self._pipeline_definition:
  506. validation_issues.append("Could not determine the primary pipeline.")
  507. elif not isinstance(self._pipeline_definition["primary_pipeline"], str):
  508. validation_issues.append("Field 'primary_pipeline' should be a string.")
  509. primary_pipeline = self.get_pipeline_definition(self._pipeline_definition.get("primary_pipeline"))
  510. if not primary_pipeline:
  511. validation_issues.append("No primary pipeline was found")
  512. else:
  513. primary_pipeline = primary_pipeline.to_dict()
  514. # Validate primary pipeline structure
  515. if "app_data" not in primary_pipeline:
  516. validation_issues.append("Primary pipeline is missing the 'app_data' field.")
  517. else:
  518. if "version" not in primary_pipeline["app_data"]:
  519. validation_issues.append("Primary pipeline is missing the 'version' field.")
  520. if "properties" not in primary_pipeline["app_data"]:
  521. validation_issues.append("Node is missing 'properties' field.")
  522. elif len(primary_pipeline["app_data"]["properties"]) == 0:
  523. validation_issues.append("Pipeline has zero length 'properties' field.")
  524. if "nodes" not in primary_pipeline or len(primary_pipeline["nodes"]) == 0:
  525. validation_issues.append("At least one node must exist in the primary pipeline.")
  526. else:
  527. for node in primary_pipeline["nodes"]:
  528. if "component_parameters" not in node["app_data"]:
  529. validation_issues.append("Node is missing 'component_parameters' field")
  530. return validation_issues
  531. def propagate_pipeline_default_properties(self):
  532. """
  533. For any default pipeline properties set (e.g. runtime image, volume), propagate
  534. the values to any nodes that do not set their own value for that property.
  535. """
  536. # Convert any key-value list pipeline default properties to the KeyValueList type
  537. kv_properties = PipelineDefinition.get_kv_properties()
  538. self.primary_pipeline.convert_kv_properties(kv_properties)
  539. pipeline_default_properties = self.primary_pipeline.get_property(PIPELINE_DEFAULTS, {})
  540. for node in self.pipeline_nodes:
  541. # Determine which Elyra-owned properties collide with parsed properties (and therefore must be skipped)
  542. node.set_elyra_properties_to_skip(self.primary_pipeline.type)
  543. # Convert any key-value list node properties to the KeyValueList type if not done already
  544. node.convert_kv_properties(kv_properties)
  545. for property_name, pipeline_default_value in pipeline_default_properties.items():
  546. if not pipeline_default_value:
  547. continue
  548. if not Operation.is_generic_operation(node.op) and property_name not in ELYRA_COMPONENT_PROPERTIES:
  549. # Do not propagate default properties that do not apply to custom components, e.g. runtime image
  550. continue
  551. node_value = node.get_component_parameter(property_name)
  552. if not node_value:
  553. node.set_component_parameter(property_name, pipeline_default_value)
  554. continue
  555. if isinstance(pipeline_default_value, KeyValueList) and isinstance(node_value, KeyValueList):
  556. merged_list = KeyValueList.merge(node_value, pipeline_default_value)
  557. node.set_component_parameter(property_name, merged_list)
  558. if self.primary_pipeline.runtime_config != "local":
  559. node.remove_env_vars_with_matching_secrets()
  560. node.convert_data_class_properties()
  561. def is_valid(self) -> bool:
  562. """
  563. Represents whether or not the pipeline structure is valid
  564. :return: True for a valid pipeline definition
  565. """
  566. return len(self.validate()) == 0
  567. def to_dict(self) -> Dict:
  568. """
  569. The raw contents of the pipeline definition json
  570. :rtype: object
  571. """
  572. return self._pipeline_definition
  573. def get_pipeline_definition(self, pipeline_id) -> Any:
  574. """
  575. Retrieve a given pipeline from the pipeline definition
  576. :param pipeline_id: the pipeline unique identifier
  577. :return: the pipeline or None
  578. """
  579. if "pipelines" in self._pipeline_definition:
  580. for pipeline in self._pipeline_definition["pipelines"]:
  581. if pipeline["id"] == pipeline_id:
  582. return Pipeline(pipeline)
  583. return None
  584. def get_node(self, node_id: str):
  585. """
  586. Given a node id returns the associated node object in the pipeline
  587. :param node_id: the node id
  588. :return: the node object or None
  589. """
  590. for pipeline in self._pipelines:
  591. for node in pipeline.nodes:
  592. if node.id == node_id:
  593. return node
  594. return None
  595. def get_node_comments(self, node_id: str) -> Optional[str]:
  596. """
  597. Given a node id returns the assoicated comments in the pipeline
  598. :param node_id: the node id
  599. :return: the comments or None
  600. """
  601. comments = []
  602. for pipeline in self.pipelines:
  603. comment_list = pipeline.comments
  604. for comment in comment_list:
  605. associated_node_id_list = comment.get("associated_id_refs", [])
  606. for ref in associated_node_id_list:
  607. if ref["node_ref"] == node_id:
  608. comments.append(comment.get("content", ""))
  609. # remove empty (or whitespace-only) comment strings
  610. comments = [c for c in comments if c.strip()]
  611. comment_str = "\n\n".join(comments)
  612. if not comment_str:
  613. return None
  614. return comment_str
  615. def get_supernodes(self) -> List[Node]:
  616. """
  617. Returns a list of all supernodes in the pipeline
  618. :return:
  619. """
  620. supernode_list = []
  621. for pipeline in self._pipelines:
  622. for node in pipeline.nodes:
  623. if node.type == "super_node":
  624. supernode_list.append(node)
  625. return supernode_list
  626. @staticmethod
  627. def get_canvas_properties_from_template(package_name: str, template_name: str) -> Dict[str, Any]:
  628. """
  629. Retrieves the dict representation of the canvas-formatted properties
  630. associated with the given template and package names. Rendering does
  631. not require parameters as expressions are not evaluated due to the
  632. SilentUndefined class.
  633. """
  634. loader = PackageLoader("elyra", package_name)
  635. template_env = Environment(loader=loader, undefined=SilentUndefined)
  636. template = template_env.get_template(template_name)
  637. output = template.render()
  638. return json.loads(output)
  639. @staticmethod
  640. def get_kv_properties() -> Set[str]:
  641. """
  642. Get pipeline properties in its canvas form and loop through to
  643. find those that should consist of key/value pairs, as given in
  644. the 'keyValueEntries' key.
  645. """
  646. canvas_pipeline_properties = PipelineDefinition.get_canvas_properties_from_template(
  647. package_name="templates/pipeline", template_name="pipeline_properties_template.jinja2"
  648. )
  649. kv_properties = set()
  650. parameter_info = canvas_pipeline_properties.get("uihints", {}).get("parameter_info", [])
  651. for parameter in parameter_info:
  652. if parameter.get("data", {}).get("keyValueEntries", False):
  653. parameter_ref = parameter.get("parameter_ref", "")
  654. if parameter_ref.startswith("elyra_"):
  655. parameter_ref = parameter_ref.replace("elyra_", "")
  656. kv_properties.add(parameter_ref)
  657. return kv_properties
  658. class SilentUndefined(Undefined):
  659. """
  660. A subclass of the jinja2.Undefined class used to represent undefined
  661. values in the template. Undefined errors as a result of the evaluation
  662. of expressions will fail silently and render as null.
  663. """
  664. def _fail_with_undefined_error(self, *args, **kwargs):
  665. return None