util.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import json
  17. import os
  18. import shutil
  19. from typing import Any
  20. from typing import List
  21. from typing import Optional
  22. import uuid
  23. from elyra.pipeline.pipeline import GenericOperation
  24. from elyra.pipeline.pipeline import Pipeline
  25. def _read_pipeline_resource(pipeline_filename):
  26. root = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
  27. pipeline_path = os.path.join(root, pipeline_filename)
  28. with open(pipeline_path, "r") as f:
  29. pipeline_json = json.load(f)
  30. return pipeline_json
  31. class NodeBase(object):
  32. """Represents a node of a constructed pipeline based on files in resources/node_util."""
  33. id: str
  34. name: str
  35. outputs: List[str]
  36. dependencies: List[str]
  37. env_vars: List[str]
  38. image_name: str
  39. fail: bool
  40. # from subclasses
  41. classifier: str
  42. filename: str
  43. pipeline_name: str # Set during pipeline construction
  44. def __init__(
  45. self,
  46. name: str,
  47. num_outputs: Optional[int] = 0,
  48. input_nodes: Optional[List[Any]] = None,
  49. image_name: Optional[str] = None,
  50. fail: Optional[bool] = False,
  51. ):
  52. self.id = str(uuid.uuid4())
  53. self.name = name
  54. self.fail = fail
  55. self.image_name = image_name
  56. self.outputs = []
  57. for i in range(1, num_outputs + 1):
  58. self.outputs.append(f"{self.name}_{i}.out")
  59. self.inputs = []
  60. self.parent_operations = []
  61. if input_nodes:
  62. for node in input_nodes:
  63. self.inputs.extend(node.outputs)
  64. self.parent_operations.append(node.id)
  65. self.dependencies = ["node_util/*"]
  66. def get_operation(self) -> GenericOperation:
  67. self.env_vars = []
  68. if self.fail: # NODE_FILENAME is required, so skip if triggering failure
  69. if "NODE_FILENAME" in os.environ: # remove entry if present
  70. os.environ.pop("NODE_FILENAME")
  71. else:
  72. self.env_vars.append(f"NODE_FILENAME={self.filename}")
  73. if self.inputs:
  74. self.env_vars.append(f"INPUT_FILENAMES={';'.join(self.inputs)}")
  75. if self.outputs:
  76. self.env_vars.append(f"OUTPUT_FILENAMES={';'.join(self.outputs)}")
  77. # Convey the pipeline name
  78. assert self.pipeline_name is not None, "Pipeline name has not been set during construction!"
  79. self.env_vars.append(f"PIPELINE_NAME={self.pipeline_name}")
  80. # Add system-owned here with bogus or no value...
  81. self.env_vars.append("ELYRA_RUNTIME_ENV=bogus_runtime")
  82. component_parameters = {
  83. "filename": self.filename,
  84. "runtime_image": self.image_name or "NA",
  85. "dependencies": self.dependencies,
  86. "env_vars": self.env_vars,
  87. "inputs": self.inputs,
  88. "outputs": self.outputs,
  89. }
  90. return GenericOperation(
  91. self.id,
  92. "execution_node",
  93. self.name,
  94. self.classifier,
  95. parent_operation_ids=self.parent_operations,
  96. component_params=component_parameters,
  97. )
  98. class NotebookNode(NodeBase):
  99. def __init__(
  100. self,
  101. name: str,
  102. num_outputs: Optional[int] = 0,
  103. input_nodes: Optional[List[Any]] = None,
  104. image_name: Optional[str] = None,
  105. fail: Optional[bool] = False,
  106. ):
  107. super().__init__(name, num_outputs=num_outputs, input_nodes=input_nodes, image_name=image_name, fail=fail)
  108. self.classifier = "execute-notebook-node"
  109. self.filename = f"{self.name}.ipynb"
  110. class PythonNode(NodeBase):
  111. def __init__(
  112. self,
  113. name: str,
  114. num_outputs: Optional[int] = 0,
  115. input_nodes: Optional[List[Any]] = None,
  116. image_name: Optional[str] = None,
  117. fail: Optional[bool] = False,
  118. ):
  119. super().__init__(name, num_outputs=num_outputs, input_nodes=input_nodes, image_name=image_name, fail=fail)
  120. self.classifier = "execute-python-node"
  121. self.filename = f"{self.name}.py"
  122. def construct_pipeline(
  123. name: str,
  124. nodes: List[NodeBase],
  125. location: str,
  126. runtime_type: Optional[str] = "local",
  127. runtime_config: Optional[str] = "local",
  128. ) -> Pipeline:
  129. """Returns an instance of a local Pipeline consisting of each node and populates the
  130. specified location with the necessary files to run the pipeline from that location.
  131. """
  132. pipeline = Pipeline(str(uuid.uuid4()), name, runtime_type, runtime_config)
  133. for node in nodes:
  134. node.pipeline_name = name
  135. pipeline.operations[node.id] = node.get_operation()
  136. # copy the node file into the "working directory"
  137. if isinstance(node, NotebookNode):
  138. src_file = os.path.join(os.path.dirname(__file__), "resources/node_util/node.ipynb")
  139. elif isinstance(node, PythonNode):
  140. src_file = os.path.join(os.path.dirname(__file__), "resources/node_util/node.py")
  141. else:
  142. assert False, f"Invalid node type detected: {node.__class__.__name__}"
  143. shutil.copy(src_file, os.path.join(location, node.filename))
  144. # copy the node_util directory into the "working directory"
  145. shutil.copytree(os.path.join(os.path.dirname(__file__), "resources/node_util"), os.path.join(location, "node_util"))
  146. return pipeline