component_catalog.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import json
  17. from logging import Logger
  18. import os
  19. from pathlib import Path
  20. from queue import Empty
  21. from queue import Queue
  22. from threading import Event
  23. from threading import Thread
  24. import time
  25. from typing import Dict
  26. from typing import List
  27. from typing import Optional
  28. from typing import Union
  29. import entrypoints
  30. from jinja2 import Environment
  31. from jinja2 import PackageLoader
  32. from jinja2 import Template
  33. from jupyter_core.paths import jupyter_runtime_dir
  34. from traitlets.config import SingletonConfigurable
  35. from watchdog.events import FileSystemEventHandler
  36. from watchdog.observers import Observer
  37. from elyra.metadata.manager import MetadataManager
  38. from elyra.metadata.metadata import Metadata
  39. from elyra.metadata.schemaspaces import ComponentCatalogs
  40. from elyra.pipeline.catalog_connector import ComponentCatalogConnector
  41. from elyra.pipeline.component import Component
  42. from elyra.pipeline.component import ComponentParser
  43. from elyra.pipeline.component_metadata import ComponentCatalogMetadata
  44. from elyra.pipeline.pipeline_constants import ELYRA_COMPONENT_PROPERTIES
  45. from elyra.pipeline.runtime_type import RuntimeProcessorType
  46. BLOCKING_TIMEOUT = 0.5
  47. NONBLOCKING_TIMEOUT = 0.10
  48. # Issue warnings if catalog update takes longer than this value in seconds
  49. CATALOG_UPDATE_TIMEOUT = int(os.getenv("ELYRA_CATALOG_UPDATE_TIMEOUT", 15))
  50. # Issue warnings when outstanding worker thread counts exceed this value
  51. WORKER_THREAD_WARNING_THRESHOLD = int(os.getenv("ELYRA_WORKER_THREAD_WARNING_THRESHOLD", 10))
  52. # Define custom type to describe the component cache
  53. ComponentCacheType = Dict[str, Dict[str, Dict[str, Dict[str, Union[Component, str, List[str]]]]]]
  54. class RefreshInProgressError(Exception):
  55. def __init__(self):
  56. super().__init__("A catalog refresh is in progress. Try the request later.")
  57. class RefreshQueue(Queue):
  58. """Entries are associated with a complete refresh of the Component Cache."""
  59. _refreshing: bool
  60. def __init__(self):
  61. super().__init__()
  62. self._refreshing = False
  63. @property
  64. def refreshing(self) -> bool:
  65. return self._refreshing
  66. @refreshing.setter
  67. def refreshing(self, value: bool) -> None:
  68. self._refreshing = value
  69. def get(self, block: bool = True, timeout: Optional[float] = None):
  70. """Overrides the superclass method to set the refreshing property to false when empty."""
  71. try:
  72. entry = super().get(block=block, timeout=timeout)
  73. except Empty:
  74. self.refreshing = False
  75. raise
  76. return entry
  77. def put(self, item, block=True, timeout=None):
  78. """Overrides the superclass method to set the refreshing property to true."""
  79. super().put(item, block=block, timeout=timeout)
  80. self.refreshing = True
  81. class UpdateQueue(Queue):
  82. """Entries are associated with a single update of the Component Cache.
  83. This class merely exists to distinguish it from the RefreshQueue instance.
  84. """
  85. pass
  86. class CacheUpdateManager(Thread):
  87. """
  88. Primary thread for maintaining consistency of the component cache.
  89. The component cache manager maintains the cache queue, whose entries are a
  90. tuple of 'catalog' and 'action'. The 'catalog' is a catalog instance against
  91. which the 'action' is applied. The 'action' is one of 'modify' or 'delete'.
  92. For 'delete' the components of the referenced catalog are removed. For 'modify'
  93. the components of the referenced catalog are inserted or updated (depending on
  94. its prior existence).
  95. """
  96. def __init__(
  97. self, log: Logger, component_cache: ComponentCacheType, refresh_queue: RefreshQueue, update_queue: UpdateQueue
  98. ):
  99. super().__init__()
  100. self.daemon = True
  101. self.name = "CacheUpdateManager"
  102. self.log: Logger = log
  103. self._component_cache: ComponentCacheType = component_cache
  104. self._refresh_queue: RefreshQueue = refresh_queue
  105. self._update_queue: UpdateQueue = update_queue
  106. self._check_refresh_queue = False
  107. self._threads: List[CacheUpdateWorker] = []
  108. self.stop_event: Event = Event() # Set when server process stops
  109. def run(self):
  110. """Process queue queue entries until server is stopped."""
  111. while not self.stop_event.is_set():
  112. self.manage_cache_tasks()
  113. def manage_cache_tasks(self):
  114. """
  115. Check the cache queue for a cache update action and start
  116. a corresponding worker thread to complete the update
  117. """
  118. outstanding_threads = self._has_outstanding_threads()
  119. try:
  120. # Get a task from the cache queue, waiting less if we have active threads.
  121. timeout = NONBLOCKING_TIMEOUT if outstanding_threads else BLOCKING_TIMEOUT
  122. # Toggle between refresh and update queues so as to prevent starvation.
  123. self._check_refresh_queue = not self._check_refresh_queue
  124. if self._check_refresh_queue:
  125. catalog, action = self._refresh_queue.get(timeout=timeout)
  126. else:
  127. catalog, action = self._update_queue.get(timeout=timeout)
  128. except Empty:
  129. # No task exists in the cache queue, proceed to check for thread execution
  130. pass
  131. else:
  132. # Create and start a thread for the task
  133. updater_thread = CacheUpdateWorker(
  134. self._component_cache,
  135. self._refresh_queue if self._check_refresh_queue else self._update_queue,
  136. catalog,
  137. action,
  138. )
  139. updater_thread.start()
  140. queue_clause = "refreshing" if self._check_refresh_queue else "updating"
  141. self.log.debug(f"CacheUpdateWorker {queue_clause} catalog: '{updater_thread.name}', action: '{action}'...")
  142. self._threads.append(updater_thread)
  143. def _has_outstanding_threads(self) -> bool:
  144. """
  145. Join finished threads and report on long-running threads as needed.
  146. """
  147. outstanding_threads = False
  148. for thread in self._threads:
  149. # Attempt to join thread within the given amount of time
  150. thread.join(timeout=NONBLOCKING_TIMEOUT)
  151. cumulative_run_time = int(time.time() - thread.task_start_time)
  152. if thread.is_alive():
  153. # Thread is still running (thread join timed out)
  154. outstanding_threads = True
  155. # Report on a long-running thread if CATALOG_UPDATE_TIMEOUT is exceeded
  156. time_since_last_check = int(time.time() - thread.last_warn_time)
  157. if time_since_last_check > CATALOG_UPDATE_TIMEOUT:
  158. thread.last_warn_time = time.time()
  159. self.log.warning(
  160. f"Cache update for catalog '{thread.name}' is still processing "
  161. f"after {cumulative_run_time} seconds ..."
  162. )
  163. else:
  164. self.log.debug(f"CacheUpdateWorker completed for catalog: '{thread.name}', action: '{thread.action}'.")
  165. # Thread has been joined and can be removed from the list
  166. self._threads.remove(thread)
  167. # Mark cache task as complete
  168. thread.queue.task_done()
  169. # Report successful join for threads that have previously logged a
  170. # cache update duration warning
  171. if thread.last_warn_time != thread.task_start_time:
  172. self.log.info(
  173. f"Cache update for catalog '{thread.name}' has "
  174. f"completed after {cumulative_run_time} seconds"
  175. )
  176. if len(self._threads) > WORKER_THREAD_WARNING_THRESHOLD:
  177. self.log.warning(
  178. f"CacheUpdateWorker outstanding threads threshold "
  179. f"({WORKER_THREAD_WARNING_THRESHOLD}) has been exceeded. "
  180. f"{len(self._threads)} threads are outstanding. This may "
  181. f"indicate a possible issue."
  182. )
  183. return outstanding_threads
  184. def is_refreshing(self) -> bool:
  185. return self._refresh_queue.refreshing
  186. def init_refresh(self) -> None:
  187. self._refresh_queue.refreshing = True
  188. def stop(self):
  189. """Trigger completion of the manager thread."""
  190. self._refresh_queue.refreshing = False
  191. self.stop_event.set()
  192. self.log.debug("CacheUpdateManager stopped.")
  193. class CacheUpdateWorker(Thread):
  194. """Spawned by the CacheUpdateManager to perform work against the component cache."""
  195. def __init__(
  196. self,
  197. component_cache: ComponentCacheType,
  198. queue: Queue,
  199. catalog: ComponentCatalogMetadata,
  200. action: Optional[str] = None,
  201. ):
  202. super().__init__()
  203. self.daemon = True
  204. self.name = catalog.name # Let the name of the thread reflect the catalog being managed
  205. self._component_cache: ComponentCacheType = component_cache
  206. # Task-specific properties
  207. self.queue: Queue = queue
  208. self.catalog: ComponentCatalogMetadata = catalog
  209. self.action: str = action
  210. # Thread metadata
  211. self.task_start_time = time.time()
  212. self.last_warn_time = self.task_start_time
  213. # Prepare component cache for modification
  214. runtime_type = None
  215. if self.catalog.metadata:
  216. runtime_type = self.catalog.runtime_type.name
  217. self.prepare_cache_for_catalog(runtime_type)
  218. def run(self):
  219. """Apply the relative action to the given catalog entry in the cache."""
  220. if self.action == "delete":
  221. # Check all runtime types in cache for an entry of the given name.
  222. # If found, remove only the components from this catalog
  223. for runtime_type in self._component_cache:
  224. if self.catalog.name in self._component_cache[runtime_type]:
  225. self._component_cache[runtime_type].pop(self.catalog.name, None)
  226. break
  227. else: # 'modify' - replace (or add) components from the given catalog an update its status
  228. runtime_type = self.catalog.runtime_type.name
  229. catalog_state = self._component_cache[runtime_type][self.catalog.name].get("status")
  230. try:
  231. # Replace all components for the given catalog
  232. self._component_cache[runtime_type][self.catalog.name][
  233. "components"
  234. ] = ComponentCache.instance().read_component_catalog(self.catalog)
  235. catalog_state["state"] = "current"
  236. catalog_state["errors"] = [] # reset any errors that may have been present
  237. except Exception as e:
  238. # Update state with an 'error' action and the relevant message
  239. catalog_state["state"] = "error"
  240. catalog_state["errors"].append(str(e))
  241. def prepare_cache_for_catalog(self, runtime_type: Optional[str] = None):
  242. """
  243. Add entries to the component cache for the runtime type and/or catalog
  244. of focus for this thread, and set the catalog state to 'updating'.
  245. """
  246. if self.action == "delete":
  247. # On 'delete' the runtime_type parameter will be None and since catalog names
  248. # are essentially unique across runtime types, we can break out of this loop
  249. # on first occurrence and let _that_ runtime type be used in the following code.
  250. for runtime_type in self._component_cache:
  251. if self.catalog.name in self._component_cache[runtime_type]:
  252. break
  253. # Add sub-dictionary for this runtime type if not present
  254. if not self._component_cache.get(runtime_type):
  255. self._component_cache[runtime_type] = {}
  256. # Add sub-dictionary for this catalog if not present - this will occur when
  257. # a catalog instance is created, so we're essentially adding a placeholder.
  258. if not self._component_cache[runtime_type].get(self.catalog.name):
  259. self._component_cache[runtime_type][self.catalog.name] = {
  260. "components": {},
  261. "status": {"state": "updating", "errors": []},
  262. }
  263. else: # Set state to 'updating' for an existing entry
  264. self._component_cache[runtime_type][self.catalog.name]["status"]["state"] = "updating"
  265. class ComponentCache(SingletonConfigurable):
  266. """Represents the cache of component definitions indexed by runtime-type, then by catalog name."""
  267. # The component_cache is indexed at the top level by runtime type name, e.g. 'APACHE_AIRFLOW',
  268. # and has as its value another dictionary. At the second level, each sub-dictionary is indexed by
  269. # a ComponentCatalogMetadata instance name; its value is also a sub-dictionary. This sub-dictionary
  270. # consists of two additional dictionaries: 1.) one with key "components" whose dictionary is
  271. # indexed by component id and maps to the corresponding Component object, and 2.) one with key
  272. # "status" and value of a final sub-dictionary with key-value pairs "state":"<current/updating/errors>"
  273. # and "errors":["<error1>", "<error2>", ...] to dynamically indicate the status of this catalog instance
  274. _component_cache: ComponentCacheType = {}
  275. _generic_category_label = "Elyra"
  276. _generic_components: Dict[str, Component] = {
  277. "notebook": Component(
  278. id="notebook",
  279. name="Notebook",
  280. description="Run notebook file",
  281. op="execute-notebook-node",
  282. catalog_type="elyra",
  283. component_reference="elyra",
  284. extensions=[".ipynb"],
  285. categories=[_generic_category_label],
  286. ),
  287. "python-script": Component(
  288. id="python-script",
  289. name="Python Script",
  290. description="Run Python script",
  291. op="execute-python-node",
  292. catalog_type="elyra",
  293. component_reference="elyra",
  294. extensions=[".py"],
  295. categories=[_generic_category_label],
  296. ),
  297. "r-script": Component(
  298. id="r-script",
  299. name="R Script",
  300. description="Run R script",
  301. op="execute-r-node",
  302. catalog_type="elyra",
  303. component_reference="elyra",
  304. extensions=[".r"],
  305. categories=[_generic_category_label],
  306. ),
  307. }
  308. def __init__(self, **kwargs):
  309. emulate_server_app: bool = kwargs.pop("emulate_server_app", False)
  310. super().__init__(**kwargs)
  311. self._component_cache = {}
  312. self.is_server_process = ComponentCache._determine_server_process(emulate_server_app, **kwargs)
  313. self.manifest_dir = jupyter_runtime_dir()
  314. # Ensure queue attribute exists for non-server instances as well.
  315. self.refresh_queue: Optional[RefreshQueue] = None
  316. self.update_queue: Optional[UpdateQueue] = None
  317. if self.is_server_process:
  318. self.refresh_queue = RefreshQueue()
  319. self.update_queue = UpdateQueue()
  320. # Set up watchdog for manifest file for out-of-process updates
  321. self.observer = Observer()
  322. self.observer.schedule(ManifestFileChangeHandler(self), self.manifest_dir)
  323. # Start a thread to manage updates to the component cache
  324. manager = CacheUpdateManager(self.log, self._component_cache, self.refresh_queue, self.update_queue)
  325. self.cache_manager = manager
  326. self.cache_manager.start()
  327. self.log.debug("CacheUpdateManager started...")
  328. else:
  329. self.manifest_filename = os.path.join(self.manifest_dir, f"elyra-component-manifest-{os.getpid()}.json")
  330. @staticmethod
  331. def _determine_server_process(emulate_server_app: bool, **kwargs) -> bool:
  332. """Determines if this process is a server (extension) process."""
  333. app_names = ["ServerApp", "ElyraApp"]
  334. is_server_process = False
  335. if "parent" in kwargs and kwargs["parent"].__class__.__name__ in app_names:
  336. is_server_process = True
  337. elif emulate_server_app: # Used in unittests
  338. is_server_process = True
  339. return is_server_process
  340. def load(self):
  341. """
  342. Completes a series of actions during system startup, such as creating
  343. the component manifest file and triggering the build of the component
  344. cache for existing ComponentCatalog metadata instances.
  345. """
  346. # Proceed only if singleton instance has been created
  347. if self.initialized:
  348. # The cache manager will work on manifest and cache tasks on an
  349. # in-process basis as load() is only called during startup from
  350. # the server process.
  351. if self.is_server_process:
  352. # Remove all existing manifest files from previous processes
  353. self._remove_all_manifest_files()
  354. # Start the watchdog if it's not alive, prevents redundant starts
  355. if not self.observer.is_alive():
  356. self.observer.start()
  357. # Fetch all component catalog instances and trigger their add to the
  358. # component cache if this is not already happening (it seems some server
  359. # test fixtures could be loading the server extensions multiple times).
  360. if not self.cache_manager.is_refreshing():
  361. self.refresh()
  362. def refresh(self):
  363. """Triggers a refresh of all catalogs in the component cache.
  364. Raises RefreshInProgressError if a complete refresh is in progress.
  365. Note that we do not preclude non-server processes from performing a
  366. complete refresh. In such cases, each of the catalog entries will be
  367. written to the manifest, which will be placed into the update queue.
  368. As a result, non-server applications could by-pass the "refresh in progress"
  369. constraint, but we're assuming a CLI application won't be as likely to
  370. "pound" refresh like a UI application can.
  371. """
  372. if self.is_server_process and self.cache_manager.is_refreshing():
  373. raise RefreshInProgressError()
  374. catalogs = MetadataManager(schemaspace=ComponentCatalogs.COMPONENT_CATALOGS_SCHEMASPACE_ID).get_all()
  375. for catalog in catalogs:
  376. self._insert_request(self.refresh_queue, catalog, "modify")
  377. def update(self, catalog: Metadata, action: str):
  378. """
  379. Triggers an update of the component cache for the given catalog name. If this is a non-server
  380. process, the entry is written to the manifest file where it will be "processed" by the watchdog
  381. and inserted into the component cache queue, otherwise we update the cache queue directly.
  382. """
  383. self._insert_request(self.update_queue, catalog, action)
  384. def _insert_request(self, queue: Queue, catalog: Metadata, action: str):
  385. """
  386. If running as a server process, the request is submitted to the desired queue, otherwise
  387. it is posted to the manifest where the server process (if running) can detect the manifest
  388. file update and send the request to the update queue.
  389. Note that any calls to ComponentCache.refresh() from non-server processes will still
  390. perform the refresh, but via the update queue rather than the refresh queue. We could,
  391. instead, raise NotImplementedError in such cases, but we may want the ability to refresh
  392. the entire component cache from a CLI utility and the current implementation would allow that.
  393. """
  394. if self.is_server_process:
  395. queue.put((catalog, action))
  396. else:
  397. manifest: Dict[str, str] = self._load_manifest()
  398. manifest[catalog.name] = action
  399. self.update_manifest(manifest=manifest)
  400. def _remove_all_manifest_files(self):
  401. """
  402. Remove all existing manifest files in the Jupyter runtimes directory.
  403. """
  404. manifest_files = Path(self.manifest_dir).glob("**/elyra-component-manifest-*.json")
  405. for file in manifest_files:
  406. os.remove(str(file))
  407. def _load_manifest(self, filename: Optional[str] = None) -> Dict[str, str]:
  408. """Read and return the contents of a manifest file.
  409. If 'filename' is not provided, this process's manifest file will be read.
  410. """
  411. filename = filename or self.manifest_filename
  412. if not os.path.isfile(filename):
  413. self.log.debug(f"Manifest file '{filename}' doesn't exist and will be created.")
  414. return {}
  415. with open(filename, "r") as f:
  416. manifest: Dict[str, str] = json.load(f)
  417. self.log.debug(f"Reading manifest '{manifest}' from file '{filename}'")
  418. return manifest
  419. def update_manifest(self, filename: Optional[str] = None, manifest: Optional[Dict[str, str]] = None) -> None:
  420. """Update the manifest file with the given entry."""
  421. filename = filename or self.manifest_filename
  422. manifest = manifest or {}
  423. self.log.debug(f"Updating manifest '{manifest}' to file '{filename}'")
  424. with open(filename, "w") as f:
  425. json.dump(manifest, f, indent=2)
  426. def wait_for_all_cache_tasks(self):
  427. """
  428. Block execution and wait for all tasks in the cache task update queue to complete.
  429. Primarily used for testing.
  430. """
  431. if self.is_server_process:
  432. self.update_queue.join()
  433. self.refresh_queue.join()
  434. def get_all_components(self, platform: RuntimeProcessorType) -> List[Component]:
  435. """
  436. Retrieve all components from component catalog cache
  437. """
  438. components: List[Component] = []
  439. catalogs = self._component_cache.get(platform.name, {})
  440. for catalog_name, catalog_properties in catalogs.items():
  441. components.extend(list(catalog_properties.get("components", {}).values()))
  442. if not components and platform != RuntimeProcessorType.LOCAL:
  443. self.log.error(f"No components could be found in any catalog for platform type '{platform.name}'.")
  444. return components
  445. def get_component(self, platform: RuntimeProcessorType, component_id: str) -> Optional[Component]:
  446. """
  447. Retrieve the component with a given component_id from component catalog cache
  448. """
  449. component: Optional[Component] = None
  450. catalogs = self._component_cache.get(platform.name, {})
  451. for catalog_name, catalog_properties in catalogs.items():
  452. component = catalog_properties.get("components", {}).get(component_id)
  453. if component:
  454. break
  455. if not component:
  456. self.log.error(f"Component with ID '{component_id}' could not be found in any catalog.")
  457. return component
  458. def _load_catalog_reader_class(
  459. self, catalog: ComponentCatalogMetadata, file_types: List[str]
  460. ) -> Optional[ComponentCatalogConnector]:
  461. """
  462. Load the appropriate entrypoint class based on the schema name indicated in
  463. the ComponentCatalogMetadata instance and the file types associated with the component
  464. parser in use
  465. """
  466. try:
  467. catalog_reader = entrypoints.get_group_named("elyra.component.catalog_types").get(catalog.schema_name)
  468. if not catalog_reader:
  469. self.log.error(
  470. f"No entrypoint with name '{catalog.schema_name}' was found in group "
  471. f"'elyra.component.catalog_types' to match the 'schema_name' given in catalog "
  472. f"'{catalog.display_name}'. Skipping..."
  473. )
  474. return None
  475. catalog_reader = catalog_reader.load()(file_types, parent=self.parent)
  476. except Exception as e:
  477. self.log.error(f"Could not load appropriate ComponentCatalogConnector class: {e}. Skipping...")
  478. return None
  479. return catalog_reader
  480. def read_component_catalog(self, catalog: ComponentCatalogMetadata) -> Dict[str, Component]:
  481. """
  482. Read a component catalog and return a dictionary of components indexed by component_id.
  483. :param catalog: a metadata instances from which to read and construct Component objects
  484. :returns: a dictionary of component id to Component object for all read/parsed components
  485. """
  486. components: Dict[str, Component] = {}
  487. # Assign component parser based on the runtime platform type
  488. parser = ComponentParser.create_instance(platform=catalog.runtime_type)
  489. # Assign reader based on the type of the catalog (the 'schema_name')
  490. catalog_reader = self._load_catalog_reader_class(catalog, parser.file_types)
  491. if not catalog_reader:
  492. return components
  493. # Get content of component definition file for each component in this catalog
  494. self.log.debug(f"Processing components in catalog '{catalog.display_name}'")
  495. catalog_entries = catalog_reader.read_component_definitions(catalog)
  496. if not catalog_entries:
  497. return components
  498. for catalog_entry in catalog_entries:
  499. # Parse the entry to get a fully qualified Component object
  500. try:
  501. parsed_components = parser.parse(catalog_entry) or []
  502. except Exception as e:
  503. self.log.warning(
  504. f"Could not parse definition for component with identifying information: "
  505. f"'{catalog_entry.entry_reference}' -> {str(e)}"
  506. )
  507. else:
  508. for component in parsed_components:
  509. components[component.id] = component
  510. return components
  511. @staticmethod
  512. def get_generic_components() -> List[Component]:
  513. return list(ComponentCache._generic_components.values())
  514. @staticmethod
  515. def get_generic_component(component_id: str) -> Optional[Component]:
  516. return ComponentCache._generic_components.get(component_id)
  517. @staticmethod
  518. def get_generic_component_ops() -> List[str]:
  519. return [component.op for component in ComponentCache.get_generic_components()]
  520. @staticmethod
  521. def load_jinja_template(template_name: str) -> Template:
  522. """
  523. Loads the jinja template of the given name from the
  524. elyra/templates/components folder
  525. """
  526. loader = PackageLoader("elyra", "templates/components")
  527. template_env = Environment(loader=loader)
  528. return template_env.get_template(template_name)
  529. @staticmethod
  530. def to_canvas_palette(components: List[Component]) -> Dict:
  531. """
  532. Converts catalog components into appropriate canvas palette format
  533. """
  534. template = ComponentCache.load_jinja_template("canvas_palette_template.jinja2")
  535. # Define a fallback category for components with no given categories
  536. fallback_category_name = "No Category"
  537. # Convert the list of all components into a dictionary of
  538. # component lists keyed by category
  539. category_to_components: Dict[str, List[Component]] = {}
  540. for component in components:
  541. categories = component.categories
  542. # Assign a fallback category so that component is not
  543. # lost during palette render
  544. if not categories:
  545. categories = [fallback_category_name]
  546. for category in categories:
  547. if category not in category_to_components.keys():
  548. category_to_components[category] = []
  549. if component.id not in [comp.id for comp in category_to_components[category]]:
  550. category_to_components[category].append(component)
  551. # Render template
  552. canvas_palette = template.render(category_dict=category_to_components)
  553. return json.loads(canvas_palette)
  554. @staticmethod
  555. def to_canvas_properties(component: Component) -> Dict:
  556. """
  557. Converts catalog components into appropriate canvas properties format
  558. If component_id is one of the generic set, generic template is rendered,
  559. otherwise, the runtime-specific property template is rendered
  560. """
  561. kwargs = {}
  562. if ComponentCache.get_generic_component(component.id) is not None:
  563. template = ComponentCache.load_jinja_template("generic_properties_template.jinja2")
  564. else:
  565. # Determine which component properties parsed from the definition
  566. # collide with Elyra-defined properties (in the case of a collision,
  567. # only the parsed property will be displayed)
  568. kwargs = {"elyra_property_collisions_list": []}
  569. for param in component.properties:
  570. if param.ref in ELYRA_COMPONENT_PROPERTIES:
  571. kwargs["elyra_property_collisions_list"].append(param.ref)
  572. template = ComponentCache.load_jinja_template("canvas_properties_template.jinja2")
  573. canvas_properties = template.render(component=component, **kwargs)
  574. return json.loads(canvas_properties)
  575. class ManifestFileChangeHandler(FileSystemEventHandler):
  576. """Watchdog handler that filters on .json files within specific metadata directories."""
  577. def __init__(self, component_cache: ComponentCache, **kwargs):
  578. super().__init__(**kwargs)
  579. self.component_cache = component_cache
  580. self.log = component_cache.log
  581. def dispatch(self, event):
  582. """Dispatches delete and modification events pertaining to the manifest filename"""
  583. if "elyra-component-manifest" in event.src_path:
  584. super().dispatch(event)
  585. def on_modified(self, event):
  586. """Fires when the component manifest file is modified."""
  587. self.log.debug(f"ManifestFileChangeHandler: file '{event.src_path}' has been modified.")
  588. manifest = self.component_cache._load_manifest(filename=event.src_path)
  589. if manifest: # only update the manifest if there is work to do
  590. for catalog, action in manifest.items():
  591. self.log.debug(f"ManifestFileChangeHandler: inserting ({catalog},{action}) into update queue...")
  592. if action == "delete":
  593. # The metadata instance has already been deleted, so we must
  594. # fabricate an instance that only consists of a catalog name
  595. catalog_instance = ComponentCatalogMetadata(name=catalog)
  596. else: # cache_action == 'modify':
  597. # Fetch the catalog instance associated with this action
  598. catalog_instance = MetadataManager(
  599. schemaspace=ComponentCatalogs.COMPONENT_CATALOGS_SCHEMASPACE_ID
  600. ).get(name=catalog)
  601. self.component_cache.update(catalog=catalog_instance, action=action)
  602. self.component_cache.update_manifest(filename=event.src_path) # clear the manifest