processor.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from abc import ABC
  17. from abc import abstractmethod
  18. import ast
  19. import asyncio
  20. import functools
  21. import os
  22. from pathlib import Path
  23. import time
  24. from typing import Dict
  25. from typing import List
  26. from typing import Optional
  27. from typing import Set
  28. from typing import Union
  29. import entrypoints
  30. from minio.error import S3Error
  31. from traitlets.config import Bool
  32. from traitlets.config import LoggingConfigurable
  33. from traitlets.config import SingletonConfigurable
  34. from traitlets.config import Unicode
  35. from urllib3.exceptions import MaxRetryError
  36. from elyra.metadata.manager import MetadataManager
  37. from elyra.pipeline.component import Component
  38. from elyra.pipeline.component_catalog import ComponentCache
  39. from elyra.pipeline.pipeline import GenericOperation
  40. from elyra.pipeline.pipeline import Operation
  41. from elyra.pipeline.pipeline import Pipeline
  42. from elyra.pipeline.runtime_type import RuntimeProcessorType
  43. from elyra.pipeline.runtime_type import RuntimeTypeResources
  44. from elyra.util.archive import create_temp_archive
  45. from elyra.util.cos import CosClient
  46. from elyra.util.path import get_expanded_path
  47. elyra_log_pipeline_info = os.getenv("ELYRA_LOG_PIPELINE_INFO", True)
  48. class PipelineProcessorRegistry(SingletonConfigurable):
  49. _processors: Dict[str, "PipelineProcessor"] = {}
  50. def __init__(self, **kwargs):
  51. super().__init__(**kwargs)
  52. self.root_dir = get_expanded_path(kwargs.get("root_dir"))
  53. # Register all known processors based on entrypoint configuration
  54. for processor in entrypoints.get_group_all("elyra.pipeline.processors"):
  55. try:
  56. # instantiate an actual instance of the processor
  57. processor_instance = processor.load()(self.root_dir, parent=kwargs.get("parent")) # Load an instance
  58. self.log.info(
  59. f"Registering {processor.name} processor " f'"{processor.module_name}.{processor.object_name}"...'
  60. )
  61. self.add_processor(processor_instance)
  62. except Exception as err:
  63. # log and ignore initialization errors
  64. self.log.error(
  65. f"Error registering {processor.name} processor "
  66. f'"{processor.module_name}.{processor.object_name}" - {err}'
  67. )
  68. def add_processor(self, processor):
  69. self.log.debug(f"Registering {processor.type.value} runtime processor '{processor.name}'")
  70. self._processors[processor.name] = processor
  71. def get_processor(self, processor_name: str):
  72. if self.is_valid_processor(processor_name):
  73. return self._processors[processor_name]
  74. else:
  75. raise RuntimeError(f"Could not find pipeline processor '{processor_name}'")
  76. def is_valid_processor(self, processor_name: str) -> bool:
  77. return processor_name in self._processors.keys()
  78. def is_valid_runtime_type(self, runtime_type_name: str) -> bool:
  79. for processor in self._processors.values():
  80. if processor.type.name == runtime_type_name:
  81. return True
  82. return False
  83. def get_runtime_types_resources(self) -> List[RuntimeTypeResources]:
  84. """Returns the set of resource instances for each active runtime type"""
  85. # Build set of active runtime types, then build list of resources instances
  86. runtime_types: Set[RuntimeProcessorType] = set()
  87. for name, processor in self._processors.items():
  88. runtime_types.add(processor.type)
  89. resources: List[RuntimeTypeResources] = list()
  90. for runtime_type in runtime_types:
  91. resources.append(RuntimeTypeResources.get_instance_by_type(runtime_type))
  92. return resources
  93. class PipelineProcessorManager(SingletonConfigurable):
  94. _registry: PipelineProcessorRegistry
  95. def __init__(self, **kwargs):
  96. super().__init__(**kwargs)
  97. self.root_dir = get_expanded_path(kwargs.get("root_dir"))
  98. self._registry = PipelineProcessorRegistry.instance()
  99. def get_processor_for_runtime(self, runtime_name: str):
  100. processor = self._registry.get_processor(runtime_name)
  101. return processor
  102. def is_supported_runtime(self, runtime_name: str) -> bool:
  103. return self._registry.is_valid_processor(runtime_name)
  104. def is_supported_runtime_type(self, runtime_type_name) -> bool:
  105. return self._registry.is_valid_runtime_type(runtime_type_name)
  106. def get_runtime_type(self, runtime_name: str) -> RuntimeProcessorType:
  107. processor = self.get_processor_for_runtime(runtime_name)
  108. return processor.type
  109. async def get_components(self, runtime):
  110. processor = self.get_processor_for_runtime(runtime_name=runtime)
  111. res = await asyncio.get_event_loop().run_in_executor(None, processor.get_components)
  112. return res
  113. async def get_component(self, runtime, component_id):
  114. processor = self.get_processor_for_runtime(runtime_name=runtime)
  115. res = await asyncio.get_event_loop().run_in_executor(
  116. None, functools.partial(processor.get_component, component_id=component_id)
  117. )
  118. return res
  119. async def process(self, pipeline):
  120. processor = self.get_processor_for_runtime(pipeline.runtime)
  121. res = await asyncio.get_event_loop().run_in_executor(None, processor.process, pipeline)
  122. return res
  123. async def export(self, pipeline, pipeline_export_format, pipeline_export_path, overwrite):
  124. processor = self.get_processor_for_runtime(pipeline.runtime)
  125. res = await asyncio.get_event_loop().run_in_executor(
  126. None, processor.export, pipeline, pipeline_export_format, pipeline_export_path, overwrite
  127. )
  128. return res
  129. class PipelineProcessorResponse(ABC):
  130. _type: RuntimeProcessorType = None
  131. _name: str = None
  132. def __init__(self, run_url, object_storage_url, object_storage_path):
  133. self._run_url = run_url
  134. self._object_storage_url = object_storage_url
  135. self._object_storage_path = object_storage_path
  136. @property
  137. def type(self) -> str: # Return the string value of the name so that JSON serialization works
  138. if self._type is None:
  139. raise NotImplementedError("_type must have a value!")
  140. return self._type.name
  141. @property
  142. def name(self) -> str:
  143. if self._name is None:
  144. raise NotImplementedError("_name must have a value!")
  145. return self._name
  146. @property
  147. def run_url(self):
  148. """
  149. :return: The runtime URL to access the pipeline experiment
  150. """
  151. return self._run_url
  152. @property
  153. def object_storage_url(self):
  154. """
  155. :return: The object storage URL to access the pipeline outputs
  156. and processed notebooks
  157. """
  158. return self._object_storage_url
  159. @property
  160. def object_storage_path(self):
  161. """
  162. :return: The object storage working directory path where the pipeline outputs
  163. and processed notebooks are located
  164. """
  165. return self._object_storage_path
  166. def to_json(self):
  167. return {
  168. "platform": self.type,
  169. "run_url": self.run_url,
  170. "object_storage_url": self.object_storage_url,
  171. "object_storage_path": self.object_storage_path,
  172. }
  173. class PipelineProcessor(LoggingConfigurable): # ABC
  174. _type: RuntimeProcessorType = None
  175. _name: str = None
  176. root_dir = Unicode(allow_none=True)
  177. enable_pipeline_info = Bool(
  178. config=True,
  179. default_value=(os.getenv("ELYRA_ENABLE_PIPELINE_INFO", "true").lower() == "true"),
  180. help="""Produces formatted logging of informational messages with durations
  181. (default=True). (ELYRA_ENABLE_PIPELINE_INFO env var)""",
  182. )
  183. def __init__(self, root_dir, **kwargs):
  184. super().__init__(**kwargs)
  185. self.root_dir = root_dir
  186. @property
  187. def type(self):
  188. if self._type is None:
  189. raise NotImplementedError("_type must have a value!")
  190. return self._type
  191. @property
  192. def name(self):
  193. if self._name is None:
  194. raise NotImplementedError("_name must have a value!")
  195. return self._name
  196. def get_components(self) -> List[Component]:
  197. """
  198. Retrieve components common to all runtimes
  199. """
  200. components: List[Component] = ComponentCache.get_generic_components()
  201. # Retrieve runtime-specific components
  202. components.extend(ComponentCache.instance().get_all_components(platform=self._type))
  203. return components
  204. def get_component(self, component_id: str) -> Optional[Component]:
  205. """
  206. Retrieve runtime-specific component details if component_id is not one of the generic set
  207. """
  208. if component_id not in ("notebook", "python-script", "r-script"):
  209. return ComponentCache.instance().get_component(platform=self._type, component_id=component_id)
  210. return ComponentCache.get_generic_component(component_id)
  211. @abstractmethod
  212. def process(self, pipeline) -> PipelineProcessorResponse:
  213. raise NotImplementedError()
  214. @abstractmethod
  215. def export(self, pipeline, pipeline_export_format, pipeline_export_path, overwrite):
  216. raise NotImplementedError()
  217. def log_pipeline_info(self, pipeline_name: str, action_clause: str, **kwargs):
  218. """
  219. Produces a formatted log INFO message used entirely for support purposes.
  220. This method is intended to be called for any entries that should be captured across aggregated
  221. log files to identify steps within a given pipeline and each of its operations. As a result,
  222. calls to this method should produce single-line entries in the log (no embedded newlines).
  223. Each entry is prefixed with the pipeline name. This functionality can be disabled by setting
  224. PipelineProcessor.enable_pipeline_info = False (or via env ELYRA_ENABLE_PIPELINE_INFO).
  225. General logging should NOT use this method but use logger.<level>() statements directly.
  226. :param pipeline_name: str representing the name of the pipeline that is being executed
  227. :param action_clause: str representing the action that is being logged
  228. :param **kwargs: dict representing the keyword arguments. Recognized keywords include:
  229. operation_name: str representing the name of the operation applicable for this entry
  230. duration: float value representing the duration of the action being logged
  231. """
  232. if self.enable_pipeline_info:
  233. duration = kwargs.get("duration")
  234. duration_clause = f"({duration:.3f} secs)" if duration else ""
  235. operation_name = kwargs.get("operation_name")
  236. op_clause = f":'{operation_name}'" if operation_name else ""
  237. self.log.info(f"{self._name} '{pipeline_name}'{op_clause} - {action_clause} {duration_clause}")
  238. @staticmethod
  239. def _propagate_operation_inputs_outputs(pipeline: Pipeline, sorted_operations: List[Operation]) -> None:
  240. """
  241. All previous operation outputs should be propagated throughout the pipeline.
  242. In order to process this recursively, the current operation's inputs should be combined
  243. from its parent's inputs (which, themselves are derived from the outputs of their parent)
  244. and its parent's outputs.
  245. """
  246. for operation in sorted_operations:
  247. parent_io = set() # gathers inputs & outputs relative to parent
  248. for parent_operation_id in operation.parent_operation_ids:
  249. parent_operation = pipeline.operations[parent_operation_id]
  250. if parent_operation.inputs:
  251. parent_io.update(parent_operation.inputs)
  252. if parent_operation.outputs:
  253. parent_io.update(parent_operation.outputs)
  254. if parent_io:
  255. parent_io.update(operation.inputs)
  256. operation.inputs = list(parent_io)
  257. @staticmethod
  258. def _sort_operations(operations_by_id: dict) -> List[Operation]:
  259. """
  260. Sort the list of operations based on its dependency graph
  261. """
  262. ordered_operations = []
  263. for operation in operations_by_id.values():
  264. PipelineProcessor._sort_operation_dependencies(operations_by_id, ordered_operations, operation)
  265. return ordered_operations
  266. @staticmethod
  267. def _sort_operation_dependencies(operations_by_id: dict, ordered_operations: list, operation: Operation) -> None:
  268. """
  269. Helper method to the main sort operation function
  270. """
  271. # Optimization: check if already processed
  272. if operation not in ordered_operations:
  273. # process each of the dependencies that needs to be executed first
  274. for parent_operation_id in operation.parent_operation_ids:
  275. parent_operation = operations_by_id[parent_operation_id]
  276. if parent_operation not in ordered_operations:
  277. PipelineProcessor._sort_operation_dependencies(
  278. operations_by_id, ordered_operations, parent_operation
  279. )
  280. ordered_operations.append(operation)
  281. class RuntimePipelineProcessor(PipelineProcessor):
  282. def __init__(self, root_dir: str, **kwargs):
  283. super().__init__(root_dir, **kwargs)
  284. def _get_dependency_archive_name(self, operation: Operation) -> str:
  285. return f"{Path(operation.filename).stem}-{operation.id}.tar.gz"
  286. def _get_dependency_source_dir(self, operation: Operation) -> str:
  287. return str(Path(self.root_dir) / Path(operation.filename).parent)
  288. def _generate_dependency_archive(self, operation: Operation) -> Optional[str]:
  289. archive_artifact_name = self._get_dependency_archive_name(operation)
  290. archive_source_dir = self._get_dependency_source_dir(operation)
  291. dependencies = [os.path.basename(operation.filename)]
  292. dependencies.extend(operation.dependencies)
  293. archive_artifact = create_temp_archive(
  294. archive_name=archive_artifact_name,
  295. source_dir=archive_source_dir,
  296. filenames=dependencies,
  297. recursive=operation.include_subdirectories,
  298. require_complete=True,
  299. )
  300. return archive_artifact
  301. def _upload_dependencies_to_object_store(
  302. self, runtime_configuration: str, pipeline_name: str, operation: Operation, prefix: str = ""
  303. ) -> None:
  304. """
  305. Create dependency archive for the generic operation identified by operation
  306. and upload it to object storage.
  307. """
  308. operation_artifact_archive = self._get_dependency_archive_name(operation)
  309. # object prefix
  310. object_prefix = prefix.strip("/")
  311. # upload operation dependencies to object store
  312. try:
  313. t0 = time.time()
  314. dependency_archive_path = self._generate_dependency_archive(operation)
  315. self.log_pipeline_info(
  316. pipeline_name,
  317. f"generated dependency archive '{dependency_archive_path}'",
  318. operation_name=operation.name,
  319. duration=(time.time() - t0),
  320. )
  321. cos_client = CosClient(config=runtime_configuration)
  322. t0 = time.time()
  323. uploaded_object_name = cos_client.upload_file(
  324. local_file_path=dependency_archive_path,
  325. object_name=operation_artifact_archive,
  326. object_prefix=object_prefix,
  327. )
  328. self.log_pipeline_info(
  329. pipeline_name,
  330. f"uploaded dependency archive to '{uploaded_object_name}' in bucket '{cos_client.bucket}'",
  331. operation_name=operation.name,
  332. duration=(time.time() - t0),
  333. )
  334. except FileNotFoundError as ex:
  335. self.log.error(
  336. f"Dependencies were not found building archive for operation: {operation.name}", exc_info=True
  337. )
  338. raise FileNotFoundError(
  339. f"Node '{operation.name}' referenced dependencies that were not found: {ex}"
  340. ) from ex
  341. except MaxRetryError as ex:
  342. cos_endpoint = runtime_configuration.metadata.get("cos_endpoint")
  343. self.log.error(f"Connection was refused when attempting to connect to : {cos_endpoint}", exc_info=True)
  344. raise RuntimeError(
  345. f"Connection was refused when attempting to upload artifacts to : '{cos_endpoint}'. "
  346. "Please check your object storage settings."
  347. ) from ex
  348. except S3Error as ex:
  349. msg_prefix = f"Error connecting to object storage: {ex.code}."
  350. if ex.code == "SignatureDoesNotMatch":
  351. # likely cause: incorrect password
  352. raise RuntimeError(
  353. f"{msg_prefix} Verify the password "
  354. f"in runtime configuration '{runtime_configuration.display_name}' "
  355. "and try again."
  356. ) from ex
  357. elif ex.code == "InvalidAccessKeyId":
  358. # likely cause: incorrect user id
  359. raise RuntimeError(
  360. f"{msg_prefix} Verify the username "
  361. f"in runtime configuration '{runtime_configuration.display_name}' "
  362. "and try again."
  363. ) from ex
  364. else:
  365. raise RuntimeError(
  366. f"{msg_prefix} Verify "
  367. f"runtime configuration '{runtime_configuration.display_name}' "
  368. "and try again."
  369. ) from ex
  370. except BaseException as ex:
  371. self.log.error(
  372. f"Error uploading artifacts to object storage for operation: {operation.name}", exc_info=True
  373. )
  374. raise ex from ex
  375. def _verify_cos_connectivity(self, runtime_configuration) -> None:
  376. self.log.debug(
  377. "Verifying cloud storage connectivity using runtime configuration "
  378. f"'{runtime_configuration.display_name}'."
  379. )
  380. try:
  381. CosClient(runtime_configuration)
  382. except Exception as ex:
  383. raise RuntimeError(
  384. f"Error connecting to cloud storage: {ex}. Update runtime configuration "
  385. f"'{runtime_configuration.display_name}' and try again."
  386. )
  387. def _get_metadata_configuration(self, schemaspace, name=None):
  388. """
  389. Retrieve associated metadata configuration based on schemaspace provided and optional instance name
  390. :return: metadata in json format
  391. """
  392. try:
  393. if not name:
  394. return MetadataManager(schemaspace=schemaspace).get_all()
  395. else:
  396. return MetadataManager(schemaspace=schemaspace).get(name)
  397. except BaseException as err:
  398. self.log.error(f"Error retrieving metadata configuration for {name}", exc_info=True)
  399. raise RuntimeError(f"Error retrieving metadata configuration for {name}", err) from err
  400. def _verify_export_format(self, pipeline_export_format: str) -> None:
  401. """
  402. Check that the given pipeline_export_format is supported by the runtime type;
  403. otherwise, raise a ValueError
  404. """
  405. export_extensions = RuntimeTypeResources.get_instance_by_type(self._type).get_export_extensions()
  406. if pipeline_export_format not in export_extensions:
  407. raise ValueError(f"Pipeline export format '{pipeline_export_format}' not recognized.")
  408. def _collect_envs(self, operation: GenericOperation, **kwargs) -> Dict:
  409. """
  410. Collect the envs stored on the Operation and set the system-defined ELYRA_RUNTIME_ENV
  411. Note: subclasses should call their superclass (this) method first.
  412. :return: dictionary containing environment name/value pairs
  413. """
  414. envs: Dict = operation.env_vars.to_dict()
  415. envs["ELYRA_RUNTIME_ENV"] = self.name
  416. # set environment variables for Minio/S3 access, in the following order of precedence:
  417. # 1. use `cos_secret`
  418. # 2. use `cos_username` and `cos_password`
  419. if "cos_secret" in kwargs and kwargs["cos_secret"]:
  420. # ensure the AWS_ACCESS_* envs are NOT set
  421. envs.pop("AWS_ACCESS_KEY_ID", None)
  422. envs.pop("AWS_SECRET_ACCESS_KEY", None)
  423. else:
  424. # set AWS_ACCESS_KEY_ID, if defined
  425. if "cos_username" in kwargs and kwargs["cos_username"]:
  426. envs["AWS_ACCESS_KEY_ID"] = kwargs["cos_username"]
  427. else:
  428. envs.pop("AWS_ACCESS_KEY_ID", None)
  429. # set AWS_SECRET_ACCESS_KEY, if defined
  430. if "cos_password" in kwargs and kwargs["cos_password"]:
  431. envs["AWS_SECRET_ACCESS_KEY"] = kwargs["cos_password"]
  432. else:
  433. envs.pop("AWS_SECRET_ACCESS_KEY", None)
  434. # Convey pipeline logging enablement to operation
  435. envs["ELYRA_ENABLE_PIPELINE_INFO"] = str(self.enable_pipeline_info)
  436. return envs
  437. def _process_dictionary_value(self, value: str) -> Union[Dict, str]:
  438. """
  439. For component parameters of type dictionary, the user-entered string value given in the pipeline
  440. JSON should be converted to the appropriate Dict format, if possible. If a Dict cannot be formed,
  441. log and return stripped string value.
  442. """
  443. if not value:
  444. return {}
  445. value = value.strip()
  446. if value == "None":
  447. return {}
  448. converted_dict = None
  449. if value.startswith("{") and value.endswith("}"):
  450. try:
  451. converted_dict = ast.literal_eval(value)
  452. except (ValueError, TypeError, SyntaxError, MemoryError, RecursionError):
  453. pass # Can raise any of these exceptions
  454. # Value could not be successfully converted to dictionary
  455. if not isinstance(converted_dict, dict):
  456. self.log.debug(f"Could not convert entered parameter value `{value}` to dictionary")
  457. return value
  458. return converted_dict
  459. def _process_list_value(self, value: str) -> Union[List, str]:
  460. """
  461. For component parameters of type list, the user-entered string value given in the pipeline JSON
  462. should be converted to the appropriate List format, if possible. If a List cannot be formed,
  463. log and return stripped string value.
  464. """
  465. if not value:
  466. return []
  467. value = value.strip()
  468. if value == "None":
  469. return []
  470. converted_list = None
  471. if value.startswith("[") and value.endswith("]"):
  472. try:
  473. converted_list = ast.literal_eval(value)
  474. except (ValueError, TypeError, SyntaxError, MemoryError, RecursionError):
  475. pass # Can raise any of these exceptions
  476. # Value could not be successfully converted to list
  477. if not isinstance(converted_list, list):
  478. self.log.debug(f"Could not convert entered parameter value `{value}` to list")
  479. return value
  480. return converted_list