test_handlers.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import asyncio
  17. import json
  18. import os
  19. from conftest import KFP_COMPONENT_CACHE_INSTANCE
  20. import jupyter_core
  21. import pytest
  22. import requests
  23. from tornado.httpclient import HTTPClientError
  24. from elyra.metadata.metadata import Metadata
  25. from elyra.metadata.schemaspaces import ComponentCatalogs
  26. from elyra.pipeline.parser import PipelineParser
  27. from elyra.pipeline.processor import PipelineProcessorManager
  28. from elyra.pipeline.runtime_type import RuntimeProcessorType
  29. from elyra.pipeline.runtime_type import RuntimeTypeResources
  30. from elyra.pipeline.validation import PipelineValidationManager
  31. from elyra.pipeline.validation import ValidationResponse
  32. from elyra.pipeline.validation import ValidationSeverity
  33. from elyra.tests.pipeline import resources
  34. from elyra.tests.util.handlers_utils import expected_http_error
  35. try:
  36. import importlib.resources as pkg_resources
  37. except ImportError:
  38. # Try backported to PY<37 `importlib_resources`.
  39. import importlib_resources as pkg_resources
  40. COMPONENT_CATALOG_DIRECTORY = os.path.join(jupyter_core.paths.ENV_JUPYTER_PATH[0], "components")
  41. TEST_CATALOG_NAME = "test_handlers_catalog"
  42. def _async_return(result):
  43. # Helper function to return an arbitrary value when mocking awaits
  44. f = asyncio.Future()
  45. f.set_result(result)
  46. return f
  47. def _get_resource_path(filename):
  48. resource_path = os.path.join(os.path.dirname(__file__), "resources", "components", filename)
  49. resource_path = os.path.normpath(resource_path)
  50. return resource_path
  51. async def cli_catalog_instance(jp_fetch):
  52. # Create new registry instance with a single URL-based component
  53. # This is not a fixture because it needs to
  54. paths = [_get_resource_path("kfp_test_operator.yaml")]
  55. instance_metadata = {
  56. "description": "A test registry",
  57. "runtime_type": RuntimeProcessorType.KUBEFLOW_PIPELINES.name,
  58. "categories": ["New Components"],
  59. "paths": paths,
  60. }
  61. instance = Metadata(
  62. schema_name="local-file-catalog",
  63. name=TEST_CATALOG_NAME,
  64. display_name="New Test Catalog",
  65. metadata=instance_metadata,
  66. )
  67. body = json.dumps(instance.to_dict())
  68. r = await jp_fetch(
  69. "elyra", "metadata", ComponentCatalogs.COMPONENT_CATALOGS_SCHEMASPACE_ID, body=body, method="POST"
  70. )
  71. assert r.code == 201
  72. r = await jp_fetch("elyra", "metadata", ComponentCatalogs.COMPONENT_CATALOGS_SCHEMASPACE_ID)
  73. assert r.code == 200
  74. metadata = json.loads(r.body.decode())
  75. assert len(metadata) >= 1
  76. async def test_get_components(jp_fetch):
  77. # Ensure all valid components can be found
  78. runtime_type = RuntimeProcessorType.LOCAL
  79. response = await jp_fetch("elyra", "pipeline", "components", runtime_type.name)
  80. assert response.code == 200
  81. payload = json.loads(response.body.decode())
  82. palette = json.loads(pkg_resources.read_text(resources, "palette.json"))
  83. assert payload == palette
  84. async def test_get_components_runtime_name_vs_type(jp_fetch, caplog):
  85. # Ensure deprecation warning appears when querying endpoint with shorthand runtime name
  86. runtime_name = "kfp"
  87. response = await jp_fetch("elyra", "pipeline", "components", runtime_name)
  88. assert response.code == 200
  89. assert "Deprecation warning: when calling endpoint" in caplog.text
  90. caplog.clear()
  91. # Ensure no deprecation warning appears when using runtime type name. The type
  92. # is case-insensitive, e.g., a runtime type can use either lowercase 'local' or
  93. # uppercase 'LOCAL'
  94. runtime_type = RuntimeProcessorType.LOCAL # use LOCAL runtime type
  95. response = await jp_fetch("elyra", "pipeline", "components", runtime_type.name.lower()) # fetch with 'local'
  96. assert response.code == 200
  97. assert "Deprecation warning: when calling endpoint" not in caplog.text
  98. async def test_get_component_properties_config(jp_fetch):
  99. # Ensure all valid component_entry properties can be found
  100. runtime_type = RuntimeProcessorType.LOCAL
  101. response = await jp_fetch("elyra", "pipeline", "components", runtime_type.name, "notebook", "properties")
  102. assert response.code == 200
  103. payload = json.loads(response.body.decode())
  104. template = pkg_resources.read_text(resources, "generic_properties_template.jinja2")
  105. properties = json.loads(
  106. template.replace("{{ component.name }}", "Notebook").replace("{{ component.extensions|tojson }}", '[".ipynb"]')
  107. )
  108. assert payload == properties
  109. @pytest.mark.parametrize("catalog_instance", [KFP_COMPONENT_CACHE_INSTANCE], indirect=True)
  110. async def test_get_component_properties_definition(catalog_instance, jp_fetch, caplog):
  111. # Ensure the definition for a component can be found
  112. component_url = (
  113. "https://raw.githubusercontent.com/elyra-ai/examples/main/component-catalog-connectors/"
  114. "kfp-example-components-connector/kfp_examples_connector/resources/download_data.yaml"
  115. )
  116. definition = requests.get(component_url)
  117. component_id = "elyra-kfp-examples-catalog:a08014f9252f" # static id for the 'Download Data' example component
  118. # Test with shorthand runtime (e.g. 'kfp', 'airflow') (support to be removed in later release)
  119. response = await jp_fetch("elyra", "pipeline", "components", "kfp", component_id)
  120. assert response.code == 200
  121. payload = json.loads(response.body.decode())
  122. assert payload["content"] == definition.text
  123. assert payload["mimeType"] == "text/x-yaml"
  124. assert "Deprecation warning" in caplog.text
  125. caplog.clear()
  126. # Test with runtime type name in endpoint
  127. runtime_type = RuntimeProcessorType.KUBEFLOW_PIPELINES
  128. response = await jp_fetch("elyra", "pipeline", "components", runtime_type.name, component_id)
  129. assert response.code == 200
  130. payload = json.loads(response.body.decode())
  131. assert payload["content"] == definition.text
  132. assert payload["mimeType"] == "text/x-yaml"
  133. assert "Deprecation warning" not in caplog.text
  134. async def test_runtime_types_resources(jp_fetch):
  135. # Ensure appropriate runtime types resources can be fetched
  136. response = await jp_fetch("elyra", "pipeline", "runtimes", "types")
  137. assert response.code == 200
  138. resources = json.loads(response.body.decode())
  139. runtime_types = resources["runtime_types"]
  140. assert len(runtime_types) >= 1 # We should have Local for sure
  141. for runtime_type_resources in runtime_types:
  142. assert runtime_type_resources.get("id") in ["LOCAL", "KUBEFLOW_PIPELINES", "APACHE_AIRFLOW", "ARGO"]
  143. # Acquire corresponding instance and compare that results are the same
  144. runtime_type = RuntimeProcessorType.get_instance_by_name(runtime_type_resources.get("id"))
  145. resources_instance = RuntimeTypeResources.get_instance_by_type(runtime_type)
  146. assert runtime_type_resources.get("display_name") == resources_instance.display_name
  147. assert runtime_type_resources.get("export_file_types") == resources_instance.export_file_types
  148. assert runtime_type_resources.get("icon") == resources_instance.icon_endpoint
  149. async def test_double_refresh(jp_fetch):
  150. # Ensure that attempts to refresh the component cache while another is in progress result in 409
  151. await cli_catalog_instance(jp_fetch)
  152. refresh = {"action": "refresh"}
  153. body = json.dumps(refresh)
  154. response = await jp_fetch("elyra", "pipeline", "components", "cache", body=body, method="PUT")
  155. assert response.code == 204
  156. with pytest.raises(HTTPClientError) as e:
  157. await jp_fetch("elyra", "pipeline", "components", "cache", body=body, method="PUT")
  158. assert expected_http_error(e, 409)
  159. # Give the first refresh attempt a chance to complete and try again to ensure it has
  160. await asyncio.sleep(2)
  161. response = await jp_fetch("elyra", "pipeline", "components", "cache", body=body, method="PUT")
  162. assert response.code == 204
  163. async def test_malformed_refresh(jp_fetch):
  164. # Ensure that providing the endpoints with a bad body generate 400 errors.
  165. refresh = {"no-action": "refresh"}
  166. body = json.dumps(refresh)
  167. with pytest.raises(HTTPClientError) as e:
  168. await jp_fetch("elyra", "pipeline", "components", "cache", body=body, method="PUT")
  169. assert expected_http_error(e, 400)
  170. refresh = {"action": "no-refresh"}
  171. body = json.dumps(refresh)
  172. with pytest.raises(HTTPClientError) as e:
  173. await jp_fetch("elyra", "pipeline", "components", "cache", body=body, method="PUT")
  174. assert expected_http_error(e, 400)
  175. async def test_get_pipeline_properties_definition(jp_fetch):
  176. runtime_list = ["kfp", "airflow", "local"]
  177. for runtime in runtime_list:
  178. response = await jp_fetch("elyra", "pipeline", runtime, "properties")
  179. assert response.code == 200
  180. payload = json.loads(response.body.decode())
  181. # Spot check
  182. assert payload["parameters"] == [
  183. {"id": "name"},
  184. {"id": "runtime"},
  185. {"id": "description"},
  186. {"id": "cos_object_prefix"},
  187. {"id": "elyra_runtime_image"},
  188. {"id": "elyra_env_vars"},
  189. {"id": "elyra_kubernetes_secrets"},
  190. {"id": "elyra_kubernetes_tolerations"},
  191. {"id": "elyra_mounted_volumes"},
  192. {"id": "elyra_kubernetes_pod_annotations"},
  193. {"id": "elyra_disallow_cached_output"},
  194. ]
  195. async def test_pipeline_success(jp_fetch, monkeypatch):
  196. request_body = {"pipeline": "body", "export_format": "py", "export_path": "test.py", "overwrite": True}
  197. # Create a response that will trigger the valid code path
  198. validation_response = ValidationResponse()
  199. monkeypatch.setattr(PipelineValidationManager, "validate", lambda x, y: _async_return(validation_response))
  200. monkeypatch.setattr(PipelineParser, "parse", lambda x, y: "Dummy_Data")
  201. monkeypatch.setattr(PipelineProcessorManager, "export", lambda x, y, z, aa, bb: _async_return("test.py"))
  202. json_body = json.dumps(request_body)
  203. http_response = await jp_fetch("elyra", "pipeline", "export", body=json_body, method="POST")
  204. assert http_response.code == 201
  205. async def test_pipeline_failure(jp_fetch, monkeypatch):
  206. request_body = {"pipeline": "body", "export_format": "py", "export_path": "test.py", "overwrite": True}
  207. # Create a response that will trigger the fatal code path
  208. bad_validation_response = ValidationResponse()
  209. bad_validation_response.add_message(severity=ValidationSeverity.Error, message_type="invalidJSON", message="issue")
  210. monkeypatch.setattr(PipelineValidationManager, "validate", lambda x, y: _async_return(bad_validation_response))
  211. json_body = json.dumps(request_body)
  212. # Will raise HTTP error so we need to catch with pytest
  213. with pytest.raises(HTTPClientError):
  214. await jp_fetch("elyra", "pipeline", "export", body=json_body, method="POST")
  215. async def test_validation_handler(jp_fetch, monkeypatch):
  216. request_body = {"pipeline": "body", "export_format": "py", "export_path": "test.py", "overwrite": True}
  217. monkeypatch.setattr(PipelineValidationManager, "validate", lambda x, y: _async_return(ValidationResponse()))
  218. json_body = json.dumps(request_body)
  219. http_response = await jp_fetch("elyra", "pipeline", "validate", body=json_body, method="POST")
  220. assert http_response.code == 200