schema.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from abc import ABC
  17. from abc import abstractmethod
  18. import copy
  19. import io
  20. import json
  21. from logging import getLogger
  22. from logging import Logger
  23. import os
  24. import re
  25. from typing import Dict
  26. from typing import List
  27. from typing import Optional
  28. from typing import Set
  29. try: # typing.final is not available in python < 3.8 so create a dummy decorator in those cases
  30. from typing import final
  31. except ImportError:
  32. def final(meth):
  33. return meth
  34. from entrypoints import get_group_all
  35. from jsonschema import draft7_format_checker
  36. from jsonschema import validate
  37. from jsonschema import ValidationError
  38. from traitlets.config import LoggingConfigurable
  39. from traitlets.config import SingletonConfigurable
  40. from elyra.metadata.error import SchemaNotFoundError
  41. METADATA_TEST_SCHEMASPACE_ID = "8182fc28-899a-4521-8342-1a0e218c3a4d"
  42. METADATA_TEST_SCHEMASPACE = "metadata-tests" # exposed via METADATA_TESTING env
  43. class SchemaManager(SingletonConfigurable):
  44. """Singleton used to store all schemas for all metadata types.
  45. Note: we currently don't refresh these entries.
  46. """
  47. # Maps SchemaspaceID AND SchemaspaceName to Schemaspace instance (two entries from Schemaspace instance)
  48. schemaspaces: Dict[str, "Schemaspace"]
  49. # Maps SchemaspaceID to ShemaspaceName
  50. schemaspace_id_to_name: Dict[str, str]
  51. # Maps SchemaspaceID to mapping of schema name to SchemasProvider
  52. schemaspace_schemasproviders: Dict[str, Dict[str, "SchemasProvider"]]
  53. def __init__(self, **kwargs):
  54. super().__init__(**kwargs)
  55. self.schemaspaces = {}
  56. self.schemaspace_id_to_name = {}
  57. self.schemaspace_schemasproviders = {}
  58. # The following exposes the metadata-test schemaspace if true or 1.
  59. # Metadata testing will enable this env. Note: this cannot be globally
  60. # defined, else the file could be loaded before the tests have enable the env.
  61. self.metadata_testing_enabled = bool(os.getenv("METADATA_TESTING", 0))
  62. self._meta_schema: dict
  63. schema_file = os.path.join(os.path.dirname(__file__), "schemas", "meta-schema.json")
  64. with io.open(schema_file, "r", encoding="utf-8") as f:
  65. self._meta_schema = json.load(f)
  66. self._load_schemaspace_schemas()
  67. def get_schemaspace_names(self, include_deprecated: bool = False) -> List[str]:
  68. """Returns list of registered schemaspace names."""
  69. schemaspace_names: List[str] = []
  70. # Filter out deprecated schemaspaces
  71. for id, name in self.schemaspace_id_to_name.items():
  72. if not include_deprecated and self.schemaspaces.get(id).is_deprecated:
  73. self.log.debug(f"get_schemaspace_names: skipping deprecated schemaspace '{name}'...")
  74. continue
  75. schemaspace_names.append(name)
  76. return schemaspace_names
  77. def get_schemaspace_name(self, schemaspace_name_or_id: str) -> str:
  78. """Returns the human-readable name of the given schemaspace name or id.
  79. Note that the value returned is the case-sensitive form as stored on
  80. the Schemaspace instance itself.
  81. """
  82. self._validate_schemaspace(schemaspace_name_or_id)
  83. return self.schemaspaces.get(schemaspace_name_or_id.lower()).name
  84. def get_schemaspace_schemas(self, schemaspace_name_or_id: str) -> dict:
  85. self._validate_schemaspace(schemaspace_name_or_id)
  86. schemaspace = self.schemaspaces.get(schemaspace_name_or_id.lower())
  87. schemas = copy.deepcopy(schemaspace.schemas)
  88. # Filter out deprecated schemas
  89. for schema_name in schemaspace.deprecated_schema_names:
  90. del schemas[schema_name]
  91. return schemas
  92. def get_schema(self, schemaspace_name_or_id: str, schema_name: str) -> dict:
  93. """Returns the specified schema for the specified schemaspace."""
  94. self._validate_schemaspace(schemaspace_name_or_id)
  95. schemaspace = self.schemaspaces.get(schemaspace_name_or_id.lower())
  96. schemas = schemaspace.schemas
  97. if schema_name not in schemas.keys():
  98. raise SchemaNotFoundError(schemaspace_name_or_id, schema_name)
  99. schema_json = schemas.get(schema_name)
  100. return copy.deepcopy(schema_json)
  101. def get_schemaspace(self, schemaspace_name_or_id: str) -> "Schemaspace":
  102. """Returns the Schemaspace instance associated with the given name or id."""
  103. self._validate_schemaspace(schemaspace_name_or_id)
  104. schemaspace = self.schemaspaces.get(schemaspace_name_or_id.lower())
  105. return copy.deepcopy(schemaspace)
  106. def get_schemasproviders(self, schemaspace_id: str) -> Dict[str, "SchemasProvider"]:
  107. """Returns a dictionary of schema name to SchemasProvider instance within a given schemaspace."""
  108. return self.schemaspace_schemasproviders.get(schemaspace_id, {})
  109. def clear_all(self) -> None:
  110. """Primarily used for testing, this method reloads schemas from initial values."""
  111. self.log.debug("SchemaManager: Reloading all schemas for all schemaspaces.")
  112. self._load_schemaspace_schemas()
  113. def _validate_schemaspace(self, schemaspace_name_or_id: str) -> None:
  114. """Ensures the schemaspace is valid and raises ValueError if it is not."""
  115. if schemaspace_name_or_id.lower() not in self.schemaspaces:
  116. raise ValueError(
  117. f"The schemaspace name or id '{schemaspace_name_or_id}' is not "
  118. f"in the list of valid schemaspaces: '{self.get_schemaspace_names()}'!"
  119. )
  120. def _load_schemaspace_schemas(self):
  121. """Gets Schemaspaces and SchemasProviders via entrypoints and validates/loads their schemas."""
  122. self._load_schemaspaces()
  123. self._load_schemas_providers()
  124. # Issue a warning for any "empty" schemaspaces...
  125. empty_schemaspaces = []
  126. for schemaspace_name in self.schemaspace_id_to_name.values():
  127. if len(self.schemaspaces[schemaspace_name].schemas) == 0:
  128. empty_schemaspaces.append(self.schemaspaces[schemaspace_name].name) # Preserve case
  129. if len(empty_schemaspaces) > 0:
  130. self.log.warning(f"The following schemaspaces have no schemas: {empty_schemaspaces}")
  131. def _load_schemaspaces(self):
  132. """Loads the Schemaspace instances from entrypoint group 'metadata.schemaspaces'."""
  133. for schemaspace in SchemaManager._get_schemaspaces():
  134. # Record the Schemaspace instance and create the name-to-id map
  135. try:
  136. # If we're not testing, skip our test schemaspace
  137. if not self.metadata_testing_enabled and schemaspace.name == METADATA_TEST_SCHEMASPACE:
  138. continue
  139. # instantiate an actual instance of the Schemaspace
  140. self.log.debug(f"Loading schemaspace '{schemaspace.name}'...")
  141. schemaspace_instance = schemaspace.load()(parent=self.parent) # Load an instance
  142. if not isinstance(schemaspace_instance, Schemaspace):
  143. raise ValueError(
  144. f"Schemaspace instance '{schemaspace.name}' is not an " f"instance of '{Schemaspace.__name__}'!"
  145. )
  146. # validate the name
  147. # To prevent a name-to-id lookup, just store the same instance in two locations
  148. self.schemaspaces[schemaspace_instance.id.lower()] = schemaspace_instance
  149. self.schemaspaces[schemaspace_instance.name.lower()] = schemaspace_instance
  150. # We'll keep a map of id-to-name, but this will be primarily used to
  151. # return the set of schemaspace names (via values()) and lookup a name
  152. # from its id.
  153. self.schemaspace_id_to_name[schemaspace_instance.id.lower()] = schemaspace_instance.name.lower()
  154. except Exception as err:
  155. # log and ignore initialization errors
  156. self.log.error(f"Error loading schemaspace '{schemaspace.name}' - {err}")
  157. def _load_schemas_providers(self):
  158. """Loads the SchemasProviders instances from entrypoint group 'metadata.schemas'."""
  159. for schemas_provider_ep in SchemaManager._get_schemas_providers():
  160. try:
  161. # If we're not testing, skip our test schemas
  162. if not self.metadata_testing_enabled and schemas_provider_ep.name == METADATA_TEST_SCHEMASPACE:
  163. continue
  164. # instantiate an actual instance of the processor
  165. self.log.debug(f"Loading SchemasProvider '{schemas_provider_ep.name}'...")
  166. schemas_provider = schemas_provider_ep.load()() # Load an instance
  167. if not isinstance(schemas_provider, SchemasProvider):
  168. raise ValueError(
  169. f"SchemasProvider instance '{schemas_provider_ep.name}' is not an "
  170. f"instance of '{SchemasProvider.__name__}'!"
  171. )
  172. schemas = schemas_provider.get_schemas()
  173. for schema in schemas:
  174. try:
  175. schemaspace_id = schema.get("schemaspace_id").lower()
  176. schemaspace_name = schema.get("schemaspace")
  177. schema_name = schema.get("name")
  178. # Ensure that both schemaspace id and name are registered and both point to same instance
  179. if schemaspace_id not in self.schemaspaces:
  180. raise ValueError(
  181. f"Schema '{schema_name}' references a schemaspace "
  182. f"'{schemaspace_id}' that is not loaded!"
  183. )
  184. if schemaspace_name not in self.schemaspaces:
  185. raise ValueError(
  186. f"Schema '{schema_name}' references a schemaspace "
  187. f"'{schemaspace_name}' that is not loaded!"
  188. )
  189. if self.schemaspaces[schemaspace_id] != self.schemaspaces[schemaspace_name.lower()]:
  190. raise ValueError(
  191. f"Schema '{schema_name}' references a schemaspace name "
  192. f"'{schemaspace_name}' and a schemaspace id '{schemaspace_id}' "
  193. f"that are associated with different Schemaspace instances!"
  194. )
  195. schema = self.schemaspaces[schemaspace_id].filter_schema(schema)
  196. self._validate_schema(schemaspace_name, schema_name, schema)
  197. # Only add the schema once since schemaspace_name is pointing to the same Schemaspace instance.
  198. self.schemaspaces[schemaspace_id].add_schema(schema)
  199. providers = self.schemaspace_schemasproviders.get(schemaspace_id, {})
  200. providers[schema_name] = schemas_provider # Capture the schemasprovider for this schema
  201. self.schemaspace_schemasproviders[schemaspace_id] = providers
  202. except Exception as schema_err:
  203. self.log.error(
  204. f"Error loading schema '{schema.get('name', '??')}' for SchemasProvider "
  205. f"'{schemas_provider_ep.name}' - {schema_err}"
  206. )
  207. except Exception as provider_err:
  208. # log and ignore initialization errors
  209. self.log.error(
  210. f"Error loading schemas for SchemasProvider " f"'{schemas_provider_ep.name}' - {provider_err}"
  211. )
  212. def _validate_schema(self, schemaspace_name: str, schema_name: str, schema: dict):
  213. """Validates the given schema against the meta-schema."""
  214. try:
  215. self.log.debug(f"Validating schema '{schema_name}' of schemaspace {schemaspace_name}...")
  216. validate(instance=schema, schema=self._meta_schema, format_checker=draft7_format_checker)
  217. except ValidationError as ve:
  218. # Because validation errors are so verbose, only provide the first line.
  219. first_line = str(ve).partition("\n")[0]
  220. msg = (
  221. f"Validation failed for schema '{schema_name}' of "
  222. f"schemaspace '{schemaspace_name}' with error: {first_line}."
  223. )
  224. self.log.error(msg)
  225. raise ValidationError(msg) from ve
  226. @staticmethod
  227. def _get_schemaspaces():
  228. """Wrapper around entrypoints.get_group_all() - primarily to facilitate testing."""
  229. return get_group_all("metadata.schemaspaces")
  230. @staticmethod
  231. def _get_schemas_providers():
  232. """Wrapper around entrypoints.get_group_all() - primarily to facilitate testing."""
  233. return get_group_all("metadata.schemas_providers")
  234. class Schemaspace(LoggingConfigurable):
  235. _id: str
  236. _name: str
  237. _display_name: str
  238. _description: str
  239. _schemas: Dict[str, Dict] # use a dict to prevent duplicate entries
  240. _deprecated: bool
  241. _deprecated_schema_names: Set[str]
  242. def __init__(
  243. self,
  244. schemaspace_id: str,
  245. name: str,
  246. display_name: Optional[str] = None,
  247. description: Optional[str] = "",
  248. **kwargs,
  249. ):
  250. super().__init__(**kwargs)
  251. self._schemas = {}
  252. self._deprecated = False
  253. self._deprecated_schema_names = set()
  254. # Validate properties
  255. if not schemaspace_id:
  256. raise ValueError("Property 'id' requires a value!")
  257. if not Schemaspace._validate_id(schemaspace_id):
  258. raise ValueError(f"The value of property 'id' ({schemaspace_id}) does not conform to a UUID!")
  259. if not name:
  260. raise ValueError("Property 'name' requires a value!")
  261. if not Schemaspace._validate_name(name):
  262. raise ValueError(f"The 'name' property ({name}) must be alphanumeric with dash or underscore only!")
  263. self._id = schemaspace_id
  264. self._name = name
  265. self._display_name = display_name or name
  266. self._description = description
  267. @property
  268. def id(self) -> str:
  269. """The id (uuid) of the schemaspace"""
  270. return self._id
  271. @property
  272. def name(self) -> str:
  273. """The name of the schemaspace"""
  274. return self._name
  275. @property
  276. def display_name(self) -> str:
  277. """The display_name of the schemaspace"""
  278. return self._display_name
  279. @property
  280. def description(self) -> str:
  281. """The description of the schemaspace"""
  282. return self._description
  283. @property
  284. def schemas(self) -> Dict[str, Dict]:
  285. """Returns the schemas currently associated with this schemaspace"""
  286. return self._schemas
  287. @property
  288. def is_deprecated(self) -> bool:
  289. """Indicates if this schemaspace is deprecaated"""
  290. return self._deprecated
  291. @property
  292. def deprecated_schema_names(self) -> List[str]:
  293. """Returns a list of deprecated schema names associated with this schemaspace"""
  294. return list(self._deprecated_schema_names)
  295. def filter_schema(self, schema: Dict) -> Dict:
  296. """Allows Schemaspace to apply changes to a given schema prior to its validation (and add)."""
  297. return schema
  298. def add_schema(self, schema: Dict) -> None:
  299. """Associates the given schema to this schemaspace"""
  300. assert isinstance(schema, dict), "Parameter 'schema' is not a dictionary!"
  301. self._schemas[schema.get("name")] = schema
  302. if schema.get("deprecated", False):
  303. self._deprecated_schema_names.add(schema.get("name"))
  304. @final
  305. def migrate(self, *args, **kwargs) -> List[str]:
  306. """Migrate schemaspace instances. This method is `final` and should not be overridden.
  307. Its purpose is to drive migration across the Schemaspace's SchemasProviders and gather
  308. results.
  309. Returns the list of migrated instance names.
  310. """
  311. migrated_instances: List[str] = []
  312. # For each schema in this schemaspace, invoke its corresponding
  313. # SchemasProvider's migrate method and collect the results.
  314. schema_to_provider = SchemaManager.instance().get_schemasproviders(self.id)
  315. for schema_name, provider in schema_to_provider.items():
  316. instances = provider.migrate(schemaspace_name=self.name, schema_name=schema_name)
  317. migrated_instances.extend(instances)
  318. return migrated_instances
  319. @staticmethod
  320. def _validate_id(id) -> bool:
  321. """Validate that id is uuidv4 compliant"""
  322. is_valid = False
  323. uuidv4_regex = re.compile("^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$", re.I)
  324. if uuidv4_regex.match(id):
  325. is_valid = True
  326. return is_valid
  327. @staticmethod
  328. def _validate_name(name) -> bool:
  329. """Validate that the name adheres to the criteria (alphanumeric, dash, underscore only)"""
  330. is_valid = False
  331. name_regex = re.compile("^[A-Za-z][0-9A-Za-z_-]*[0-9A-Za-z]$", re.I)
  332. if name_regex.match(name):
  333. is_valid = True
  334. return is_valid
  335. class SchemasProvider(ABC):
  336. """Abstract base class used to obtain schema definitions from registered schema providers."""
  337. log: Logger
  338. def __init__(self, *args, **kwargs):
  339. self.log = getLogger("ElyraApp")
  340. @abstractmethod
  341. def get_schemas(self) -> List[Dict]:
  342. """Returns a list of schemas"""
  343. pass
  344. def migrate(self, *args, **kwargs) -> List[str]:
  345. """Migrate instances of schemas provided by this SchemasProvider.
  346. kwargs:
  347. schema_name: str The name of the schema
  348. schemaspace_name: str The name of the schemaspace in which this schema resides
  349. Called by Schemaspace.migrate(), this method will be called with a `schema_name`
  350. keyword argument indicating a name of schema provided by this SchemasProvider.
  351. The method will return a list of migrated instances or an empty array.
  352. """
  353. return list()