manager.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import os
  17. import re
  18. from typing import Any
  19. from typing import Dict
  20. from typing import List
  21. from typing import Union
  22. from jsonschema import draft7_format_checker
  23. from jsonschema import validate
  24. from jsonschema import ValidationError
  25. from traitlets import Type # noqa H306
  26. from traitlets.config import LoggingConfigurable # noqa H306
  27. from elyra.metadata.error import SchemaNotFoundError
  28. from elyra.metadata.metadata import Metadata
  29. from elyra.metadata.schema import SchemaManager
  30. from elyra.metadata.storage import FileMetadataStore
  31. from elyra.metadata.storage import MetadataStore
  32. class MetadataManager(LoggingConfigurable):
  33. """Manages the persistence and retrieval of metadata instances"""
  34. metadata_store_class = Type(
  35. default_value=FileMetadataStore,
  36. config=True,
  37. klass=MetadataStore,
  38. help="""The metadata store class. This is configurable to allow subclassing of
  39. the MetadataStore for customized behavior.""",
  40. )
  41. def __init__(self, schemaspace: str, **kwargs: Any):
  42. """
  43. Generic object to manage metadata instances.
  44. :param schemaspace (str): the partition where metadata instances are stored
  45. :param kwargs: additional arguments to be used to instantiate a metadata manager
  46. Keyword Args:
  47. metadata_store_class (str): the name of the MetadataStore subclass to use for storing managed instances
  48. """
  49. super().__init__(**kwargs)
  50. self.schema_mgr = SchemaManager.instance()
  51. schemaspace_instance = self.schema_mgr.get_schemaspace(schemaspace)
  52. self.schemaspace = schemaspace_instance.name
  53. self.metadata_store = self.metadata_store_class(self.schemaspace, **kwargs)
  54. def schemaspace_exists(self) -> bool:
  55. """Returns True if the schemaspace for this instance exists"""
  56. return self.metadata_store.schemaspace_exists()
  57. def get_all(self, include_invalid: bool = False, of_schema: str = None) -> List[Metadata]:
  58. """Returns all metadata instances in summary form (name, display_name, location)"""
  59. instances = []
  60. instance_list = self.metadata_store.fetch_instances(include_invalid=include_invalid)
  61. for metadata_dict in instance_list:
  62. # validate the instance prior to return, include invalid instances as appropriate
  63. try:
  64. metadata = Metadata.from_dict(self.schemaspace, metadata_dict)
  65. if of_schema and metadata.schema_name != of_schema: # If provided, filter on of_schema
  66. continue
  67. metadata.on_load() # Allow class instances to handle loads
  68. # if we're including invalid and there was an issue on retrieval, add it to the list
  69. if include_invalid and metadata.reason:
  70. # If no schema-name is present, set to '{unknown}' since we can't make that determination.
  71. if not metadata.schema_name:
  72. metadata.schema_name = "{unknown}"
  73. else: # go ahead and validate against the schema
  74. self.validate(metadata.name, metadata)
  75. instances.append(metadata)
  76. except Exception as ex: # Ignore ValidationError and others when fetching all instances
  77. # Since we may not have a metadata instance due to a failure during `from_dict()`,
  78. # instantiate a bad instance directly to use in the message and invalid result.
  79. invalid_instance = Metadata(**metadata_dict)
  80. self.log.warning(
  81. f"Fetch of instance '{invalid_instance.name}' "
  82. f"of schemaspace '{self.schemaspace}' "
  83. f"encountered an exception: {ex}"
  84. )
  85. if include_invalid and (not of_schema or invalid_instance.schema_name == of_schema):
  86. # Export invalid instances if requested and if a schema was not specified
  87. # or the specified schema matches the instance's schema.
  88. invalid_instance.reason = ex.__class__.__name__
  89. instances.append(invalid_instance)
  90. return instances
  91. def get(self, name: str) -> Metadata:
  92. """Returns the metadata instance corresponding to the given name"""
  93. if name is None:
  94. raise ValueError("The 'name' parameter requires a value.")
  95. instance_list = self.metadata_store.fetch_instances(name=name)
  96. metadata_dict = instance_list[0]
  97. metadata = Metadata.from_dict(self.schemaspace, metadata_dict)
  98. # Allow class instances to alter instance
  99. metadata.on_load()
  100. # Validate the instance on load
  101. self.validate(name, metadata)
  102. return metadata
  103. def create(self, name: str, metadata: Metadata) -> Metadata:
  104. """Creates the given metadata, returning the created instance"""
  105. return self._save(name, metadata)
  106. def update(self, name: str, metadata: Metadata, for_migration: bool = False) -> Metadata:
  107. """Updates the given metadata, returning the updated instance"""
  108. return self._save(name, metadata, for_update=True, for_migration=for_migration)
  109. def remove(self, name: str) -> None:
  110. """Removes the metadata instance corresponding to the given name"""
  111. instance_list = self.metadata_store.fetch_instances(name=name)
  112. metadata_dict = instance_list[0]
  113. self.log.debug(f"Removing metadata resource '{name}' from schemaspace '{self.schemaspace}'.")
  114. metadata = Metadata.from_dict(self.schemaspace, metadata_dict)
  115. metadata.pre_delete() # Allow class instances to handle delete
  116. self.metadata_store.delete_instance(metadata_dict)
  117. try:
  118. metadata.post_delete() # Allow class instances to handle post-delete tasks (e.g., cache updates, etc.)
  119. except Exception as ex:
  120. self._rollback(name, metadata, "delete", ex)
  121. raise ex
  122. def validate(self, name: str, metadata: Metadata) -> None:
  123. """Validate metadata against its schema.
  124. Ensure metadata is valid based on its schema. If invalid or schema
  125. is not found, ValidationError will be raised.
  126. """
  127. metadata_dict = metadata.to_dict()
  128. schema_name = metadata_dict.get("schema_name")
  129. if not schema_name:
  130. raise ValueError(
  131. f"Instance '{name}' in the {self.schemaspace} schemaspace is missing a 'schema_name' field!"
  132. )
  133. schema = self._get_schema(schema_name) # returns a value or throws
  134. try:
  135. validate(instance=metadata_dict, schema=schema, format_checker=draft7_format_checker)
  136. except ValidationError as ve:
  137. # Because validation errors are so verbose, only provide the first line.
  138. first_line = str(ve).partition("\n")[0]
  139. msg = f"Validation failed for instance '{name}' using the {schema_name} schema with error: {first_line}."
  140. self.log.error(msg)
  141. raise ValidationError(msg) from ve
  142. @staticmethod
  143. def get_normalized_name(name: str) -> str:
  144. # lowercase and replaces spaces with underscore
  145. name = re.sub("\\s+", "_", name.lower())
  146. # remove all invalid characters
  147. name = re.sub("[^a-z0-9-_]+", "", name)
  148. # begin with alpha
  149. if not name[0].isalpha():
  150. name = "a_" + name
  151. # end with alpha numeric
  152. if not name[-1].isalnum():
  153. name = name + "_0"
  154. return name
  155. def _get_schema(self, schema_name: str) -> dict:
  156. """Loads the schema based on the schema_name and returns the loaded schema json.
  157. Throws ValidationError if schema file is not present.
  158. """
  159. schema_json = self.schema_mgr.get_schema(self.schemaspace, schema_name)
  160. if schema_json is None:
  161. schema_file = os.path.join(os.path.dirname(__file__), "schemas", schema_name + ".json")
  162. if not os.path.exists(schema_file):
  163. self.log.error(
  164. f"The file for schema '{schema_name}' is missing from its expected location: '{schema_file}'"
  165. )
  166. raise SchemaNotFoundError(f"The file for schema '{schema_name}' is missing!")
  167. return schema_json
  168. def _save(self, name: str, metadata: Metadata, for_update: bool = False, for_migration: bool = False) -> Metadata:
  169. if not metadata:
  170. raise ValueError("An instance of class 'Metadata' was not provided.")
  171. if not isinstance(metadata, Metadata):
  172. raise TypeError("'metadata' is not an instance of class 'Metadata'.")
  173. if not name and not for_update: # name is derived from display_name only on creates
  174. if metadata.display_name:
  175. name = MetadataManager.get_normalized_name(metadata.display_name)
  176. metadata.name = name
  177. if not name: # At this point, name must be set
  178. raise ValueError("Name of metadata was not provided.")
  179. match = re.search("^[a-z]([a-z0-9-_]*[a-z,0-9])?$", name)
  180. if match is None:
  181. raise ValueError(
  182. "Name of metadata must be lowercase alphanumeric, beginning with alpha and can include "
  183. "embedded hyphens ('-') and underscores ('_')."
  184. )
  185. orig_value = None
  186. if for_update:
  187. if for_migration: # Avoid triggering a on_load() call since migrations will likely stem from there
  188. instance_list = self.metadata_store.fetch_instances(name=name)
  189. orig_value = instance_list[0]
  190. else:
  191. orig_value = self.get(name)
  192. # Allow class instances to handle pre-save tasks
  193. metadata.pre_save(for_update=for_update)
  194. self._apply_defaults(metadata)
  195. # Validate the metadata prior to storage then store the instance.
  196. self.validate(name, metadata)
  197. metadata_dict = self.metadata_store.store_instance(name, metadata.prepare_write(), for_update=for_update)
  198. metadata_post_op = Metadata.from_dict(self.schemaspace, metadata_dict)
  199. # Allow class instances to handle post-save tasks (e.g., cache updates, etc.)
  200. # Note that this is a _different_ instance from pre-save call
  201. try:
  202. metadata_post_op.post_save(for_update=for_update)
  203. except Exception as ex:
  204. if for_update:
  205. self._rollback(name, orig_value, "update", ex)
  206. else: # Use the metadata instance prior to post op
  207. self._rollback(name, Metadata.from_dict(self.schemaspace, metadata_dict), "create", ex)
  208. raise ex
  209. return self.get(name) # Retrieve updated/new instance so load hook can be called
  210. def _rollback(self, name: str, orig_value: Union[Metadata, Dict], operation: str, exception: Exception):
  211. """Rolls back the original value depending on the operation.
  212. For rolled back creation attempts, we must remove the created instance. For rolled back
  213. update or deletion attempts, we must restore the original value. Note that these operations
  214. must call the metadata store directly so that class hooks are not called.
  215. """
  216. self.log.debug(f"Rolling back metadata operation '{operation}' for instance '{name}' due to: {exception}")
  217. if operation == "create": # remove the instance, orig_value is the newly-created instance.
  218. if isinstance(orig_value, Metadata):
  219. orig_value = orig_value.to_dict()
  220. self.metadata_store.delete_instance(orig_value)
  221. elif operation == "update": # restore original as an update
  222. if isinstance(orig_value, dict):
  223. orig_value = Metadata.from_dict(self.schemaspace, orig_value)
  224. self.metadata_store.store_instance(name, orig_value.prepare_write(), for_update=True)
  225. elif operation == "delete": # restore original as a create
  226. if isinstance(orig_value, dict):
  227. orig_value = Metadata.from_dict(self.schemaspace, orig_value)
  228. self.metadata_store.store_instance(name, orig_value.prepare_write(), for_update=False)
  229. self.log.warning(
  230. f"Rolled back metadata operation '{operation}' for instance '{name}' due to "
  231. f"failure in post-processing method: {exception}"
  232. )
  233. def _apply_defaults(self, metadata: Metadata) -> None:
  234. """If a given property has a default value defined, and that property is not currently represented,
  235. assign it the default value. We will also treat constants similarly.
  236. For schema-level properties (i.e., not application-level), we will check if such a property
  237. has a corresponding attribute and, if so, set the property to that value.
  238. Note: we only consider constants updates for schema-level properties
  239. """
  240. # Get the schema and build a dict consisting of properties and their default/const values (for those
  241. # properties that have defaults/consts defined). Then walk the metadata instance looking for missing
  242. # properties and assign the corresponding value. Note that we do not consider existing properties with
  243. # values of None for default replacement since that may be intentional (although those values will
  244. # likely fail subsequent validation). We also don't consider defaults when applying values to the
  245. # schema-level properties since these settings are function of a defined attribute.
  246. schema = self.schema_mgr.get_schema(self.schemaspace, metadata.schema_name)
  247. def _update_instance(target_prop: str, schema_properties: Dict, instance: Union[Metadata, Dict]) -> None:
  248. property_defaults = {}
  249. for name, property in schema_properties.items():
  250. if target_prop in property:
  251. property_defaults[name] = property[target_prop]
  252. if property_defaults: # schema defines defaulted properties
  253. if isinstance(instance, Metadata): # schema properties, updated constants
  254. for name, default in property_defaults.items():
  255. if hasattr(instance, name):
  256. setattr(instance, name, default)
  257. else: # instance properties, update missing defaults
  258. instance_properties = instance
  259. for name, default in property_defaults.items():
  260. if name not in instance_properties:
  261. instance_properties[name] = default
  262. # Update default properties of instance properties
  263. _update_instance("default", schema["properties"]["metadata"]["properties"], metadata.metadata)
  264. # Update const properties of schema properties
  265. _update_instance("const", schema["properties"], metadata)