test_airflow_package_connector.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import io
  17. from pathlib import Path
  18. import zipfile
  19. from elyra.pipeline.airflow.package_catalog_connector.airflow_package_catalog_connector import (
  20. AirflowPackageCatalogConnector, # noqa:H301
  21. )
  22. from elyra.pipeline.catalog_connector import AirflowEntryData
  23. AIRFLOW_1_10_15_PKG_URL = (
  24. "https://files.pythonhosted.org/packages/f0/3a/"
  25. "f5ce74b2bdbbe59c925bb3398ec0781b66a64b8a23e2f6adc7ab9f1005d9/"
  26. "apache_airflow-1.10.15-py2.py3-none-any.whl"
  27. )
  28. AIRFLOW_SUPPORTED_FILE_TYPES = [".py"]
  29. def test_empty_workdir():
  30. """
  31. Verify that the workdir isn't set by default. (The property is only set
  32. after 'get_catalog_entries' was invoked.)
  33. """
  34. apc = AirflowPackageCatalogConnector(AIRFLOW_SUPPORTED_FILE_TYPES)
  35. assert hasattr(apc, "tmp_archive_dir") is False
  36. apc.get_hash_keys()
  37. assert hasattr(apc, "tmp_archive_dir") is False
  38. cd = apc.get_entry_data({"file": "dummyfile"}, {})
  39. assert cd is None
  40. assert hasattr(apc, "tmp_archive_dir") is False
  41. def test_get_hash_keys():
  42. """
  43. Verify that `get_hash_keys` returns the expected hash key.
  44. """
  45. hk = AirflowPackageCatalogConnector.get_hash_keys()
  46. assert len(hk) == 1
  47. assert hk[0] == "file"
  48. def test_invalid_download_input(requests_mock):
  49. """
  50. Test invalid input scenarios.
  51. """
  52. apc = AirflowPackageCatalogConnector(AIRFLOW_SUPPORTED_FILE_TYPES)
  53. # Input is an invalid URL/host.
  54. requests_mock.get("http://no.such.host/a-file", real_http=True)
  55. ce = apc.get_catalog_entries({"airflow_package_download_url": "http://no.such.host/a-file"})
  56. assert len(ce) == 0
  57. # Input is a valid URL but does not include a filename.
  58. requests_mock.get("http://server.domain.com", text="a-file-content")
  59. ce = apc.get_catalog_entries({"airflow_package_download_url": "http://server.domain.com/"})
  60. assert len(ce) == 0
  61. # Input is a valid URL, but does not return a ZIP-compressed file.
  62. requests_mock.get("http://server.domain.com/a-file", text="another-file-content")
  63. ce = apc.get_catalog_entries({"airflow_package_download_url": "http://server.domain.com/a-file"})
  64. assert len(ce) == 0
  65. # Input is a valid URL and a ZIP-compressed file, but is not an Airflow built distribution.
  66. zip_buffer = io.BytesIO()
  67. with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED) as zip_file:
  68. zip_file.writestr("dummy_file.txt", "I am a dummy file, living in a ZIP archive.")
  69. requests_mock.get("http://server.domain.com/a-zip-file", content=zip_buffer.getvalue())
  70. ce = apc.get_catalog_entries({"airflow_package_download_url": "http://server.domain.com/a-zip-file"})
  71. assert len(ce) == 0
  72. def test_invalid_get_entry_data():
  73. """
  74. Validate that AirflowPackageCatalogConnector.get_entry_data(...) returns
  75. the expected results for invalid inputs
  76. """
  77. apc = AirflowPackageCatalogConnector(AIRFLOW_SUPPORTED_FILE_TYPES)
  78. # Test invalid "file://" inputs ...
  79. # ... input refers to a directory
  80. resource_location = Path(__file__).parent / ".." / "resources" / "components"
  81. resource_url = resource_location.as_uri()
  82. ce = apc.get_catalog_entries({"airflow_package_download_url": resource_url, "display_name": "file://is-a-dir-test"})
  83. assert isinstance(ce, list), resource_url
  84. assert len(ce) == 0
  85. # ... input refers to a non-existing whl file
  86. resource_location = Path(__file__).parent / ".." / "resources" / "components" / "no-such.whl"
  87. resource_url = resource_location.as_uri()
  88. ce = apc.get_catalog_entries(
  89. {"airflow_package_download_url": resource_url, "display_name": "file://no-such-file-test"}
  90. )
  91. assert isinstance(ce, list), resource_url
  92. assert len(ce) == 0
  93. # -----------------------------------
  94. # Long running test(s)
  95. # ----------------------------------
  96. def test_1_10_15_distribution():
  97. """
  98. Test connector using Apache Airflow 1.10.15 built distribution.
  99. """
  100. apc = AirflowPackageCatalogConnector(AIRFLOW_SUPPORTED_FILE_TYPES)
  101. # get catalog entries for the specified distribution
  102. ces = apc.get_catalog_entries({"airflow_package_download_url": AIRFLOW_1_10_15_PKG_URL})
  103. # this distribution should contain 37 Python scripts with operator definitions
  104. assert len(ces) == 37
  105. # each entry must contain two keys
  106. for entry in ces:
  107. # built distribution package file name
  108. assert entry.get("airflow_package") == "apache_airflow-1.10.15-py2.py3-none-any.whl"
  109. # a Python script
  110. assert entry.get("file", "").endswith(".py")
  111. # fetch and validate the first entry
  112. ce = apc.get_entry_data({"file": ces[0]["file"]}, {})
  113. assert ce is not None
  114. assert isinstance(ce, AirflowEntryData)
  115. assert ce.definition is not None
  116. assert ce.package_name.startswith("airflow.operators.")