processor.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from abc import abstractmethod
  17. import ast
  18. import asyncio
  19. import functools
  20. import os
  21. from pathlib import Path
  22. import time
  23. from typing import Dict
  24. from typing import List
  25. from typing import Optional
  26. from typing import Set
  27. from typing import Union
  28. import entrypoints
  29. from minio.error import S3Error
  30. from traitlets.config import Bool
  31. from traitlets.config import LoggingConfigurable
  32. from traitlets.config import SingletonConfigurable
  33. from traitlets.config import Unicode
  34. from urllib3.exceptions import MaxRetryError
  35. from elyra.metadata.manager import MetadataManager
  36. from elyra.pipeline.component import Component
  37. from elyra.pipeline.component_catalog import ComponentCache
  38. from elyra.pipeline.pipeline import GenericOperation
  39. from elyra.pipeline.pipeline import Operation
  40. from elyra.pipeline.pipeline import Pipeline
  41. from elyra.pipeline.runtime_type import RuntimeProcessorType
  42. from elyra.pipeline.runtime_type import RuntimeTypeResources
  43. from elyra.util.archive import create_temp_archive
  44. from elyra.util.cos import CosClient
  45. from elyra.util.path import get_expanded_path
  46. elyra_log_pipeline_info = os.getenv("ELYRA_LOG_PIPELINE_INFO", True)
  47. class PipelineProcessorRegistry(SingletonConfigurable):
  48. _processors: Dict[str, "PipelineProcessor"] = {}
  49. def __init__(self, **kwargs):
  50. root_dir: Optional[str] = kwargs.pop("root_dir", None)
  51. super().__init__(**kwargs)
  52. self.root_dir = get_expanded_path(root_dir)
  53. # Register all known processors based on entrypoint configuration
  54. for processor in entrypoints.get_group_all("elyra.pipeline.processors"):
  55. try:
  56. # instantiate an actual instance of the processor
  57. processor_instance = processor.load()(root_dir=self.root_dir, parent=kwargs.get("parent"))
  58. self.log.info(
  59. f"Registering {processor.name} processor '{processor.module_name}.{processor.object_name}'..."
  60. )
  61. self.add_processor(processor_instance)
  62. except Exception as err:
  63. # log and ignore initialization errors
  64. self.log.error(
  65. f"Error registering {processor.name} processor "
  66. f'"{processor.module_name}.{processor.object_name}" - {err}'
  67. )
  68. def add_processor(self, processor):
  69. self.log.debug(f"Registering {processor.type.value} runtime processor '{processor.name}'")
  70. self._processors[processor.name] = processor
  71. def get_processor(self, processor_name: str):
  72. if self.is_valid_processor(processor_name):
  73. return self._processors[processor_name]
  74. else:
  75. raise RuntimeError(f"Could not find pipeline processor '{processor_name}'")
  76. def is_valid_processor(self, processor_name: str) -> bool:
  77. return processor_name in self._processors.keys()
  78. def is_valid_runtime_type(self, runtime_type_name: str) -> bool:
  79. for processor in self._processors.values():
  80. if processor.type.name == runtime_type_name.upper():
  81. return True
  82. return False
  83. def get_runtime_types_resources(self) -> List[RuntimeTypeResources]:
  84. """Returns the set of resource instances for each active runtime type"""
  85. # Build set of active runtime types, then build list of resources instances
  86. runtime_types: Set[RuntimeProcessorType] = set()
  87. for name, processor in self._processors.items():
  88. runtime_types.add(processor.type)
  89. resources: List[RuntimeTypeResources] = list()
  90. for runtime_type in runtime_types:
  91. resources.append(RuntimeTypeResources.get_instance_by_type(runtime_type))
  92. return resources
  93. class PipelineProcessorManager(SingletonConfigurable):
  94. _registry: PipelineProcessorRegistry
  95. def __init__(self, **kwargs):
  96. root_dir: Optional[str] = kwargs.pop("root_dir", None)
  97. super().__init__(**kwargs)
  98. self.root_dir = get_expanded_path(root_dir)
  99. self._registry = PipelineProcessorRegistry.instance(root_dir=self.root_dir)
  100. def get_processor_for_runtime(self, runtime_name: str):
  101. processor = self._registry.get_processor(runtime_name)
  102. return processor
  103. def is_supported_runtime(self, runtime_name: str) -> bool:
  104. return self._registry.is_valid_processor(runtime_name)
  105. def is_supported_runtime_type(self, runtime_type_name) -> bool:
  106. return self._registry.is_valid_runtime_type(runtime_type_name)
  107. def get_runtime_type(self, runtime_name: str) -> RuntimeProcessorType:
  108. processor = self.get_processor_for_runtime(runtime_name)
  109. return processor.type
  110. async def get_components(self, runtime):
  111. processor = self.get_processor_for_runtime(runtime_name=runtime)
  112. res = await asyncio.get_event_loop().run_in_executor(None, processor.get_components)
  113. return res
  114. async def get_component(self, runtime, component_id):
  115. processor = self.get_processor_for_runtime(runtime_name=runtime)
  116. res = await asyncio.get_event_loop().run_in_executor(
  117. None, functools.partial(processor.get_component, component_id=component_id)
  118. )
  119. return res
  120. async def process(self, pipeline):
  121. processor = self.get_processor_for_runtime(pipeline.runtime)
  122. res = await asyncio.get_event_loop().run_in_executor(None, processor.process, pipeline)
  123. return res
  124. async def export(self, pipeline: Pipeline, pipeline_export_format: str, pipeline_export_path: str, overwrite: bool):
  125. processor = self.get_processor_for_runtime(pipeline.runtime)
  126. res = await asyncio.get_event_loop().run_in_executor(
  127. None, processor.export, pipeline, pipeline_export_format, pipeline_export_path, overwrite
  128. )
  129. return res
  130. class PipelineProcessorResponse:
  131. _type: RuntimeProcessorType = None
  132. _name: str = None
  133. @property
  134. def type(self) -> str: # Return the string value of the name so that JSON serialization works
  135. if self._type is None:
  136. raise NotImplementedError("_type must have a value!")
  137. return self._type.name
  138. @property
  139. def name(self) -> str:
  140. if self._name is None:
  141. raise NotImplementedError("_name must have a value!")
  142. return self._name
  143. def to_json(self):
  144. return {
  145. "platform": self.type,
  146. }
  147. class RuntimePipelineProcessorResponse(PipelineProcessorResponse):
  148. def __init__(self, run_url, object_storage_url, object_storage_path):
  149. super().__init__()
  150. self._run_url = run_url
  151. self._object_storage_url = object_storage_url
  152. self._object_storage_path = object_storage_path
  153. @property
  154. def run_url(self):
  155. """
  156. :return: The runtime URL to access the pipeline experiment
  157. """
  158. return self._run_url
  159. @property
  160. def object_storage_url(self):
  161. """
  162. :return: The object storage URL to access the pipeline outputs
  163. and processed notebooks
  164. """
  165. return self._object_storage_url
  166. @property
  167. def object_storage_path(self):
  168. """
  169. :return: The object storage working directory path where the pipeline outputs
  170. and processed notebooks are located
  171. """
  172. return self._object_storage_path
  173. def to_json(self):
  174. return {
  175. "platform": self.type,
  176. "run_url": self.run_url,
  177. "object_storage_url": self.object_storage_url,
  178. "object_storage_path": self.object_storage_path,
  179. }
  180. class PipelineProcessor(LoggingConfigurable): # ABC
  181. _type: RuntimeProcessorType = None
  182. _name: str = None
  183. root_dir: str = Unicode(allow_none=True)
  184. enable_pipeline_info: bool = Bool(
  185. config=True,
  186. default_value=(os.getenv("ELYRA_ENABLE_PIPELINE_INFO", "true").lower() == "true"),
  187. help="""Produces formatted logging of informational messages with durations
  188. (default=True). (ELYRA_ENABLE_PIPELINE_INFO env var)""",
  189. )
  190. @property
  191. def type(self):
  192. if self._type is None:
  193. raise NotImplementedError("_type must have a value!")
  194. return self._type
  195. @property
  196. def name(self):
  197. if self._name is None:
  198. raise NotImplementedError("_name must have a value!")
  199. return self._name
  200. def get_components(self) -> List[Component]:
  201. """
  202. Retrieve components common to all runtimes
  203. """
  204. components: List[Component] = ComponentCache.get_generic_components()
  205. # Retrieve runtime-specific components
  206. components.extend(ComponentCache.instance().get_all_components(platform=self._type))
  207. return components
  208. def get_component(self, component_id: str) -> Optional[Component]:
  209. """
  210. Retrieve runtime-specific component details if component_id is not one of the generic set
  211. """
  212. if component_id not in ("notebook", "python-script", "r-script"):
  213. return ComponentCache.instance().get_component(platform=self._type, component_id=component_id)
  214. return ComponentCache.get_generic_component(component_id)
  215. @abstractmethod
  216. def process(self, pipeline) -> PipelineProcessorResponse:
  217. raise NotImplementedError()
  218. @abstractmethod
  219. def export(self, pipeline, pipeline_export_format, pipeline_export_path, overwrite):
  220. raise NotImplementedError()
  221. def log_pipeline_info(self, pipeline_name: str, action_clause: str, **kwargs):
  222. """
  223. Produces a formatted log INFO message used entirely for support purposes.
  224. This method is intended to be called for any entries that should be captured across aggregated
  225. log files to identify steps within a given pipeline and each of its operations. As a result,
  226. calls to this method should produce single-line entries in the log (no embedded newlines).
  227. Each entry is prefixed with the pipeline name. This functionality can be disabled by setting
  228. PipelineProcessor.enable_pipeline_info = False (or via env ELYRA_ENABLE_PIPELINE_INFO).
  229. General logging should NOT use this method but use logger.<level>() statements directly.
  230. :param pipeline_name: str representing the name of the pipeline that is being executed
  231. :param action_clause: str representing the action that is being logged
  232. :param **kwargs: dict representing the keyword arguments. Recognized keywords include:
  233. operation_name: str representing the name of the operation applicable for this entry
  234. duration: float value representing the duration of the action being logged
  235. """
  236. if self.enable_pipeline_info:
  237. duration = kwargs.get("duration")
  238. duration_clause = f"({duration:.3f} secs)" if duration else ""
  239. operation_name = kwargs.get("operation_name")
  240. op_clause = f":'{operation_name}'" if operation_name else ""
  241. self.log.info(f"{self._name} '{pipeline_name}'{op_clause} - {action_clause} {duration_clause}")
  242. @staticmethod
  243. def _propagate_operation_inputs_outputs(pipeline: Pipeline, sorted_operations: List[Operation]) -> None:
  244. """
  245. All previous operation outputs should be propagated throughout the pipeline.
  246. In order to process this recursively, the current operation's inputs should be combined
  247. from its parent's inputs (which, themselves are derived from the outputs of their parent)
  248. and its parent's outputs.
  249. """
  250. for operation in sorted_operations:
  251. parent_io = set() # gathers inputs & outputs relative to parent
  252. for parent_operation_id in operation.parent_operation_ids:
  253. parent_operation = pipeline.operations[parent_operation_id]
  254. if parent_operation.inputs:
  255. parent_io.update(parent_operation.inputs)
  256. if parent_operation.outputs:
  257. parent_io.update(parent_operation.outputs)
  258. if parent_io:
  259. parent_io.update(operation.inputs)
  260. operation.inputs = list(parent_io)
  261. @staticmethod
  262. def _sort_operations(operations_by_id: dict) -> List[Operation]:
  263. """
  264. Sort the list of operations based on its dependency graph
  265. """
  266. ordered_operations = []
  267. for operation in operations_by_id.values():
  268. PipelineProcessor._sort_operation_dependencies(operations_by_id, ordered_operations, operation)
  269. return ordered_operations
  270. @staticmethod
  271. def _sort_operation_dependencies(operations_by_id: dict, ordered_operations: list, operation: Operation) -> None:
  272. """
  273. Helper method to the main sort operation function
  274. """
  275. # Optimization: check if already processed
  276. if operation not in ordered_operations:
  277. # process each of the dependencies that needs to be executed first
  278. for parent_operation_id in operation.parent_operation_ids:
  279. parent_operation = operations_by_id[parent_operation_id]
  280. if parent_operation not in ordered_operations:
  281. PipelineProcessor._sort_operation_dependencies(
  282. operations_by_id, ordered_operations, parent_operation
  283. )
  284. ordered_operations.append(operation)
  285. class RuntimePipelineProcessor(PipelineProcessor):
  286. def _get_dependency_archive_name(self, operation: Operation) -> str:
  287. return f"{Path(operation.filename).stem}-{operation.id}.tar.gz"
  288. def _get_dependency_source_dir(self, operation: Operation) -> str:
  289. return str(Path(self.root_dir) / Path(operation.filename).parent)
  290. def _generate_dependency_archive(self, operation: Operation) -> Optional[str]:
  291. archive_artifact_name = self._get_dependency_archive_name(operation)
  292. archive_source_dir = self._get_dependency_source_dir(operation)
  293. dependencies = [os.path.basename(operation.filename)]
  294. dependencies.extend(operation.dependencies)
  295. archive_artifact = create_temp_archive(
  296. archive_name=archive_artifact_name,
  297. source_dir=archive_source_dir,
  298. filenames=dependencies,
  299. recursive=operation.include_subdirectories,
  300. require_complete=True,
  301. )
  302. return archive_artifact
  303. def _upload_dependencies_to_object_store(
  304. self, runtime_configuration: str, pipeline_name: str, operation: Operation, prefix: str = ""
  305. ) -> None:
  306. """
  307. Create dependency archive for the generic operation identified by operation
  308. and upload it to object storage.
  309. """
  310. operation_artifact_archive = self._get_dependency_archive_name(operation)
  311. # object prefix
  312. object_prefix = prefix.strip("/")
  313. # upload operation dependencies to object store
  314. try:
  315. t0 = time.time()
  316. dependency_archive_path = self._generate_dependency_archive(operation)
  317. self.log_pipeline_info(
  318. pipeline_name,
  319. f"generated dependency archive '{dependency_archive_path}'",
  320. operation_name=operation.name,
  321. duration=(time.time() - t0),
  322. )
  323. cos_client = CosClient(config=runtime_configuration)
  324. t0 = time.time()
  325. uploaded_object_name = cos_client.upload_file(
  326. local_file_path=dependency_archive_path,
  327. object_name=operation_artifact_archive,
  328. object_prefix=object_prefix,
  329. )
  330. self.log_pipeline_info(
  331. pipeline_name,
  332. f"uploaded dependency archive to '{uploaded_object_name}' in bucket '{cos_client.bucket}'",
  333. operation_name=operation.name,
  334. duration=(time.time() - t0),
  335. )
  336. except FileNotFoundError as ex:
  337. self.log.error(
  338. f"Dependencies were not found building archive for operation: {operation.name}", exc_info=True
  339. )
  340. raise FileNotFoundError(
  341. f"Node '{operation.name}' referenced dependencies that were not found: {ex}"
  342. ) from ex
  343. except MaxRetryError as ex:
  344. cos_endpoint = runtime_configuration.metadata.get("cos_endpoint")
  345. self.log.error(f"Connection was refused when attempting to connect to : {cos_endpoint}", exc_info=True)
  346. raise RuntimeError(
  347. f"Connection was refused when attempting to upload artifacts to : '{cos_endpoint}'. "
  348. "Please check your object storage settings."
  349. ) from ex
  350. except S3Error as ex:
  351. msg_prefix = f"Error connecting to object storage: {ex.code}."
  352. if ex.code == "SignatureDoesNotMatch":
  353. # likely cause: incorrect password
  354. raise RuntimeError(
  355. f"{msg_prefix} Verify the password "
  356. f"in runtime configuration '{runtime_configuration.display_name}' "
  357. "and try again."
  358. ) from ex
  359. elif ex.code == "InvalidAccessKeyId":
  360. # likely cause: incorrect user id
  361. raise RuntimeError(
  362. f"{msg_prefix} Verify the username "
  363. f"in runtime configuration '{runtime_configuration.display_name}' "
  364. "and try again."
  365. ) from ex
  366. else:
  367. raise RuntimeError(
  368. f"{msg_prefix} Verify "
  369. f"runtime configuration '{runtime_configuration.display_name}' "
  370. "and try again."
  371. ) from ex
  372. except BaseException as ex:
  373. self.log.error(
  374. f"Error uploading artifacts to object storage for operation: {operation.name}", exc_info=True
  375. )
  376. raise ex from ex
  377. def _verify_cos_connectivity(self, runtime_configuration) -> None:
  378. self.log.debug(
  379. "Verifying cloud storage connectivity using runtime configuration "
  380. f"'{runtime_configuration.display_name}'."
  381. )
  382. try:
  383. CosClient(runtime_configuration)
  384. except Exception as ex:
  385. raise RuntimeError(
  386. f"Error connecting to cloud storage: {ex}. Update runtime configuration "
  387. f"'{runtime_configuration.display_name}' and try again."
  388. )
  389. def _get_metadata_configuration(self, schemaspace, name=None):
  390. """
  391. Retrieve associated metadata configuration based on schemaspace provided and optional instance name
  392. :return: metadata in json format
  393. """
  394. try:
  395. if not name:
  396. return MetadataManager(schemaspace=schemaspace).get_all()
  397. else:
  398. return MetadataManager(schemaspace=schemaspace).get(name)
  399. except BaseException as err:
  400. self.log.error(f"Error retrieving metadata configuration for {name}", exc_info=True)
  401. raise RuntimeError(f"Error retrieving metadata configuration for {name}", err) from err
  402. def _verify_export_format(self, pipeline_export_format: str) -> None:
  403. """
  404. Check that the given pipeline_export_format is supported by the runtime type;
  405. otherwise, raise a ValueError
  406. """
  407. export_extensions = RuntimeTypeResources.get_instance_by_type(self._type).get_export_extensions()
  408. if pipeline_export_format not in export_extensions:
  409. raise ValueError(f"Pipeline export format '{pipeline_export_format}' not recognized.")
  410. def _collect_envs(self, operation: GenericOperation, **kwargs) -> Dict:
  411. """
  412. Collect the envs stored on the Operation and set the system-defined ELYRA_RUNTIME_ENV
  413. Note: subclasses should call their superclass (this) method first.
  414. :return: dictionary containing environment name/value pairs
  415. """
  416. envs: Dict = operation.env_vars.to_dict()
  417. envs["ELYRA_RUNTIME_ENV"] = self.name
  418. # set environment variables for Minio/S3 access, in the following order of precedence:
  419. # 1. use `cos_secret`
  420. # 2. use `cos_username` and `cos_password`
  421. if "cos_secret" in kwargs and kwargs["cos_secret"]:
  422. # ensure the AWS_ACCESS_* envs are NOT set
  423. envs.pop("AWS_ACCESS_KEY_ID", None)
  424. envs.pop("AWS_SECRET_ACCESS_KEY", None)
  425. else:
  426. # set AWS_ACCESS_KEY_ID, if defined
  427. if "cos_username" in kwargs and kwargs["cos_username"]:
  428. envs["AWS_ACCESS_KEY_ID"] = kwargs["cos_username"]
  429. else:
  430. envs.pop("AWS_ACCESS_KEY_ID", None)
  431. # set AWS_SECRET_ACCESS_KEY, if defined
  432. if "cos_password" in kwargs and kwargs["cos_password"]:
  433. envs["AWS_SECRET_ACCESS_KEY"] = kwargs["cos_password"]
  434. else:
  435. envs.pop("AWS_SECRET_ACCESS_KEY", None)
  436. # Convey pipeline logging enablement to operation
  437. envs["ELYRA_ENABLE_PIPELINE_INFO"] = str(self.enable_pipeline_info)
  438. return envs
  439. def _process_dictionary_value(self, value: str) -> Union[Dict, str]:
  440. """
  441. For component parameters of type dictionary, the user-entered string value given in the pipeline
  442. JSON should be converted to the appropriate Dict format, if possible. If a Dict cannot be formed,
  443. log and return stripped string value.
  444. """
  445. if not value:
  446. return {}
  447. value = value.strip()
  448. if value == "None":
  449. return {}
  450. converted_dict = None
  451. if value.startswith("{") and value.endswith("}"):
  452. try:
  453. converted_dict = ast.literal_eval(value)
  454. except (ValueError, TypeError, SyntaxError, MemoryError, RecursionError):
  455. pass # Can raise any of these exceptions
  456. # Value could not be successfully converted to dictionary
  457. if not isinstance(converted_dict, dict):
  458. self.log.debug(f"Could not convert entered parameter value `{value}` to dictionary")
  459. return value
  460. return converted_dict
  461. def _process_list_value(self, value: str) -> Union[List, str]:
  462. """
  463. For component parameters of type list, the user-entered string value given in the pipeline JSON
  464. should be converted to the appropriate List format, if possible. If a List cannot be formed,
  465. log and return stripped string value.
  466. """
  467. if not value:
  468. return []
  469. value = value.strip()
  470. if value == "None":
  471. return []
  472. converted_list = None
  473. if value.startswith("[") and value.endswith("]"):
  474. try:
  475. converted_list = ast.literal_eval(value)
  476. except (ValueError, TypeError, SyntaxError, MemoryError, RecursionError):
  477. pass # Can raise any of these exceptions
  478. # Value could not be successfully converted to list
  479. if not isinstance(converted_list, list):
  480. self.log.debug(f"Could not convert entered parameter value `{value}` to list")
  481. return value
  482. return converted_list