node_util.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from abc import ABC
  17. from abc import abstractmethod
  18. import json
  19. import os
  20. from typing import List
  21. class NodeFile(object):
  22. """Base class for input and output node files"""
  23. def __init__(self, filename: str) -> None:
  24. self.filename = filename
  25. class InputNodeFile(NodeFile):
  26. """Given a filename, it ensures the file exists and can read its contents."""
  27. def __init__(self, filename: str) -> None:
  28. super().__init__(filename)
  29. self.data = None
  30. if not os.path.exists(self.filename):
  31. raise FileNotFoundError(f"File '{self.filename}' does not exist!")
  32. def read(self) -> str:
  33. with open(self.filename) as f:
  34. self.data = f.read()
  35. return self.data
  36. def data(self) -> str:
  37. return self.data
  38. class OutputNodeFile(NodeFile):
  39. """Given a filename, it ensures the file does not exist and will write data to that file."""
  40. def __init__(self, filename: str) -> None:
  41. super().__init__(filename)
  42. # Don't enforce output file existence here - break idempotency
  43. # if os.path.exists(self.filename):
  44. # raise FileExistsError(f"File '{self.filename}' already exists!")
  45. def write(self, data) -> None:
  46. with open(self.filename, "w+") as f:
  47. f.write(data)
  48. class ExecutionNode(ABC):
  49. """Represents an excutable node of a pipeline. This class must be subclassed."""
  50. node_name = None
  51. filename = None
  52. extension = None
  53. def __init__(self) -> None:
  54. self.filename = os.getenv("NODE_FILENAME")
  55. if not self.filename:
  56. raise ValueError("NODE_FILENAME environment variable must be set!")
  57. node_file_splits = os.path.basename(self.filename).split(".")
  58. self.node_name = node_file_splits[0]
  59. self.extension = node_file_splits[1]
  60. self.validate()
  61. def validate(self) -> None:
  62. """Validate the filename as best as possible, depending on subclass."""
  63. # Validate its extension and that the file exists.
  64. self.validate_extension()
  65. if not os.path.exists(self.filename):
  66. raise FileNotFoundError(f"ExecutionNode filename '{self.filename}' does not exist!")
  67. def run(self) -> None:
  68. self.process_inputs("INPUT_FILENAMES")
  69. self.perform_experiment()
  70. self.process_outputs("OUTPUT_FILENAMES")
  71. def perform_experiment(self) -> None:
  72. """Emulates the experiment to run."""
  73. print(f"NODE_NAME: {self.node_name}")
  74. runtime_env = os.getenv("ELYRA_RUNTIME_ENV")
  75. assert runtime_env == "local", "ELYRA_RUNTIME_ENV has not been set to 'local'!"
  76. print(f"ELYRA_RUNTIME_ENV: {runtime_env}")
  77. run_name = os.getenv("ELYRA_RUN_NAME")
  78. assert run_name is not None, "ELYRA_RUN_NAME is not set!"
  79. print(f"ELYRA_RUN_NAME: {run_name}")
  80. pipeline_name = os.getenv("PIPELINE_NAME")
  81. print(f"PIPELINE_NAME: {pipeline_name}")
  82. assert pipeline_name is not None, "PIPELINE_NAME is not set!"
  83. assert run_name.startswith(pipeline_name), "ELYRA_RUN_NAME does not start with pipeline name!"
  84. def process_inputs(self, env_var: str) -> List[InputNodeFile]:
  85. """Given an environment variable `env_var`, that contains a SEMI-COLON-separated
  86. list of filenames, it processes each entry by instantiating an instance of
  87. InputNodeFile corresponding to each entry and returns the list of instances.
  88. """
  89. inputs = []
  90. filenames = os.getenv(env_var, "").split(";")
  91. for filename in filenames:
  92. if filename:
  93. inputs.append(InputNodeFile(filename))
  94. for input_file in inputs:
  95. payload = json.loads(input_file.read())
  96. print(f"FROM: {payload.get('node')}")
  97. assert payload.get("run_name") == os.getenv("ELYRA_RUN_NAME")
  98. return inputs
  99. def process_outputs(self, env_var: str) -> List[OutputNodeFile]:
  100. """Given an environment variable `env_var`, that contains a SEMI-COLON-separated
  101. list of filenames, it processes each entry by instantiating an instance of
  102. OutputNodeFile corresponding to each entry and returns the list of instances.
  103. """
  104. outputs = []
  105. filenames = os.getenv(env_var, "").split(";")
  106. for filename in filenames:
  107. if filename:
  108. outputs.append(OutputNodeFile(filename))
  109. # Include ELYRA_RUN_NAME in all outputs - which are verified when used as inputs
  110. payload = {"node": self.node_name, "run_name": os.getenv("ELYRA_RUN_NAME")}
  111. for output_file in outputs:
  112. output_file.write(json.dumps(payload))
  113. return outputs
  114. @abstractmethod
  115. def expected_extension(self) -> str:
  116. raise NotImplementedError(
  117. f"Method 'expected_extension()' must be implemented by subclass '{self.__class__.__name__}'!"
  118. )
  119. def validate_extension(self) -> None:
  120. if self.expected_extension() != self.extension:
  121. raise ValueError(
  122. f"Filename '{self.filename}' does not have a proper extension: '{self.expected_extension()}'"
  123. )
  124. class NotebookNode(ExecutionNode):
  125. """Represents a Notebook execution node of a pipeline."""
  126. def expected_extension(self) -> str:
  127. return "ipynb"
  128. def validate(self) -> None:
  129. """For notebooks, we can also ensure the file can be loaded as JSON."""
  130. super().validate()
  131. # Confirm file can be loaded as JSON
  132. with open(self.filename) as f:
  133. json.load(f)
  134. class PythonNode(ExecutionNode):
  135. """Represents a Python file execution node of a pipeline."""
  136. def expected_extension(self) -> str:
  137. return "py"