catalog_connector.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from abc import abstractmethod
  17. from copy import deepcopy
  18. import hashlib
  19. from http import HTTPStatus
  20. import os
  21. from pathlib import Path
  22. from queue import Empty
  23. from queue import Queue
  24. from threading import Thread
  25. from typing import Any
  26. from typing import Dict
  27. from typing import List
  28. from typing import Optional
  29. from urllib.parse import urlparse
  30. from deprecation import deprecated
  31. from jupyter_core.paths import ENV_JUPYTER_PATH
  32. from requests import session
  33. from requests.auth import HTTPBasicAuth
  34. from traitlets.config import LoggingConfigurable
  35. from traitlets.traitlets import default
  36. from traitlets.traitlets import Integer
  37. from elyra._version import __version__
  38. from elyra.metadata.metadata import Metadata
  39. from elyra.pipeline.component import Component
  40. from elyra.pipeline.component import ComponentParameter
  41. from elyra.pipeline.runtime_type import RuntimeProcessorType
  42. from elyra.util.url import FileTransportAdapter
  43. class EntryData(object):
  44. """
  45. An object representing the data retrieved from a single entry of a catalog, which,
  46. at minimum, includes the string definition of the corresponding component(s)
  47. """
  48. definition: str = None
  49. file_extension: str = None
  50. def __init__(self, definition: str, file_extension: Optional[str] = None, **kwargs):
  51. if isinstance(definition, (bytes, bytearray)):
  52. definition = definition.decode("utf-8")
  53. self.definition = definition
  54. self.file_extension = file_extension
  55. class AirflowEntryData(EntryData):
  56. """
  57. An Airflow-specific EntryData object that includes the fully-qualified package
  58. name (excluding class name) that represents the definition file.
  59. """
  60. package_name: str = None
  61. def __init__(self, definition: str, file_extension: Optional[str] = None, **kwargs):
  62. super().__init__(definition, file_extension, **kwargs)
  63. self.package_name = kwargs.get("package_name")
  64. class KfpEntryData(EntryData):
  65. """
  66. A KFP-specific EntryData object
  67. """
  68. pass
  69. class CatalogEntry(object):
  70. """
  71. An object corresponding to a single entry of a component catalog, which has a
  72. unique id, a string definition, a dict of identifying key-value pairs, and
  73. other associated metadata.
  74. """
  75. id: str
  76. entry_data: EntryData
  77. entry_reference: Any
  78. catalog_type: str
  79. runtime_type: RuntimeProcessorType
  80. categories: List[str]
  81. def __init__(self, entry_data: EntryData, entry_reference: Any, catalog_instance: Metadata, hash_keys: List[str]):
  82. self.entry_data = entry_data
  83. self.entry_reference = entry_reference
  84. self.catalog_type = catalog_instance.schema_name
  85. self.runtime_type = catalog_instance.runtime_type.name # noqa
  86. self.categories = catalog_instance.metadata.get("categories", [])
  87. self.id = self.compute_unique_id(hash_keys)
  88. def compute_unique_id(self, hash_keys: List[str]) -> str:
  89. """
  90. Computes a unique id for the given component based on the schema name of the
  91. catalog connector type and any information specific to that component-catalog-type
  92. combination as given in hash_keys.
  93. :param hash_keys: the list of keys (present in the catalog_entry_data dict)
  94. whose values will be used to construct the hash
  95. :returns: a unique component id of the form '<catalog-type>:<hash_of_entry_info>'
  96. """
  97. hash_str = ""
  98. for key in hash_keys:
  99. if not self.entry_reference.get(key):
  100. # Catalog entry does not have key - build hash without it
  101. continue
  102. hash_str = hash_str + str(self.entry_reference[key]) + ":"
  103. hash_str = hash_str[:-1]
  104. # Use only the first 12 characters of the resulting hash
  105. hash_digest = f"{hashlib.sha256(hash_str.encode()).hexdigest()[:12]}"
  106. return f"{self.catalog_type}:{hash_digest}"
  107. def get_component(
  108. self, id: str, name: str, description: str, properties: List[ComponentParameter], file_extension: str
  109. ) -> Component:
  110. """
  111. Construct a Component object given the arguments (as parsed from the definition file)
  112. and the relevant information from the catalog from which the component originates.
  113. """
  114. params = {
  115. "id": id,
  116. "name": name,
  117. "description": description,
  118. "properties": properties,
  119. "catalog_type": self.catalog_type,
  120. "component_reference": self.entry_reference,
  121. "definition": self.entry_data.definition,
  122. "runtime_type": self.runtime_type,
  123. "categories": self.categories,
  124. "extensions": [self.entry_data.file_extension or file_extension],
  125. }
  126. if isinstance(self.entry_data, AirflowEntryData):
  127. params["package_name"] = self.entry_data.package_name
  128. return Component(**params)
  129. class ComponentCatalogConnector(LoggingConfigurable):
  130. """
  131. Abstract class to model component_entry readers that can read components from different locations
  132. """
  133. max_threads_default = 3
  134. max_readers_env = "ELYRA_CATALOG_CONNECTOR_MAX_READERS"
  135. max_readers = Integer(
  136. max_threads_default,
  137. help="""Sets the maximum number of reader threads to be used to read
  138. catalog entries in parallel""",
  139. ).tag(config=True)
  140. @default("max_readers")
  141. def max_readers_default(self):
  142. max_reader_threads = ComponentCatalogConnector.max_threads_default
  143. try:
  144. max_reader_threads = int(os.getenv(self.max_readers_env, max_reader_threads))
  145. except ValueError:
  146. self.log.info(
  147. f"Unable to parse environmental variable {self.max_readers_env}, "
  148. f"using the default value of {self.max_threads_default}"
  149. )
  150. return max_reader_threads
  151. def __init__(self, file_types: List[str], **kwargs):
  152. super().__init__(**kwargs)
  153. self._file_types = file_types
  154. @abstractmethod
  155. def get_catalog_entries(self, catalog_metadata: Dict[str, Any]) -> List[Dict[str, Any]]:
  156. """
  157. Returns a list of catalog_entry_data dictionary instances, one per entry in the given catalog.
  158. Each catalog_entry_data dictionary contains the information needed to access a single component
  159. definition. The form that each catalog_entry_data takes is determined by the unique requirements
  160. of the reader class.
  161. For example, the FilesystemCatalogConnector includes both a base directory ('base_dir') key-value
  162. pair and a relative path ('path') key-value pair in its 'catalog_entry_data' dict. Both fields
  163. are needed in order to access the corresponding definition in get_entry_data().
  164. Every catalog_entry_data should contain each of the keys returned in get_hash_keys() to ensure
  165. uniqueness and portability among entries. For the same reason, no two catalog entries should have
  166. equivalent catalog_entry_data dictionaries.
  167. :param catalog_metadata: the dictionary form of the metadata associated with a single catalog;
  168. the general structure is given in the example below
  169. example:
  170. {
  171. "description": "...", # only present if a description is added
  172. "runtime_type": "...", # must be present
  173. "categories": ["category1", "category2", ...], # may be an empty array
  174. "your_property1": value1,
  175. "your_property2": value2,
  176. ...
  177. }
  178. :returns: a list of catalog entry dictionaries, each of which contains the information
  179. needed to access a component definition in get_entry_data()
  180. """
  181. raise NotImplementedError("abstract method 'get_catalog_entries()' must be implemented")
  182. @deprecated(
  183. deprecated_in="3.7.0",
  184. removed_in="4.0",
  185. current_version=__version__,
  186. details="Implement the get_entry_data function instead",
  187. )
  188. def read_catalog_entry(self, catalog_entry_data: Dict[str, Any], catalog_metadata: Dict[str, Any]) -> Optional[str]:
  189. """
  190. DEPRECATED. Will be removed in 4.0. get_entry_data() must be implemented instead.
  191. Reads a component definition for a single catalog entry using the catalog_entry_data returned
  192. from get_catalog_entries() and, if needed, the catalog metadata.
  193. :param catalog_entry_data: a dictionary that contains the information needed to read the content
  194. of the component definition; below is an example data structure returned
  195. from get_catalog_entries()
  196. example:
  197. {
  198. "directory_path": "/Users/path/to/directory",
  199. "relative_path": "subdir/file.py"
  200. }
  201. :param catalog_metadata: the metadata associated with the catalog in which this catalog entry is
  202. stored; this is the same dictionary that is passed into get_catalog_entries();
  203. in addition to catalog_entry_data, catalog_metadata may also be
  204. needed to read the component definition for certain types of catalogs
  205. :returns: the content of the given catalog entry's definition in string form, if found, or None;
  206. if None is returned, this catalog entry is skipped and a warning message logged
  207. """
  208. raise NotImplementedError("abstract method 'read_catalog_entry()' must be implemented")
  209. def get_entry_data(
  210. self, catalog_entry_data: Dict[str, Any], catalog_metadata: Dict[str, Any]
  211. ) -> Optional[EntryData]:
  212. """
  213. Reads a component definition (and other information-of-interest) for a single catalog entry and
  214. creates an EntryData object to represent it. Uses the catalog_entry_data returned from
  215. get_catalog_entries() and, if needed, the catalog metadata to retrieve the definition.
  216. :param catalog_entry_data: a dictionary that contains the information needed to read the content of
  217. the component definition; below is an example data structure returned from get_catalog_entries()
  218. example:
  219. {
  220. "directory_path": "/Users/path/to/directory",
  221. "relative_path": "subdir/file.py"
  222. }
  223. :param catalog_metadata: the metadata associated with the catalog in which this catalog entry is
  224. stored; this is the same dictionary that is passed into get_catalog_entries(); in addition to
  225. catalog_entry_data, catalog_metadata may also be needed to read the component definition for
  226. certain types of catalogs
  227. :returns: an EntryData object representing the definition (and other identifying info) for a single
  228. catalog entry; if None is returned, this catalog entry is skipped and a warning message logged
  229. """
  230. raise NotImplementedError("method 'get_entry_data()' must be overridden")
  231. @classmethod
  232. def get_hash_keys(cls) -> List[Any]:
  233. """
  234. Provides a list of keys, available in the 'catalog_entry_data' dictionary, whose values
  235. will be used to construct a unique hash id for each entry with the given catalog type.
  236. This function has been changed to a class method as of version 3.7. Connectors that still
  237. implement this function as an abstract method will be supported in a fallback scenario.
  238. Besides being a means to uniquely identify a single component (catalog entry), the hash id
  239. also enables pipeline portability across installations when the keys returned here are
  240. chosen strategically. For example, the FilesystemCatalogConnector includes both a base
  241. directory key-value pair and a relative path key-value pair in its 'catalog_entry_data' dict.
  242. Both fields are required to access the component definition in get_entry_data(), but
  243. only the relative path field is used to create the unique hash. This allows a component
  244. that has the same relative path defined in two separate a catalogs in two separate
  245. installations to resolve to the same unique id in each, and therefore to be portable across
  246. pipelines in these installations.
  247. To ensure the hash is unique, no two catalog entries can have the same key-value pairs
  248. over the set of keys returned by this function. If two entries resolve to the same hash,
  249. the one whose definition is read last will overwrite the other(s).
  250. Example:
  251. Given a set of keys ['key1', 'key2', 'key3'], the below two catalog_entry_data dictionaries
  252. will produce unique hashes. The same can not be said, however, if the set of keys
  253. returned is ['key2', 'key3'].
  254. component_entry_data for entry1: component_entry_data for entry2:
  255. { {
  256. 'key1': 'value1', 'key1': 'value4',
  257. 'key2': 'value2', 'key2': 'value2',
  258. 'key3': 'value3' 'key3': 'value3'
  259. } {
  260. Additionally, every catalog_entry_data dict should include each key in the set returned
  261. here. If this is not the case, a catalog entry's portability and uniqueness may be negatively
  262. affected.
  263. :returns: a list of keys
  264. """
  265. raise NotImplementedError("abstract method 'get_hash_keys()' must be implemented")
  266. def read_component_definitions(self, catalog_instance: Metadata) -> List[CatalogEntry]:
  267. """
  268. This function compiles the definitions of all catalog entries in a given catalog.
  269. Catalog entry data is first retrieved for each entry in the given catalog. This data is added
  270. to a queue, and a number of reader threads ('max_reader' or fewer) are started.
  271. Each reader thread pulls the data for a singe catalog entry from the queue and uses it to read
  272. the definition associated with that entry.
  273. As a mutable object, the 'catalog_entry_map' provides a means to retrieve a return value for
  274. each thread. If a thread is able to successfully read the content of the given catalog entry,
  275. a unique hash is created for the entry and a mapping is added to the catalog_entry_map.
  276. The catalog_instance Metadata parameter will have the following attributes of interest in
  277. addition to a few additional attributes used internally:
  278. :param catalog_instance: the Metadata instance for this catalog; below is an example instance
  279. example:
  280. display_name: str = "Catalog Name"
  281. schema_name: str = "connector-type"
  282. metadata: Dict[str, Any] = {
  283. "description": "...", # only present if a description is added
  284. "runtime": "...", # must be present
  285. "categories": ["category1", "category2", ...], # may be an empty array
  286. "your_property1": value1,
  287. "your_property2": value2,
  288. ...
  289. }
  290. :returns: a mapping of a unique component ids to their definition and identifying data
  291. """
  292. catalog_entry_q = Queue()
  293. catalog_entries: List[CatalogEntry] = []
  294. try:
  295. # Retrieve list of keys that will be used to construct
  296. # the catalog entry hash for each entry in the catalog
  297. try:
  298. # Attempt to use get_hash_keys as class method (Elyra version 3.7+)
  299. keys_to_hash = ComponentCatalogConnector.get_hash_keys()
  300. except Exception:
  301. # Fall back to using abstract method (version 3.6 and earlier)
  302. keys_to_hash = self.get_hash_keys()
  303. # Add display_name attribute to the metadata dictionary
  304. catalog_metadata = deepcopy(catalog_instance.metadata)
  305. catalog_metadata["display_name"] = catalog_instance.display_name
  306. # Add catalog entry data dictionaries to the thread queue
  307. for entry in self.get_catalog_entries(catalog_metadata):
  308. catalog_entry_q.put_nowait(entry)
  309. except NotImplementedError as e:
  310. err_msg = f"{self.__class__.__name__} does not meet the requirements of a catalog connector class: {e}"
  311. self.log.error(err_msg)
  312. except Exception as e:
  313. err_msg = f"Could not get catalog entry information for catalog '{catalog_instance.display_name}': {e}"
  314. # Dump stack trace with error message
  315. self.log.exception(err_msg)
  316. def read_with_thread():
  317. """
  318. Gets a catalog entry data dictionary from the queue and attempts to read corresponding definition
  319. """
  320. while not catalog_entry_q.empty():
  321. try:
  322. # Pull a catalog entry dictionary from the queue
  323. catalog_entry_data = catalog_entry_q.get(timeout=0.1)
  324. except Empty:
  325. continue
  326. try:
  327. # Read the entry definition given its returned data and the catalog entry data
  328. self.log.debug(
  329. f"Attempting read of definition for catalog entry with identifying information: "
  330. f"{str(catalog_entry_data)}..."
  331. )
  332. try:
  333. # Attempt to get an EntryData object from get_entry_data first
  334. entry_data: EntryData = self.get_entry_data(
  335. catalog_entry_data=catalog_entry_data, catalog_metadata=catalog_metadata
  336. )
  337. except NotImplementedError:
  338. # Connector class does not implement get_catalog_definition and we must
  339. # manually coerce this entry's returned values into a EntryData object
  340. definition = self.read_catalog_entry(
  341. catalog_entry_data=catalog_entry_data, catalog_metadata=catalog_metadata
  342. )
  343. entry_data: EntryData = EntryData(definition=definition)
  344. # Ignore this entry if no definition content is returned
  345. if not entry_data or not entry_data.definition:
  346. self.log.warning(
  347. f"No definition content found for catalog entry with identifying information: "
  348. f"{catalog_entry_data}. Skipping..."
  349. )
  350. catalog_entry_q.task_done()
  351. continue
  352. # Create a CatalogEntry object with the returned EntryData and other
  353. # necessary information from the catalog instance and connector class
  354. catalog_entry = CatalogEntry(
  355. entry_data=entry_data,
  356. entry_reference=catalog_entry_data,
  357. catalog_instance=catalog_instance,
  358. hash_keys=keys_to_hash,
  359. )
  360. catalog_entries.append(catalog_entry)
  361. except NotImplementedError as e:
  362. msg = f"{self.__class__.__name__} does not meet the requirements of a catalog connector class: {e}."
  363. self.log.error(msg)
  364. except Exception as e:
  365. # Dump stack trace with error message and continue
  366. self.log.exception(
  367. f"Could not read definition for catalog entry with identifying information: "
  368. f"{str(catalog_entry_data)}: {e}"
  369. )
  370. # Mark this thread's read as complete
  371. catalog_entry_q.task_done()
  372. # Start 'max_reader' reader threads if catalog includes more than 'max_reader'
  373. # number of catalog entries, else start one thread per entry
  374. num_threads = min(catalog_entry_q.qsize(), self.max_readers)
  375. for i in range(num_threads):
  376. Thread(target=read_with_thread).start()
  377. # Wait for all queued entries to be processed
  378. catalog_entry_q.join()
  379. return catalog_entries
  380. class FilesystemComponentCatalogConnector(ComponentCatalogConnector):
  381. """
  382. Read a singular component definition from the local filesystem
  383. """
  384. def get_absolute_path(self, path: str) -> str:
  385. """
  386. Determines the absolute location of a given path. Error checking is delegated to
  387. the calling function
  388. """
  389. # Expand path to include user home if necessary
  390. path = os.path.expanduser(path)
  391. # Check for absolute path
  392. if os.path.isabs(path):
  393. return path
  394. # If path is still not absolute, default to the Jupyter share location
  395. return os.path.join(ENV_JUPYTER_PATH[0], "components", path)
  396. def get_catalog_entries(self, catalog_metadata: Dict[str, Any]) -> List[Dict[str, Any]]:
  397. """
  398. Returns a list of catalog_entry_data dictionary instances, one per entry in the given catalog.
  399. :returns: a list of component_entry_data; for the FilesystemComponentCatalogConnector class this
  400. takes the form:
  401. {
  402. 'base_dir': 'base/directory/for/file', # can be empty
  403. 'path': 'path/to/definition_in_local_fs.ext' # may be relative or absolute
  404. }
  405. """
  406. catalog_entry_data = []
  407. base_dir = catalog_metadata.get("base_path", "")
  408. if base_dir:
  409. base_dir = self.get_absolute_path(base_dir)
  410. if not os.path.exists(base_dir):
  411. # If the base directory is not found, skip this catalog
  412. self.log.warning(f"Base directory does not exist -> {base_dir}")
  413. return catalog_entry_data
  414. for path in catalog_metadata.get("paths"):
  415. path = os.path.expanduser(path)
  416. if not base_dir and not os.path.isabs(path):
  417. base_dir = os.path.join(ENV_JUPYTER_PATH[0], "components")
  418. catalog_entry_data.append({"base_dir": base_dir, "path": path})
  419. return catalog_entry_data
  420. def get_entry_data(
  421. self, catalog_entry_data: Dict[str, Any], catalog_metadata: Dict[str, Any]
  422. ) -> Optional[EntryData]:
  423. """
  424. Reads a component definition (and other information-of-interest) for a single catalog entry and
  425. creates an EntryData object to represent it. Uses the catalog_entry_data returned from
  426. get_catalog_entries() and, if needed, the catalog metadata to retrieve the definition.
  427. :param catalog_entry_data: for the Filesystem- and DirectoryComponentCatalogConnector classes,
  428. this includes 'path' and 'base_dir' keys
  429. :param catalog_metadata: Filesystem- and DirectoryComponentCatalogConnector classes do not need this
  430. field to read individual catalog entries
  431. """
  432. path = os.path.join(catalog_entry_data.get("base_dir", ""), catalog_entry_data.get("path"))
  433. if not os.path.exists(path):
  434. self.log.warning(f"Invalid location for component: {path}")
  435. else:
  436. with open(path, "r") as f:
  437. return EntryData(definition=f.read())
  438. return None
  439. @classmethod
  440. def get_hash_keys(cls) -> List[Any]:
  441. """
  442. For the Filesystem- and DirectoryComponentCatalogConnector classes, only the
  443. 'path' value is needed from the catalog_entry_data dictionary to construct a
  444. unique hash id for a single catalog entry
  445. """
  446. return ["path"]
  447. class DirectoryComponentCatalogConnector(FilesystemComponentCatalogConnector):
  448. """
  449. Read component definitions from a local directory
  450. """
  451. def get_relative_path_from_base(self, base_dir: str, file_path: str) -> str:
  452. """
  453. Determines the relative portion of a path from the given base directory.
  454. :param base_dir: the absolute path to a base directory to compare against
  455. :param file_path: the absolute path to a file within the given base directory
  456. :returns: the path to the given file relative to the given base directory
  457. Example:
  458. given: base_path = "/path/to/folder"
  459. given: absolute_path = "/path/to/folder/nested/file.py"
  460. returns: 'nested/file.py'
  461. """
  462. base_list = base_dir.split("/")
  463. absolute_list = file_path.split("/")
  464. while base_list:
  465. base_list = base_list[1:]
  466. absolute_list = absolute_list[1:]
  467. return "/".join(absolute_list)
  468. def get_catalog_entries(self, catalog_metadata: Dict[str, Any]) -> List[Dict[str, Any]]:
  469. """
  470. Returns a list of catalog_entry_data dictionary instances, one per entry in the given catalog.
  471. :returns: a list of component_entry_data; for the DirectoryComponentCatalogConnector class this
  472. takes the form
  473. {
  474. 'base_dir': 'base/directory/for/files', # given in base_path
  475. 'path': 'path/to/definition_in_local_fs.ext' # may be relative or absolute
  476. }
  477. """
  478. catalog_entry_data = []
  479. for dir_path in catalog_metadata.get("paths"):
  480. base_dir = self.get_absolute_path(dir_path)
  481. if not os.path.exists(base_dir):
  482. self.log.warning(f"Invalid directory -> {base_dir}")
  483. continue
  484. # Include '**/' in the glob pattern if files in subdirectories should be included
  485. recursive_flag = "**/" if catalog_metadata.get("include_subdirs", False) else ""
  486. patterns = [f"{recursive_flag}*{file_type}" for file_type in self._file_types]
  487. for file_pattern in patterns:
  488. catalog_entry_data.extend(
  489. [
  490. {"base_dir": base_dir, "path": self.get_relative_path_from_base(base_dir, str(absolute_path))}
  491. for absolute_path in Path(base_dir).glob(file_pattern)
  492. ]
  493. )
  494. return catalog_entry_data
  495. class UrlComponentCatalogConnector(ComponentCatalogConnector):
  496. """
  497. Read a singular component definition from a url
  498. """
  499. REQUEST_TIMEOUT = 30
  500. def get_catalog_entries(self, catalog_metadata: Dict[str, Any]) -> List[Dict[str, Any]]:
  501. """
  502. Returns a list of catalog_entry_data dictionary instances, one per entry in the given catalog.
  503. :returns: a list of component_entry_data; for the UrlComponentCatalogConnector class this takes
  504. the form:
  505. {
  506. 'url': 'url_of_remote_component_definition'
  507. }
  508. """
  509. return [{"url": url} for url in catalog_metadata.get("paths")]
  510. def get_entry_data(
  511. self, catalog_entry_data: Dict[str, Any], catalog_metadata: Dict[str, Any]
  512. ) -> Optional[EntryData]:
  513. """
  514. Reads a component definition (and other information-of-interest) for a single catalog entry and
  515. creates an EntryData object to represent it. Uses the catalog_entry_data returned from
  516. get_catalog_entries() and, if needed, the catalog metadata to retrieve the definition.
  517. :param catalog_entry_data: for the UrlComponentCatalogConnector class this includes a 'url' key
  518. :param catalog_metadata: UrlComponentCatalogConnector does not need this field to read
  519. individual catalog entries
  520. """
  521. url = catalog_entry_data.get("url")
  522. pr = urlparse(url)
  523. auth = None
  524. if pr.scheme != "file":
  525. # determine whether authentication needs to be performed
  526. auth_id = catalog_metadata.get("auth_id")
  527. auth_password = catalog_metadata.get("auth_password")
  528. if auth_id and auth_password:
  529. auth = HTTPBasicAuth(auth_id, auth_password)
  530. elif auth_id or auth_password:
  531. self.log.error(
  532. f"Error. URL catalog connector '{catalog_metadata.get('display_name')}' "
  533. "is not configured properly. "
  534. "Authentication requires a user id and password or API key."
  535. )
  536. return None
  537. try:
  538. requests_session = session()
  539. if pr.scheme == "file":
  540. requests_session.mount("file://", FileTransportAdapter())
  541. res = requests_session.get(
  542. url,
  543. timeout=UrlComponentCatalogConnector.REQUEST_TIMEOUT,
  544. allow_redirects=True,
  545. auth=auth,
  546. )
  547. except Exception as e:
  548. self.log.error(
  549. f"Error. The URL catalog connector '{catalog_metadata.get('display_name')}' "
  550. f"encountered an issue downloading '{url}': {e} "
  551. )
  552. else:
  553. if res.status_code != HTTPStatus.OK:
  554. self.log.error(
  555. f"Error. The URL catalog connector '{catalog_metadata.get('display_name')}' "
  556. f"encountered an issue downloading '{url}'. "
  557. f"HTTP response code: {res.status_code}"
  558. )
  559. else:
  560. return EntryData(definition=res.text)
  561. return None
  562. @classmethod
  563. def get_hash_keys(cls) -> List[Any]:
  564. """
  565. For the UrlComponentCatalogConnector class, only the 'url' value is needed
  566. from the catalog_entry_data dictionary to construct a unique hash id for a
  567. single catalog entry
  568. """
  569. return ["url"]