processor_local.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from abc import ABC
  17. from abc import abstractmethod
  18. from datetime import datetime
  19. import os
  20. from subprocess import CalledProcessError
  21. from subprocess import PIPE
  22. from subprocess import run
  23. import sys
  24. import time
  25. from typing import Dict
  26. from typing import List
  27. from typing import Optional
  28. from jupyter_server.gateway.managers import GatewayClient
  29. import papermill
  30. from traitlets import log
  31. from elyra.pipeline.component_catalog import ComponentCache
  32. from elyra.pipeline.pipeline import GenericOperation
  33. from elyra.pipeline.processor import PipelineProcessor
  34. from elyra.pipeline.processor import PipelineProcessorResponse
  35. from elyra.pipeline.runtime_type import RuntimeProcessorType
  36. from elyra.util.path import get_absolute_path
  37. class LocalPipelineProcessor(PipelineProcessor):
  38. """
  39. Local pipeline processor runs a pipeline locally. The scope of this runtime is
  40. simply to automate the execution of multiple notebooks from a pipeline as if they
  41. were being executed manually. Any additional support is out of the scope of this
  42. processor. If a notebook doesn't run using `run all` it will fail in the
  43. same way when processed by this processor. Also, any relationship or specific capabilities
  44. associated with a particular runtime is not supported by local mode and any additional properties
  45. other then the specific file to be executed is ignored by the local processor.
  46. Note: Execution happens in-place and a ledger of runs will be available at $TMPFILE/elyra/pipeline-name-<timestamp>
  47. """
  48. _operation_processor_catalog: Dict
  49. _type = RuntimeProcessorType.LOCAL
  50. _name = "local"
  51. def __init__(self, **kwargs):
  52. super().__init__(**kwargs)
  53. notebook_op_processor = NotebookOperationProcessor(self.root_dir)
  54. python_op_processor = PythonScriptOperationProcessor(self.root_dir)
  55. r_op_processor = RScriptOperationProcessor(self.root_dir)
  56. self._operation_processor_catalog = {
  57. notebook_op_processor.operation_name: notebook_op_processor,
  58. python_op_processor.operation_name: python_op_processor,
  59. r_op_processor.operation_name: r_op_processor,
  60. }
  61. def get_components(self):
  62. return ComponentCache.get_generic_components()
  63. def process(self, pipeline):
  64. """
  65. Process a pipeline locally.
  66. The pipeline execution consists on properly ordering the operations
  67. based on it's dependency graph and than delegating the execution
  68. to proper executor (e.g. papermill to notebooks)
  69. """
  70. self.log_pipeline_info(pipeline.name, "processing pipeline")
  71. t0_all = time.time()
  72. # This unique run identifier is made available to all
  73. # notebooks | scripts in environment variable ELYRA_RUN_NAME
  74. # during pipeline execution. The id itself is not persisted
  75. # after the run completes.
  76. # The motivation is that notebooks | scripts might want to generate
  77. # artifact names that are unique for each run to avoid overwriting.
  78. # For example, a saved model file might be named
  79. # `my-trained-model-{ELYRA_RUN_NAME}.ext`.
  80. elyra_run_name = f'{pipeline.name}-{datetime.now().strftime("%m%d%H%M%S")}'
  81. # Sort operations based on dependency graph (topological order)
  82. operations = PipelineProcessor._sort_operations(pipeline.operations)
  83. for operation in operations:
  84. assert isinstance(operation, GenericOperation)
  85. try:
  86. t0 = time.time()
  87. operation_processor = self._operation_processor_catalog[operation.classifier]
  88. operation_processor.process(operation, elyra_run_name)
  89. self.log_pipeline_info(
  90. pipeline.name,
  91. f"completed {operation.filename}",
  92. operation_name=operation.name,
  93. duration=(time.time() - t0),
  94. )
  95. except Exception as ex:
  96. raise RuntimeError(f"Error processing operation {operation.name} {str(ex)}") from ex
  97. self.log_pipeline_info(pipeline.name, "pipeline processed", duration=(time.time() - t0_all))
  98. return LocalPipelineProcessorResponse()
  99. def export(self, pipeline, pipeline_export_format, pipeline_export_path, overwrite):
  100. raise NotImplementedError("Local pipelines does not support export functionality")
  101. class LocalPipelineProcessorResponse(PipelineProcessorResponse):
  102. _type = RuntimeProcessorType.LOCAL
  103. _name = "local"
  104. class OperationProcessor(ABC):
  105. _operation_name: str = None
  106. def __init__(self):
  107. self.log = log.get_logger()
  108. @property
  109. def operation_name(self) -> str:
  110. return self._operation_name
  111. @abstractmethod
  112. def process(self, operation: GenericOperation, elyra_run_name: str):
  113. raise NotImplementedError
  114. @staticmethod
  115. def _collect_envs(operation: GenericOperation, elyra_run_name: str) -> Dict:
  116. envs = os.environ.copy() # Make sure this process's env is "available" in the kernel subprocess
  117. envs.update(operation.env_vars.to_dict())
  118. envs["ELYRA_RUNTIME_ENV"] = "local" # Special case
  119. envs["ELYRA_RUN_NAME"] = elyra_run_name
  120. return envs
  121. class FileOperationProcessor(OperationProcessor):
  122. MAX_ERROR_LEN: int = 80
  123. def __init__(self, root_dir: str):
  124. super().__init__()
  125. self._root_dir = root_dir
  126. @property
  127. def operation_name(self) -> str:
  128. return self._operation_name
  129. @abstractmethod
  130. def process(self, operation: GenericOperation, elyra_run_name: str):
  131. raise NotImplementedError
  132. def get_valid_filepath(self, op_filename: str) -> str:
  133. filepath = get_absolute_path(self._root_dir, op_filename)
  134. if not os.path.exists(filepath):
  135. raise FileNotFoundError(f"Could not find {filepath}")
  136. if not os.path.isfile(filepath):
  137. raise ValueError(f"Not a file: {filepath}")
  138. return filepath
  139. def log_and_raise(self, file_name: str, ex: Exception, data_capture_msg: Optional[str] = None) -> None:
  140. """Log and raise the exception that occurs when processing file_name.
  141. If the exception's message is longer than MAX_ERROR_LEN, it will be
  142. truncated with an ellipses (...) when raised. The complete message
  143. will be logged.
  144. """
  145. self.log.error(f"Error executing {file_name}: {str(ex)}")
  146. if data_capture_msg:
  147. self.log.info(data_capture_msg)
  148. truncated_msg = FileOperationProcessor._truncate_msg(str(ex))
  149. raise RuntimeError(f"({file_name}): {truncated_msg}") from ex
  150. @staticmethod
  151. def _truncate_msg(msg: str) -> str:
  152. """Truncates the msg string to be less that MAX_ERROR_LEN.
  153. If msg is longer than MAX_ERROR_LEN, the first space is found from the right,
  154. then ellipses (...) are appended to that location so that they don't appear
  155. in the middle of a word. As a result, the truncation could result in lengths
  156. less than the max+2.
  157. """
  158. if len(msg) < FileOperationProcessor.MAX_ERROR_LEN:
  159. return msg
  160. # locate the first whitespace from the 80th character and truncate from there
  161. last_space = msg.rfind(" ", 0, FileOperationProcessor.MAX_ERROR_LEN)
  162. if last_space >= 0:
  163. return msg[:last_space] + "..."
  164. return msg[: FileOperationProcessor.MAX_ERROR_LEN]
  165. class NotebookOperationProcessor(FileOperationProcessor):
  166. _operation_name = "execute-notebook-node"
  167. def process(self, operation: GenericOperation, elyra_run_name: str):
  168. filepath = self.get_valid_filepath(operation.filename)
  169. file_dir = os.path.dirname(filepath)
  170. file_name = os.path.basename(filepath)
  171. self.log.debug(f"Processing notebook: {filepath}")
  172. # We'll always use the ElyraEngine. This engine is essentially the default Papermill engine
  173. # but allows for environment variables to be passed to the kernel process (via 'kernel_env').
  174. # If the current notebook server is running with Enterprise Gateway configured, we will also
  175. # point the 'kernel_manager_class' to GatewayKernelManager so that notebooks run as they
  176. # would outside of Elyra. Current working directory (cwd) is specified both for where papermill
  177. # runs the notebook (cwd) and where the directory of the kernel process (kernel_cwd). The latter
  178. # of which is important when EG is configured.
  179. additional_kwargs = dict()
  180. additional_kwargs["engine_name"] = "ElyraEngine"
  181. additional_kwargs["cwd"] = file_dir # For local operations, papermill runs from this dir
  182. additional_kwargs["kernel_cwd"] = file_dir
  183. additional_kwargs["kernel_env"] = OperationProcessor._collect_envs(operation, elyra_run_name)
  184. if GatewayClient.instance().gateway_enabled:
  185. additional_kwargs["kernel_manager_class"] = "jupyter_server.gateway.managers.GatewayKernelManager"
  186. t0 = time.time()
  187. try:
  188. papermill.execute_notebook(filepath, filepath, **additional_kwargs)
  189. except papermill.PapermillExecutionError as pmee:
  190. self.log.error(
  191. f"Error executing {file_name} in cell {pmee.exec_count}: " + f"{str(pmee.ename)} {str(pmee.evalue)}"
  192. )
  193. raise RuntimeError(
  194. f"({file_name}) in cell {pmee.exec_count}: " + f"{str(pmee.ename)} {str(pmee.evalue)}"
  195. ) from pmee
  196. except Exception as ex:
  197. self.log_and_raise(file_name, ex)
  198. t1 = time.time()
  199. duration = t1 - t0
  200. self.log.debug(f"Execution of {file_name} took {duration:.3f} secs.")
  201. class ScriptOperationProcessor(FileOperationProcessor):
  202. _script_type: str = None
  203. def get_argv(self, filepath) -> List[str]:
  204. raise NotImplementedError
  205. def process(self, operation: GenericOperation, elyra_run_name: str):
  206. filepath = self.get_valid_filepath(operation.filename)
  207. file_dir = os.path.dirname(filepath)
  208. file_name = os.path.basename(filepath)
  209. self.log.debug(f"Processing {self._script_type} script: {filepath}")
  210. argv = self.get_argv(filepath)
  211. envs = OperationProcessor._collect_envs(operation, elyra_run_name)
  212. data_capture_msg = (
  213. f"Data capture for previous error:\n"
  214. f" command: {' '.join(argv)}\n"
  215. f" working directory: {file_dir}\n"
  216. f" environment variables: {envs}"
  217. )
  218. t0 = time.time()
  219. try:
  220. run(argv, cwd=file_dir, env=envs, check=True, stderr=PIPE)
  221. except CalledProcessError as cpe:
  222. error_msg = str(cpe.stderr.decode())
  223. self.log.error(f"Error executing {file_name}: {error_msg}")
  224. # Log process information to aid with troubleshooting
  225. self.log.info(data_capture_msg)
  226. error_trim_index = error_msg.rfind("\n", 0, error_msg.rfind("Error"))
  227. if error_trim_index != -1:
  228. raise RuntimeError(f"({file_name}): {error_msg[error_trim_index:].strip()}") from cpe
  229. else:
  230. raise RuntimeError(f"({file_name})") from cpe
  231. except Exception as ex:
  232. self.log_and_raise(file_name, ex, data_capture_msg)
  233. t1 = time.time()
  234. duration = t1 - t0
  235. self.log.debug(f"Execution of {file_name} took {duration:.3f} secs.")
  236. class PythonScriptOperationProcessor(ScriptOperationProcessor):
  237. _operation_name = "execute-python-node"
  238. _script_type = "Python"
  239. def get_argv(self, file_path) -> List[str]:
  240. return [f"{sys.executable}", file_path]
  241. class RScriptOperationProcessor(ScriptOperationProcessor):
  242. _operation_name = "execute-r-node"
  243. _script_type = "R"
  244. def get_argv(self, file_path) -> List[str]:
  245. return ["Rscript", file_path]