123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- #
- # Copyright 2018-2022 Elyra Authors
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- import io
- from pathlib import Path
- import zipfile
- from elyra.pipeline.airflow.package_catalog_connector.airflow_package_catalog_connector import (
- AirflowPackageCatalogConnector, # noqa:H301
- )
- from elyra.pipeline.catalog_connector import AirflowEntryData
- AIRFLOW_1_10_15_PKG_URL = (
- "https://files.pythonhosted.org/packages/f0/3a/"
- "f5ce74b2bdbbe59c925bb3398ec0781b66a64b8a23e2f6adc7ab9f1005d9/"
- "apache_airflow-1.10.15-py2.py3-none-any.whl"
- )
- AIRFLOW_SUPPORTED_FILE_TYPES = [".py"]
- def test_empty_workdir():
- """
- Verify that the workdir isn't set by default. (The property is only set
- after 'get_catalog_entries' was invoked.)
- """
- apc = AirflowPackageCatalogConnector(AIRFLOW_SUPPORTED_FILE_TYPES)
- assert hasattr(apc, "tmp_archive_dir") is False
- apc.get_hash_keys()
- assert hasattr(apc, "tmp_archive_dir") is False
- cd = apc.get_entry_data({"file": "dummyfile"}, {})
- assert cd is None
- assert hasattr(apc, "tmp_archive_dir") is False
- def test_get_hash_keys():
- """
- Verify that `get_hash_keys` returns the expected hash key.
- """
- hk = AirflowPackageCatalogConnector.get_hash_keys()
- assert len(hk) == 1
- assert hk[0] == "file"
- def test_invalid_download_input(requests_mock):
- """
- Test invalid input scenarios.
- """
- apc = AirflowPackageCatalogConnector(AIRFLOW_SUPPORTED_FILE_TYPES)
- # Input is an invalid URL/host.
- requests_mock.get("http://no.such.host/a-file", real_http=True)
- ce = apc.get_catalog_entries({"airflow_package_download_url": "http://no.such.host/a-file"})
- assert len(ce) == 0
- # Input is a valid URL but does not include a filename.
- requests_mock.get("http://server.domain.com", text="a-file-content")
- ce = apc.get_catalog_entries({"airflow_package_download_url": "http://server.domain.com/"})
- assert len(ce) == 0
- # Input is a valid URL, but does not return a ZIP-compressed file.
- requests_mock.get("http://server.domain.com/a-file", text="another-file-content")
- ce = apc.get_catalog_entries({"airflow_package_download_url": "http://server.domain.com/a-file"})
- assert len(ce) == 0
- # Input is a valid URL and a ZIP-compressed file, but is not an Airflow built distribution.
- zip_buffer = io.BytesIO()
- with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED) as zip_file:
- zip_file.writestr("dummy_file.txt", "I am a dummy file, living in a ZIP archive.")
- requests_mock.get("http://server.domain.com/a-zip-file", content=zip_buffer.getvalue())
- ce = apc.get_catalog_entries({"airflow_package_download_url": "http://server.domain.com/a-zip-file"})
- assert len(ce) == 0
- def test_invalid_get_entry_data():
- """
- Validate that AirflowPackageCatalogConnector.get_entry_data(...) returns
- the expected results for invalid inputs
- """
- apc = AirflowPackageCatalogConnector(AIRFLOW_SUPPORTED_FILE_TYPES)
- # Test invalid "file://" inputs ...
- # ... input refers to a directory
- resource_location = Path(__file__).parent / ".." / "resources" / "components"
- resource_url = resource_location.as_uri()
- ce = apc.get_catalog_entries({"airflow_package_download_url": resource_url, "display_name": "file://is-a-dir-test"})
- assert isinstance(ce, list), resource_url
- assert len(ce) == 0
- # ... input refers to a non-existing whl file
- resource_location = Path(__file__).parent / ".." / "resources" / "components" / "no-such.whl"
- resource_url = resource_location.as_uri()
- ce = apc.get_catalog_entries(
- {"airflow_package_download_url": resource_url, "display_name": "file://no-such-file-test"}
- )
- assert isinstance(ce, list), resource_url
- assert len(ce) == 0
- # -----------------------------------
- # Long running test(s)
- # ----------------------------------
- def test_1_10_15_distribution():
- """
- Test connector using Apache Airflow 1.10.15 built distribution.
- """
- apc = AirflowPackageCatalogConnector(AIRFLOW_SUPPORTED_FILE_TYPES)
- # get catalog entries for the specified distribution
- ces = apc.get_catalog_entries({"airflow_package_download_url": AIRFLOW_1_10_15_PKG_URL})
- # this distribution should contain 37 Python scripts with operator definitions
- assert len(ces) == 37
- # each entry must contain two keys
- for entry in ces:
- # built distribution package file name
- assert entry.get("airflow_package") == "apache_airflow-1.10.15-py2.py3-none-any.whl"
- # a Python script
- assert entry.get("file", "").endswith(".py")
- # fetch and validate the first entry
- ce = apc.get_entry_data({"file": ces[0]["file"]}, {})
- assert ce is not None
- assert isinstance(ce, AirflowEntryData)
- assert ce.definition is not None
- assert ce.package_name.startswith("airflow.operators.")
|