123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191 |
- #
- # Copyright 2018-2022 Elyra Authors
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- import io
- from pathlib import Path
- from tempfile import TemporaryDirectory
- from urllib.parse import urlparse
- import zipfile
- from requests import get
- from elyra.pipeline.airflow.provider_package_catalog_connector.airflow_provider_package_catalog_connector import (
- AirflowProviderPackageCatalogConnector, # noqa: H301
- )
- from elyra.pipeline.catalog_connector import AirflowEntryData
- HTTP_PROVIDER_PKG_URL = (
- "https://files.pythonhosted.org/packages/a1/08/"
- "91653e9f394cbefe356ac07db809be7e69cc89b094379ad91d6cef3d2bc9/"
- "apache_airflow_providers_http-2.0.2-py3-none-any.whl"
- )
- AIRFLOW_SUPPORTED_FILE_TYPES = [".py"]
- def test_empty_workdir():
- """
- Verify that the workdir isn't set by default. (The property is only set
- after 'get_catalog_entries' was invoked.)
- """
- appc = AirflowProviderPackageCatalogConnector(AIRFLOW_SUPPORTED_FILE_TYPES)
- assert hasattr(appc, "tmp_archive_dir") is False
- appc.get_hash_keys()
- assert hasattr(appc, "tmp_archive_dir") is False
- cd = appc.get_entry_data({"file": "dummyfile"}, {})
- assert cd is None
- assert hasattr(appc, "tmp_archive_dir") is False
- def test_get_hash_keys():
- """
- Verify that `get_hash_keys` returns the expected hash key.
- """
- hk = AirflowProviderPackageCatalogConnector.get_hash_keys()
- assert len(hk) == 2
- assert hk[0] == "provider"
- assert hk[1] == "file"
- def test_invalid_download_input(requests_mock):
- """
- Test invalid input scenarios.
- """
- appc = AirflowProviderPackageCatalogConnector(AIRFLOW_SUPPORTED_FILE_TYPES)
- # Input is an invalid URL/host.
- requests_mock.get("http://no.such.host/a-file", real_http=True)
- ce = appc.get_catalog_entries({"airflow_provider_package_download_url": "http://no.such.host/a-file"})
- assert len(ce) == 0
- # Input is a valid URL but does not include a filename.
- requests_mock.get("http://server.domain.com", text="a-file-content")
- ce = appc.get_catalog_entries({"airflow_provider_package_download_url": "http://server.domain.com/"})
- assert len(ce) == 0
- # Input is a valid URL but but does not return a ZIP-compressed file.
- requests_mock.get("http://server.domain.com/a-file", text="a-file-content")
- ce = appc.get_catalog_entries({"airflow_provider_package_download_url": "http://server.domain.com/a-file"})
- assert len(ce) == 0
- # Input is a valid URL and a ZIP-compressed file, but not a provider package
- zip_buffer = io.BytesIO()
- with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED) as zip_file:
- zip_file.writestr("dummy_file.txt", "I am a dummy file, living in a ZIP archive.")
- requests_mock.get("http://server.domain.com/a-zip-file", content=zip_buffer.getvalue())
- ce = appc.get_catalog_entries({"airflow_provider_package_download_url": "http://server.domain.com/a-zip-file"})
- assert len(ce) == 0
- def test_invalid_input_get_catalog_entries():
- """
- Validate that AirflowProviderPackageCatalogConnector.get_catalog_entries(...) returns
- the expected results for invalid inputs
- """
- apc = AirflowProviderPackageCatalogConnector(AIRFLOW_SUPPORTED_FILE_TYPES)
- # Test invalid "file://" inputs ...
- # ... input refers to a directory
- resource_location = Path(__file__).parent / ".." / "resources" / "components"
- resource_url = resource_location.as_uri()
- ce = apc.get_catalog_entries(
- {"airflow_provider_package_download_url": resource_url, "display_name": "file://is-a-dir-test"}
- )
- assert isinstance(ce, list), resource_url
- assert len(ce) == 0
- # ... input refers to a non-existing whl file
- resource_location = Path(__file__).parent / ".." / "resources" / "components" / "no-such.whl"
- resource_url = resource_location.as_uri()
- ce = apc.get_catalog_entries(
- {"airflow_provider_package_download_url": resource_url, "display_name": "file://no-such-file-test"}
- )
- assert isinstance(ce, list), resource_url
- assert len(ce) == 0
- # -----------------------------------
- # Long running test(s)
- # ----------------------------------
- def test_valid_url_http_provider_package():
- """
- Test connector using HTTP provider package
- """
- appc = AirflowProviderPackageCatalogConnector(AIRFLOW_SUPPORTED_FILE_TYPES)
- # get catalog entries for the specified provider package
- ces = appc.get_catalog_entries({"airflow_provider_package_download_url": HTTP_PROVIDER_PKG_URL})
- # this package should contain 1 Python scripts with operator definitions
- assert len(ces) == 1
- # each entry must contain three keys
- for entry in ces:
- # provider package file name
- assert entry.get("provider_package") == "apache_airflow_providers_http-2.0.2-py3-none-any.whl"
- # provider name
- assert entry.get("provider") == "apache_airflow_providers_http"
- # a Python script
- assert entry.get("file", "").endswith(".py")
- # fetch and validate the first entry
- ce = appc.get_entry_data({"file": ces[0]["file"]}, {})
- assert ce is not None
- assert isinstance(ce, AirflowEntryData)
- assert ce.definition is not None
- assert ce.package_name == "airflow.providers.http.operators.http"
- def test_valid_file_http_provider_package():
- """
- Test connector using a local copy of the HTTP provider package
- """
- # Download the test provider package and store it in the local file system
- resp = get(HTTP_PROVIDER_PKG_URL)
- assert resp.status_code == 200
- with TemporaryDirectory() as dirpath:
- local_file_copy = Path(dirpath) / Path(urlparse(HTTP_PROVIDER_PKG_URL).path).name
- with open(local_file_copy, "wb") as downloaded_package:
- downloaded_package.write(resp.content)
- local_provider_package_file_copy_url = local_file_copy.as_uri()
- appc = AirflowProviderPackageCatalogConnector(AIRFLOW_SUPPORTED_FILE_TYPES)
- # get catalog entries for the specified provider package
- ces = appc.get_catalog_entries(
- {
- "airflow_provider_package_download_url": local_provider_package_file_copy_url,
- "display_name": "file://local-provider-package",
- }
- )
- # this package should contain 1 Python script with operator definitions
- assert len(ces) == 1
- # each entry must contain three keys
- for entry in ces:
- # provider package file name
- assert entry.get("provider_package") == Path(urlparse(HTTP_PROVIDER_PKG_URL).path).name
- # provider name
- assert entry.get("provider") == "apache_airflow_providers_http"
- # a Python script
- assert entry.get("file", "").endswith(".py")
- # fetch and validate the first entry
- ce = appc.get_entry_data({"file": ces[0]["file"]}, {})
- assert ce is not None
- assert isinstance(ce, AirflowEntryData)
- assert ce.definition is not None
- assert ce.package_name == "airflow.providers.http.operators.http"
|