123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655 |
- #
- # Copyright 2018-2022 Elyra Authors
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- from abc import abstractmethod
- from copy import deepcopy
- import hashlib
- from http import HTTPStatus
- import os
- from pathlib import Path
- from queue import Empty
- from queue import Queue
- from threading import Thread
- from typing import Any
- from typing import Dict
- from typing import List
- from typing import Optional
- from deprecation import deprecated
- from jupyter_core.paths import ENV_JUPYTER_PATH
- import requests
- from traitlets.config import LoggingConfigurable
- from traitlets.traitlets import default
- from traitlets.traitlets import Integer
- from elyra._version import __version__
- from elyra.metadata.metadata import Metadata
- from elyra.pipeline.component import Component
- from elyra.pipeline.component import ComponentParameter
- from elyra.pipeline.runtime_type import RuntimeProcessorType
- class EntryData(object):
- """
- An object representing the data retrieved from a single entry of a catalog, which,
- at minimum, includes the string definition of the corresponding component(s)
- """
- definition: str = None
- file_extension: str = None
- def __init__(self, definition: str, file_extension: Optional[str] = None, **kwargs):
- if isinstance(definition, (bytes, bytearray)):
- definition = definition.decode("utf-8")
- self.definition = definition
- self.file_extension = file_extension
- class AirflowEntryData(EntryData):
- """
- An Airflow-specific EntryData object that includes the fully-qualified package
- name (excluding class name) that represents the definition file.
- """
- package_name: str = None
- def __init__(self, definition: str, file_extension: Optional[str] = None, **kwargs):
- super().__init__(definition, file_extension, **kwargs)
- self.package_name = kwargs.get("package_name")
- class KfpEntryData(EntryData):
- """
- A KFP-specific EntryData object
- """
- pass
- class CatalogEntry(object):
- """
- An object corresponding to a single entry of a component catalog, which has a
- unique id, a string definition, a dict of identifying key-value pairs, and
- other associated metadata.
- """
- id: str
- entry_data: EntryData
- entry_reference: Any
- catalog_type: str
- runtime_type: RuntimeProcessorType
- categories: List[str]
- def __init__(self, entry_data: EntryData, entry_reference: Any, catalog_instance: Metadata, hash_keys: List[str]):
- self.entry_data = entry_data
- self.entry_reference = entry_reference
- self.catalog_type = catalog_instance.schema_name
- self.runtime_type = catalog_instance.runtime_type.name # noqa
- self.categories = catalog_instance.metadata.get("categories", [])
- self.id = self.compute_unique_id(hash_keys)
- def compute_unique_id(self, hash_keys: List[str]) -> str:
- """
- Computes a unique id for the given component based on the schema name of the
- catalog connector type and any information specific to that component-catalog-type
- combination as given in hash_keys.
- :param hash_keys: the list of keys (present in the catalog_entry_data dict)
- whose values will be used to construct the hash
- :returns: a unique component id of the form '<catalog-type>:<hash_of_entry_info>'
- """
- hash_str = ""
- for key in hash_keys:
- if not self.entry_reference.get(key):
- # Catalog entry does not have key - build hash without it
- continue
- hash_str = hash_str + str(self.entry_reference[key]) + ":"
- hash_str = hash_str[:-1]
- # Use only the first 12 characters of the resulting hash
- hash_digest = f"{hashlib.sha256(hash_str.encode()).hexdigest()[:12]}"
- return f"{self.catalog_type}:{hash_digest}"
- def get_component(
- self, id: str, name: str, description: str, properties: List[ComponentParameter], file_extension: str
- ) -> Component:
- """
- Construct a Component object given the arguments (as parsed from the definition file)
- and the relevant information from the catalog from which the component originates.
- """
- params = {
- "id": id,
- "name": name,
- "description": description,
- "properties": properties,
- "catalog_type": self.catalog_type,
- "component_reference": self.entry_reference,
- "definition": self.entry_data.definition,
- "runtime_type": self.runtime_type,
- "categories": self.categories,
- "extensions": [self.entry_data.file_extension or file_extension],
- }
- if isinstance(self.entry_data, AirflowEntryData):
- params["package_name"] = self.entry_data.package_name
- return Component(**params)
- class ComponentCatalogConnector(LoggingConfigurable):
- """
- Abstract class to model component_entry readers that can read components from different locations
- """
- max_threads_default = 3
- max_readers_env = "ELYRA_CATALOG_CONNECTOR_MAX_READERS"
- max_readers = Integer(
- max_threads_default,
- help="""Sets the maximum number of reader threads to be used to read
- catalog entries in parallel""",
- ).tag(config=True)
- @default("max_readers")
- def max_readers_default(self):
- max_reader_threads = ComponentCatalogConnector.max_threads_default
- try:
- max_reader_threads = int(os.getenv(self.max_readers_env, max_reader_threads))
- except ValueError:
- self.log.info(
- f"Unable to parse environmental variable {self.max_readers_env}, "
- f"using the default value of {self.max_threads_default}"
- )
- return max_reader_threads
- def __init__(self, file_types: List[str], **kwargs):
- super().__init__(**kwargs)
- self._file_types = file_types
- @abstractmethod
- def get_catalog_entries(self, catalog_metadata: Dict[str, Any]) -> List[Dict[str, Any]]:
- """
- Returns a list of catalog_entry_data dictionary instances, one per entry in the given catalog.
- Each catalog_entry_data dictionary contains the information needed to access a single component
- definition. The form that each catalog_entry_data takes is determined by the unique requirements
- of the reader class.
- For example, the FilesystemCatalogConnector includes both a base directory ('base_dir') key-value
- pair and a relative path ('path') key-value pair in its 'catalog_entry_data' dict. Both fields
- are needed in order to access the corresponding definition in get_entry_data().
- Every catalog_entry_data should contain each of the keys returned in get_hash_keys() to ensure
- uniqueness and portability among entries. For the same reason, no two catalog entries should have
- equivalent catalog_entry_data dictionaries.
- :param catalog_metadata: the dictionary form of the metadata associated with a single catalog;
- the general structure is given in the example below
- example:
- {
- "description": "...", # only present if a description is added
- "runtime_type": "...", # must be present
- "categories": ["category1", "category2", ...], # may be an empty array
- "your_property1": value1,
- "your_property2": value2,
- ...
- }
- :returns: a list of catalog entry dictionaries, each of which contains the information
- needed to access a component definition in get_entry_data()
- """
- raise NotImplementedError("abstract method 'get_catalog_entries()' must be implemented")
- @deprecated(
- deprecated_in="3.7.0",
- removed_in="4.0",
- current_version=__version__,
- details="Implement the get_entry_data function instead",
- )
- def read_catalog_entry(self, catalog_entry_data: Dict[str, Any], catalog_metadata: Dict[str, Any]) -> Optional[str]:
- """
- DEPRECATED. Will be removed in 4.0. get_entry_data() must be implemented instead.
- Reads a component definition for a single catalog entry using the catalog_entry_data returned
- from get_catalog_entries() and, if needed, the catalog metadata.
- :param catalog_entry_data: a dictionary that contains the information needed to read the content
- of the component definition; below is an example data structure returned
- from get_catalog_entries()
- example:
- {
- "directory_path": "/Users/path/to/directory",
- "relative_path": "subdir/file.py"
- }
- :param catalog_metadata: the metadata associated with the catalog in which this catalog entry is
- stored; this is the same dictionary that is passed into get_catalog_entries();
- in addition to catalog_entry_data, catalog_metadata may also be
- needed to read the component definition for certain types of catalogs
- :returns: the content of the given catalog entry's definition in string form, if found, or None;
- if None is returned, this catalog entry is skipped and a warning message logged
- """
- raise NotImplementedError("abstract method 'read_catalog_entry()' must be implemented")
- def get_entry_data(
- self, catalog_entry_data: Dict[str, Any], catalog_metadata: Dict[str, Any]
- ) -> Optional[EntryData]:
- """
- Reads a component definition (and other information-of-interest) for a single catalog entry and
- creates an EntryData object to represent it. Uses the catalog_entry_data returned from
- get_catalog_entries() and, if needed, the catalog metadata to retrieve the definition.
- :param catalog_entry_data: a dictionary that contains the information needed to read the content of
- the component definition; below is an example data structure returned from get_catalog_entries()
- example:
- {
- "directory_path": "/Users/path/to/directory",
- "relative_path": "subdir/file.py"
- }
- :param catalog_metadata: the metadata associated with the catalog in which this catalog entry is
- stored; this is the same dictionary that is passed into get_catalog_entries(); in addition to
- catalog_entry_data, catalog_metadata may also be needed to read the component definition for
- certain types of catalogs
- :returns: an EntryData object representing the definition (and other identifying info) for a single
- catalog entry; if None is returned, this catalog entry is skipped and a warning message logged
- """
- raise NotImplementedError("method 'get_entry_data()' must be overridden")
- @classmethod
- def get_hash_keys(cls) -> List[Any]:
- """
- Provides a list of keys, available in the 'catalog_entry_data' dictionary, whose values
- will be used to construct a unique hash id for each entry with the given catalog type.
- This function has been changed to a class method as of version 3.7. Connectors that still
- implement this function as an abstract method will be supported in a fallback scenario.
- Besides being a means to uniquely identify a single component (catalog entry), the hash id
- also enables pipeline portability across installations when the keys returned here are
- chosen strategically. For example, the FilesystemCatalogConnector includes both a base
- directory key-value pair and a relative path key-value pair in its 'catalog_entry_data' dict.
- Both fields are required to access the component definition in get_entry_data(), but
- only the relative path field is used to create the unique hash. This allows a component
- that has the same relative path defined in two separate a catalogs in two separate
- installations to resolve to the same unique id in each, and therefore to be portable across
- pipelines in these installations.
- To ensure the hash is unique, no two catalog entries can have the same key-value pairs
- over the set of keys returned by this function. If two entries resolve to the same hash,
- the one whose definition is read last will overwrite the other(s).
- Example:
- Given a set of keys ['key1', 'key2', 'key3'], the below two catalog_entry_data dictionaries
- will produce unique hashes. The same can not be said, however, if the set of keys
- returned is ['key2', 'key3'].
- component_entry_data for entry1: component_entry_data for entry2:
- { {
- 'key1': 'value1', 'key1': 'value4',
- 'key2': 'value2', 'key2': 'value2',
- 'key3': 'value3' 'key3': 'value3'
- } {
- Additionally, every catalog_entry_data dict should include each key in the set returned
- here. If this is not the case, a catalog entry's portability and uniqueness may be negatively
- affected.
- :returns: a list of keys
- """
- raise NotImplementedError("abstract method 'get_hash_keys()' must be implemented")
- def read_component_definitions(self, catalog_instance: Metadata) -> List[CatalogEntry]:
- """
- This function compiles the definitions of all catalog entries in a given catalog.
- Catalog entry data is first retrieved for each entry in the given catalog. This data is added
- to a queue, and a number of reader threads ('max_reader' or fewer) are started.
- Each reader thread pulls the data for a singe catalog entry from the queue and uses it to read
- the definition associated with that entry.
- As a mutable object, the 'catalog_entry_map' provides a means to retrieve a return value for
- each thread. If a thread is able to successfully read the content of the given catalog entry,
- a unique hash is created for the entry and a mapping is added to the catalog_entry_map.
- The catalog_instance Metadata parameter will have the following attributes of interest in
- addition to a few additional attributes used internally:
- :param catalog_instance: the Metadata instance for this catalog; below is an example instance
- example:
- display_name: str = "Catalog Name"
- schema_name: str = "connector-type"
- metadata: Dict[str, Any] = {
- "description": "...", # only present if a description is added
- "runtime": "...", # must be present
- "categories": ["category1", "category2", ...], # may be an empty array
- "your_property1": value1,
- "your_property2": value2,
- ...
- }
- :returns: a mapping of a unique component ids to their definition and identifying data
- """
- catalog_entry_q = Queue()
- catalog_entries: List[CatalogEntry] = []
- try:
- # Retrieve list of keys that will be used to construct
- # the catalog entry hash for each entry in the catalog
- try:
- # Attempt to use get_hash_keys as class method (Elyra version 3.7+)
- keys_to_hash = ComponentCatalogConnector.get_hash_keys()
- except Exception:
- # Fall back to using abstract method (version 3.6 and earlier)
- keys_to_hash = self.get_hash_keys()
- # Add display_name attribute to the metadata dictionary
- catalog_metadata = deepcopy(catalog_instance.metadata)
- catalog_metadata["display_name"] = catalog_instance.display_name
- # Add catalog entry data dictionaries to the thread queue
- for entry in self.get_catalog_entries(catalog_metadata):
- catalog_entry_q.put_nowait(entry)
- except NotImplementedError as e:
- err_msg = f"{self.__class__.__name__} does not meet the requirements of a catalog connector class: {e}"
- self.log.error(err_msg)
- except Exception as e:
- err_msg = f"Could not get catalog entry information for catalog '{catalog_instance.display_name}': {e}"
- # Dump stack trace with error message
- self.log.exception(err_msg)
- def read_with_thread():
- """
- Gets a catalog entry data dictionary from the queue and attempts to read corresponding definition
- """
- while not catalog_entry_q.empty():
- try:
- # Pull a catalog entry dictionary from the queue
- catalog_entry_data = catalog_entry_q.get(timeout=0.1)
- except Empty:
- continue
- try:
- # Read the entry definition given its returned data and the catalog entry data
- self.log.debug(
- f"Attempting read of definition for catalog entry with identifying information: "
- f"{str(catalog_entry_data)}..."
- )
- try:
- # Attempt to get an EntryData object from get_entry_data first
- entry_data: EntryData = self.get_entry_data(
- catalog_entry_data=catalog_entry_data, catalog_metadata=catalog_metadata
- )
- except NotImplementedError:
- # Connector class does not implement get_catalog_definition and we must
- # manually coerce this entry's returned values into a EntryData object
- definition = self.read_catalog_entry(
- catalog_entry_data=catalog_entry_data, catalog_metadata=catalog_metadata
- )
- entry_data: EntryData = EntryData(definition=definition)
- # Ignore this entry if no definition content is returned
- if not entry_data or not entry_data.definition:
- self.log.warning(
- f"No definition content found for catalog entry with identifying information: "
- f"{catalog_entry_data}. Skipping..."
- )
- catalog_entry_q.task_done()
- continue
- # Create a CatalogEntry object with the returned EntryData and other
- # necessary information from the catalog instance and connector class
- catalog_entry = CatalogEntry(
- entry_data=entry_data,
- entry_reference=catalog_entry_data,
- catalog_instance=catalog_instance,
- hash_keys=keys_to_hash,
- )
- catalog_entries.append(catalog_entry)
- except NotImplementedError as e:
- msg = f"{self.__class__.__name__} does not meet the requirements of a catalog connector class: {e}."
- self.log.error(msg)
- except Exception as e:
- # Dump stack trace with error message and continue
- self.log.exception(
- f"Could not read definition for catalog entry with identifying information: "
- f"{str(catalog_entry_data)}: {e}"
- )
- # Mark this thread's read as complete
- catalog_entry_q.task_done()
- # Start 'max_reader' reader threads if catalog includes more than 'max_reader'
- # number of catalog entries, else start one thread per entry
- num_threads = min(catalog_entry_q.qsize(), self.max_readers)
- for i in range(num_threads):
- Thread(target=read_with_thread).start()
- # Wait for all queued entries to be processed
- catalog_entry_q.join()
- return catalog_entries
- class FilesystemComponentCatalogConnector(ComponentCatalogConnector):
- """
- Read a singular component definition from the local filesystem
- """
- def get_absolute_path(self, path: str) -> str:
- """
- Determines the absolute location of a given path. Error checking is delegated to
- the calling function
- """
- # Expand path to include user home if necessary
- path = os.path.expanduser(path)
- # Check for absolute path
- if os.path.isabs(path):
- return path
- # If path is still not absolute, default to the Jupyter share location
- return os.path.join(ENV_JUPYTER_PATH[0], "components", path)
- def get_catalog_entries(self, catalog_metadata: Dict[str, Any]) -> List[Dict[str, Any]]:
- """
- Returns a list of catalog_entry_data dictionary instances, one per entry in the given catalog.
- :returns: a list of component_entry_data; for the FilesystemComponentCatalogConnector class this
- takes the form:
- {
- 'base_dir': 'base/directory/for/file', # can be empty
- 'path': 'path/to/definition_in_local_fs.ext' # may be relative or absolute
- }
- """
- catalog_entry_data = []
- base_dir = catalog_metadata.get("base_path", "")
- if base_dir:
- base_dir = self.get_absolute_path(base_dir)
- if not os.path.exists(base_dir):
- # If the base directory is not found, skip this catalog
- self.log.warning(f"Base directory does not exist -> {base_dir}")
- return catalog_entry_data
- for path in catalog_metadata.get("paths"):
- path = os.path.expanduser(path)
- if not base_dir and not os.path.isabs(path):
- base_dir = os.path.join(ENV_JUPYTER_PATH[0], "components")
- catalog_entry_data.append({"base_dir": base_dir, "path": path})
- return catalog_entry_data
- def get_entry_data(
- self, catalog_entry_data: Dict[str, Any], catalog_metadata: Dict[str, Any]
- ) -> Optional[EntryData]:
- """
- Reads a component definition (and other information-of-interest) for a single catalog entry and
- creates an EntryData object to represent it. Uses the catalog_entry_data returned from
- get_catalog_entries() and, if needed, the catalog metadata to retrieve the definition.
- :param catalog_entry_data: for the Filesystem- and DirectoryComponentCatalogConnector classes,
- this includes 'path' and 'base_dir' keys
- :param catalog_metadata: Filesystem- and DirectoryComponentCatalogConnector classes do not need this
- field to read individual catalog entries
- """
- path = os.path.join(catalog_entry_data.get("base_dir", ""), catalog_entry_data.get("path"))
- if not os.path.exists(path):
- self.log.warning(f"Invalid location for component: {path}")
- else:
- with open(path, "r") as f:
- return EntryData(definition=f.read())
- return None
- @classmethod
- def get_hash_keys(cls) -> List[Any]:
- """
- For the Filesystem- and DirectoryComponentCatalogConnector classes, only the
- 'path' value is needed from the catalog_entry_data dictionary to construct a
- unique hash id for a single catalog entry
- """
- return ["path"]
- class DirectoryComponentCatalogConnector(FilesystemComponentCatalogConnector):
- """
- Read component definitions from a local directory
- """
- def get_relative_path_from_base(self, base_dir: str, file_path: str) -> str:
- """
- Determines the relative portion of a path from the given base directory.
- :param base_dir: the absolute path to a base directory to compare against
- :param file_path: the absolute path to a file within the given base directory
- :returns: the path to the given file relative to the given base directory
- Example:
- given: base_path = "/path/to/folder"
- given: absolute_path = "/path/to/folder/nested/file.py"
- returns: 'nested/file.py'
- """
- base_list = base_dir.split("/")
- absolute_list = file_path.split("/")
- while base_list:
- base_list = base_list[1:]
- absolute_list = absolute_list[1:]
- return "/".join(absolute_list)
- def get_catalog_entries(self, catalog_metadata: Dict[str, Any]) -> List[Dict[str, Any]]:
- """
- Returns a list of catalog_entry_data dictionary instances, one per entry in the given catalog.
- :returns: a list of component_entry_data; for the DirectoryComponentCatalogConnector class this
- takes the form
- {
- 'base_dir': 'base/directory/for/files', # given in base_path
- 'path': 'path/to/definition_in_local_fs.ext' # may be relative or absolute
- }
- """
- catalog_entry_data = []
- for dir_path in catalog_metadata.get("paths"):
- base_dir = self.get_absolute_path(dir_path)
- if not os.path.exists(base_dir):
- self.log.warning(f"Invalid directory -> {base_dir}")
- continue
- # Include '**/' in the glob pattern if files in subdirectories should be included
- recursive_flag = "**/" if catalog_metadata.get("include_subdirs", False) else ""
- patterns = [f"{recursive_flag}*{file_type}" for file_type in self._file_types]
- for file_pattern in patterns:
- catalog_entry_data.extend(
- [
- {"base_dir": base_dir, "path": self.get_relative_path_from_base(base_dir, str(absolute_path))}
- for absolute_path in Path(base_dir).glob(file_pattern)
- ]
- )
- return catalog_entry_data
- class UrlComponentCatalogConnector(ComponentCatalogConnector):
- """
- Read a singular component definition from a url
- """
- def get_catalog_entries(self, catalog_metadata: Dict[str, Any]) -> List[Dict[str, Any]]:
- """
- Returns a list of catalog_entry_data dictionary instances, one per entry in the given catalog.
- :returns: a list of component_entry_data; for the UrlComponentCatalogConnector class this takes
- the form:
- {
- 'url': 'url_of_remote_component_definition'
- }
- """
- return [{"url": url} for url in catalog_metadata.get("paths")]
- def get_entry_data(
- self, catalog_entry_data: Dict[str, Any], catalog_metadata: Dict[str, Any]
- ) -> Optional[EntryData]:
- """
- Reads a component definition (and other information-of-interest) for a single catalog entry and
- creates an EntryData object to represent it. Uses the catalog_entry_data returned from
- get_catalog_entries() and, if needed, the catalog metadata to retrieve the definition.
- :param catalog_entry_data: for the UrlComponentCatalogConnector class this includes a 'url' key
- :param catalog_metadata: UrlComponentCatalogConnector does not need this field to read
- individual catalog entries
- """
- url = catalog_entry_data.get("url")
- try:
- res = requests.get(url)
- except Exception as e:
- self.log.warning(f"Failed to connect to URL for component: {url}: {e}")
- else:
- if res.status_code != HTTPStatus.OK:
- self.log.warning(f"Invalid location for component: {url} (HTTP code {res.status_code})")
- else:
- return EntryData(definition=res.text)
- return None
- @classmethod
- def get_hash_keys(cls) -> List[Any]:
- """
- For the UrlComponentCatalogConnector class, only the 'url' value is needed
- from the catalog_entry_data dictionary to construct a unique hash id for a
- single catalog entry
- """
- return ["url"]
|