pipeline_definition.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import json
  17. import os
  18. from typing import Any
  19. from typing import Dict
  20. from typing import List
  21. from typing import Optional
  22. from jinja2 import Environment, Undefined
  23. from jinja2 import PackageLoader
  24. from elyra.pipeline.pipeline import KeyValueList
  25. from elyra.pipeline.pipeline import KubernetesSecret
  26. from elyra.pipeline.pipeline import Operation
  27. from elyra.pipeline.pipeline import VolumeMount
  28. from elyra.pipeline.pipeline_constants import ENV_VARIABLES
  29. from elyra.pipeline.pipeline_constants import KUBERNETES_SECRETS
  30. from elyra.pipeline.pipeline_constants import MOUNTED_VOLUMES
  31. from elyra.pipeline.pipeline_constants import PIPELINE_DEFAULTS
  32. from elyra.pipeline.pipeline_constants import PIPELINE_META_PROPERTIES
  33. class AppDataBase(object): # ABC
  34. """
  35. An abstraction for app_data based nodes
  36. """
  37. _node: Dict = None
  38. def __init__(self, node: Dict):
  39. """
  40. Constructor with the node json structure
  41. :param node: the node json
  42. """
  43. self._node = node
  44. @property
  45. def id(self) -> str:
  46. """
  47. The node id
  48. :return: the node unique identifier
  49. """
  50. return self._node.get("id")
  51. def get(self, key: str, default_value=None) -> Any:
  52. """
  53. Retrieve node values for a given key.
  54. These key/value pairs are stored in the app_data stanza
  55. :param key: The key to be retrieved
  56. :param default_value: a default value in case the key is not found
  57. :return: the value or the default_value if the key is not found
  58. """
  59. return self._node["app_data"].get(key, default_value)
  60. def set(self, key: str, value: Any):
  61. """
  62. Update node values for a given key.
  63. These key/value pairs are stored in the app_data stanza
  64. :param key: The key to be set
  65. :param value: The value to be set
  66. """
  67. if not key:
  68. raise ValueError("Key is required")
  69. if not value:
  70. raise ValueError("Value is required")
  71. self._node["app_data"][key] = value
  72. def to_dict(self) -> Dict:
  73. return self._node
  74. class Pipeline(AppDataBase):
  75. _nodes: list = None
  76. def __init__(self, node: Dict):
  77. """
  78. The constructor with pipeline json structure
  79. :param node: the node pipeline
  80. """
  81. super().__init__(node)
  82. @property
  83. def version(self) -> int:
  84. """
  85. The pipeline version
  86. :return: The version
  87. """
  88. return int(self._node["app_data"].get("version"))
  89. @property
  90. def runtime(self) -> str:
  91. """The runtime processor name associated with the pipeline.
  92. NOTE: This value should really be derived from runtime_config.
  93. :return: The runtime keyword
  94. """
  95. return self._node["app_data"].get("runtime")
  96. @property
  97. def runtime_config(self) -> str:
  98. """The runtime configuration associated with the pipeline.
  99. :return: The runtime configuration key. This should be a valid key from the Runtimes metadata
  100. """
  101. return self._node["app_data"].get("runtime_config")
  102. @property
  103. def type(self):
  104. """The runtime type.
  105. NOTE: This value should really be derived from runtime_config.
  106. :return: The runtime_type keyword associated with the pipeline.
  107. """
  108. return self._node["app_data"].get("runtime_type")
  109. @property
  110. def name(self) -> str:
  111. """
  112. The pipeline name
  113. :rtype: The pipeline name or `untitled`
  114. """
  115. return self._node["app_data"].get("name", self._node["app_data"].get("properties", {}).get("name", "untitled"))
  116. @property
  117. def source(self) -> str:
  118. """
  119. The pipeline source
  120. :rtype: The pipeline source
  121. """
  122. return self._node["app_data"].get("source")
  123. @property
  124. def nodes(self) -> list:
  125. """
  126. The list of nodes for the pipeline
  127. :rtype: object
  128. """
  129. if "nodes" not in self._node:
  130. raise ValueError("Pipeline is missing 'nodes' field.")
  131. if self._nodes is None:
  132. nodes: list = list()
  133. for node in self._node["nodes"]:
  134. nodes.append(Node(node))
  135. self._nodes = nodes
  136. return self._nodes
  137. @property
  138. def comments(self) -> list:
  139. """
  140. The list of user comments in the pipeline
  141. :rtype: list of comments
  142. """
  143. return self._node["app_data"]["ui_data"].get("comments", [])
  144. @property
  145. def pipeline_parameters(self) -> Dict[str, Any]:
  146. """
  147. Retrieve pipeline parameters, which are defined as all
  148. key/value pairs in the 'properties' stanza that are not
  149. either pipeline meta-properties (e.g. name, description,
  150. and runtime) or the pipeline defaults dictionary
  151. """
  152. all_properties = self._node["app_data"].get("properties", {})
  153. excluded_properties = PIPELINE_META_PROPERTIES + [PIPELINE_DEFAULTS]
  154. pipeline_parameters = {}
  155. for property_name, value in all_properties.items():
  156. if property_name not in excluded_properties:
  157. pipeline_parameters[property_name] = value
  158. return pipeline_parameters
  159. def get_property(self, key: str, default_value=None) -> Any:
  160. """
  161. Retrieve pipeline values for a given key.
  162. :param key: the key to be retrieved
  163. :param default_value: a default value in case the key is not found
  164. :return: the value or the default_value if the key is not found
  165. """
  166. return_value = default_value
  167. if "properties" in self._node["app_data"]:
  168. return_value = self._node["app_data"]["properties"].get(key, default_value)
  169. return return_value
  170. def set_property(self, key: str, value: Any):
  171. """
  172. Update pipeline values for a given key.
  173. :param key: the key to be set
  174. :param value: the value to be set
  175. """
  176. if not key:
  177. raise ValueError("Key is required")
  178. if not value:
  179. raise ValueError("Value is required")
  180. self._node["app_data"]["properties"][key] = value
  181. def convert_kv_properties(self, kv_properties: List[str]):
  182. """
  183. Convert pipeline defaults-level list properties that have been identified
  184. as sets of key-value pairs from a plain list type to the KeyValueList type.
  185. """
  186. pipeline_defaults = self.get_property(PIPELINE_DEFAULTS, {})
  187. for property_name, value in pipeline_defaults.items():
  188. if property_name not in kv_properties:
  189. continue
  190. # Replace plain list with KeyValueList
  191. pipeline_defaults[property_name] = KeyValueList(value)
  192. if pipeline_defaults:
  193. self.set_property(PIPELINE_DEFAULTS, pipeline_defaults)
  194. class Node(AppDataBase):
  195. def __init__(self, node: Dict):
  196. super().__init__(node)
  197. @property
  198. def type(self) -> str:
  199. """
  200. The node type
  201. :return: type (e.g. execution_node, super_node)
  202. """
  203. return self._node.get("type")
  204. @property
  205. def op(self) -> str:
  206. """
  207. The node op, which identify the operation to be executed
  208. :return: op (e.g. execute-notebook-node)
  209. """
  210. return self._node.get("op")
  211. @property
  212. def label(self) -> str:
  213. """
  214. The node label
  215. :return: node label
  216. """
  217. return self._node["app_data"]["ui_data"].get("label", self._node["app_data"].get("label", None))
  218. @property
  219. def subflow_pipeline_id(self) -> Pipeline:
  220. """
  221. The Super Node pipeline reference. Only available when type is a super node.
  222. :return:
  223. """
  224. if self._node["type"] != "super_node":
  225. raise ValueError("Node must be a super_node in order to retrieve a subflow pipeline id")
  226. if "subflow_ref" in self._node:
  227. return self._node["subflow_ref"].get("pipeline_id_ref")
  228. else:
  229. return None
  230. @property
  231. def component_links(self) -> List:
  232. """
  233. Retrieve component links to other components.
  234. :return: the list of links associated with this node or an empty list if none are found
  235. """
  236. if self.type in ["execution_node", "super_node"]:
  237. return self._node["inputs"][0].get("links", [])
  238. else:
  239. # binding nodes do not contain links
  240. return []
  241. @property
  242. def component_source(self) -> Optional[str]:
  243. """
  244. Retrieve the component source path.
  245. :return: None, if the node is a generic component, the component path otherwise.
  246. """
  247. if self.type == "execution_node":
  248. return self._node["app_data"].get("component_source", None)
  249. return None
  250. def get_component_parameter(self, key: str, default_value=None) -> Any:
  251. """
  252. Retrieve component parameter values.
  253. These key/value pairs are stored in app_data.component_parameters
  254. :param key: the parameter key to be retrieved
  255. :param default_value: a default value in case the key is not found
  256. :return: the value or the default value if the key is not found
  257. """
  258. value = self._node["app_data"]["component_parameters"].get(key, default_value)
  259. return None if value == "None" else value
  260. def set_component_parameter(self, key: str, value: Any):
  261. """
  262. Update component parameter values for a given key.
  263. These key/value pairs are stored in app_data.component_parameters
  264. :param key: The parameter key to be retrieved
  265. :param value: the value to be set
  266. """
  267. if not key:
  268. raise ValueError("Key is required")
  269. if value is None:
  270. raise ValueError("Value is required")
  271. self._node["app_data"]["component_parameters"][key] = value
  272. def get_all_component_parameters(self) -> Dict[str, Any]:
  273. """
  274. Retrieve all component parameter key-value pairs.
  275. """
  276. return self._node["app_data"]["component_parameters"]
  277. def convert_kv_properties(self, kv_properties: List[str]):
  278. """
  279. Convert node-level list properties that have been identified as sets of
  280. key-value pairs from a plain list type to the KeyValueList type. If any
  281. k-v property has already been converted to a KeyValueList, all k-v
  282. properties are assumed to have already been converted.
  283. """
  284. for kv_property in kv_properties:
  285. value = self.get_component_parameter(kv_property)
  286. if not value:
  287. continue
  288. if isinstance(value, KeyValueList) or not isinstance(value[0], str):
  289. # A KeyValueList instance implies all relevant properties have already been converted
  290. # Similarly, if KeyValueList items aren't strings, this implies they have already been
  291. # converted to the appropriate data class objects
  292. return
  293. # Convert plain list to KeyValueList
  294. self.set_component_parameter(kv_property, KeyValueList(value))
  295. def remove_env_vars_with_matching_secrets(self):
  296. """
  297. In the case of a matching key between env vars and kubernetes secrets,
  298. prefer the Kubernetes Secret and remove the matching env var.
  299. """
  300. env_vars = self.get_component_parameter(ENV_VARIABLES)
  301. secrets = self.get_component_parameter(KUBERNETES_SECRETS)
  302. if isinstance(env_vars, KeyValueList) and isinstance(secrets, KeyValueList):
  303. new_list = KeyValueList.difference(minuend=env_vars, subtrahend=secrets)
  304. self.set_component_parameter(ENV_VARIABLES, new_list)
  305. def convert_data_class_properties(self):
  306. """
  307. Convert select node-level list properties to their corresponding dataclass
  308. object type. No validation is performed.
  309. """
  310. volume_mounts = self.get_component_parameter(MOUNTED_VOLUMES)
  311. if volume_mounts and isinstance(volume_mounts, KeyValueList):
  312. volume_objects = []
  313. for mount_path, pvc_name in volume_mounts.to_dict().items():
  314. formatted_mount_path = f"/{mount_path.strip('/')}"
  315. # Create a VolumeMount class instance and add to list
  316. volume_objects.append(VolumeMount(formatted_mount_path, pvc_name))
  317. self.set_component_parameter(MOUNTED_VOLUMES, volume_objects)
  318. secrets = self.get_component_parameter(KUBERNETES_SECRETS)
  319. if secrets and isinstance(secrets, KeyValueList):
  320. secret_objects = []
  321. for env_var_name, secret in secrets.to_dict().items():
  322. secret_name, *optional_key = secret.split(":", 1)
  323. secret_key = ""
  324. if optional_key:
  325. secret_key = optional_key[0].strip()
  326. # Create a KubernetesSecret class instance and add to list
  327. secret_objects.append(KubernetesSecret(env_var_name, secret_name.strip(), secret_key))
  328. self.set_component_parameter(KUBERNETES_SECRETS, secret_objects)
  329. class PipelineDefinition(object):
  330. """
  331. Represents a helper class to manipulate pipeline json structure
  332. """
  333. _pipelines: list = None
  334. _primary_pipeline: Pipeline = None
  335. _validated: bool = False
  336. _validation_issues: list = None
  337. def __init__(
  338. self,
  339. pipeline_path: Optional[str] = None,
  340. pipeline_definition: Optional[Dict] = None,
  341. validate: bool = False,
  342. ):
  343. """
  344. The constructor enables either passing a pipeline path or the content of the pipeline definition.
  345. :param pipeline_path: this is the path to a pipeline
  346. :param pipeline_definition: this is the piepline json
  347. :param validate: flag to turn validation during pipeline initialization
  348. """
  349. if not pipeline_path and not pipeline_definition:
  350. # at least one parameter should be provided
  351. raise ValueError("At least one parameter must be provided ('pipeline_path' or 'pipeline_definition')")
  352. if pipeline_path and pipeline_definition:
  353. # only one parameter should be provided
  354. raise ValueError("Only one parameter should be provided ('pipeline_path' or 'pipeline_definition')")
  355. if pipeline_path:
  356. # supporting loading pipeline from file
  357. if not os.path.exists(pipeline_path):
  358. raise ValueError(f"Pipeline file not found: '{pipeline_path}'\n")
  359. with open(pipeline_path) as f:
  360. try:
  361. self._pipeline_definition = json.load(f)
  362. except ValueError as ve:
  363. raise ValueError(f"Pipeline file is invalid: \n {ve}")
  364. else:
  365. # supporting passing the pipeline definition directly
  366. self._pipeline_definition = pipeline_definition
  367. if validate:
  368. self.validate()
  369. self.propagate_pipeline_default_properties()
  370. @property
  371. def id(self) -> str:
  372. """
  373. The pipeline definition id
  374. :return: the unid
  375. """
  376. return self._pipeline_definition.get("id")
  377. @property
  378. def schema_version(self) -> str:
  379. """
  380. The schema used by the Pipeline definition
  381. :return: the version
  382. """
  383. return self._pipeline_definition.get("version")
  384. @property
  385. def pipelines(self) -> list:
  386. """
  387. The list of pipelines defined in the pipeline definition
  388. :return: the list of pipelines
  389. """
  390. if not self._pipelines:
  391. if "pipelines" not in self._pipeline_definition:
  392. raise ValueError("Pipeline is missing 'pipelines' field.")
  393. elif len(self._pipeline_definition["pipelines"]) == 0:
  394. raise ValueError("Pipeline has zero length 'pipelines' field.")
  395. pipelines: list = list()
  396. for pipeline in self._pipeline_definition["pipelines"]:
  397. pipelines.append(Pipeline(pipeline))
  398. self._pipelines = pipelines
  399. return self._pipelines
  400. @property
  401. def primary_pipeline(self) -> Pipeline:
  402. """
  403. The primary pipeline associated with this pipeline definition
  404. :return: the primary pipeline
  405. """
  406. if not self._primary_pipeline:
  407. if "pipelines" not in self._pipeline_definition:
  408. raise ValueError("Pipeline is missing 'pipelines' field.")
  409. elif len(self._pipeline_definition["pipelines"]) == 0:
  410. raise ValueError("Pipeline has zero length 'pipelines' field.")
  411. # Find primary pipeline
  412. self._primary_pipeline = self.get_pipeline_definition(self._pipeline_definition.get("primary_pipeline"))
  413. assert self._primary_pipeline is not None, "No primary pipeline was found"
  414. return self._primary_pipeline
  415. @property
  416. def pipeline_nodes(self) -> List[Node]:
  417. """
  418. All nodes of all pipelines associated with a pipeline definition
  419. """
  420. return [node for pipeline in self.pipelines for node in pipeline.nodes]
  421. def validate(self) -> list:
  422. """
  423. Validates the pipeline definition structure and semantics
  424. :return: the list of issues found
  425. """
  426. # If it has been validated before
  427. if self._validated:
  428. # return current list of issues
  429. return self._validation_issues
  430. # Has not been validated before
  431. validation_issues = []
  432. # Validate pipeline schema version
  433. if "version" not in self._pipeline_definition:
  434. validation_issues.append("Pipeline schema version field is missing.")
  435. elif not isinstance(self._pipeline_definition["version"], str):
  436. validation_issues.append("Pipeline schema version field should be a string.")
  437. # Validate pipelines
  438. if "pipelines" not in self._pipeline_definition:
  439. validation_issues.append("Pipeline is missing 'pipelines' field.")
  440. elif not isinstance(self._pipeline_definition["pipelines"], list):
  441. validation_issues.append("Field 'pipelines' should be a list.")
  442. elif len(self._pipeline_definition["pipelines"]) == 0:
  443. validation_issues.append("Pipeline has zero length 'pipelines' field.")
  444. # Validate primary pipeline
  445. if "primary_pipeline" not in self._pipeline_definition:
  446. validation_issues.append("Could not determine the primary pipeline.")
  447. elif not isinstance(self._pipeline_definition["primary_pipeline"], str):
  448. validation_issues.append("Field 'primary_pipeline' should be a string.")
  449. primary_pipeline = self.get_pipeline_definition(self._pipeline_definition.get("primary_pipeline"))
  450. if not primary_pipeline:
  451. validation_issues.append("No primary pipeline was found")
  452. else:
  453. primary_pipeline = primary_pipeline.to_dict()
  454. # Validate primary pipeline structure
  455. if "app_data" not in primary_pipeline:
  456. validation_issues.append("Primary pipeline is missing the 'app_data' field.")
  457. else:
  458. if "version" not in primary_pipeline["app_data"]:
  459. validation_issues.append("Primary pipeline is missing the 'version' field.")
  460. if "properties" not in primary_pipeline["app_data"]:
  461. validation_issues.append("Node is missing 'properties' field.")
  462. elif len(primary_pipeline["app_data"]["properties"]) == 0:
  463. validation_issues.append("Pipeline has zero length 'properties' field.")
  464. if "nodes" not in primary_pipeline or len(primary_pipeline["nodes"]) == 0:
  465. validation_issues.append("At least one node must exist in the primary pipeline.")
  466. else:
  467. for node in primary_pipeline["nodes"]:
  468. if "component_parameters" not in node["app_data"]:
  469. validation_issues.append("Node is missing 'component_parameters' field")
  470. return validation_issues
  471. def propagate_pipeline_default_properties(self):
  472. """
  473. For any default pipeline properties set (e.g. runtime image, volume), propagate
  474. the values to any nodes that do not set their own value for that property.
  475. """
  476. # Convert any key-value list pipeline default properties to the KeyValueList type
  477. kv_properties = PipelineDefinition.get_kv_properties()
  478. self.primary_pipeline.convert_kv_properties(kv_properties)
  479. pipeline_default_properties = self.primary_pipeline.get_property(PIPELINE_DEFAULTS, {})
  480. for node in self.pipeline_nodes:
  481. if not Operation.is_generic_operation(node.op):
  482. continue
  483. # Convert any key-value list node properties to the KeyValueList type if not done already
  484. node.convert_kv_properties(kv_properties)
  485. for property_name, pipeline_default_value in pipeline_default_properties.items():
  486. if not pipeline_default_value:
  487. continue
  488. node_value = node.get_component_parameter(property_name)
  489. if not node_value:
  490. node.set_component_parameter(property_name, pipeline_default_value)
  491. continue
  492. if isinstance(pipeline_default_value, KeyValueList) and isinstance(node_value, KeyValueList):
  493. merged_list = KeyValueList.merge(node_value, pipeline_default_value)
  494. node.set_component_parameter(property_name, merged_list)
  495. if self.primary_pipeline.runtime_config != "local":
  496. node.remove_env_vars_with_matching_secrets()
  497. node.convert_data_class_properties()
  498. def is_valid(self) -> bool:
  499. """
  500. Represents whether or not the pipeline structure is valid
  501. :return: True for a valid pipeline definition
  502. """
  503. return len(self.validate()) == 0
  504. def to_dict(self) -> Dict:
  505. """
  506. The raw contents of the pipeline definition json
  507. :rtype: object
  508. """
  509. return self._pipeline_definition
  510. def get_pipeline_definition(self, pipeline_id) -> Any:
  511. """
  512. Retrieve a given pipeline from the pipeline definition
  513. :param pipeline_id: the pipeline unique identifier
  514. :return: the pipeline or None
  515. """
  516. if "pipelines" in self._pipeline_definition:
  517. for pipeline in self._pipeline_definition["pipelines"]:
  518. if pipeline["id"] == pipeline_id:
  519. return Pipeline(pipeline)
  520. return None
  521. def get_node(self, node_id: str):
  522. """
  523. Given a node id returns the associated node object in the pipeline
  524. :param node_id: the node id
  525. :return: the node object or None
  526. """
  527. for pipeline in self._pipelines:
  528. for node in pipeline.nodes:
  529. if node.id == node_id:
  530. return node
  531. return None
  532. def get_node_comments(self, node_id: str) -> Optional[str]:
  533. """
  534. Given a node id returns the assoicated comments in the pipeline
  535. :param node_id: the node id
  536. :return: the comments or None
  537. """
  538. comments = []
  539. for pipeline in self.pipelines:
  540. comment_list = pipeline.comments
  541. for comment in comment_list:
  542. associated_node_id_list = comment.get("associated_id_refs", [])
  543. for ref in associated_node_id_list:
  544. if ref["node_ref"] == node_id:
  545. comments.append(comment.get("content", ""))
  546. # remove empty (or whitespace-only) comment strings
  547. comments = [c for c in comments if c.strip()]
  548. comment_str = "\n\n".join(comments)
  549. if not comment_str:
  550. return None
  551. return comment_str
  552. def get_supernodes(self) -> List[Node]:
  553. """
  554. Returns a list of all supernodes in the pipeline
  555. :return:
  556. """
  557. supernode_list = []
  558. for pipeline in self._pipelines:
  559. for node in pipeline.nodes:
  560. if node.type == "super_node":
  561. supernode_list.append(node)
  562. return supernode_list
  563. @staticmethod
  564. def get_canvas_properties_from_template(package_name: str, template_name: str) -> Dict[str, Any]:
  565. """
  566. Retrieves the dict representation of the canvas-formatted properties
  567. associated with the given template and package names. Rendering does
  568. not require parameters as expressions are not evaluated due to the
  569. SilentUndefined class.
  570. """
  571. loader = PackageLoader("elyra", package_name)
  572. template_env = Environment(loader=loader, undefined=SilentUndefined)
  573. template = template_env.get_template(template_name)
  574. output = template.render()
  575. return json.loads(output)
  576. @staticmethod
  577. def get_kv_properties() -> List[str]:
  578. """
  579. Get pipeline properties in its canvas form and loop through to
  580. find those that should consist of key/value pairs, as given in
  581. the 'keyValueEntries' key.
  582. """
  583. canvas_pipeline_properties = PipelineDefinition.get_canvas_properties_from_template(
  584. package_name="templates/pipeline", template_name="pipeline_properties_template.jinja2"
  585. )
  586. kv_properties = []
  587. parameter_info = canvas_pipeline_properties.get("uihints", {}).get("parameter_info", [])
  588. for parameter in parameter_info:
  589. if parameter.get("data", {}).get("keyValueEntries", False):
  590. parameter_ref = parameter.get("parameter_ref", "")
  591. if parameter_ref.startswith("elyra_"):
  592. parameter_ref = parameter_ref.replace("elyra_", "")
  593. kv_properties.append(parameter_ref)
  594. return kv_properties
  595. class SilentUndefined(Undefined):
  596. """
  597. A subclass of the jinja2.Undefined class used to represent undefined
  598. values in the template. Undefined errors as a result of the evaluation
  599. of expressions will fail silently and render as null.
  600. """
  601. def _fail_with_undefined_error(self, *args, **kwargs):
  602. return None