handlers.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from datetime import datetime
  17. from http.client import responses
  18. import json
  19. from logging import Logger
  20. import mimetypes
  21. from typing import List
  22. from typing import Optional
  23. from jupyter_server.base.handlers import APIHandler
  24. from jupyter_server.utils import url_path_join
  25. from tornado import web
  26. from elyra.metadata.error import MetadataNotFoundError
  27. from elyra.metadata.manager import MetadataManager
  28. from elyra.metadata.schemaspaces import ComponentCatalogs
  29. from elyra.pipeline.component import Component
  30. from elyra.pipeline.component_catalog import ComponentCache
  31. from elyra.pipeline.component_catalog import RefreshInProgressError
  32. from elyra.pipeline.parser import PipelineParser
  33. from elyra.pipeline.pipeline_definition import PipelineDefinition
  34. from elyra.pipeline.processor import PipelineProcessorManager
  35. from elyra.pipeline.processor import PipelineProcessorRegistry
  36. from elyra.pipeline.runtime_type import RuntimeProcessorType
  37. from elyra.pipeline.runtime_type import RuntimeTypeResources
  38. from elyra.pipeline.validation import PipelineValidationManager
  39. from elyra.util.http import HttpErrorMixin
  40. MIMETYPE_MAP = {".yaml": "text/x-yaml", ".py": "text/x-python", None: "text/plain"}
  41. def get_runtime_processor_type(runtime_type: str, log: Logger, request_path: str) -> Optional[RuntimeProcessorType]:
  42. """
  43. Gets the runtime processor type for the runtime type given in the request path.
  44. :param runtime_type: can be the shorthand runtime ('kfp', 'airflow') or the
  45. runtime type name ('KUBEFLOW_PIPELINES', 'APACHE_AIRFLOW') (preferred).
  46. :param log: used to log the appropriate warning for shorthand-name requests
  47. :param request_path: full request path of the endpoint
  48. :returns: the RuntimeProcessorType for the given runtime_type, or None
  49. """
  50. processor_manager = PipelineProcessorManager.instance()
  51. if processor_manager.is_supported_runtime_type(runtime_type):
  52. # The request path uses the appropriate RuntimeProcessorType name. Use this
  53. # to get the RuntimeProcessorType instance to pass to get_all_components
  54. return RuntimeProcessorType.get_instance_by_name(runtime_type)
  55. elif processor_manager.is_supported_runtime(runtime_type):
  56. # The endpoint path contains the shorthand version of a runtime (e.g., 'kfp',
  57. # 'airflow'). This case and its associated functions should eventually be removed
  58. # in favor of using the RuntimeProcessorType name in the request path.
  59. log.warning(
  60. f"Deprecation warning: when calling endpoint '{request_path}' "
  61. f"use runtime type name (e.g. 'KUBEFLOW_PIPELINES', 'APACHE_AIRFLOW') "
  62. f"instead of shorthand name (e.g., 'kfp', 'airflow')"
  63. )
  64. return processor_manager.get_runtime_type(runtime_type)
  65. return None
  66. class PipelineExportHandler(HttpErrorMixin, APIHandler):
  67. """Handler to expose REST API to export pipelines"""
  68. @web.authenticated
  69. async def get(self):
  70. msg_json = dict(title="Operation not supported.")
  71. self.set_header("Content-Type", "application/json")
  72. await self.finish(msg_json)
  73. @web.authenticated
  74. async def post(self, *args, **kwargs):
  75. self.log.debug("Pipeline Export handler now executing post request")
  76. parent = self.settings.get("elyra")
  77. payload = self.get_json_body()
  78. self.log.debug(f"JSON payload: {json.dumps(payload, indent=2, separators=(',', ': '))}")
  79. pipeline_definition = payload["pipeline"]
  80. pipeline_export_format = payload["export_format"]
  81. pipeline_export_path = payload["export_path"]
  82. pipeline_overwrite = payload["overwrite"]
  83. response = await PipelineValidationManager.instance().validate(pipeline_definition)
  84. self.log.debug(f"Validation checks completed. Results as follows: {response.to_json()}")
  85. if not response.has_fatal:
  86. pipeline = PipelineParser(root_dir=self.settings["server_root_dir"], parent=parent).parse(
  87. pipeline_definition
  88. )
  89. pipeline_exported_path = await PipelineProcessorManager.instance().export(
  90. pipeline, pipeline_export_format, pipeline_export_path, pipeline_overwrite
  91. )
  92. json_msg = json.dumps({"export_path": pipeline_export_path})
  93. self.set_status(201)
  94. self.set_header("Content-Type", "application/json")
  95. location = url_path_join(self.base_url, "api", "contents", pipeline_exported_path)
  96. self.set_header("Location", location)
  97. else:
  98. json_msg = json.dumps(
  99. {
  100. "reason": responses.get(400),
  101. "message": "Errors found in pipeline",
  102. "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
  103. "issues": response.to_json().get("issues"),
  104. }
  105. )
  106. self.set_status(400)
  107. self.set_header("Content-Type", "application/json")
  108. await self.finish(json_msg)
  109. class PipelineSchedulerHandler(HttpErrorMixin, APIHandler):
  110. """Handler to expose method calls to execute pipelines as batch jobs"""
  111. @web.authenticated
  112. async def get(self):
  113. msg_json = dict(title="Operation not supported.")
  114. self.write(msg_json)
  115. await self.flush()
  116. @web.authenticated
  117. async def post(self, *args, **kwargs):
  118. self.log.debug("Pipeline SchedulerHandler now executing post request")
  119. parent = self.settings.get("elyra")
  120. pipeline_definition = self.get_json_body()
  121. self.log.debug(f"JSON payload: {pipeline_definition}")
  122. response = await PipelineValidationManager.instance().validate(pipeline=pipeline_definition)
  123. self.log.debug(f"Validation checks completed. Results as follows: {response.to_json()}")
  124. if not response.has_fatal:
  125. self.log.debug("Processing the pipeline submission and executing request")
  126. pipeline = PipelineParser(root_dir=self.settings["server_root_dir"], parent=parent).parse(
  127. pipeline_definition
  128. )
  129. response = await PipelineProcessorManager.instance().process(pipeline)
  130. json_msg = json.dumps(response.to_json())
  131. self.set_status(200)
  132. else:
  133. json_msg = json.dumps(
  134. {
  135. "reason": responses.get(400),
  136. "message": "Errors found in pipeline",
  137. "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
  138. "issues": response.to_json().get("issues"),
  139. }
  140. )
  141. self.set_status(400)
  142. self.set_header("Content-Type", "application/json")
  143. await self.finish(json_msg)
  144. class PipelineComponentHandler(HttpErrorMixin, APIHandler):
  145. """Handler to expose method calls to retrieve pipelines editor component configuration"""
  146. @web.authenticated
  147. async def get(self, runtime_type):
  148. self.log.debug(f"Retrieving pipeline components for runtime type: {runtime_type}")
  149. runtime_processor_type = get_runtime_processor_type(runtime_type, self.log, self.request.path)
  150. if not runtime_processor_type:
  151. raise web.HTTPError(400, f"Invalid runtime type '{runtime_type}'")
  152. # Include generic components for all runtime types
  153. components: List[Component] = ComponentCache.get_generic_components()
  154. # Add additional runtime-type-specific components, if present
  155. components.extend(ComponentCache.instance().get_all_components(platform=runtime_processor_type))
  156. palette_json = ComponentCache.to_canvas_palette(components=components)
  157. self.set_status(200)
  158. self.set_header("Content-Type", "application/json")
  159. await self.finish(palette_json)
  160. class PipelinePropertiesHandler(HttpErrorMixin, APIHandler):
  161. """Handler to expose method calls to retrieve pipeline properties"""
  162. @web.authenticated
  163. async def get(self, runtime_type):
  164. self.log.debug(f"Retrieving pipeline components for runtime type: {runtime_type}")
  165. runtime_processor_type = get_runtime_processor_type(runtime_type, self.log, self.request.path)
  166. if not runtime_processor_type:
  167. raise web.HTTPError(400, f"Invalid runtime type '{runtime_type}'")
  168. # Get pipeline properties json
  169. pipeline_properties_json = PipelineDefinition.get_canvas_properties_from_template(
  170. package_name="templates/pipeline", template_name="pipeline_properties_template.jinja2"
  171. )
  172. self.set_status(200)
  173. self.set_header("Content-Type", "application/json")
  174. await self.finish(pipeline_properties_json)
  175. class PipelineComponentPropertiesHandler(HttpErrorMixin, APIHandler):
  176. """Handler to expose method calls to retrieve pipeline component_id properties"""
  177. def get_mimetype(self, ext: Optional[str]) -> str:
  178. """
  179. Get the MIME type for the component definition content.
  180. """
  181. if ext == ".yml":
  182. ext = ".yaml"
  183. # Get mimetype from mimetypes map or built-in mimetypes package; default to plaintext
  184. mimetype = MIMETYPE_MAP.get(ext, mimetypes.guess_type(f"file{ext}")[0]) or "text/plain"
  185. return mimetype
  186. @web.authenticated
  187. async def get(self, runtime_type, component_id):
  188. self.log.debug(f"Retrieving pipeline component properties for component: {component_id}")
  189. if not component_id:
  190. raise web.HTTPError(400, "Missing component ID")
  191. runtime_processor_type = get_runtime_processor_type(runtime_type, self.log, self.request.path)
  192. if not runtime_processor_type:
  193. raise web.HTTPError(400, f"Invalid runtime type '{runtime_type}'")
  194. # Try to get component_id as a generic component; assigns None if id is not a generic component
  195. component: Optional[Component] = ComponentCache.get_generic_component(component_id)
  196. # Try to retrieve a runtime-type-specific component; assigns None if not found
  197. if not component:
  198. component = ComponentCache.instance().get_component(
  199. platform=runtime_processor_type, component_id=component_id
  200. )
  201. if not component:
  202. raise web.HTTPError(404, f"Component '{component_id}' not found")
  203. if self.request.path.endswith("/properties"):
  204. # Return complete set of component properties
  205. json_response = ComponentCache.to_canvas_properties(component)
  206. else:
  207. # Return component definition content
  208. json_response = json.dumps(
  209. {"content": component.definition, "mimeType": self.get_mimetype(component.file_extension)}
  210. )
  211. self.set_status(200)
  212. self.set_header("Content-Type", "application/json")
  213. await self.finish(json_response)
  214. class PipelineValidationHandler(HttpErrorMixin, APIHandler):
  215. """Handler to expose method calls to validate pipeline payloads for errors"""
  216. @web.authenticated
  217. async def get(self):
  218. msg_json = dict(title="GET requests are not supported.")
  219. self.write(msg_json)
  220. await self.flush()
  221. @web.authenticated
  222. async def post(self):
  223. self.log.debug("Pipeline Validation Handler now executing post request")
  224. pipeline_definition = self.get_json_body()
  225. self.log.debug(f"Pipeline payload: {pipeline_definition}")
  226. response = await PipelineValidationManager.instance().validate(pipeline_definition)
  227. json_msg = response.to_json()
  228. self.set_status(200)
  229. self.set_header("Content-Type", "application/json")
  230. await self.finish(json_msg)
  231. class PipelineRuntimeTypesHandler(HttpErrorMixin, APIHandler):
  232. """Handler to get static information relative to the set of configured runtime types"""
  233. @web.authenticated
  234. async def get(self):
  235. self.log.debug("Retrieving active runtime information from PipelineProcessorRegistry...")
  236. resources: List[RuntimeTypeResources] = PipelineProcessorRegistry.instance().get_runtime_types_resources()
  237. runtime_types = []
  238. for runtime_type in resources:
  239. runtime_types.append(runtime_type.to_dict())
  240. self.set_status(200)
  241. self.set_header("Content-Type", "application/json")
  242. await self.finish({"runtime_types": runtime_types})
  243. class ComponentCacheHandler(HttpErrorMixin, APIHandler):
  244. """Handler to trigger a complete re-fresh of all component catalogs."""
  245. @web.authenticated
  246. async def put(self):
  247. # Validate the body
  248. cache_refresh = self.get_json_body()
  249. if "action" not in cache_refresh or cache_refresh["action"] != "refresh":
  250. raise web.HTTPError(400, reason="A body of {'action': 'refresh'} is required!")
  251. try:
  252. self.log.debug("Refreshing component cache for all catalog instances...")
  253. ComponentCache.instance().refresh()
  254. self.set_status(204)
  255. except RefreshInProgressError as ripe:
  256. self.set_status(409, str(ripe))
  257. await self.finish()
  258. class ComponentCacheCatalogHandler(HttpErrorMixin, APIHandler):
  259. """Handler to trigger a re-fresh of a single component catalog with the given name."""
  260. @web.authenticated
  261. async def put(self, catalog):
  262. # Validate the body
  263. cache_refresh = self.get_json_body()
  264. if "action" not in cache_refresh or cache_refresh["action"] != "refresh":
  265. raise web.HTTPError(400, reason="A body of {'action': 'refresh'} is required.")
  266. try:
  267. # Ensure given catalog name is a metadata instance
  268. catalog_instance = MetadataManager(schemaspace=ComponentCatalogs.COMPONENT_CATALOGS_SCHEMASPACE_ID).get(
  269. name=catalog
  270. )
  271. except MetadataNotFoundError:
  272. raise web.HTTPError(404, f"Catalog '{catalog}' cannot be found.")
  273. self.log.debug(f"Refreshing component cache for catalog with name '{catalog}'...")
  274. ComponentCache.instance().update(catalog=catalog_instance, action="modify")
  275. self.set_status(204)
  276. await self.finish()