test_airflow_package_connector.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import io
  17. import zipfile
  18. from elyra.pipeline.airflow.package_catalog_connector.airflow_package_catalog_connector import (
  19. AirflowPackageCatalogConnector, # noqa:H301
  20. )
  21. from elyra.pipeline.catalog_connector import AirflowEntryData
  22. AIRFLOW_1_10_15_PKG_URL = (
  23. "https://files.pythonhosted.org/packages/f0/3a/"
  24. "f5ce74b2bdbbe59c925bb3398ec0781b66a64b8a23e2f6adc7ab9f1005d9/"
  25. "apache_airflow-1.10.15-py2.py3-none-any.whl"
  26. )
  27. AIRFLOW_SUPPORTED_FILE_TYPES = [".py"]
  28. def test_empty_workdir():
  29. """
  30. Verify that the workdir isn't set by default. (The property is only set
  31. after 'get_catalog_entries' was invoked.)
  32. """
  33. apc = AirflowPackageCatalogConnector(AIRFLOW_SUPPORTED_FILE_TYPES)
  34. assert hasattr(apc, "tmp_archive_dir") is False
  35. apc.get_hash_keys()
  36. assert hasattr(apc, "tmp_archive_dir") is False
  37. cd = apc.get_entry_data({"file": "dummyfile"}, {})
  38. assert cd is None
  39. assert hasattr(apc, "tmp_archive_dir") is False
  40. def test_get_hash_keys():
  41. """
  42. Verify that `get_hash_keys` returns the expected hash key.
  43. """
  44. hk = AirflowPackageCatalogConnector.get_hash_keys()
  45. assert len(hk) == 1
  46. assert hk[0] == "file"
  47. def test_invalid_download_input(requests_mock):
  48. """
  49. Test invalid input scenarios.
  50. """
  51. apc = AirflowPackageCatalogConnector(AIRFLOW_SUPPORTED_FILE_TYPES)
  52. # Input is an invalid URL/host.
  53. requests_mock.get("http://no.such.host/a-file", real_http=True)
  54. ce = apc.get_catalog_entries({"airflow_package_download_url": "http://no.such.host/a-file"})
  55. assert len(ce) == 0
  56. # Input is a valid URL but does not include a filename.
  57. requests_mock.get("http://server.domain.com", text="a-file-content")
  58. ce = apc.get_catalog_entries({"airflow_package_download_url": "http://server.domain.com/"})
  59. assert len(ce) == 0
  60. # Input is a valid URL, but does not return a ZIP-compressed file.
  61. requests_mock.get("http://server.domain.com/a-file", text="another-file-content")
  62. ce = apc.get_catalog_entries({"airflow_package_download_url": "http://server.domain.com/a-file"})
  63. assert len(ce) == 0
  64. # Input is a valid URL and a ZIP-compressed file, but is not an Airflow built distribution.
  65. zip_buffer = io.BytesIO()
  66. with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED) as zip_file:
  67. zip_file.writestr("dummy_file.txt", "I am a dummy file, living in a ZIP archive.")
  68. requests_mock.get("http://server.domain.com/a-zip-file", content=zip_buffer.getvalue())
  69. ce = apc.get_catalog_entries({"airflow_package_download_url": "http://server.domain.com/a-zip-file"})
  70. assert len(ce) == 0
  71. # -----------------------------------
  72. # Long running test(s)
  73. # ----------------------------------
  74. def test_1_10_15_distribution():
  75. """
  76. Test connector using Apache Airflow 1.10.15 built distribution.
  77. """
  78. apc = AirflowPackageCatalogConnector(AIRFLOW_SUPPORTED_FILE_TYPES)
  79. # get catalog entries for the specified distribution
  80. ces = apc.get_catalog_entries({"airflow_package_download_url": AIRFLOW_1_10_15_PKG_URL})
  81. # this distribution should contain 37 Python scripts with operator definitions
  82. assert len(ces) == 37
  83. # each entry must contain two keys
  84. for entry in ces:
  85. # built distribution package file name
  86. assert entry.get("airflow_package") == "apache_airflow-1.10.15-py2.py3-none-any.whl"
  87. # a Python script
  88. assert entry.get("file", "").endswith(".py")
  89. # fetch and validate the first entry
  90. ce = apc.get_entry_data({"file": ces[0]["file"]}, {})
  91. assert ce is not None
  92. assert isinstance(ce, AirflowEntryData)
  93. assert ce.definition is not None
  94. assert ce.package_name.startswith("airflow.operators.")