123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- #
- # Copyright 2018-2022 Elyra Authors
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- from abc import ABC
- from abc import abstractmethod
- import json
- import os
- from typing import List
- class NodeFile(object):
- """Base class for input and output node files"""
- def __init__(self, filename: str) -> None:
- self.filename = filename
- class InputNodeFile(NodeFile):
- """Given a filename, it ensures the file exists and can read its contents."""
- def __init__(self, filename: str) -> None:
- super().__init__(filename)
- self.data = None
- if not os.path.exists(self.filename):
- raise FileNotFoundError(f"File '{self.filename}' does not exist!")
- def read(self) -> str:
- with open(self.filename) as f:
- self.data = f.read()
- return self.data
- def data(self) -> str:
- return self.data
- class OutputNodeFile(NodeFile):
- """Given a filename, it ensures the file does not exist and will write data to that file."""
- def __init__(self, filename: str) -> None:
- super().__init__(filename)
- # Don't enforce output file existence here - break idempotency
- # if os.path.exists(self.filename):
- # raise FileExistsError(f"File '{self.filename}' already exists!")
- def write(self, data) -> None:
- with open(self.filename, "w+") as f:
- f.write(data)
- class ExecutionNode(ABC):
- """Represents an excutable node of a pipeline. This class must be subclassed."""
- node_name = None
- filename = None
- extension = None
- def __init__(self) -> None:
- self.filename = os.getenv("NODE_FILENAME")
- if not self.filename:
- raise ValueError("NODE_FILENAME environment variable must be set!")
- node_file_splits = os.path.basename(self.filename).split(".")
- self.node_name = node_file_splits[0]
- self.extension = node_file_splits[1]
- self.validate()
- def validate(self) -> None:
- """Validate the filename as best as possible, depending on subclass."""
- # Validate its extension and that the file exists.
- self.validate_extension()
- if not os.path.exists(self.filename):
- raise FileNotFoundError(f"ExecutionNode filename '{self.filename}' does not exist!")
- def run(self) -> None:
- self.process_inputs("INPUT_FILENAMES")
- self.perform_experiment()
- self.process_outputs("OUTPUT_FILENAMES")
- def perform_experiment(self) -> None:
- """Emulates the experiment to run."""
- print(f"NODE_NAME: {self.node_name}")
- runtime_env = os.getenv("ELYRA_RUNTIME_ENV")
- assert runtime_env == "local", "ELYRA_RUNTIME_ENV has not been set to 'local'!"
- print(f"ELYRA_RUNTIME_ENV: {runtime_env}")
- run_name = os.getenv("ELYRA_RUN_NAME")
- assert run_name is not None, "ELYRA_RUN_NAME is not set!"
- print(f"ELYRA_RUN_NAME: {run_name}")
- pipeline_name = os.getenv("PIPELINE_NAME")
- print(f"PIPELINE_NAME: {pipeline_name}")
- assert pipeline_name is not None, "PIPELINE_NAME is not set!"
- assert run_name.startswith(pipeline_name), "ELYRA_RUN_NAME does not start with pipeline name!"
- def process_inputs(self, env_var: str) -> List[InputNodeFile]:
- """Given an environment variable `env_var`, that contains a SEMI-COLON-separated
- list of filenames, it processes each entry by instantiating an instance of
- InputNodeFile corresponding to each entry and returns the list of instances.
- """
- inputs = []
- filenames = os.getenv(env_var, "").split(";")
- for filename in filenames:
- if filename:
- inputs.append(InputNodeFile(filename))
- for input_file in inputs:
- payload = json.loads(input_file.read())
- print(f"FROM: {payload.get('node')}")
- assert payload.get("run_name") == os.getenv("ELYRA_RUN_NAME")
- return inputs
- def process_outputs(self, env_var: str) -> List[OutputNodeFile]:
- """Given an environment variable `env_var`, that contains a SEMI-COLON-separated
- list of filenames, it processes each entry by instantiating an instance of
- OutputNodeFile corresponding to each entry and returns the list of instances.
- """
- outputs = []
- filenames = os.getenv(env_var, "").split(";")
- for filename in filenames:
- if filename:
- outputs.append(OutputNodeFile(filename))
- # Include ELYRA_RUN_NAME in all outputs - which are verified when used as inputs
- payload = {"node": self.node_name, "run_name": os.getenv("ELYRA_RUN_NAME")}
- for output_file in outputs:
- output_file.write(json.dumps(payload))
- return outputs
- @abstractmethod
- def expected_extension(self) -> str:
- raise NotImplementedError(
- f"Method 'expected_extension()' must be implemented by subclass '{self.__class__.__name__}'!"
- )
- def validate_extension(self) -> None:
- if self.expected_extension() != self.extension:
- raise ValueError(
- f"Filename '{self.filename}' does not have a proper extension: '{self.expected_extension()}'"
- )
- class NotebookNode(ExecutionNode):
- """Represents a Notebook execution node of a pipeline."""
- def expected_extension(self) -> str:
- return "ipynb"
- def validate(self) -> None:
- """For notebooks, we can also ensure the file can be loaded as JSON."""
- super().validate()
- # Confirm file can be loaded as JSON
- with open(self.filename) as f:
- json.load(f)
- class PythonNode(ExecutionNode):
- """Represents a Python file execution node of a pipeline."""
- def expected_extension(self) -> str:
- return "py"
|