test_airflow_provider_package_connector.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import io
  17. from pathlib import Path
  18. from tempfile import TemporaryDirectory
  19. from urllib.parse import urlparse
  20. import zipfile
  21. from requests import get
  22. from elyra.pipeline.airflow.provider_package_catalog_connector.airflow_provider_package_catalog_connector import (
  23. AirflowProviderPackageCatalogConnector, # noqa: H301
  24. )
  25. from elyra.pipeline.catalog_connector import AirflowEntryData
  26. HTTP_PROVIDER_PKG_URL = (
  27. "https://files.pythonhosted.org/packages/a1/08/"
  28. "91653e9f394cbefe356ac07db809be7e69cc89b094379ad91d6cef3d2bc9/"
  29. "apache_airflow_providers_http-2.0.2-py3-none-any.whl"
  30. )
  31. AIRFLOW_SUPPORTED_FILE_TYPES = [".py"]
  32. def test_empty_workdir():
  33. """
  34. Verify that the workdir isn't set by default. (The property is only set
  35. after 'get_catalog_entries' was invoked.)
  36. """
  37. appc = AirflowProviderPackageCatalogConnector(AIRFLOW_SUPPORTED_FILE_TYPES)
  38. assert hasattr(appc, "tmp_archive_dir") is False
  39. appc.get_hash_keys()
  40. assert hasattr(appc, "tmp_archive_dir") is False
  41. cd = appc.get_entry_data({"file": "dummyfile"}, {})
  42. assert cd is None
  43. assert hasattr(appc, "tmp_archive_dir") is False
  44. def test_get_hash_keys():
  45. """
  46. Verify that `get_hash_keys` returns the expected hash key.
  47. """
  48. hk = AirflowProviderPackageCatalogConnector.get_hash_keys()
  49. assert len(hk) == 2
  50. assert hk[0] == "provider"
  51. assert hk[1] == "file"
  52. def test_invalid_download_input(requests_mock):
  53. """
  54. Test invalid input scenarios.
  55. """
  56. appc = AirflowProviderPackageCatalogConnector(AIRFLOW_SUPPORTED_FILE_TYPES)
  57. # Input is an invalid URL/host.
  58. requests_mock.get("http://no.such.host/a-file", real_http=True)
  59. ce = appc.get_catalog_entries({"airflow_provider_package_download_url": "http://no.such.host/a-file"})
  60. assert len(ce) == 0
  61. # Input is a valid URL but does not include a filename.
  62. requests_mock.get("http://server.domain.com", text="a-file-content")
  63. ce = appc.get_catalog_entries({"airflow_provider_package_download_url": "http://server.domain.com/"})
  64. assert len(ce) == 0
  65. # Input is a valid URL but but does not return a ZIP-compressed file.
  66. requests_mock.get("http://server.domain.com/a-file", text="a-file-content")
  67. ce = appc.get_catalog_entries({"airflow_provider_package_download_url": "http://server.domain.com/a-file"})
  68. assert len(ce) == 0
  69. # Input is a valid URL and a ZIP-compressed file, but not a provider package
  70. zip_buffer = io.BytesIO()
  71. with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED) as zip_file:
  72. zip_file.writestr("dummy_file.txt", "I am a dummy file, living in a ZIP archive.")
  73. requests_mock.get("http://server.domain.com/a-zip-file", content=zip_buffer.getvalue())
  74. ce = appc.get_catalog_entries({"airflow_provider_package_download_url": "http://server.domain.com/a-zip-file"})
  75. assert len(ce) == 0
  76. def test_invalid_input_get_catalog_entries():
  77. """
  78. Validate that AirflowProviderPackageCatalogConnector.get_catalog_entries(...) returns
  79. the expected results for invalid inputs
  80. """
  81. apc = AirflowProviderPackageCatalogConnector(AIRFLOW_SUPPORTED_FILE_TYPES)
  82. # Test invalid "file://" inputs ...
  83. # ... input refers to a directory
  84. resource_location = Path(__file__).parent / ".." / "resources" / "components"
  85. resource_url = resource_location.as_uri()
  86. ce = apc.get_catalog_entries(
  87. {"airflow_provider_package_download_url": resource_url, "display_name": "file://is-a-dir-test"}
  88. )
  89. assert isinstance(ce, list), resource_url
  90. assert len(ce) == 0
  91. # ... input refers to a non-existing whl file
  92. resource_location = Path(__file__).parent / ".." / "resources" / "components" / "no-such.whl"
  93. resource_url = resource_location.as_uri()
  94. ce = apc.get_catalog_entries(
  95. {"airflow_provider_package_download_url": resource_url, "display_name": "file://no-such-file-test"}
  96. )
  97. assert isinstance(ce, list), resource_url
  98. assert len(ce) == 0
  99. # -----------------------------------
  100. # Long running test(s)
  101. # ----------------------------------
  102. def test_valid_url_http_provider_package():
  103. """
  104. Test connector using HTTP provider package
  105. """
  106. appc = AirflowProviderPackageCatalogConnector(AIRFLOW_SUPPORTED_FILE_TYPES)
  107. # get catalog entries for the specified provider package
  108. ces = appc.get_catalog_entries({"airflow_provider_package_download_url": HTTP_PROVIDER_PKG_URL})
  109. # this package should contain 1 Python scripts with operator definitions
  110. assert len(ces) == 1
  111. # each entry must contain three keys
  112. for entry in ces:
  113. # provider package file name
  114. assert entry.get("provider_package") == "apache_airflow_providers_http-2.0.2-py3-none-any.whl"
  115. # provider name
  116. assert entry.get("provider") == "apache_airflow_providers_http"
  117. # a Python script
  118. assert entry.get("file", "").endswith(".py")
  119. # fetch and validate the first entry
  120. ce = appc.get_entry_data({"file": ces[0]["file"]}, {})
  121. assert ce is not None
  122. assert isinstance(ce, AirflowEntryData)
  123. assert ce.definition is not None
  124. assert ce.package_name == "airflow.providers.http.operators.http"
  125. def test_valid_file_http_provider_package():
  126. """
  127. Test connector using a local copy of the HTTP provider package
  128. """
  129. # Download the test provider package and store it in the local file system
  130. resp = get(HTTP_PROVIDER_PKG_URL)
  131. assert resp.status_code == 200
  132. with TemporaryDirectory() as dirpath:
  133. local_file_copy = Path(dirpath) / Path(urlparse(HTTP_PROVIDER_PKG_URL).path).name
  134. with open(local_file_copy, "wb") as downloaded_package:
  135. downloaded_package.write(resp.content)
  136. local_provider_package_file_copy_url = local_file_copy.as_uri()
  137. appc = AirflowProviderPackageCatalogConnector(AIRFLOW_SUPPORTED_FILE_TYPES)
  138. # get catalog entries for the specified provider package
  139. ces = appc.get_catalog_entries(
  140. {
  141. "airflow_provider_package_download_url": local_provider_package_file_copy_url,
  142. "display_name": "file://local-provider-package",
  143. }
  144. )
  145. # this package should contain 1 Python script with operator definitions
  146. assert len(ces) == 1
  147. # each entry must contain three keys
  148. for entry in ces:
  149. # provider package file name
  150. assert entry.get("provider_package") == Path(urlparse(HTTP_PROVIDER_PKG_URL).path).name
  151. # provider name
  152. assert entry.get("provider") == "apache_airflow_providers_http"
  153. # a Python script
  154. assert entry.get("file", "").endswith(".py")
  155. # fetch and validate the first entry
  156. ce = appc.get_entry_data({"file": ces[0]["file"]}, {})
  157. assert ce is not None
  158. assert isinstance(ce, AirflowEntryData)
  159. assert ce.definition is not None
  160. assert ce.package_name == "airflow.providers.http.operators.http"