123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- #
- # Copyright 2018-2022 Elyra Authors
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- import json
- import os
- import shutil
- from typing import Any
- from typing import List
- from typing import Optional
- import uuid
- from elyra.pipeline.pipeline import GenericOperation
- from elyra.pipeline.pipeline import Pipeline
- def _read_pipeline_resource(pipeline_filename):
- root = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
- pipeline_path = os.path.join(root, pipeline_filename)
- with open(pipeline_path, "r") as f:
- pipeline_json = json.load(f)
- return pipeline_json
- class NodeBase(object):
- """Represents a node of a constructed pipeline based on files in resources/node_util."""
- id: str
- name: str
- outputs: List[str]
- dependencies: List[str]
- env_vars: List[str]
- image_name: str
- fail: bool
- # from subclasses
- classifier: str
- filename: str
- pipeline_name: str # Set during pipeline construction
- def __init__(
- self,
- name: str,
- num_outputs: Optional[int] = 0,
- input_nodes: Optional[List[Any]] = None,
- image_name: Optional[str] = None,
- fail: Optional[bool] = False,
- ):
- self.id = str(uuid.uuid4())
- self.name = name
- self.fail = fail
- self.image_name = image_name
- self.outputs = []
- for i in range(1, num_outputs + 1):
- self.outputs.append(f"{self.name}_{i}.out")
- self.inputs = []
- self.parent_operations = []
- if input_nodes:
- for node in input_nodes:
- self.inputs.extend(node.outputs)
- self.parent_operations.append(node.id)
- self.dependencies = ["node_util/*"]
- def get_operation(self) -> GenericOperation:
- self.env_vars = []
- if self.fail: # NODE_FILENAME is required, so skip if triggering failure
- if "NODE_FILENAME" in os.environ: # remove entry if present
- os.environ.pop("NODE_FILENAME")
- else:
- self.env_vars.append(f"NODE_FILENAME={self.filename}")
- if self.inputs:
- self.env_vars.append(f"INPUT_FILENAMES={';'.join(self.inputs)}")
- if self.outputs:
- self.env_vars.append(f"OUTPUT_FILENAMES={';'.join(self.outputs)}")
- # Convey the pipeline name
- assert self.pipeline_name is not None, "Pipeline name has not been set during construction!"
- self.env_vars.append(f"PIPELINE_NAME={self.pipeline_name}")
- # Add system-owned here with bogus or no value...
- self.env_vars.append("ELYRA_RUNTIME_ENV=bogus_runtime")
- component_parameters = {
- "filename": self.filename,
- "runtime_image": self.image_name or "NA",
- "dependencies": self.dependencies,
- "env_vars": self.env_vars,
- "inputs": self.inputs,
- "outputs": self.outputs,
- }
- return GenericOperation(
- self.id,
- "execution_node",
- self.name,
- self.classifier,
- parent_operation_ids=self.parent_operations,
- component_params=component_parameters,
- )
- class NotebookNode(NodeBase):
- def __init__(
- self,
- name: str,
- num_outputs: Optional[int] = 0,
- input_nodes: Optional[List[Any]] = None,
- image_name: Optional[str] = None,
- fail: Optional[bool] = False,
- ):
- super().__init__(name, num_outputs=num_outputs, input_nodes=input_nodes, image_name=image_name, fail=fail)
- self.classifier = "execute-notebook-node"
- self.filename = f"{self.name}.ipynb"
- class PythonNode(NodeBase):
- def __init__(
- self,
- name: str,
- num_outputs: Optional[int] = 0,
- input_nodes: Optional[List[Any]] = None,
- image_name: Optional[str] = None,
- fail: Optional[bool] = False,
- ):
- super().__init__(name, num_outputs=num_outputs, input_nodes=input_nodes, image_name=image_name, fail=fail)
- self.classifier = "execute-python-node"
- self.filename = f"{self.name}.py"
- def construct_pipeline(
- name: str,
- nodes: List[NodeBase],
- location: str,
- runtime_type: Optional[str] = "local",
- runtime_config: Optional[str] = "local",
- ) -> Pipeline:
- """Returns an instance of a local Pipeline consisting of each node and populates the
- specified location with the necessary files to run the pipeline from that location.
- """
- pipeline = Pipeline(str(uuid.uuid4()), name, runtime_type, runtime_config)
- for node in nodes:
- node.pipeline_name = name
- pipeline.operations[node.id] = node.get_operation()
- # copy the node file into the "working directory"
- if isinstance(node, NotebookNode):
- src_file = os.path.join(os.path.dirname(__file__), "resources/node_util/node.ipynb")
- elif isinstance(node, PythonNode):
- src_file = os.path.join(os.path.dirname(__file__), "resources/node_util/node.py")
- else:
- assert False, f"Invalid node type detected: {node.__class__.__name__}"
- shutil.copy(src_file, os.path.join(location, node.filename))
- # copy the node_util directory into the "working directory"
- shutil.copytree(os.path.join(os.path.dirname(__file__), "resources/node_util"), os.path.join(location, "node_util"))
- return pipeline
|