schemasproviders.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from abc import ABCMeta
  16. import io
  17. import json
  18. import os
  19. from typing import Dict
  20. from typing import List
  21. import entrypoints
  22. from traitlets import log # noqa H306
  23. try:
  24. from kfp_tekton import TektonClient
  25. except ImportError:
  26. # We may not have kfp-tekton available and that's okay!
  27. TektonClient = None
  28. from elyra.metadata.schema import SchemasProvider
  29. from elyra.metadata.schemaspaces import CodeSnippets
  30. from elyra.metadata.schemaspaces import ComponentCatalogs
  31. from elyra.metadata.schemaspaces import RuntimeImages
  32. from elyra.metadata.schemaspaces import Runtimes
  33. from elyra.pipeline.kfp.kfp_authentication import SupportedAuthProviders
  34. from elyra.util.gitutil import SupportedGitTypes
  35. class ElyraSchemasProvider(SchemasProvider, metaclass=ABCMeta):
  36. """Base class used for retrieving Elyra-based schema files from its metadata/schemas directory."""
  37. # Just read the schema files once. Note that this list will also include the meta-schema.json.
  38. local_schemas = []
  39. schema_dir = os.path.join(os.path.dirname(__file__), "schemas")
  40. schema_files = [json_file for json_file in os.listdir(schema_dir) if json_file.endswith(".json")]
  41. for json_file in schema_files:
  42. schema_file = os.path.join(schema_dir, json_file)
  43. with io.open(schema_file, "r", encoding="utf-8") as f:
  44. schema_json = json.load(f)
  45. local_schemas.append(schema_json)
  46. def __init__(self):
  47. self.log = log.get_logger()
  48. # get set of registered runtimes
  49. self._runtime_processor_names = set()
  50. for processor in entrypoints.get_group_all("elyra.pipeline.processors"):
  51. # load the names of the runtime processors (skip 'local')
  52. if processor.name == "local":
  53. continue
  54. self._runtime_processor_names.add(processor.name)
  55. def get_local_schemas_by_schemaspace(self, schemaspace_id: str) -> List[Dict]:
  56. """Returns a list of local schemas associated with the given schemaspace_id"""
  57. schemas = []
  58. for schema in ElyraSchemasProvider.local_schemas:
  59. if schema.get("schemaspace_id") == schemaspace_id:
  60. schemas.append(schema)
  61. return schemas
  62. class RuntimesSchemas(ElyraSchemasProvider):
  63. """Returns schemas relative to Runtimes schemaspace only for THIS provider."""
  64. def get_schemas(self) -> List[Dict]:
  65. kfp_schema_present = False
  66. airflow_schema_present = False
  67. # determine if both airflow and kfp are needed and note if kfp is needed for later
  68. runtime_schemas = []
  69. schemas = self.get_local_schemas_by_schemaspace(Runtimes.RUNTIMES_SCHEMASPACE_ID)
  70. for schema in schemas:
  71. if schema["name"] in self._runtime_processor_names:
  72. runtime_schemas.append(schema)
  73. if schema["name"] == "kfp":
  74. kfp_schema_present = True
  75. elif schema["name"] == "airflow":
  76. airflow_schema_present = True
  77. else:
  78. self.log.error(
  79. f"No entrypoint with name '{schema['name']}' was found in group "
  80. f"'elyra.pipeline.processor' to match the schema with the same name. Skipping..."
  81. )
  82. if kfp_schema_present: # Update the kfp engine enum to reflect current packages...
  83. # If TektonClient package is missing, navigate to the engine property
  84. # and remove 'tekton' entry if present and return updated result.
  85. if not TektonClient:
  86. # locate the schema and update the enum
  87. for schema in runtime_schemas:
  88. if schema["name"] == "kfp":
  89. engine_enum: list = schema["properties"]["metadata"]["properties"]["engine"]["enum"]
  90. if "Tekton" in engine_enum:
  91. engine_enum.remove("Tekton")
  92. schema["properties"]["metadata"]["properties"]["engine"]["enum"] = engine_enum
  93. # For KFP schemas replace placeholders:
  94. # - properties.metadata.properties.auth_type.enum ({AUTH_PROVIDER_PLACEHOLDERS})
  95. # - properties.metadata.properties.auth_type.default ({DEFAULT_AUTH_PROVIDER_PLACEHOLDER})
  96. auth_type_enum = SupportedAuthProviders.get_provider_names()
  97. auth_type_default = SupportedAuthProviders.get_default_provider().name
  98. for schema in runtime_schemas:
  99. if schema["name"] == "kfp":
  100. if schema["properties"]["metadata"]["properties"].get("auth_type") is not None:
  101. schema["properties"]["metadata"]["properties"]["auth_type"]["enum"] = auth_type_enum
  102. schema["properties"]["metadata"]["properties"]["auth_type"]["default"] = auth_type_default
  103. if airflow_schema_present:
  104. # Replace Git Type placeholders
  105. git_type_enum = list(map(lambda c: c.name, SupportedGitTypes.get_enabled_types()))
  106. git_type_default = SupportedGitTypes.get_default_type().name
  107. for schema in runtime_schemas:
  108. if schema["name"] == "airflow":
  109. if schema["properties"]["metadata"]["properties"].get("git_type") is not None:
  110. schema["properties"]["metadata"]["properties"]["git_type"]["enum"] = git_type_enum
  111. schema["properties"]["metadata"]["properties"]["git_type"]["default"] = git_type_default
  112. return runtime_schemas
  113. class RuntimeImagesSchemas(ElyraSchemasProvider):
  114. """Returns schemas relative to Runtime Images schemaspace."""
  115. def get_schemas(self) -> List[Dict]:
  116. return self.get_local_schemas_by_schemaspace(RuntimeImages.RUNTIME_IMAGES_SCHEMASPACE_ID)
  117. class CodeSnippetsSchemas(ElyraSchemasProvider):
  118. """Returns schemas relative to Code Snippets schemaspace."""
  119. def get_schemas(self) -> List[Dict]:
  120. return self.get_local_schemas_by_schemaspace(CodeSnippets.CODE_SNIPPETS_SCHEMASPACE_ID)
  121. class ComponentCatalogsSchemas(ElyraSchemasProvider):
  122. """Returns schemas relative to Component Catalogs schemaspace."""
  123. def get_schemas(self) -> List[Dict]:
  124. schemas = self.get_local_schemas_by_schemaspace(ComponentCatalogs.COMPONENT_CATALOGS_SCHEMASPACE_ID)
  125. return schemas