123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424 |
- #
- # Copyright 2018-2022 Elyra Authors
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- from abc import ABC
- from abc import abstractmethod
- import copy
- import io
- import json
- from logging import getLogger
- from logging import Logger
- import os
- import re
- from typing import Dict
- from typing import List
- from typing import Optional
- from typing import Set
- try: # typing.final is not available in python < 3.8 so create a dummy decorator in those cases
- from typing import final
- except ImportError:
- def final(meth):
- return meth
- from entrypoints import get_group_all
- from jsonschema import draft7_format_checker
- from jsonschema import validate
- from jsonschema import ValidationError
- from traitlets.config import LoggingConfigurable
- from traitlets.config import SingletonConfigurable
- from elyra.metadata.error import SchemaNotFoundError
- METADATA_TEST_SCHEMASPACE_ID = "8182fc28-899a-4521-8342-1a0e218c3a4d"
- METADATA_TEST_SCHEMASPACE = "metadata-tests" # exposed via METADATA_TESTING env
- class SchemaManager(SingletonConfigurable):
- """Singleton used to store all schemas for all metadata types.
- Note: we currently don't refresh these entries.
- """
- # Maps SchemaspaceID AND SchemaspaceName to Schemaspace instance (two entries from Schemaspace instance)
- schemaspaces: Dict[str, "Schemaspace"]
- # Maps SchemaspaceID to ShemaspaceName
- schemaspace_id_to_name: Dict[str, str]
- # Maps SchemaspaceID to mapping of schema name to SchemasProvider
- schemaspace_schemasproviders: Dict[str, Dict[str, "SchemasProvider"]]
- def __init__(self, **kwargs):
- super().__init__(**kwargs)
- self.schemaspaces = {}
- self.schemaspace_id_to_name = {}
- self.schemaspace_schemasproviders = {}
- # The following exposes the metadata-test schemaspace if true or 1.
- # Metadata testing will enable this env. Note: this cannot be globally
- # defined, else the file could be loaded before the tests have enable the env.
- self.metadata_testing_enabled = bool(os.getenv("METADATA_TESTING", 0))
- self._meta_schema: dict
- schema_file = os.path.join(os.path.dirname(__file__), "schemas", "meta-schema.json")
- with io.open(schema_file, "r", encoding="utf-8") as f:
- self._meta_schema = json.load(f)
- self._load_schemaspace_schemas()
- def get_schemaspace_names(self, include_deprecated: bool = False) -> List[str]:
- """Returns list of registered schemaspace names."""
- schemaspace_names: List[str] = []
- # Filter out deprecated schemaspaces
- for id, name in self.schemaspace_id_to_name.items():
- if not include_deprecated and self.schemaspaces.get(id).is_deprecated:
- self.log.debug(f"get_schemaspace_names: skipping deprecated schemaspace '{name}'...")
- continue
- schemaspace_names.append(name)
- return schemaspace_names
- def get_schemaspace_name(self, schemaspace_name_or_id: str) -> str:
- """Returns the human-readable name of the given schemaspace name or id.
- Note that the value returned is the case-sensitive form as stored on
- the Schemaspace instance itself.
- """
- self._validate_schemaspace(schemaspace_name_or_id)
- return self.schemaspaces.get(schemaspace_name_or_id.lower()).name
- def get_schemaspace_schemas(self, schemaspace_name_or_id: str) -> dict:
- self._validate_schemaspace(schemaspace_name_or_id)
- schemaspace = self.schemaspaces.get(schemaspace_name_or_id.lower())
- schemas = copy.deepcopy(schemaspace.schemas)
- # Filter out deprecated schemas
- for schema_name in schemaspace.deprecated_schema_names:
- del schemas[schema_name]
- return schemas
- def get_schema(self, schemaspace_name_or_id: str, schema_name: str) -> dict:
- """Returns the specified schema for the specified schemaspace."""
- self._validate_schemaspace(schemaspace_name_or_id)
- schemaspace = self.schemaspaces.get(schemaspace_name_or_id.lower())
- schemas = schemaspace.schemas
- if schema_name not in schemas.keys():
- raise SchemaNotFoundError(schemaspace_name_or_id, schema_name)
- schema_json = schemas.get(schema_name)
- return copy.deepcopy(schema_json)
- def get_schemaspace(self, schemaspace_name_or_id: str) -> "Schemaspace":
- """Returns the Schemaspace instance associated with the given name or id."""
- self._validate_schemaspace(schemaspace_name_or_id)
- schemaspace = self.schemaspaces.get(schemaspace_name_or_id.lower())
- return copy.deepcopy(schemaspace)
- def get_schemasproviders(self, schemaspace_id: str) -> Dict[str, "SchemasProvider"]:
- """Returns a dictionary of schema name to SchemasProvider instance within a given schemaspace."""
- return self.schemaspace_schemasproviders.get(schemaspace_id, {})
- def clear_all(self) -> None:
- """Primarily used for testing, this method reloads schemas from initial values."""
- self.log.debug("SchemaManager: Reloading all schemas for all schemaspaces.")
- self._load_schemaspace_schemas()
- def _validate_schemaspace(self, schemaspace_name_or_id: str) -> None:
- """Ensures the schemaspace is valid and raises ValueError if it is not."""
- if schemaspace_name_or_id.lower() not in self.schemaspaces:
- raise ValueError(
- f"The schemaspace name or id '{schemaspace_name_or_id}' is not "
- f"in the list of valid schemaspaces: '{self.get_schemaspace_names()}'!"
- )
- def _load_schemaspace_schemas(self):
- """Gets Schemaspaces and SchemasProviders via entrypoints and validates/loads their schemas."""
- self._load_schemaspaces()
- self._load_schemas_providers()
- # Issue a warning for any "empty" schemaspaces...
- empty_schemaspaces = []
- for schemaspace_name in self.schemaspace_id_to_name.values():
- if len(self.schemaspaces[schemaspace_name].schemas) == 0:
- empty_schemaspaces.append(self.schemaspaces[schemaspace_name].name) # Preserve case
- if len(empty_schemaspaces) > 0:
- self.log.warning(f"The following schemaspaces have no schemas: {empty_schemaspaces}")
- def _load_schemaspaces(self):
- """Loads the Schemaspace instances from entrypoint group 'metadata.schemaspaces'."""
- for schemaspace in SchemaManager._get_schemaspaces():
- # Record the Schemaspace instance and create the name-to-id map
- try:
- # If we're not testing, skip our test schemaspace
- if not self.metadata_testing_enabled and schemaspace.name == METADATA_TEST_SCHEMASPACE:
- continue
- # instantiate an actual instance of the Schemaspace
- self.log.debug(f"Loading schemaspace '{schemaspace.name}'...")
- schemaspace_instance = schemaspace.load()(parent=self.parent) # Load an instance
- if not isinstance(schemaspace_instance, Schemaspace):
- raise ValueError(
- f"Schemaspace instance '{schemaspace.name}' is not an " f"instance of '{Schemaspace.__name__}'!"
- )
- # validate the name
- # To prevent a name-to-id lookup, just store the same instance in two locations
- self.schemaspaces[schemaspace_instance.id.lower()] = schemaspace_instance
- self.schemaspaces[schemaspace_instance.name.lower()] = schemaspace_instance
- # We'll keep a map of id-to-name, but this will be primarily used to
- # return the set of schemaspace names (via values()) and lookup a name
- # from its id.
- self.schemaspace_id_to_name[schemaspace_instance.id.lower()] = schemaspace_instance.name.lower()
- except Exception as err:
- # log and ignore initialization errors
- self.log.error(f"Error loading schemaspace '{schemaspace.name}' - {err}")
- def _load_schemas_providers(self):
- """Loads the SchemasProviders instances from entrypoint group 'metadata.schemas'."""
- for schemas_provider_ep in SchemaManager._get_schemas_providers():
- try:
- # If we're not testing, skip our test schemas
- if not self.metadata_testing_enabled and schemas_provider_ep.name == METADATA_TEST_SCHEMASPACE:
- continue
- # instantiate an actual instance of the processor
- self.log.debug(f"Loading SchemasProvider '{schemas_provider_ep.name}'...")
- schemas_provider = schemas_provider_ep.load()() # Load an instance
- if not isinstance(schemas_provider, SchemasProvider):
- raise ValueError(
- f"SchemasProvider instance '{schemas_provider_ep.name}' is not an "
- f"instance of '{SchemasProvider.__name__}'!"
- )
- schemas = schemas_provider.get_schemas()
- for schema in schemas:
- try:
- schemaspace_id = schema.get("schemaspace_id").lower()
- schemaspace_name = schema.get("schemaspace")
- schema_name = schema.get("name")
- # Ensure that both schemaspace id and name are registered and both point to same instance
- if schemaspace_id not in self.schemaspaces:
- raise ValueError(
- f"Schema '{schema_name}' references a schemaspace "
- f"'{schemaspace_id}' that is not loaded!"
- )
- if schemaspace_name not in self.schemaspaces:
- raise ValueError(
- f"Schema '{schema_name}' references a schemaspace "
- f"'{schemaspace_name}' that is not loaded!"
- )
- if self.schemaspaces[schemaspace_id] != self.schemaspaces[schemaspace_name.lower()]:
- raise ValueError(
- f"Schema '{schema_name}' references a schemaspace name "
- f"'{schemaspace_name}' and a schemaspace id '{schemaspace_id}' "
- f"that are associated with different Schemaspace instances!"
- )
- schema = self.schemaspaces[schemaspace_id].filter_schema(schema)
- self._validate_schema(schemaspace_name, schema_name, schema)
- # Only add the schema once since schemaspace_name is pointing to the same Schemaspace instance.
- self.schemaspaces[schemaspace_id].add_schema(schema)
- providers = self.schemaspace_schemasproviders.get(schemaspace_id, {})
- providers[schema_name] = schemas_provider # Capture the schemasprovider for this schema
- self.schemaspace_schemasproviders[schemaspace_id] = providers
- except Exception as schema_err:
- self.log.error(
- f"Error loading schema '{schema.get('name', '??')}' for SchemasProvider "
- f"'{schemas_provider_ep.name}' - {schema_err}"
- )
- except Exception as provider_err:
- # log and ignore initialization errors
- self.log.error(
- f"Error loading schemas for SchemasProvider " f"'{schemas_provider_ep.name}' - {provider_err}"
- )
- def _validate_schema(self, schemaspace_name: str, schema_name: str, schema: dict):
- """Validates the given schema against the meta-schema."""
- try:
- self.log.debug(f"Validating schema '{schema_name}' of schemaspace {schemaspace_name}...")
- validate(instance=schema, schema=self._meta_schema, format_checker=draft7_format_checker)
- except ValidationError as ve:
- # Because validation errors are so verbose, only provide the first line.
- first_line = str(ve).partition("\n")[0]
- msg = (
- f"Validation failed for schema '{schema_name}' of "
- f"schemaspace '{schemaspace_name}' with error: {first_line}."
- )
- self.log.error(msg)
- raise ValidationError(msg) from ve
- @staticmethod
- def _get_schemaspaces():
- """Wrapper around entrypoints.get_group_all() - primarily to facilitate testing."""
- return get_group_all("metadata.schemaspaces")
- @staticmethod
- def _get_schemas_providers():
- """Wrapper around entrypoints.get_group_all() - primarily to facilitate testing."""
- return get_group_all("metadata.schemas_providers")
- class Schemaspace(LoggingConfigurable):
- _id: str
- _name: str
- _display_name: str
- _description: str
- _schemas: Dict[str, Dict] # use a dict to prevent duplicate entries
- _deprecated: bool
- _deprecated_schema_names: Set[str]
- def __init__(
- self,
- schemaspace_id: str,
- name: str,
- display_name: Optional[str] = None,
- description: Optional[str] = "",
- **kwargs,
- ):
- super().__init__(**kwargs)
- self._schemas = {}
- self._deprecated = False
- self._deprecated_schema_names = set()
- # Validate properties
- if not schemaspace_id:
- raise ValueError("Property 'id' requires a value!")
- if not Schemaspace._validate_id(schemaspace_id):
- raise ValueError(f"The value of property 'id' ({schemaspace_id}) does not conform to a UUID!")
- if not name:
- raise ValueError("Property 'name' requires a value!")
- if not Schemaspace._validate_name(name):
- raise ValueError(f"The 'name' property ({name}) must be alphanumeric with dash or underscore only!")
- self._id = schemaspace_id
- self._name = name
- self._display_name = display_name or name
- self._description = description
- @property
- def id(self) -> str:
- """The id (uuid) of the schemaspace"""
- return self._id
- @property
- def name(self) -> str:
- """The name of the schemaspace"""
- return self._name
- @property
- def display_name(self) -> str:
- """The display_name of the schemaspace"""
- return self._display_name
- @property
- def description(self) -> str:
- """The description of the schemaspace"""
- return self._description
- @property
- def schemas(self) -> Dict[str, Dict]:
- """Returns the schemas currently associated with this schemaspace"""
- return self._schemas
- @property
- def is_deprecated(self) -> bool:
- """Indicates if this schemaspace is deprecaated"""
- return self._deprecated
- @property
- def deprecated_schema_names(self) -> List[str]:
- """Returns a list of deprecated schema names associated with this schemaspace"""
- return list(self._deprecated_schema_names)
- def filter_schema(self, schema: Dict) -> Dict:
- """Allows Schemaspace to apply changes to a given schema prior to its validation (and add)."""
- return schema
- def add_schema(self, schema: Dict) -> None:
- """Associates the given schema to this schemaspace"""
- assert isinstance(schema, dict), "Parameter 'schema' is not a dictionary!"
- self._schemas[schema.get("name")] = schema
- if schema.get("deprecated", False):
- self._deprecated_schema_names.add(schema.get("name"))
- @final
- def migrate(self, *args, **kwargs) -> List[str]:
- """Migrate schemaspace instances. This method is `final` and should not be overridden.
- Its purpose is to drive migration across the Schemaspace's SchemasProviders and gather
- results.
- Returns the list of migrated instance names.
- """
- migrated_instances: List[str] = []
- # For each schema in this schemaspace, invoke its corresponding
- # SchemasProvider's migrate method and collect the results.
- schema_to_provider = SchemaManager.instance().get_schemasproviders(self.id)
- for schema_name, provider in schema_to_provider.items():
- instances = provider.migrate(schemaspace_name=self.name, schema_name=schema_name)
- migrated_instances.extend(instances)
- return migrated_instances
- @staticmethod
- def _validate_id(id) -> bool:
- """Validate that id is uuidv4 compliant"""
- is_valid = False
- uuidv4_regex = re.compile("^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$", re.I)
- if uuidv4_regex.match(id):
- is_valid = True
- return is_valid
- @staticmethod
- def _validate_name(name) -> bool:
- """Validate that the name adheres to the criteria (alphanumeric, dash, underscore only)"""
- is_valid = False
- name_regex = re.compile("^[A-Za-z][0-9A-Za-z_-]*[0-9A-Za-z]$", re.I)
- if name_regex.match(name):
- is_valid = True
- return is_valid
- class SchemasProvider(ABC):
- """Abstract base class used to obtain schema definitions from registered schema providers."""
- log: Logger
- def __init__(self, *args, **kwargs):
- self.log = getLogger("ElyraApp")
- @abstractmethod
- def get_schemas(self) -> List[Dict]:
- """Returns a list of schemas"""
- pass
- def migrate(self, *args, **kwargs) -> List[str]:
- """Migrate instances of schemas provided by this SchemasProvider.
- kwargs:
- schema_name: str The name of the schema
- schemaspace_name: str The name of the schemaspace in which this schema resides
- Called by Schemaspace.migrate(), this method will be called with a `schema_name`
- keyword argument indicating a name of schema provided by this SchemasProvider.
- The method will return a list of migrated instances or an empty array.
- """
- return list()
|