test_component_parser_airflow.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import json
  17. import os
  18. from subprocess import CompletedProcess
  19. from subprocess import run
  20. from conftest import AIRFLOW_TEST_OPERATOR_CATALOG
  21. from conftest import TEST_CATALOG_NAME
  22. import jupyter_core.paths
  23. import pytest
  24. from elyra.metadata.metadata import Metadata
  25. from elyra.pipeline.catalog_connector import CatalogEntry
  26. from elyra.pipeline.catalog_connector import FilesystemComponentCatalogConnector
  27. from elyra.pipeline.catalog_connector import UrlComponentCatalogConnector
  28. from elyra.pipeline.component import ComponentParser
  29. from elyra.pipeline.component_catalog import ComponentCache
  30. from elyra.pipeline.component_metadata import ComponentCatalogMetadata
  31. from elyra.pipeline.runtime_type import RuntimeProcessorType
  32. COMPONENT_CATALOG_DIRECTORY = os.path.join(jupyter_core.paths.ENV_JUPYTER_PATH[0], "components")
  33. RUNTIME_PROCESSOR = RuntimeProcessorType.APACHE_AIRFLOW
  34. @pytest.fixture
  35. def invalid_url(request):
  36. return request.param
  37. def _get_resource_path(filename):
  38. root = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
  39. resource_path = os.path.join(root, "..", "..", "..", "tests/pipeline", "resources", "components", filename)
  40. resource_path = os.path.normpath(resource_path)
  41. return resource_path
  42. @pytest.mark.parametrize("catalog_instance", [AIRFLOW_TEST_OPERATOR_CATALOG], indirect=True)
  43. def test_component_catalog_can_load_components_from_registries(catalog_instance, component_cache):
  44. components = component_cache.get_all_components(RUNTIME_PROCESSOR)
  45. assert len(components) > 0
  46. @pytest.mark.parametrize("create_inprocess", [True, False])
  47. async def test_modify_component_catalogs(component_cache, metadata_manager_with_teardown, create_inprocess):
  48. # Get initial set of components
  49. initial_components = component_cache.get_all_components(RUNTIME_PROCESSOR)
  50. # Create new registry instance with a single URL-based component
  51. urls = [
  52. "https://raw.githubusercontent.com/elyra-ai/elyra/main/elyra/tests/pipeline/resources/components/"
  53. "airflow_test_operator.py"
  54. ]
  55. instance_metadata = {
  56. "description": "A test registry",
  57. "runtime_type": RUNTIME_PROCESSOR.name,
  58. "categories": ["New Components"],
  59. "paths": urls,
  60. }
  61. registry_instance = Metadata(
  62. schema_name="url-catalog", name=TEST_CATALOG_NAME, display_name="New Test Registry", metadata=instance_metadata
  63. )
  64. if create_inprocess:
  65. metadata_manager_with_teardown.create(TEST_CATALOG_NAME, registry_instance)
  66. else:
  67. res: CompletedProcess = run(
  68. [
  69. "elyra-metadata",
  70. "install",
  71. "component-catalogs",
  72. f"--schema_name={registry_instance.schema_name}",
  73. f"--json={registry_instance.to_json()}",
  74. f"--name={TEST_CATALOG_NAME}",
  75. ]
  76. )
  77. assert res.returncode == 0
  78. # Wait for update to complete
  79. component_cache.wait_for_all_cache_tasks()
  80. # Get new set of components from all active registries, including added test registry
  81. components_after_create = component_cache.get_all_components(RUNTIME_PROCESSOR)
  82. assert len(components_after_create) == len(initial_components) + 3
  83. added_component_names = [component.name for component in components_after_create]
  84. assert "TestOperator" in added_component_names
  85. assert "TestOperatorNoInputs" not in added_component_names
  86. # Modify the test registry to add an additional path to
  87. urls.append(
  88. "https://raw.githubusercontent.com/elyra-ai/elyra/main/elyra/tests/pipeline/resources/components"
  89. "/airflow_test_operator_no_inputs.py"
  90. )
  91. metadata_manager_with_teardown.update(TEST_CATALOG_NAME, registry_instance)
  92. # Wait for update to complete
  93. component_cache.wait_for_all_cache_tasks()
  94. # Get set of components from all active registries, including modified test registry
  95. components_after_update = component_cache.get_all_components(RUNTIME_PROCESSOR)
  96. assert len(components_after_update) == len(initial_components) + 4
  97. modified_component_names = [component.name for component in components_after_update]
  98. assert "TestOperator" in modified_component_names
  99. assert "TestOperatorNoInputs" in modified_component_names
  100. # Delete the test registry
  101. metadata_manager_with_teardown.remove(TEST_CATALOG_NAME)
  102. # Wait for update to complete
  103. component_cache.wait_for_all_cache_tasks()
  104. # Check that components remaining after delete are the same as before the new catalog was added
  105. components_after_remove = component_cache.get_all_components(RUNTIME_PROCESSOR)
  106. assert len(components_after_remove) == len(initial_components)
  107. @pytest.mark.parametrize("create_inprocess", [True, False])
  108. async def test_directory_based_component_catalog(component_cache, metadata_manager_with_teardown, create_inprocess):
  109. # Get initial set of components
  110. initial_components = component_cache.get_all_components(RUNTIME_PROCESSOR)
  111. # Create new directory-based registry instance with components in ../../test/resources/components
  112. registry_path = _get_resource_path("")
  113. instance_metadata = {
  114. "description": "A test registry",
  115. "runtime_type": RUNTIME_PROCESSOR.name,
  116. "categories": ["New Components"],
  117. "paths": [registry_path],
  118. }
  119. registry_instance = Metadata(
  120. schema_name="local-directory-catalog",
  121. name=TEST_CATALOG_NAME,
  122. display_name="New Test Registry",
  123. metadata=instance_metadata,
  124. )
  125. if create_inprocess:
  126. metadata_manager_with_teardown.create(TEST_CATALOG_NAME, registry_instance)
  127. else:
  128. res: CompletedProcess = run(
  129. [
  130. "elyra-metadata",
  131. "install",
  132. "component-catalogs",
  133. f"--schema_name={registry_instance.schema_name}",
  134. f"--json={registry_instance.to_json()}",
  135. f"--name={TEST_CATALOG_NAME}",
  136. ]
  137. )
  138. assert res.returncode == 0
  139. # Wait for update to complete
  140. component_cache.wait_for_all_cache_tasks()
  141. # Get new set of components from all active registries, including added test registry
  142. components_after_create = component_cache.get_all_components(RUNTIME_PROCESSOR)
  143. assert len(components_after_create) == len(initial_components) + 5
  144. # Check that all relevant components from the new registry have been added
  145. added_component_names = [component.name for component in components_after_create]
  146. assert "TestOperator" in added_component_names
  147. assert "TestOperatorNoInputs" in added_component_names
  148. # Delete the test registry and wait for updates to complete
  149. metadata_manager_with_teardown.remove(TEST_CATALOG_NAME)
  150. component_cache.wait_for_all_cache_tasks()
  151. def test_parse_airflow_component_file():
  152. # Define the appropriate reader for a filesystem-type component definition
  153. airflow_supported_file_types = [".py"]
  154. reader = FilesystemComponentCatalogConnector(airflow_supported_file_types)
  155. # Read contents of given path
  156. path = _get_resource_path("airflow_test_operator.py")
  157. catalog_entry_data = {"path": path}
  158. # Construct a catalog instance
  159. catalog_type = "local-file-catalog"
  160. catalog_instance = ComponentCatalogMetadata(
  161. schema_name=catalog_type, metadata={"categories": ["Test"], "runtime_type": RUNTIME_PROCESSOR.name}
  162. )
  163. # Build the catalog entry data structures required for parsing
  164. entry_data = reader.get_entry_data(catalog_entry_data, {})
  165. catalog_entry = CatalogEntry(entry_data, catalog_entry_data, catalog_instance, ["path"])
  166. # Parse the component entry
  167. parser = ComponentParser.create_instance(platform=RUNTIME_PROCESSOR)
  168. components = parser.parse(catalog_entry)
  169. assert len(components) == 3 # TestOperator, DeriveFromTestOperator, and DeriveFromImportedOperator
  170. # Split components list into its constituent operators
  171. components = sorted(components, key=lambda component: component.id)
  172. import_test_op, derive_test_op, test_op = components[0], components[1], components[2]
  173. # Helper method to retrieve the requested parameter value from the dictionary
  174. def get_parameter_value(param_name):
  175. param_name = f"elyra_{param_name}" # add elyra_prefix to param name
  176. property_dict = properties_json["current_parameters"][param_name]
  177. return property_dict[property_dict["activeControl"]]
  178. # Helper method to retrieve the requested parameter info from the dictionary
  179. def get_parameter_format(param_name, control_id="StringControl"):
  180. param_info = None
  181. param_name = f"elyra_{param_name}" # add elyra_prefix to param name
  182. for prop_info in properties_json["uihints"]["parameter_info"]:
  183. if prop_info.get("parameter_ref") == param_name:
  184. param_info = prop_info["data"]["controls"][control_id]["format"]
  185. break
  186. return param_info
  187. # Helper method to retrieve the requested parameter description from the dictionary
  188. def get_parameter_description(param_name):
  189. param_desc = None
  190. param_name = f"elyra_{param_name}" # add elyra_prefix to param name
  191. for prop_info in properties_json["uihints"]["parameter_info"]:
  192. if prop_info.get("parameter_ref") == param_name:
  193. param_desc = prop_info["description"]["default"]
  194. break
  195. return param_desc
  196. # Helper method to retrieve whether the requested parameter is required
  197. def get_parameter_required(param_name):
  198. param_info = None
  199. param_name = f"elyra_{param_name}" # add elyra_prefix to param name
  200. for prop_info in properties_json["uihints"]["parameter_info"]:
  201. if prop_info.get("parameter_ref") == param_name:
  202. param_info = prop_info["data"]["required"]
  203. break
  204. return param_info
  205. # Retrieve properties for TestOperator
  206. # Test Operator does not include type hints for the init function args
  207. properties_json = ComponentCache.to_canvas_properties(test_op)
  208. # Ensure system parameters are not prefixed and hold correct values
  209. assert properties_json["current_parameters"]["label"] == ""
  210. component_source = json.dumps({"catalog_type": catalog_type, "component_ref": catalog_entry.entry_reference})
  211. assert properties_json["current_parameters"]["component_source"] == component_source
  212. # Ensure component parameters are prefixed with 'elyra_' and values are as expected
  213. assert get_parameter_value("str_no_default") == ""
  214. assert get_parameter_value("str_default") == "default"
  215. assert get_parameter_value("str_empty") == ""
  216. assert get_parameter_value("str_not_in_docstring") == ""
  217. assert get_parameter_value("bool_no_default") is False
  218. assert get_parameter_value("bool_default_false") is False
  219. assert get_parameter_value("bool_default_true") is True
  220. assert get_parameter_value("bool_not_in_docstring") is False
  221. assert get_parameter_value("int_no_default") == 0
  222. assert get_parameter_value("int_default_zero") == 0
  223. assert get_parameter_value("int_default_non_zero") == 2
  224. assert get_parameter_value("int_not_in_docstring") == 3
  225. assert get_parameter_value("dict_default_is_none") == "{}" # {}
  226. assert get_parameter_value("list_default_is_none") == "[]" # []
  227. # Ensure that type information is inferred correctly for properties that
  228. # define 'unusual' types, such as 'a dictionary of lists'
  229. assert get_parameter_format("unusual_type_dict") == "dictionary"
  230. assert get_parameter_format("unusual_type_list") == "list"
  231. # TestOperator has a property, 'mounted_volumes', whose id/ref collides with
  232. # the system-defined property of the same id. In these cases, the parsed property
  233. # should be preferred to the system-defined property, which should not appear.
  234. # Here we ensure that the 'mounted_volumes' property is a string-type (as defined
  235. # in the Operator class) rather than the system-defined list-type
  236. assert get_parameter_format("mounted_volumes") == "string"
  237. # Ensure that type information falls back to string if no type hint present
  238. # and no ':type: <type info>' phrase found in docstring
  239. assert get_parameter_format("fallback_type") == "string"
  240. # Ensure component parameters are marked as required in the correct circumstances
  241. # (parameter is required if there is no default value provided or if a type hint
  242. # does not include 'Optional[...]')
  243. assert get_parameter_required("str_no_default") is True
  244. assert get_parameter_required("str_default") is False
  245. assert get_parameter_required("str_empty") is False
  246. # Ensure descriptions are rendered properly with type hint in parentheses
  247. assert (
  248. get_parameter_description("unusual_type_dict") == "a dictionary parameter with the "
  249. "phrase 'list' in type description "
  250. "(type: a dictionary of arrays)"
  251. )
  252. assert (
  253. get_parameter_description("unusual_type_list") == "a list parameter with the phrase "
  254. "'string' in type description "
  255. "(type: a list of strings)"
  256. )
  257. assert get_parameter_description("fallback_type") == "(type: str)"
  258. # Ensure that a long description with line wrapping and a backslash escape has rendered
  259. # (and hence did not raise an error during json.loads in the properties API request)
  260. parsed_description = """a string parameter with a very long description
  261. that wraps lines and also has an escaped underscore in it, as shown here: (\_) # noqa W605"""
  262. modified_description = parsed_description.replace("\n", " ") + " (type: str)" # modify desc acc. to parser rules
  263. assert get_parameter_description("long_description_property") == modified_description
  264. # Retrieve properties for DeriveFromTestOperator
  265. # DeriveFromTestOperator includes type hints for all init arguments
  266. properties_json = ComponentCache.to_canvas_properties(derive_test_op)
  267. # Ensure default values are parsed correct in the case where type hints are present
  268. assert get_parameter_value("str_default") == "default"
  269. assert get_parameter_value("bool_default") is True
  270. assert get_parameter_value("int_default") == 2
  271. # Ensure component parameters are prefixed with 'elyra_' and types are as expected
  272. # in the case when a type hint is provided (and regardless of whether or not the
  273. # parameter type is included in the docstring)
  274. assert get_parameter_format("str_no_default") == "string"
  275. assert get_parameter_format("str_default") == "string"
  276. assert get_parameter_format("str_optional_default") == "string"
  277. assert get_parameter_format("str_not_in_docstring") == "string"
  278. assert get_parameter_format("bool_no_default", "BooleanControl") == "boolean"
  279. assert get_parameter_format("bool_default", "BooleanControl") == "boolean"
  280. assert get_parameter_format("bool_not_in_docstring", "BooleanControl") == "boolean"
  281. assert get_parameter_format("int_no_default", "NumberControl") == "number"
  282. assert get_parameter_format("int_default", "NumberControl") == "number"
  283. assert get_parameter_format("int_not_in_docstring", "NumberControl") == "number"
  284. assert get_parameter_format("list_optional_default") == "list"
  285. # Ensure component parameters are marked as required in the correct circumstances
  286. assert get_parameter_required("str_no_default") is True
  287. assert get_parameter_required("str_default") is False
  288. assert get_parameter_required("str_optional_default") is False
  289. assert get_parameter_required("str_not_in_docstring") is True
  290. # Retrieve properties for DeriveFromImportedOperator
  291. # DeriveFromImportedOperator includes type hints for dictionary and
  292. # list values to test the more complex parsing required in this case
  293. properties_json = ComponentCache.to_canvas_properties(import_test_op)
  294. # Ensure component parameters are prefixed with '' and types are as expected
  295. assert get_parameter_format("dict_no_default") == "dictionary"
  296. assert get_parameter_format("dict_optional_no_default") == "dictionary"
  297. assert get_parameter_format("nested_dict_default") == "dictionary"
  298. assert get_parameter_format("dict_not_in_docstring") == "dictionary"
  299. assert get_parameter_format("list_no_default") == "list"
  300. assert get_parameter_format("list_optional_no_default") == "list"
  301. assert get_parameter_format("list_default") == "list"
  302. assert get_parameter_format("list_optional_default") == "list"
  303. assert get_parameter_format("list_not_in_docstring") == "list"
  304. assert get_parameter_value("dict_no_default") == "{}"
  305. assert get_parameter_value("list_no_default") == "[]"
  306. def test_parse_airflow_component_url():
  307. # Define the appropriate reader for a URL-type component definition
  308. airflow_supported_file_types = [".py"]
  309. reader = UrlComponentCatalogConnector(airflow_supported_file_types)
  310. # Read contents of given path
  311. url = "https://raw.githubusercontent.com/elyra-ai/elyra/main/elyra/tests/pipeline/resources/components/airflow_test_operator.py" # noqa: E501
  312. catalog_entry_data = {"url": url}
  313. # Construct a catalog instance
  314. catalog_type = "url-catalog"
  315. catalog_instance = ComponentCatalogMetadata(
  316. schema_name=catalog_type, metadata={"categories": ["Test"], "runtime_type": RUNTIME_PROCESSOR.name}
  317. )
  318. # Build the catalog entry data structures required for parsing
  319. entry_data = reader.get_entry_data(catalog_entry_data, {})
  320. catalog_entry = CatalogEntry(entry_data, catalog_entry_data, catalog_instance, ["url"])
  321. # Parse the component entry
  322. parser = ComponentParser.create_instance(platform=RUNTIME_PROCESSOR)
  323. component = parser.parse(catalog_entry)[0]
  324. properties_json = ComponentCache.to_canvas_properties(component)
  325. # Ensure component parameters are prefixed, and system parameters are not, and hold correct values
  326. assert properties_json["current_parameters"]["label"] == ""
  327. # Helper method to retrieve the requested parameter value from the dictionary
  328. def get_parameter(param_name):
  329. property_dict = properties_json["current_parameters"][param_name]
  330. return property_dict[property_dict["activeControl"]]
  331. component_source = json.dumps({"catalog_type": catalog_type, "component_ref": catalog_entry.entry_reference})
  332. assert properties_json["current_parameters"]["component_source"] == component_source
  333. assert get_parameter("elyra_str_no_default") == ""
  334. assert get_parameter("elyra_bool_default_true") is True
  335. assert get_parameter("elyra_int_default_non_zero") == 2
  336. assert get_parameter("elyra_unusual_type_dict") == "{}" # {}
  337. assert get_parameter("elyra_unusual_type_list") == "[]"
  338. def test_parse_airflow_component_file_no_inputs():
  339. # Define the appropriate reader for a filesystem-type component definition
  340. airflow_supported_file_types = [".py"]
  341. reader = FilesystemComponentCatalogConnector(airflow_supported_file_types)
  342. # Read contents of given path
  343. path = _get_resource_path("airflow_test_operator_no_inputs.py")
  344. catalog_entry_data = {"path": path}
  345. # Construct a catalog instance
  346. catalog_type = "local-file-catalog"
  347. catalog_instance = ComponentCatalogMetadata(
  348. schema_name=catalog_type, metadata={"categories": ["Test"], "runtime_type": RUNTIME_PROCESSOR.name}
  349. )
  350. # Build the catalog entry data structures required for parsing
  351. entry_data = reader.get_entry_data(catalog_entry_data, {})
  352. catalog_entry = CatalogEntry(entry_data, catalog_entry_data, catalog_instance, ["path"])
  353. # Parse the component entry
  354. parser = ComponentParser.create_instance(platform=RUNTIME_PROCESSOR)
  355. no_input_op = parser.parse(catalog_entry)[0]
  356. properties_json = ComponentCache.to_canvas_properties(no_input_op)
  357. # Properties JSON should only include the six parameters common to every
  358. # component: ('label', 'component_source', 'mounted_volumes',
  359. # 'kubernetes_pod_annotations', 'kubernetes_tolerations', and 'elyra_disallow_cached_output')
  360. num_common_params = 6
  361. assert len(properties_json["current_parameters"].keys()) == num_common_params
  362. assert len(properties_json["parameters"]) == num_common_params
  363. assert len(properties_json["uihints"]["parameter_info"]) == num_common_params
  364. # Total number of groups includes one for each parameter,
  365. # plus 1 for the component_source header,
  366. # plus 1 for the 'other properties' header (that includes, e.g., mounted_volumes)
  367. # (Airflow does not include an output header since there are no formally defined outputs)
  368. num_groups = num_common_params + 2
  369. assert len(properties_json["uihints"]["group_info"][0]["group_info"]) == num_groups
  370. # Ensure that template still renders the two common parameters correctly
  371. assert properties_json["current_parameters"]["label"] == ""
  372. component_source = json.dumps({"catalog_type": catalog_type, "component_ref": catalog_entry.entry_reference})
  373. assert properties_json["current_parameters"]["component_source"] == component_source
  374. @pytest.mark.parametrize(
  375. "invalid_url",
  376. [
  377. "https://nourl.py", # test an invalid host
  378. "https://raw.githubusercontent.com/elyra-ai/elyra/main/elyra/\
  379. pipeline/tests/resources/components/invalid_file.py", # test an invalid file
  380. ],
  381. indirect=True,
  382. )
  383. async def test_parse_components_invalid_url(invalid_url):
  384. # Define the appropriate reader for a Url-type component definition
  385. airflow_supported_file_types = [".py"]
  386. reader = UrlComponentCatalogConnector(airflow_supported_file_types)
  387. # Get path to an invalid component definition file and read contents
  388. entry_data = reader.get_entry_data({"url": invalid_url}, {})
  389. assert entry_data is None