parser.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from typing import Dict
  17. from typing import List
  18. from typing import Optional
  19. from traitlets.config import LoggingConfigurable
  20. from elyra.pipeline.pipeline import Operation
  21. from elyra.pipeline.pipeline import Pipeline
  22. from elyra.pipeline.pipeline_definition import Node
  23. from elyra.pipeline.pipeline_definition import PipelineDefinition
  24. class PipelineParser(LoggingConfigurable):
  25. def __init__(self, root_dir="", **kwargs):
  26. super().__init__(**kwargs)
  27. self.root_dir = root_dir
  28. def parse(self, pipeline_json: Dict) -> Pipeline:
  29. """
  30. The pipeline definitions allow for defining multiple pipelines in one json file.
  31. When super_nodes are used, their node actually references another pipeline in the
  32. set of pipeline definitions - which is "flattened" into the overall pipeline object's
  33. list of operations.
  34. """
  35. try:
  36. pipeline_definition = PipelineDefinition(pipeline_definition=pipeline_json)
  37. primary_pipeline = pipeline_definition.primary_pipeline
  38. except Exception as e:
  39. raise ValueError(f"Invalid Pipeline: {e}")
  40. # runtime info is only present on primary pipeline...
  41. runtime = primary_pipeline.runtime
  42. if not runtime:
  43. raise ValueError("Invalid pipeline: Missing runtime.")
  44. runtime_config = primary_pipeline.runtime_config
  45. if not runtime_config:
  46. raise ValueError("Invalid pipeline: Missing runtime configuration.")
  47. source = primary_pipeline.source
  48. description = primary_pipeline.get_property("description")
  49. pipeline_object = Pipeline(
  50. id=primary_pipeline.id,
  51. name=primary_pipeline.name,
  52. runtime=runtime,
  53. runtime_config=runtime_config,
  54. source=source,
  55. description=description,
  56. pipeline_parameters=primary_pipeline.pipeline_parameters,
  57. )
  58. self._nodes_to_operations(pipeline_definition, pipeline_object, primary_pipeline.nodes)
  59. return pipeline_object
  60. def _nodes_to_operations(
  61. self,
  62. pipeline_definition: PipelineDefinition,
  63. pipeline_object: Pipeline,
  64. nodes: List[Node],
  65. super_node: Optional[Node] = None,
  66. ) -> None:
  67. """
  68. Converts each execution_node of the pipeline to its corresponding operation.
  69. If a super_node is encountered recursion is used to process its embedded nodes.
  70. If the super_node has binding nodes, those "nodes" are ignored since we handle
  71. their "functionality" by parsing the port_id_ref field to determine the node_id
  72. of the embedded node.
  73. If any node types other than execution_node, super_node or binding are encountered,
  74. a ValueError is raised indicating the unknown node type.
  75. Since the pipeline_object's operations list is updated, this method does not return a value.
  76. """
  77. for node in nodes:
  78. # Super_nodes trigger recursion
  79. if node.type == "super_node":
  80. self._super_node_to_operations(pipeline_definition, node, pipeline_object, node)
  81. continue # skip to next node
  82. elif node.type == "binding": # We can ignore binding nodes since we're able to determine links w/o
  83. continue
  84. elif node.type == "model_node":
  85. raise NotImplementedError(f"Node type '{node.type}' is currently not supported!")
  86. elif node.type != "execution_node":
  87. raise ValueError(f"Node type '{node.type}' is invalid!")
  88. # parse each node as a pipeline operation
  89. operation = self._create_pipeline_operation(node, super_node)
  90. # assoicate user comment as docs to operations
  91. comment = pipeline_definition.get_node_comments(node.id)
  92. if comment:
  93. operation.doc = comment
  94. self.log.debug(f"Adding operation for '{operation.name}' to pipeline: {pipeline_object.name}")
  95. pipeline_object.operations[operation.id] = operation
  96. def _super_node_to_operations(
  97. self, pipeline_definition: PipelineDefinition, node: Node, pipeline_object: Pipeline, super_node: Node
  98. ) -> None:
  99. """Converts nodes within a super_node to operations."""
  100. # get pipeline corresponding to super_node
  101. pipeline_id = node.subflow_pipeline_id
  102. pipeline = pipeline_definition.get_pipeline_definition(pipeline_id)
  103. # recurse to process nodes of super-node
  104. return self._nodes_to_operations(pipeline_definition, pipeline_object, pipeline.nodes, super_node)
  105. def _create_pipeline_operation(self, node: Node, super_node: Node = None) -> Operation:
  106. """
  107. Creates a pipeline operation instance from the given node.
  108. The node and super_node are used to build the list of parent_operation_ids (links) to
  109. the node (operation dependencies).
  110. """
  111. parent_operations = PipelineParser._get_parent_operation_links(node.to_dict()) # parse links as dependencies
  112. if super_node: # gather parent-links tied to embedded nodes inputs
  113. parent_operations.extend(PipelineParser._get_parent_operation_links(super_node.to_dict(), node.id))
  114. return Operation.create_instance(
  115. id=node.id,
  116. type=node.type,
  117. classifier=node.op,
  118. name=node.label,
  119. parent_operation_ids=parent_operations,
  120. component_params=node.get("component_parameters", {}),
  121. )
  122. @staticmethod
  123. def _get_port_node_id(link: Dict) -> [None, str]:
  124. """
  125. Gets the id of the node corresponding to the linked out port.
  126. If the link is on a super_node the appropriate node_id is actually
  127. embedded in the port_id_ref value.
  128. """
  129. node_id = None
  130. if "port_id_ref" in link:
  131. if link["port_id_ref"] == "outPort": # Regular execution node
  132. if "node_id_ref" in link:
  133. node_id = link["node_id_ref"]
  134. elif link["port_id_ref"].endswith("_outPort"): # Super node
  135. # node_id_ref is the super-node, but the prefix of port_id_ref, is the actual node-id
  136. node_id = link["port_id_ref"].split("_")[0]
  137. return node_id
  138. @staticmethod
  139. def _get_input_node_ids(node_input: Dict) -> List[str]:
  140. """
  141. Gets a list of node_ids corresponding to the linked out ports on the input node.
  142. """
  143. input_node_ids = []
  144. if "links" in node_input:
  145. for link in node_input["links"]:
  146. node_id = PipelineParser._get_port_node_id(link)
  147. if node_id:
  148. input_node_ids.append(node_id)
  149. return input_node_ids
  150. @staticmethod
  151. def _get_parent_operation_links(node: Dict, embedded_node_id: Optional[str] = None) -> List[str]:
  152. """
  153. Gets a list nodes_ids corresponding to parent nodes (outputs directed to this node).
  154. For super_nodes, the node to use has an id of the embedded_node_id suffixed with '_inPort'.
  155. """
  156. links = []
  157. if "inputs" in node:
  158. for node_input in node["inputs"]:
  159. if embedded_node_id: # node is a super_node, handle matches to {embedded_node_id}_inPort
  160. input_id = node_input.get("id")
  161. if input_id == embedded_node_id + "_inPort":
  162. links.extend(PipelineParser._get_input_node_ids(node_input))
  163. else:
  164. links.extend(PipelineParser._get_input_node_ids(node_input))
  165. return links