test_airflow_provider_package_connector.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import io
  17. import zipfile
  18. from elyra.pipeline.airflow.provider_package_catalog_connector.airflow_provider_package_catalog_connector import (
  19. AirflowProviderPackageCatalogConnector, # noqa: H301
  20. )
  21. from elyra.pipeline.catalog_connector import AirflowEntryData
  22. HTTP_PROVIDER_PKG_URL = (
  23. "https://files.pythonhosted.org/packages/a1/08/"
  24. "91653e9f394cbefe356ac07db809be7e69cc89b094379ad91d6cef3d2bc9/"
  25. "apache_airflow_providers_http-2.0.2-py3-none-any.whl"
  26. )
  27. AIRFLOW_SUPPORTED_FILE_TYPES = [".py"]
  28. def test_empty_workdir():
  29. """
  30. Verify that the workdir isn't set by default. (The property is only set
  31. after 'get_catalog_entries' was invoked.)
  32. """
  33. appc = AirflowProviderPackageCatalogConnector(AIRFLOW_SUPPORTED_FILE_TYPES)
  34. assert hasattr(appc, "tmp_archive_dir") is False
  35. appc.get_hash_keys()
  36. assert hasattr(appc, "tmp_archive_dir") is False
  37. cd = appc.get_entry_data({"file": "dummyfile"}, {})
  38. assert cd is None
  39. assert hasattr(appc, "tmp_archive_dir") is False
  40. def test_get_hash_keys():
  41. """
  42. Verify that `get_hash_keys` returns the expected hash key.
  43. """
  44. hk = AirflowProviderPackageCatalogConnector.get_hash_keys()
  45. assert len(hk) == 2
  46. assert hk[0] == "provider"
  47. assert hk[1] == "file"
  48. def test_invalid_download_input(requests_mock):
  49. """
  50. Test invalid input scenarios.
  51. """
  52. appc = AirflowProviderPackageCatalogConnector(AIRFLOW_SUPPORTED_FILE_TYPES)
  53. # Input is an invalid URL/host.
  54. requests_mock.get("http://no.such.host/a-file", real_http=True)
  55. ce = appc.get_catalog_entries({"airflow_provider_package_download_url": "http://no.such.host/a-file"})
  56. assert len(ce) == 0
  57. # Input is a valid URL but does not include a filename.
  58. requests_mock.get("http://server.domain.com", text="a-file-content")
  59. ce = appc.get_catalog_entries({"airflow_provider_package_download_url": "http://server.domain.com/"})
  60. assert len(ce) == 0
  61. # Input is a valid URL but but does not return a ZIP-compressed file.
  62. requests_mock.get("http://server.domain.com/a-file", text="a-file-content")
  63. ce = appc.get_catalog_entries({"airflow_provider_package_download_url": "http://server.domain.com/a-file"})
  64. assert len(ce) == 0
  65. # Input is a valid URL and a ZIP-compressed file, but not a provider package
  66. zip_buffer = io.BytesIO()
  67. with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED) as zip_file:
  68. zip_file.writestr("dummy_file.txt", "I am a dummy file, living in a ZIP archive.")
  69. requests_mock.get("http://server.domain.com/a-zip-file", content=zip_buffer.getvalue())
  70. ce = appc.get_catalog_entries({"airflow_provider_package_download_url": "http://server.domain.com/a-zip-file"})
  71. assert len(ce) == 0
  72. # -----------------------------------
  73. # Long running test(s)
  74. # ----------------------------------
  75. def test_http_provider_package():
  76. """
  77. Test connector using HTTP provider package
  78. """
  79. appc = AirflowProviderPackageCatalogConnector(AIRFLOW_SUPPORTED_FILE_TYPES)
  80. # get catalog entries for the specified provider package
  81. ces = appc.get_catalog_entries({"airflow_provider_package_download_url": HTTP_PROVIDER_PKG_URL})
  82. # this package should contain 1 Python scripts with operator definitions
  83. assert len(ces) == 1
  84. # each entry must contain three keys
  85. for entry in ces:
  86. # provider package file name
  87. assert entry.get("provider_package") == "apache_airflow_providers_http-2.0.2-py3-none-any.whl"
  88. # provider name
  89. assert entry.get("provider") == "apache_airflow_providers_http"
  90. # a Python script
  91. assert entry.get("file", "").endswith(".py")
  92. # fetch and validate the first entry
  93. ce = appc.get_entry_data({"file": ces[0]["file"]}, {})
  94. assert ce is not None
  95. assert isinstance(ce, AirflowEntryData)
  96. assert ce.definition is not None
  97. assert ce.package_name == "airflow.providers.http.operators.http"