storage.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from abc import ABC
  17. from abc import abstractmethod
  18. from collections import OrderedDict
  19. import copy
  20. import io
  21. import json
  22. import os
  23. import time
  24. from typing import Any
  25. from typing import Dict
  26. from typing import List
  27. from typing import Optional
  28. import jupyter_core.paths
  29. from traitlets import log # noqa H306
  30. from traitlets.config import LoggingConfigurable # noqa H306
  31. from traitlets.config import SingletonConfigurable
  32. from traitlets.traitlets import Bool
  33. from traitlets.traitlets import Integer
  34. from watchdog.events import FileSystemEventHandler
  35. from watchdog.observers import Observer
  36. from elyra.metadata.error import MetadataExistsError
  37. from elyra.metadata.error import MetadataNotFoundError
  38. class MetadataStore(ABC):
  39. def __init__(self, schemaspace, parent: Optional[LoggingConfigurable] = None, **kwargs):
  40. self.schemaspace = schemaspace
  41. self.log = parent.log if parent else log.get_logger()
  42. @abstractmethod
  43. def schemaspace_exists(self) -> bool:
  44. """Returns True if the schemaspace for this instance exists"""
  45. pass
  46. @abstractmethod
  47. def fetch_instances(self, name: Optional[str] = None, include_invalid: bool = False) -> List[dict]:
  48. """Fetch metadata instances"""
  49. pass
  50. @abstractmethod
  51. def store_instance(self, name: str, metadata: dict, for_update: bool = False) -> dict:
  52. """Stores the named metadata instance."""
  53. pass
  54. @abstractmethod
  55. def delete_instance(self, metadata: dict) -> None:
  56. """Deletes the metadata instance corresponding to the given name."""
  57. pass
  58. def caching_enabled(func):
  59. """Checks if file store cache is enabled. If not, just return, else perform function."""
  60. def wrapped(self, *args, **kwargs):
  61. if not self.enabled:
  62. return
  63. return func(self, *args, **kwargs)
  64. return wrapped
  65. class FileMetadataCache(SingletonConfigurable):
  66. """FileMetadataCache is used exclusively by FileMetadataStore to cache file-based metadata instances.
  67. FileMetadataCache utilizes a watchdog handler to monitor directories corresponding to
  68. any files it contains. The handler is primarily used to determine which cached entries
  69. to remove (on delete operations).
  70. The cache is implemented as a simple LRU cache using an OrderedDict.
  71. """
  72. max_size = Integer(
  73. min=1, max=1024, default_value=128, config=True, help="The maximum number of entries allowed in the cache."
  74. )
  75. enabled = Bool(default_value=True, config=True, help="Caching is enabled (True) or disabled (False).")
  76. def __init__(self, **kwargs):
  77. super().__init__(**kwargs)
  78. self.hits: int = 0
  79. self.misses: int = 0
  80. self.trims: int = 0
  81. self._entries: OrderedDict = OrderedDict()
  82. if self.enabled: # Only create and start an observer when enabled
  83. self.observed_dirs = set() # Tracks which directories are being watched
  84. self.observer = Observer()
  85. self.observer.start()
  86. else:
  87. self.log.info(
  88. "The file metadata cache is currently disabled via configuration. "
  89. "Set FileMetadataCache.enabled=True to enable instance caching."
  90. )
  91. def __len__(self) -> int:
  92. """Return the number of running kernels."""
  93. return len(self._entries)
  94. def __contains__(self, path: str) -> bool:
  95. return path in self._entries
  96. @caching_enabled
  97. def add_item(self, path: str, entry: Dict[str, Any]) -> None:
  98. """Adds the named entry and its entry to the cache.
  99. If this causes the cache to grow beyond its max size, the least recently
  100. used entry is removed.
  101. """
  102. md_dir: str = os.path.dirname(path)
  103. if md_dir not in self.observed_dirs and os.path.isdir(md_dir):
  104. self.observer.schedule(FileChangeHandler(self), md_dir, recursive=True)
  105. self.observed_dirs.add(md_dir)
  106. self._entries[path] = copy.deepcopy(entry)
  107. self._entries.move_to_end(path)
  108. if len(self._entries) > self.max_size:
  109. self.trims += 1
  110. self._entries.popitem(last=False) # pop LRU entry
  111. @caching_enabled
  112. def get_item(self, path: str) -> Optional[Dict[str, Any]]:
  113. """Gets the named entry and returns its value or None if not present."""
  114. if path in self._entries:
  115. self.hits += 1
  116. self._entries.move_to_end(path)
  117. return copy.deepcopy(self._entries[path])
  118. self.misses += 1
  119. return None
  120. @caching_enabled
  121. def remove_item(self, path: str) -> Optional[Dict[str, Any]]:
  122. """Removes the named entry and returns its value or None if not present."""
  123. if path in self._entries:
  124. return self._entries.pop(path)
  125. return None
  126. class FileChangeHandler(FileSystemEventHandler):
  127. """Watchdog handler that filters on .json files within specific metadata directories."""
  128. def __init__(self, file_metadata_cache: FileMetadataCache, **kwargs):
  129. super(FileChangeHandler, self).__init__(**kwargs)
  130. self.file_metadata_cache = file_metadata_cache
  131. self.log = file_metadata_cache.log
  132. def dispatch(self, event):
  133. """Dispatches delete and modification events pertaining to watched metadata instances."""
  134. if event.src_path.endswith(".json"):
  135. super(FileChangeHandler, self).dispatch(event)
  136. def on_deleted(self, event):
  137. """Fires when a watched file is deleted, triggering a removal of the corresponding item from the cache."""
  138. self.file_metadata_cache.remove_item(event.src_path)
  139. def on_modified(self, event):
  140. """Fires when a watched file is modified.
  141. On updates, go ahead and remove the item from the cache. It will be reloaded on next fetch.
  142. """
  143. self.file_metadata_cache.remove_item(event.src_path)
  144. class FileMetadataStore(MetadataStore):
  145. def __init__(self, schemaspace: str, **kwargs):
  146. super().__init__(schemaspace, **kwargs)
  147. self.cache = FileMetadataCache.instance()
  148. self.metadata_paths = FileMetadataStore.metadata_path(self.schemaspace.lower())
  149. self.preferred_metadata_dir = self.metadata_paths[0]
  150. self.log.debug(
  151. f"Schemaspace '{self.schemaspace}' is using metadata directory: "
  152. f"{self.preferred_metadata_dir} from list: {self.metadata_paths}"
  153. )
  154. def schemaspace_exists(self) -> bool:
  155. """Does the schemaspace exist in any of the dir paths?"""
  156. schemaspace_dir_exists = False
  157. for d in self.metadata_paths:
  158. if os.path.isdir(d):
  159. schemaspace_dir_exists = True
  160. break
  161. return schemaspace_dir_exists
  162. def fetch_instances(self, name: Optional[str] = None, include_invalid: bool = False) -> List[dict]:
  163. """Returns a list of metadata instances.
  164. If name is provided, the single instance will be returned in a list of one item.
  165. """
  166. if not self.schemaspace_exists(): # schemaspace doesn't exist - return empty list
  167. if name: # If we're looking for a specific instance and there's no schemaspace, raise MetadataNotFound
  168. raise MetadataNotFoundError(self.schemaspace, name)
  169. return []
  170. resources = {}
  171. all_metadata_dirs = reversed(self.metadata_paths)
  172. for metadata_dir in all_metadata_dirs:
  173. if os.path.isdir(metadata_dir):
  174. for f in os.listdir(metadata_dir):
  175. path = os.path.join(metadata_dir, f)
  176. if path.endswith(".json"):
  177. if name: # if looking for a specific instance, and this is not it, continue
  178. if os.path.splitext(os.path.basename(path))[0] != name:
  179. continue
  180. try:
  181. metadata = self._load_resource(path)
  182. except Exception as ex:
  183. if name: # if we're looking for this instance, re-raise exception
  184. raise ex from ex
  185. # else create a dictionary from what we have if including invalid, else continue
  186. if include_invalid:
  187. metadata = {
  188. "name": os.path.splitext(os.path.basename(path))[0],
  189. "resource": path,
  190. "reason": ex.__class__.__name__,
  191. }
  192. else:
  193. continue
  194. md_name = metadata.get("name")
  195. if md_name in resources.keys():
  196. # If we're replacing an instance, let that be known via debug
  197. from_resource = resources[md_name].get("resource")
  198. md_resource = metadata.get("resource")
  199. self.log.debug(
  200. f"Replacing metadata instance '{md_name}' from '{from_resource}' with '{md_resource}'."
  201. )
  202. resources[md_name] = metadata
  203. if name:
  204. if name in resources.keys(): # check if we have a match.
  205. return [resources[name]]
  206. # If we're looking for a single metadata and we're here, then its not found
  207. raise MetadataNotFoundError(self.schemaspace, name)
  208. # We're here only if loading all resources, so only return list of values.
  209. return list(resources.values())
  210. def store_instance(self, name: str, metadata: dict, for_update: bool = False) -> dict:
  211. """Store the named metadata instance
  212. Create is the default behavior, while updates are performed when for_update is True.
  213. """
  214. metadata_resource_name = f"{name}.json"
  215. resource = os.path.join(self.preferred_metadata_dir, metadata_resource_name)
  216. # If the preferred metadata directory is not present, create it and note it.
  217. if not os.path.exists(self.preferred_metadata_dir):
  218. self.log.debug(f"Creating metadata directory: {self.preferred_metadata_dir}")
  219. os.makedirs(self.preferred_metadata_dir, mode=0o700, exist_ok=True)
  220. # Prepare for persistence, check existence, etc.
  221. renamed_resource = None
  222. if for_update:
  223. renamed_resource = self._prepare_update(name, resource)
  224. else: # create
  225. self._prepare_create(name, resource)
  226. # Write out the instance
  227. try:
  228. with jupyter_core.paths.secure_write(resource) as f:
  229. json.dump(metadata, f, indent=2) # Only persist necessary items
  230. except Exception as ex:
  231. self._rollback(resource, renamed_resource)
  232. raise ex from ex
  233. else:
  234. self.log.debug(f"{'Updated' if for_update else 'Created'} metadata instance: {resource}")
  235. # Confirm persistence so in case there are issues, we can rollback
  236. metadata = self._confirm_persistence(resource, renamed_resource)
  237. return metadata
  238. def delete_instance(self, metadata: dict) -> None:
  239. """Delete the named instance"""
  240. name = metadata.get("name")
  241. resource = metadata.get("resource")
  242. if resource:
  243. # Since multiple folders are in play, we only allow removal if the resource is in
  244. # the first directory in the list (i.e., most "near" the user)
  245. if not self._remove_allowed(metadata):
  246. self.log.error(
  247. f"Removal of instance '{name}' from the {self.schemaspace} schemaspace is not permitted! "
  248. f"Resource conflict at '{resource}' "
  249. )
  250. raise PermissionError(
  251. f"Removal of instance '{name}' from the {self.schemaspace} schemaspace is not permitted!"
  252. )
  253. os.remove(resource)
  254. self.cache.remove_item(resource)
  255. def _prepare_create(self, name: str, resource: str) -> None:
  256. """Prepare to create resource, ensure it doesn't exist in the hierarchy."""
  257. if os.path.exists(resource):
  258. self.log.error(
  259. f"An instance named '{name}' already exists in the {self.schemaspace} schemaspace at {resource}."
  260. )
  261. raise MetadataExistsError(self.schemaspace, name)
  262. # Although the resource doesn't exist in the preferred dir, it may exist at other levels.
  263. # If creating, then existence at other levels should also prevent the operation.
  264. try:
  265. resources = self.fetch_instances(name)
  266. # Instance exists at other (protected) level and this is a create - throw exception
  267. self.log.error(
  268. f"An instance named '{name}' already exists in the {self.schemaspace} "
  269. f"schemaspace at {resources[0].get('resource')}."
  270. )
  271. raise MetadataExistsError(self.schemaspace, name)
  272. except MetadataNotFoundError: # doesn't exist elsewhere, so we're good.
  273. pass
  274. def _prepare_update(self, name: str, resource: str) -> str:
  275. """Prepare to update resource, rename current."""
  276. renamed_resource = None
  277. if os.path.exists(resource):
  278. # We're updating so we need to rename the current file to allow restore on errs
  279. renamed_resource = resource + str(time.time())
  280. os.rename(resource, renamed_resource)
  281. self.log.debug(f"Renamed resource for instance '{name}' to: '{renamed_resource}'")
  282. return renamed_resource
  283. def _rollback(self, resource: str, renamed_resource: str) -> None:
  284. """Rollback changes made during persistence (typically updates) and exceptions are encountered"""
  285. self.cache.remove_item(resource) # Clear the item from the cache, let it be re-added naturally
  286. if os.path.exists(resource):
  287. os.remove(resource)
  288. if renamed_resource: # Restore the renamed file
  289. os.rename(renamed_resource, resource)
  290. def _confirm_persistence(self, resource: str, renamed_resource: str) -> dict:
  291. """Confirms persistence by loading the instance and cleans up renamed instance, if applicable."""
  292. # Prior to loading from the filesystem, REMOVE any associated cache entry (likely on updates)
  293. # so that _load_resource() hits the filesystem - then adds the item to the cache.
  294. self.cache.remove_item(resource)
  295. try:
  296. metadata = self._load_resource(resource)
  297. except Exception as ex:
  298. self.log.error(f"Removing metadata instance '{resource}' due to previous error.")
  299. self._rollback(resource, renamed_resource)
  300. raise ex from ex
  301. if renamed_resource: # Remove the renamed file
  302. os.remove(renamed_resource)
  303. return metadata
  304. def _remove_allowed(self, metadata: dict) -> bool:
  305. """Determines if the resource of the given instance is allowed to be removed."""
  306. allowed_resource = os.path.join(self.preferred_metadata_dir, metadata.get("name"))
  307. current_resource = os.path.splitext(metadata.get("resource"))[0]
  308. return allowed_resource == current_resource
  309. def _load_resource(self, resource: str) -> Dict[str, Any]:
  310. # This is always called with an existing resource (path) so no need to check existence.
  311. metadata_json: Dict[str, Any] = self.cache.get_item(resource)
  312. if metadata_json is not None:
  313. self.log.debug(f"Loading metadata instance from cache: '{metadata_json['name']}'")
  314. return metadata_json
  315. # Always take name from resource so resources can be copied w/o having to change content
  316. name = os.path.splitext(os.path.basename(resource))[0]
  317. self.log.debug(f"Loading metadata instance from: '{resource}'")
  318. with io.open(resource, "r", encoding="utf-8") as f:
  319. try:
  320. metadata_json = json.load(f)
  321. except ValueError as jde: # JSONDecodeError is raised, but it derives from ValueError
  322. # If the JSON file cannot load, there's nothing we can do other than log and raise since
  323. # we aren't able to even instantiate an instance of Metadata. Because errors are ignored
  324. # when getting multiple items, it's okay to raise. The singleton searches (by handlers)
  325. # already catch ValueError and map to 400, so we're good there as well.
  326. self.log.error(
  327. f"JSON failed to load for resource '{resource}' in the "
  328. f"{self.schemaspace} schemaspace with error: {jde}."
  329. )
  330. raise ValueError(
  331. f"JSON failed to load for instance '{name}' in the "
  332. f"{self.schemaspace} schemaspace with error: {jde}."
  333. ) from jde
  334. metadata_json["name"] = name
  335. metadata_json["resource"] = resource
  336. self.cache.add_item(resource, metadata_json)
  337. return metadata_json
  338. @staticmethod
  339. def metadata_path(*subdirs):
  340. """Return a list of directories to search for metadata files.
  341. ELYRA_METADATA_PATH environment variable has highest priority.
  342. This is based on jupyter_core.paths.jupyter_path, but where the python
  343. env-based directory is last in the list, preceded by the system shared
  344. locations with the user's home-based directory still first in the list.
  345. The first directory in the list (data_dir, if env is not set) is where files
  346. will be written, although files can reside at other levels as well, with
  347. SYSTEM_JUPYTER_PATH representing shared data and ENV_JUPYTER_PATH representing
  348. the location of factory data (created during installation).
  349. If ``*subdirs`` are given, that subdirectory will be added to each element.
  350. """
  351. paths = []
  352. # highest priority is env
  353. if os.environ.get("ELYRA_METADATA_PATH"):
  354. paths.extend(p.rstrip(os.sep) for p in os.environ["ELYRA_METADATA_PATH"].split(os.pathsep))
  355. # then user dir
  356. paths.append(jupyter_core.paths.jupyter_data_dir())
  357. system_path = jupyter_core.paths.SYSTEM_JUPYTER_PATH
  358. paths.extend(system_path)
  359. # then sys.prefix, where installed files will reside (factory data)
  360. env_path = jupyter_core.paths.ENV_JUPYTER_PATH
  361. for p in env_path:
  362. if p not in system_path:
  363. paths.append(p)
  364. # add subdir, if requested.
  365. # Note, the 'metadata' parent dir is automatically added.
  366. if subdirs:
  367. paths = [os.path.join(p, "metadata", *subdirs) for p in paths]
  368. return paths