conftest.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import pytest
  17. from elyra.metadata.metadata import Metadata
  18. from elyra.metadata.schemaspaces import ComponentCatalogs
  19. from elyra.metadata.manager import MetadataManager
  20. from elyra.pipeline.component_catalog import ComponentCache
  21. pytest_plugins = ["jupyter_server.pytest_plugin"]
  22. TEST_CATALOG_NAME = "new_test_catalog"
  23. KFP_COMPONENT_CACHE_INSTANCE = {
  24. "display_name": "KFP Example Components",
  25. "metadata": {"runtime_type": "KUBEFLOW_PIPELINES", "categories": ["examples"]},
  26. "schema_name": "elyra-kfp-examples-catalog",
  27. }
  28. AIRFLOW_COMPONENT_CACHE_INSTANCE = {
  29. "display_name": "Airflow Example Components",
  30. "metadata": {
  31. "runtime_type": "APACHE_AIRFLOW",
  32. "categories": ["examples"],
  33. "paths": [
  34. "https://raw.githubusercontent.com/elyra-ai/elyra/master/elyra/tests/pipeline/resources/components/bash_operator.py" # noqa
  35. ],
  36. },
  37. "schema_name": "url-catalog",
  38. }
  39. @pytest.fixture
  40. def component_cache(jp_environ):
  41. """
  42. Initialize a component cache
  43. """
  44. # Create new instance and load the cache
  45. component_cache = ComponentCache.instance(emulate_server_app=True)
  46. component_cache.load()
  47. yield component_cache
  48. component_cache.cache_manager.stop()
  49. ComponentCache.clear_instance()
  50. @pytest.fixture
  51. def catalog_instance(component_cache, request):
  52. """Creates an instance of a component catalog and removes after test."""
  53. instance_metadata = request.param
  54. instance_name = "component_cache"
  55. md_mgr = MetadataManager(schemaspace=ComponentCatalogs.COMPONENT_CATALOGS_SCHEMASPACE_ID)
  56. catalog = md_mgr.create(instance_name, Metadata(**instance_metadata))
  57. component_cache.wait_for_all_cache_tasks()
  58. yield catalog
  59. md_mgr.remove(instance_name)
  60. @pytest.fixture
  61. def metadata_manager_with_teardown(jp_environ):
  62. """
  63. This fixture provides a MetadataManager instance for certain tests that modify the component
  64. catalog. This ensures the catalog instance is removed even when the test fails
  65. """
  66. metadata_manager = MetadataManager(schemaspace=ComponentCatalogs.COMPONENT_CATALOGS_SCHEMASPACE_ID)
  67. # Run test with provided metadata manager
  68. yield metadata_manager
  69. # Remove test catalog
  70. try:
  71. if metadata_manager.get(TEST_CATALOG_NAME):
  72. metadata_manager.remove(TEST_CATALOG_NAME)
  73. except Exception:
  74. pass
  75. # Set Elyra server extension as enabled (overriding server_config fixture from jupyter_server)
  76. @pytest.fixture
  77. def jp_server_config():
  78. return {"ServerApp": {"jpserver_extensions": {"elyra": True}}}