123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355 |
- #
- # Copyright 2018-2022 Elyra Authors
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- from datetime import datetime
- from http.client import responses
- import json
- from logging import Logger
- import mimetypes
- from typing import List
- from typing import Optional
- from jupyter_server.base.handlers import APIHandler
- from jupyter_server.utils import url_path_join
- from tornado import web
- from elyra.metadata.error import MetadataNotFoundError
- from elyra.metadata.manager import MetadataManager
- from elyra.metadata.schemaspaces import ComponentCatalogs
- from elyra.pipeline.component import Component
- from elyra.pipeline.component_catalog import ComponentCache
- from elyra.pipeline.component_catalog import RefreshInProgressError
- from elyra.pipeline.parser import PipelineParser
- from elyra.pipeline.pipeline_definition import PipelineDefinition
- from elyra.pipeline.processor import PipelineProcessorManager
- from elyra.pipeline.processor import PipelineProcessorRegistry
- from elyra.pipeline.runtime_type import RuntimeProcessorType
- from elyra.pipeline.runtime_type import RuntimeTypeResources
- from elyra.pipeline.validation import PipelineValidationManager
- from elyra.util.http import HttpErrorMixin
- MIMETYPE_MAP = {".yaml": "text/x-yaml", ".py": "text/x-python", None: "text/plain"}
- def get_runtime_processor_type(runtime_type: str, log: Logger, request_path: str) -> Optional[RuntimeProcessorType]:
- """
- Gets the runtime processor type for the runtime type given in the request path.
- :param runtime_type: can be the shorthand runtime ('kfp', 'airflow') or the
- runtime type name ('KUBEFLOW_PIPELINES', 'APACHE_AIRFLOW') (preferred).
- :param log: used to log the appropriate warning for shorthand-name requests
- :param request_path: full request path of the endpoint
- :returns: the RuntimeProcessorType for the given runtime_type, or None
- """
- processor_manager = PipelineProcessorManager.instance()
- if processor_manager.is_supported_runtime(runtime_type):
- # The endpoint path contains the shorthand version of a runtime (e.g., 'kfp',
- # 'airflow'). This case and its associated functions should eventually be removed
- # in favor of using the RuntimeProcessorType name in the request path.
- log.warning(
- f"Deprecation warning: when calling endpoint '{request_path}' "
- f"use runtime type name (e.g. 'KUBEFLOW_PIPELINES', 'APACHE_AIRFLOW') "
- f"instead of shorthand name (e.g., 'kfp', 'airflow')"
- )
- return processor_manager.get_runtime_type(runtime_type)
- elif processor_manager.is_supported_runtime_type(runtime_type):
- # The request path uses the appropriate RuntimeProcessorType name. Use this
- # to get the RuntimeProcessorType instance to pass to get_all_components
- return RuntimeProcessorType.get_instance_by_name(runtime_type)
- return None
- class PipelineExportHandler(HttpErrorMixin, APIHandler):
- """Handler to expose REST API to export pipelines"""
- @web.authenticated
- async def get(self):
- msg_json = dict(title="Operation not supported.")
- self.set_header("Content-Type", "application/json")
- await self.finish(msg_json)
- @web.authenticated
- async def post(self, *args, **kwargs):
- self.log.debug("Pipeline Export handler now executing post request")
- parent = self.settings.get("elyra")
- payload = self.get_json_body()
- self.log.debug(f"JSON payload: {json.dumps(payload, indent=2, separators=(',', ': '))}")
- pipeline_definition = payload["pipeline"]
- pipeline_export_format = payload["export_format"]
- pipeline_export_path = payload["export_path"]
- pipeline_overwrite = payload["overwrite"]
- response = await PipelineValidationManager.instance().validate(pipeline=pipeline_definition)
- self.log.debug(f"Validation checks completed. Results as follows: {response.to_json()}")
- if not response.has_fatal:
- pipeline = PipelineParser(root_dir=self.settings["server_root_dir"], parent=parent).parse(
- pipeline_definition
- )
- pipeline_exported_path = await PipelineProcessorManager.instance().export(
- pipeline, pipeline_export_format, pipeline_export_path, pipeline_overwrite
- )
- json_msg = json.dumps({"export_path": pipeline_export_path})
- self.set_status(201)
- self.set_header("Content-Type", "application/json")
- location = url_path_join(self.base_url, "api", "contents", pipeline_exported_path)
- self.set_header("Location", location)
- else:
- json_msg = json.dumps(
- {
- "reason": responses.get(400),
- "message": "Errors found in pipeline",
- "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
- "issues": response.to_json().get("issues"),
- }
- )
- self.set_status(400)
- self.set_header("Content-Type", "application/json")
- await self.finish(json_msg)
- class PipelineSchedulerHandler(HttpErrorMixin, APIHandler):
- """Handler to expose method calls to execute pipelines as batch jobs"""
- @web.authenticated
- async def get(self):
- msg_json = dict(title="Operation not supported.")
- self.write(msg_json)
- await self.flush()
- @web.authenticated
- async def post(self, *args, **kwargs):
- self.log.debug("Pipeline SchedulerHandler now executing post request")
- parent = self.settings.get("elyra")
- pipeline_definition = self.get_json_body()
- self.log.debug(f"JSON payload: {pipeline_definition}")
- response = await PipelineValidationManager.instance().validate(pipeline=pipeline_definition)
- self.log.debug(f"Validation checks completed. Results as follows: {response.to_json()}")
- if not response.has_fatal:
- self.log.debug("Processing the pipeline submission and executing request")
- pipeline = PipelineParser(root_dir=self.settings["server_root_dir"], parent=parent).parse(
- pipeline_definition
- )
- response = await PipelineProcessorManager.instance().process(pipeline)
- json_msg = json.dumps(response.to_json())
- self.set_status(200)
- else:
- json_msg = json.dumps(
- {
- "reason": responses.get(400),
- "message": "Errors found in pipeline",
- "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
- "issues": response.to_json().get("issues"),
- }
- )
- self.set_status(400)
- self.set_header("Content-Type", "application/json")
- await self.finish(json_msg)
- class PipelineComponentHandler(HttpErrorMixin, APIHandler):
- """Handler to expose method calls to retrieve pipelines editor component configuration"""
- @web.authenticated
- async def get(self, runtime_type):
- self.log.debug(f"Retrieving pipeline components for runtime type: {runtime_type}")
- runtime_processor_type = get_runtime_processor_type(runtime_type, self.log, self.request.path)
- if not runtime_processor_type:
- raise web.HTTPError(400, f"Invalid runtime type '{runtime_type}'")
- # Include generic components for all runtime types
- components: List[Component] = ComponentCache.get_generic_components()
- # Add additional runtime-type-specific components, if present
- components.extend(ComponentCache.instance().get_all_components(platform=runtime_processor_type))
- palette_json = ComponentCache.to_canvas_palette(components=components)
- self.set_status(200)
- self.set_header("Content-Type", "application/json")
- await self.finish(palette_json)
- class PipelinePropertiesHandler(HttpErrorMixin, APIHandler):
- """Handler to expose method calls to retrieve pipeline properties"""
- @web.authenticated
- async def get(self, runtime_type):
- self.log.debug(f"Retrieving pipeline components for runtime type: {runtime_type}")
- runtime_processor_type = get_runtime_processor_type(runtime_type, self.log, self.request.path)
- if not runtime_processor_type:
- raise web.HTTPError(400, f"Invalid runtime type '{runtime_type}'")
- # Get pipeline properties json
- pipeline_properties_json = PipelineDefinition.get_canvas_properties_from_template(
- package_name="templates/pipeline", template_name="pipeline_properties_template.jinja2"
- )
- self.set_status(200)
- self.set_header("Content-Type", "application/json")
- await self.finish(pipeline_properties_json)
- class PipelineComponentPropertiesHandler(HttpErrorMixin, APIHandler):
- """Handler to expose method calls to retrieve pipeline component_id properties"""
- def get_mimetype(self, ext: Optional[str]) -> str:
- """
- Get the MIME type for the component definition content.
- """
- if ext == ".yml":
- ext = ".yaml"
- # Get mimetype from mimetypes map or built-in mimetypes package; default to plaintext
- mimetype = MIMETYPE_MAP.get(ext, mimetypes.guess_type(f"file{ext}")[0]) or "text/plain"
- return mimetype
- @web.authenticated
- async def get(self, runtime_type, component_id):
- self.log.debug(f"Retrieving pipeline component properties for component: {component_id}")
- if not component_id:
- raise web.HTTPError(400, "Missing component ID")
- runtime_processor_type = get_runtime_processor_type(runtime_type, self.log, self.request.path)
- if not runtime_processor_type:
- raise web.HTTPError(400, f"Invalid runtime type '{runtime_type}'")
- # Try to get component_id as a generic component; assigns None if id is not a generic component
- component: Optional[Component] = ComponentCache.get_generic_component(component_id)
- # Try to retrieve a runtime-type-specific component; assigns None if not found
- if not component:
- component = ComponentCache.instance().get_component(
- platform=runtime_processor_type, component_id=component_id
- )
- if not component:
- raise web.HTTPError(404, f"Component '{component_id}' not found")
- if self.request.path.endswith("/properties"):
- # Return complete set of component properties
- json_response = ComponentCache.to_canvas_properties(component)
- else:
- # Return component definition content
- json_response = json.dumps(
- {"content": component.definition, "mimeType": self.get_mimetype(component.file_extension)}
- )
- self.set_status(200)
- self.set_header("Content-Type", "application/json")
- await self.finish(json_response)
- class PipelineValidationHandler(HttpErrorMixin, APIHandler):
- """Handler to expose method calls to validate pipeline payloads for errors"""
- @web.authenticated
- async def get(self):
- msg_json = dict(title="GET requests are not supported.")
- self.write(msg_json)
- await self.flush()
- @web.authenticated
- async def post(self):
- self.log.debug("Pipeline Validation Handler now executing post request")
- pipeline_definition = self.get_json_body()
- self.log.debug(f"Pipeline payload: {pipeline_definition}")
- response = await PipelineValidationManager.instance().validate(pipeline=pipeline_definition)
- json_msg = response.to_json()
- self.set_status(200)
- self.set_header("Content-Type", "application/json")
- await self.finish(json_msg)
- class PipelineRuntimeTypesHandler(HttpErrorMixin, APIHandler):
- """Handler to get static information relative to the set of configured runtime types"""
- @web.authenticated
- async def get(self):
- self.log.debug("Retrieving active runtime information from PipelineProcessorRegistry...")
- resources: List[RuntimeTypeResources] = PipelineProcessorRegistry.instance().get_runtime_types_resources()
- runtime_types = []
- for runtime_type in resources:
- runtime_types.append(runtime_type.to_dict())
- self.set_status(200)
- self.set_header("Content-Type", "application/json")
- await self.finish({"runtime_types": runtime_types})
- class ComponentCacheHandler(HttpErrorMixin, APIHandler):
- """Handler to trigger a complete re-fresh of all component catalogs."""
- @web.authenticated
- async def put(self):
- # Validate the body
- cache_refresh = self.get_json_body()
- if "action" not in cache_refresh or cache_refresh["action"] != "refresh":
- raise web.HTTPError(400, reason="A body of {'action': 'refresh'} is required!")
- try:
- self.log.debug("Refreshing component cache for all catalog instances...")
- ComponentCache.instance().refresh()
- self.set_status(204)
- except RefreshInProgressError as ripe:
- self.set_status(409, str(ripe))
- await self.finish()
- class ComponentCacheCatalogHandler(HttpErrorMixin, APIHandler):
- """Handler to trigger a re-fresh of a single component catalog with the given name."""
- @web.authenticated
- async def put(self, catalog):
- # Validate the body
- cache_refresh = self.get_json_body()
- if "action" not in cache_refresh or cache_refresh["action"] != "refresh":
- raise web.HTTPError(400, reason="A body of {'action': 'refresh'} is required.")
- try:
- # Ensure given catalog name is a metadata instance
- catalog_instance = MetadataManager(schemaspace=ComponentCatalogs.COMPONENT_CATALOGS_SCHEMASPACE_ID).get(
- name=catalog
- )
- except MetadataNotFoundError:
- raise web.HTTPError(404, f"Catalog '{catalog}' cannot be found.")
- self.log.debug(f"Refreshing component cache for catalog with name '{catalog}'...")
- ComponentCache.instance().update(catalog=catalog_instance, action="modify")
- self.set_status(204)
- await self.finish()
|