component_catalog.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import json
  17. from logging import Logger
  18. import os
  19. from pathlib import Path
  20. from queue import Empty
  21. from queue import Queue
  22. from threading import Event
  23. from threading import Thread
  24. import time
  25. from typing import Dict
  26. from typing import List
  27. from typing import Optional
  28. from typing import Union
  29. import entrypoints
  30. from jinja2 import Environment
  31. from jinja2 import PackageLoader
  32. from jinja2 import Template
  33. from jupyter_core.paths import jupyter_runtime_dir
  34. from traitlets.config import SingletonConfigurable
  35. from watchdog.events import FileSystemEventHandler
  36. from watchdog.observers import Observer
  37. from elyra.metadata.manager import MetadataManager
  38. from elyra.metadata.metadata import Metadata
  39. from elyra.metadata.schemaspaces import ComponentCatalogs
  40. from elyra.pipeline.catalog_connector import ComponentCatalogConnector
  41. from elyra.pipeline.component import Component
  42. from elyra.pipeline.component import ComponentParser
  43. from elyra.pipeline.component_metadata import ComponentCatalogMetadata
  44. from elyra.pipeline.runtime_type import RuntimeProcessorType
  45. BLOCKING_TIMEOUT = 0.5
  46. NONBLOCKING_TIMEOUT = 0.10
  47. # Issue warnings if catalog update takes longer than this value in seconds
  48. CATALOG_UPDATE_TIMEOUT = int(os.getenv("ELYRA_CATALOG_UPDATE_TIMEOUT", 15))
  49. # Issue warnings when outstanding worker thread counts exceed this value
  50. WORKER_THREAD_WARNING_THRESHOLD = int(os.getenv("ELYRA_WORKER_THREAD_WARNING_THRESHOLD", 10))
  51. # Define custom type to describe the component cache
  52. ComponentCacheType = Dict[str, Dict[str, Dict[str, Dict[str, Union[Component, str, List[str]]]]]]
  53. class RefreshInProgressError(Exception):
  54. def __init__(self):
  55. super().__init__("A catalog refresh is in progress. Try the request later.")
  56. class RefreshQueue(Queue):
  57. """Entries are associated with a complete refresh of the Component Cache."""
  58. _refreshing: bool
  59. def __init__(self):
  60. super().__init__()
  61. self._refreshing = False
  62. @property
  63. def refreshing(self) -> bool:
  64. return self._refreshing
  65. @refreshing.setter
  66. def refreshing(self, value: bool) -> None:
  67. self._refreshing = value
  68. def get(self, block: bool = True, timeout: Optional[float] = None):
  69. """Overrides the superclass method to set the refreshing property to false when empty."""
  70. try:
  71. entry = super().get(block=block, timeout=timeout)
  72. except Empty:
  73. self.refreshing = False
  74. raise
  75. return entry
  76. def put(self, item, block=True, timeout=None):
  77. """Overrides the superclass method to set the refreshing property to true."""
  78. super().put(item, block=block, timeout=timeout)
  79. self.refreshing = True
  80. class UpdateQueue(Queue):
  81. """Entries are associated with a single update of the Component Cache.
  82. This class merely exists to distinguish it from the RefreshQueue instance.
  83. """
  84. pass
  85. class CacheUpdateManager(Thread):
  86. """
  87. Primary thread for maintaining consistency of the component cache.
  88. The component cache manager maintains the cache queue, whose entries are a
  89. tuple of 'catalog' and 'action'. The 'catalog' is a catalog instance against
  90. which the 'action' is applied. The 'action' is one of 'modify' or 'delete'.
  91. For 'delete' the components of the referenced catalog are removed. For 'modify'
  92. the components of the referenced catalog are inserted or updated (depending on
  93. its prior existence).
  94. """
  95. def __init__(
  96. self, log: Logger, component_cache: ComponentCacheType, refresh_queue: RefreshQueue, update_queue: UpdateQueue
  97. ):
  98. super().__init__()
  99. self.daemon = True
  100. self.name = "CacheUpdateManager"
  101. self.log: Logger = log
  102. self._component_cache: ComponentCacheType = component_cache
  103. self._refresh_queue: RefreshQueue = refresh_queue
  104. self._update_queue: UpdateQueue = update_queue
  105. self._check_refresh_queue = False
  106. self._threads: List[CacheUpdateWorker] = []
  107. self.stop_event: Event = Event() # Set when server process stops
  108. def run(self):
  109. """Process queue queue entries until server is stopped."""
  110. while not self.stop_event.is_set():
  111. self.manage_cache_tasks()
  112. def manage_cache_tasks(self):
  113. """
  114. Check the cache queue for a cache update action and start
  115. a corresponding worker thread to complete the update
  116. """
  117. outstanding_threads = self._has_outstanding_threads()
  118. try:
  119. # Get a task from the cache queue, waiting less if we have active threads.
  120. timeout = NONBLOCKING_TIMEOUT if outstanding_threads else BLOCKING_TIMEOUT
  121. # Toggle between refresh and update queues so as to prevent starvation.
  122. self._check_refresh_queue = not self._check_refresh_queue
  123. if self._check_refresh_queue:
  124. catalog, action = self._refresh_queue.get(timeout=timeout)
  125. else:
  126. catalog, action = self._update_queue.get(timeout=timeout)
  127. except Empty:
  128. # No task exists in the cache queue, proceed to check for thread execution
  129. pass
  130. else:
  131. # Create and start a thread for the task
  132. updater_thread = CacheUpdateWorker(
  133. self._component_cache,
  134. self._refresh_queue if self._check_refresh_queue else self._update_queue,
  135. catalog,
  136. action,
  137. )
  138. updater_thread.start()
  139. queue_clause = "refreshing" if self._check_refresh_queue else "updating"
  140. self.log.debug(f"CacheUpdateWorker {queue_clause} catalog: '{updater_thread.name}', action: '{action}'...")
  141. self._threads.append(updater_thread)
  142. def _has_outstanding_threads(self) -> bool:
  143. """
  144. Join finished threads and report on long-running threads as needed.
  145. """
  146. outstanding_threads = False
  147. for thread in self._threads:
  148. # Attempt to join thread within the given amount of time
  149. thread.join(timeout=NONBLOCKING_TIMEOUT)
  150. cumulative_run_time = int(time.time() - thread.task_start_time)
  151. if thread.is_alive():
  152. # Thread is still running (thread join timed out)
  153. outstanding_threads = True
  154. # Report on a long-running thread if CATALOG_UPDATE_TIMEOUT is exceeded
  155. time_since_last_check = int(time.time() - thread.last_warn_time)
  156. if time_since_last_check > CATALOG_UPDATE_TIMEOUT:
  157. thread.last_warn_time = time.time()
  158. self.log.warning(
  159. f"Cache update for catalog '{thread.name}' is still processing "
  160. f"after {cumulative_run_time} seconds ..."
  161. )
  162. else:
  163. self.log.debug(f"CacheUpdateWorker completed for catalog: '{thread.name}', action: '{thread.action}'.")
  164. # Thread has been joined and can be removed from the list
  165. self._threads.remove(thread)
  166. # Mark cache task as complete
  167. thread.queue.task_done()
  168. # Report successful join for threads that have previously logged a
  169. # cache update duration warning
  170. if thread.last_warn_time != thread.task_start_time:
  171. self.log.info(
  172. f"Cache update for catalog '{thread.name}' has "
  173. f"completed after {cumulative_run_time} seconds"
  174. )
  175. if len(self._threads) > WORKER_THREAD_WARNING_THRESHOLD:
  176. self.log.warning(
  177. f"CacheUpdateWorker outstanding threads threshold "
  178. f"({WORKER_THREAD_WARNING_THRESHOLD}) has been exceeded. "
  179. f"{len(self._threads)} threads are outstanding. This may "
  180. f"indicate a possible issue."
  181. )
  182. return outstanding_threads
  183. def is_refreshing(self) -> bool:
  184. return self._refresh_queue.refreshing
  185. def init_refresh(self) -> None:
  186. self._refresh_queue.refreshing = True
  187. def stop(self):
  188. """Trigger completion of the manager thread."""
  189. self._refresh_queue.refreshing = False
  190. self.stop_event.set()
  191. self.log.debug("CacheUpdateManager stopped.")
  192. class CacheUpdateWorker(Thread):
  193. """Spawned by the CacheUpdateManager to perform work against the component cache."""
  194. def __init__(
  195. self,
  196. component_cache: ComponentCacheType,
  197. queue: Queue,
  198. catalog: ComponentCatalogMetadata,
  199. action: Optional[str] = None,
  200. ):
  201. super().__init__()
  202. self.daemon = True
  203. self.name = catalog.name # Let the name of the thread reflect the catalog being managed
  204. self._component_cache: ComponentCacheType = component_cache
  205. # Task-specific properties
  206. self.queue: Queue = queue
  207. self.catalog: ComponentCatalogMetadata = catalog
  208. self.action: str = action
  209. # Thread metadata
  210. self.task_start_time = time.time()
  211. self.last_warn_time = self.task_start_time
  212. # Prepare component cache for modification
  213. runtime_type = None
  214. if self.catalog.metadata:
  215. runtime_type = self.catalog.runtime_type.name
  216. self.prepare_cache_for_catalog(runtime_type)
  217. def run(self):
  218. """Apply the relative action to the given catalog entry in the cache."""
  219. if self.action == "delete":
  220. # Check all runtime types in cache for an entry of the given name.
  221. # If found, remove only the components from this catalog
  222. for runtime_type in self._component_cache:
  223. if self.catalog.name in self._component_cache[runtime_type]:
  224. self._component_cache[runtime_type].pop(self.catalog.name, None)
  225. break
  226. else: # 'modify' - replace (or add) components from the given catalog an update its status
  227. runtime_type = self.catalog.runtime_type.name
  228. catalog_state = self._component_cache[runtime_type][self.catalog.name].get("status")
  229. try:
  230. # Replace all components for the given catalog
  231. self._component_cache[runtime_type][self.catalog.name][
  232. "components"
  233. ] = ComponentCache.instance().read_component_catalog(self.catalog)
  234. catalog_state["state"] = "current"
  235. catalog_state["errors"] = [] # reset any errors that may have been present
  236. except Exception as e:
  237. # Update state with an 'error' action and the relevant message
  238. catalog_state["state"] = "error"
  239. catalog_state["errors"].append(str(e))
  240. def prepare_cache_for_catalog(self, runtime_type: Optional[str] = None):
  241. """
  242. Add entries to the component cache for the runtime type and/or catalog
  243. of focus for this thread, and set the catalog state to 'updating'.
  244. """
  245. if self.action == "delete":
  246. # On 'delete' the runtime_type parameter will be None and since catalog names
  247. # are essentially unique across runtime types, we can break out of this loop
  248. # on first occurrence and let _that_ runtime type be used in the following code.
  249. for runtime_type in self._component_cache:
  250. if self.catalog.name in self._component_cache[runtime_type]:
  251. break
  252. # Add sub-dictionary for this runtime type if not present
  253. if not self._component_cache.get(runtime_type):
  254. self._component_cache[runtime_type] = {}
  255. # Add sub-dictionary for this catalog if not present - this will occur when
  256. # a catalog instance is created, so we're essentially adding a placeholder.
  257. if not self._component_cache[runtime_type].get(self.catalog.name):
  258. self._component_cache[runtime_type][self.catalog.name] = {
  259. "components": {},
  260. "status": {"state": "updating", "errors": []},
  261. }
  262. else: # Set state to 'updating' for an existing entry
  263. self._component_cache[runtime_type][self.catalog.name]["status"]["state"] = "updating"
  264. class ComponentCache(SingletonConfigurable):
  265. """Represents the cache of component definitions indexed by runtime-type, then by catalog name."""
  266. # The component_cache is indexed at the top level by runtime type name, e.g. 'APACHE_AIRFLOW',
  267. # and has as its value another dictionary. At the second level, each sub-dictionary is indexed by
  268. # a ComponentCatalogMetadata instance name; its value is also a sub-dictionary. This sub-dictionary
  269. # consists of two additional dictionaries: 1.) one with key "components" whose dictionary is
  270. # indexed by component id and maps to the corresponding Component object, and 2.) one with key
  271. # "status" and value of a final sub-dictionary with key-value pairs "state":"<current/updating/errors>"
  272. # and "errors":["<error1>", "<error2>", ...] to dynamically indicate the status of this catalog instance
  273. _component_cache: ComponentCacheType = {}
  274. _generic_category_label = "Elyra"
  275. _generic_components: Dict[str, Component] = {
  276. "notebook": Component(
  277. id="notebook",
  278. name="Notebook",
  279. description="Run notebook file",
  280. op="execute-notebook-node",
  281. catalog_type="elyra",
  282. component_reference="elyra",
  283. extensions=[".ipynb"],
  284. categories=[_generic_category_label],
  285. ),
  286. "python-script": Component(
  287. id="python-script",
  288. name="Python Script",
  289. description="Run Python script",
  290. op="execute-python-node",
  291. catalog_type="elyra",
  292. component_reference="elyra",
  293. extensions=[".py"],
  294. categories=[_generic_category_label],
  295. ),
  296. "r-script": Component(
  297. id="r-script",
  298. name="R Script",
  299. description="Run R script",
  300. op="execute-r-node",
  301. catalog_type="elyra",
  302. component_reference="elyra",
  303. extensions=[".r"],
  304. categories=[_generic_category_label],
  305. ),
  306. }
  307. def __init__(self, **kwargs):
  308. super().__init__(**kwargs)
  309. self._component_cache = {}
  310. self.is_server_process = ComponentCache._determine_server_process(**kwargs)
  311. self.manifest_dir = jupyter_runtime_dir()
  312. # Ensure queue attribute exists for non-server instances as well.
  313. self.refresh_queue: Optional[RefreshQueue] = None
  314. self.update_queue: Optional[UpdateQueue] = None
  315. if self.is_server_process:
  316. self.refresh_queue = RefreshQueue()
  317. self.update_queue = UpdateQueue()
  318. # Set up watchdog for manifest file for out-of-process updates
  319. self.observer = Observer()
  320. self.observer.schedule(ManifestFileChangeHandler(self), self.manifest_dir)
  321. # Start a thread to manage updates to the component cache
  322. manager = CacheUpdateManager(self.log, self._component_cache, self.refresh_queue, self.update_queue)
  323. self.cache_manager = manager
  324. self.cache_manager.start()
  325. self.log.debug("CacheUpdateManager started...")
  326. else:
  327. self.manifest_filename = os.path.join(self.manifest_dir, f"elyra-component-manifest-{os.getpid()}.json")
  328. @staticmethod
  329. def _determine_server_process(**kwargs) -> bool:
  330. """Determines if this process is a server (extension) process."""
  331. app_names = ["ServerApp", "ElyraApp"]
  332. is_server_process = False
  333. if "parent" in kwargs and kwargs["parent"].__class__.__name__ in app_names:
  334. is_server_process = True
  335. elif "emulate_server_app" in kwargs and kwargs["emulate_server_app"]: # Used in unittests
  336. is_server_process = True
  337. return is_server_process
  338. def load(self):
  339. """
  340. Completes a series of actions during system startup, such as creating
  341. the component manifest file and triggering the build of the component
  342. cache for existing ComponentCatalog metadata instances.
  343. """
  344. # Proceed only if singleton instance has been created
  345. if self.initialized:
  346. # The cache manager will work on manifest and cache tasks on an
  347. # in-process basis as load() is only called during startup from
  348. # the server process.
  349. if self.is_server_process:
  350. # Remove all existing manifest files from previous processes
  351. self._remove_all_manifest_files()
  352. # Start the watchdog if it's not alive, prevents redundant starts
  353. if not self.observer.is_alive():
  354. self.observer.start()
  355. # Fetch all component catalog instances and trigger their add to the
  356. # component cache if this is not already happening (it seems some server
  357. # test fixtures could be loading the server extensions multiple times).
  358. if not self.cache_manager.is_refreshing():
  359. self.refresh()
  360. def refresh(self):
  361. """Triggers a refresh of all catalogs in the component cache.
  362. Raises RefreshInProgressError if a complete refresh is in progress.
  363. Note that we do not preclude non-server processes from performing a
  364. complete refresh. In such cases, each of the catalog entries will be
  365. written to the manifest, which will be placed into the update queue.
  366. As a result, non-server applications could by-pass the "refresh in progress"
  367. constraint, but we're assuming a CLI application won't be as likely to
  368. "pound" refresh like a UI application can.
  369. """
  370. if self.is_server_process and self.cache_manager.is_refreshing():
  371. raise RefreshInProgressError()
  372. catalogs = MetadataManager(schemaspace=ComponentCatalogs.COMPONENT_CATALOGS_SCHEMASPACE_ID).get_all()
  373. for catalog in catalogs:
  374. self._insert_request(self.refresh_queue, catalog, "modify")
  375. def update(self, catalog: Metadata, action: str):
  376. """
  377. Triggers an update of the component cache for the given catalog name. If this is a non-server
  378. process, the entry is written to the manifest file where it will be "processed" by the watchdog
  379. and inserted into the component cache queue, otherwise we update the cache queue directly.
  380. """
  381. self._insert_request(self.update_queue, catalog, action)
  382. def _insert_request(self, queue: Queue, catalog: Metadata, action: str):
  383. """
  384. If running as a server process, the request is submitted to the desired queue, otherwise
  385. it is posted to the manifest where the server process (if running) can detect the manifest
  386. file update and send the request to the update queue.
  387. Note that any calls to ComponentCache.refresh() from non-server processes will still
  388. perform the refresh, but via the update queue rather than the refresh queue. We could,
  389. instead, raise NotImplementedError in such cases, but we may want the ability to refresh
  390. the entire component cache from a CLI utility and the current implementation would allow that.
  391. """
  392. if self.is_server_process:
  393. queue.put((catalog, action))
  394. else:
  395. manifest: Dict[str, str] = self._load_manifest()
  396. manifest[catalog.name] = action
  397. self.update_manifest(manifest=manifest)
  398. def _remove_all_manifest_files(self):
  399. """
  400. Remove all existing manifest files in the Jupyter runtimes directory.
  401. """
  402. manifest_files = Path(self.manifest_dir).glob("**/elyra-component-manifest-*.json")
  403. for file in manifest_files:
  404. os.remove(str(file))
  405. def _load_manifest(self, filename: Optional[str] = None) -> Dict[str, str]:
  406. """Read and return the contents of a manifest file.
  407. If 'filename' is not provided, this process's manifest file will be read.
  408. """
  409. filename = filename or self.manifest_filename
  410. if not os.path.isfile(filename):
  411. self.log.debug(f"Manifest file '{filename}' doesn't exist and will be created.")
  412. return {}
  413. with open(filename, "r") as f:
  414. manifest: Dict[str, str] = json.load(f)
  415. self.log.debug(f"Reading manifest '{manifest}' from file '{filename}'")
  416. return manifest
  417. def update_manifest(self, filename: Optional[str] = None, manifest: Optional[Dict[str, str]] = None) -> None:
  418. """Update the manifest file with the given entry."""
  419. filename = filename or self.manifest_filename
  420. manifest = manifest or {}
  421. self.log.debug(f"Updating manifest '{manifest}' to file '{filename}'")
  422. with open(filename, "w") as f:
  423. json.dump(manifest, f, indent=2)
  424. def wait_for_all_cache_tasks(self):
  425. """
  426. Block execution and wait for all tasks in the cache task update queue to complete.
  427. Primarily used for testing.
  428. """
  429. if self.is_server_process:
  430. self.update_queue.join()
  431. self.refresh_queue.join()
  432. def get_all_components(self, platform: RuntimeProcessorType) -> List[Component]:
  433. """
  434. Retrieve all components from component catalog cache
  435. """
  436. components: List[Component] = []
  437. catalogs = self._component_cache.get(platform.name, {})
  438. for catalog_name, catalog_properties in catalogs.items():
  439. components.extend(list(catalog_properties.get("components", {}).values()))
  440. if not components and platform != RuntimeProcessorType.LOCAL:
  441. self.log.error(f"No components could be found in any catalog for platform type '{platform.name}'.")
  442. return components
  443. def get_component(self, platform: RuntimeProcessorType, component_id: str) -> Optional[Component]:
  444. """
  445. Retrieve the component with a given component_id from component catalog cache
  446. """
  447. component: Optional[Component] = None
  448. catalogs = self._component_cache.get(platform.name, {})
  449. for catalog_name, catalog_properties in catalogs.items():
  450. component = catalog_properties.get("components", {}).get(component_id)
  451. if component:
  452. break
  453. if not component:
  454. self.log.error(f"Component with ID '{component_id}' could not be found in any catalog.")
  455. return component
  456. def _load_catalog_reader_class(
  457. self, catalog: ComponentCatalogMetadata, file_types: List[str]
  458. ) -> Optional[ComponentCatalogConnector]:
  459. """
  460. Load the appropriate entrypoint class based on the schema name indicated in
  461. the ComponentCatalogMetadata instance and the file types associated with the component
  462. parser in use
  463. """
  464. try:
  465. catalog_reader = entrypoints.get_group_named("elyra.component.catalog_types").get(catalog.schema_name)
  466. if not catalog_reader:
  467. self.log.error(
  468. f"No entrypoint with name '{catalog.schema_name}' was found in group "
  469. f"'elyra.component.catalog_types' to match the 'schema_name' given in catalog "
  470. f"'{catalog.display_name}'. Skipping..."
  471. )
  472. return None
  473. catalog_reader = catalog_reader.load()(file_types, parent=self.parent)
  474. except Exception as e:
  475. self.log.error(f"Could not load appropriate ComponentCatalogConnector class: {e}. Skipping...")
  476. return None
  477. return catalog_reader
  478. def read_component_catalog(self, catalog: ComponentCatalogMetadata) -> Dict[str, Component]:
  479. """
  480. Read a component catalog and return a dictionary of components indexed by component_id.
  481. :param catalog: a metadata instances from which to read and construct Component objects
  482. :returns: a dictionary of component id to Component object for all read/parsed components
  483. """
  484. components: Dict[str, Component] = {}
  485. # Assign component parser based on the runtime platform type
  486. parser = ComponentParser.create_instance(platform=catalog.runtime_type)
  487. # Assign reader based on the type of the catalog (the 'schema_name')
  488. catalog_reader = self._load_catalog_reader_class(catalog, parser.file_types)
  489. if not catalog_reader:
  490. return components
  491. # Get content of component definition file for each component in this catalog
  492. self.log.debug(f"Processing components in catalog '{catalog.display_name}'")
  493. catalog_entries = catalog_reader.read_component_definitions(catalog)
  494. if not catalog_entries:
  495. return components
  496. for catalog_entry in catalog_entries:
  497. # Parse the entry to get a fully qualified Component object
  498. try:
  499. parsed_components = parser.parse(catalog_entry) or []
  500. except Exception as e:
  501. self.log.warning(
  502. f"Could not parse definition for component with identifying information: "
  503. f"'{catalog_entry.entry_reference}' -> {str(e)}"
  504. )
  505. else:
  506. for component in parsed_components:
  507. components[component.id] = component
  508. return components
  509. @staticmethod
  510. def get_generic_components() -> List[Component]:
  511. return list(ComponentCache._generic_components.values())
  512. @staticmethod
  513. def get_generic_component(component_id: str) -> Optional[Component]:
  514. return ComponentCache._generic_components.get(component_id)
  515. @staticmethod
  516. def get_generic_component_ops() -> List[str]:
  517. return [component.op for component in ComponentCache.get_generic_components()]
  518. @staticmethod
  519. def load_jinja_template(template_name: str) -> Template:
  520. """
  521. Loads the jinja template of the given name from the
  522. elyra/templates/components folder
  523. """
  524. loader = PackageLoader("elyra", "templates/components")
  525. template_env = Environment(loader=loader)
  526. return template_env.get_template(template_name)
  527. @staticmethod
  528. def to_canvas_palette(components: List[Component]) -> Dict:
  529. """
  530. Converts catalog components into appropriate canvas palette format
  531. """
  532. template = ComponentCache.load_jinja_template("canvas_palette_template.jinja2")
  533. # Define a fallback category for components with no given categories
  534. fallback_category_name = "No Category"
  535. # Convert the list of all components into a dictionary of
  536. # component lists keyed by category
  537. category_to_components: Dict[str, List[Component]] = {}
  538. for component in components:
  539. categories = component.categories
  540. # Assign a fallback category so that component is not
  541. # lost during palette render
  542. if not categories:
  543. categories = [fallback_category_name]
  544. for category in categories:
  545. if category not in category_to_components.keys():
  546. category_to_components[category] = []
  547. if component.id not in [comp.id for comp in category_to_components[category]]:
  548. category_to_components[category].append(component)
  549. # Render template
  550. canvas_palette = template.render(category_dict=category_to_components)
  551. return json.loads(canvas_palette)
  552. @staticmethod
  553. def to_canvas_properties(component: Component) -> Dict:
  554. """
  555. Converts catalog components into appropriate canvas properties format
  556. If component_id is one of the generic set, generic template is rendered,
  557. otherwise, the runtime-specific property template is rendered
  558. """
  559. if ComponentCache.get_generic_component(component.id) is not None:
  560. template = ComponentCache.load_jinja_template("generic_properties_template.jinja2")
  561. else:
  562. template = ComponentCache.load_jinja_template("canvas_properties_template.jinja2")
  563. canvas_properties = template.render(component=component)
  564. return json.loads(canvas_properties)
  565. class ManifestFileChangeHandler(FileSystemEventHandler):
  566. """Watchdog handler that filters on .json files within specific metadata directories."""
  567. def __init__(self, component_cache: ComponentCache, **kwargs):
  568. super().__init__(**kwargs)
  569. self.component_cache = component_cache
  570. self.log = component_cache.log
  571. def dispatch(self, event):
  572. """Dispatches delete and modification events pertaining to the manifest filename"""
  573. if "elyra-component-manifest" in event.src_path:
  574. super().dispatch(event)
  575. def on_modified(self, event):
  576. """Fires when the component manifest file is modified."""
  577. self.log.debug(f"ManifestFileChangeHandler: file '{event.src_path}' has been modified.")
  578. manifest = self.component_cache._load_manifest(filename=event.src_path)
  579. if manifest: # only update the manifest if there is work to do
  580. for catalog, action in manifest.items():
  581. self.log.debug(f"ManifestFileChangeHandler: inserting ({catalog},{action}) into update queue...")
  582. if action == "delete":
  583. # The metadata instance has already been deleted, so we must
  584. # fabricate an instance that only consists of a catalog name
  585. catalog_instance = ComponentCatalogMetadata(name=catalog)
  586. else: # cache_action == 'modify':
  587. # Fetch the catalog instance associated with this action
  588. catalog_instance = MetadataManager(
  589. schemaspace=ComponentCatalogs.COMPONENT_CATALOGS_SCHEMASPACE_ID
  590. ).get(name=catalog)
  591. self.component_cache.update(catalog=catalog_instance, action=action)
  592. self.component_cache.update_manifest(filename=event.src_path) # clear the manifest