test_processor_airflow.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import os
  17. from pathlib import Path
  18. import re
  19. import string
  20. import tempfile
  21. from types import SimpleNamespace
  22. from unittest import mock
  23. from conftest import AIRFLOW_COMPONENT_CACHE_INSTANCE
  24. import github
  25. import pytest
  26. from elyra.metadata.metadata import Metadata
  27. from elyra.pipeline.airflow.processor_airflow import AirflowPipelineProcessor
  28. from elyra.pipeline.parser import PipelineParser
  29. from elyra.pipeline.pipeline import GenericOperation
  30. from elyra.pipeline.runtime_type import RuntimeProcessorType
  31. from elyra.tests.pipeline.test_pipeline_parser import _read_pipeline_resource
  32. from elyra.util.github import GithubClient
  33. PIPELINE_FILE_COMPLEX = "resources/sample_pipelines/pipeline_dependency_complex.json"
  34. PIPELINE_FILE_CUSTOM_COMPONENTS = "resources/sample_pipelines/pipeline_with_airflow_components.json"
  35. @pytest.fixture
  36. def processor(setup_factory_data):
  37. processor = AirflowPipelineProcessor(os.getcwd())
  38. return processor
  39. @pytest.fixture
  40. def parsed_pipeline(request):
  41. pipeline_resource = _read_pipeline_resource(request.param)
  42. return PipelineParser().parse(pipeline_json=pipeline_resource)
  43. @pytest.fixture
  44. def sample_metadata():
  45. return {
  46. "name": "airflow_test",
  47. "display_name": "Apache Airflow Test Endpoint",
  48. "metadata": {
  49. "github_api_endpoint": "https://api.github.com",
  50. "github_repo": "test/test-repo",
  51. "github_repo_token": "",
  52. "github_branch": "test",
  53. "api_endpoint": "http://test.example.com:30000/",
  54. "cos_endpoint": "http://test.example.com:30001/",
  55. "cos_username": "test",
  56. "cos_password": "test-password",
  57. "cos_bucket": "test-bucket",
  58. "tags": [],
  59. "user_namespace": "default",
  60. "runtime_type": "APACHE_AIRFLOW",
  61. },
  62. "schema_name": "airflow",
  63. "resource": "/User/test_directory/airflow_test.json",
  64. }
  65. @pytest.fixture
  66. def sample_image_metadata():
  67. image_one = {"image_name": "tensorflow/tensorflow:2.0.0-py3", "pull_policy": "IfNotPresent", "tags": []}
  68. image_two = {"image_name": "elyra/examples:1.0.0-py3", "pull_policy": "Always", "tags": []}
  69. mocked_runtime_images = [
  70. Metadata(name="test-image-metadata", display_name="test-image", schema_name="airflow", metadata=image_one),
  71. Metadata(name="test-image-metadata", display_name="test-image", schema_name="airflow", metadata=image_two),
  72. ]
  73. return mocked_runtime_images
  74. @pytest.fixture
  75. def parsed_ordered_dict(monkeypatch, processor, parsed_pipeline, sample_metadata, sample_image_metadata):
  76. mocked_runtime = Metadata(
  77. name="test-metadata", display_name="test", schema_name="airflow", metadata=sample_metadata["metadata"]
  78. )
  79. mocked_func = mock.Mock(return_value="default", side_effect=[mocked_runtime, sample_image_metadata])
  80. monkeypatch.setattr(processor, "_get_metadata_configuration", mocked_func)
  81. monkeypatch.setattr(processor, "_upload_dependencies_to_object_store", lambda w, x, y, prefix: True)
  82. monkeypatch.setattr(processor, "_get_dependency_archive_name", lambda x: True)
  83. monkeypatch.setattr(processor, "_verify_cos_connectivity", lambda x: True)
  84. return processor._cc_pipeline(parsed_pipeline, pipeline_name="some-name", pipeline_instance_id="some-instance-id")
  85. def read_key_pair(key_pair, sep="="):
  86. return {"key": key_pair.split(sep)[0].strip('" '), "value": key_pair.split(sep)[1].rstrip(",").strip('" ')}
  87. def string_to_list(stringed_list):
  88. return stringed_list.replace(" ", "").replace('"', "").strip("[]").split(",")
  89. def test_processor_type(processor):
  90. assert processor.type == RuntimeProcessorType.APACHE_AIRFLOW
  91. def test_fail_processor_type(processor):
  92. with pytest.raises(Exception):
  93. assert processor.type == RuntimeProcessorType.KUBEFLOW_PIPELINES
  94. @pytest.mark.parametrize("parsed_pipeline", [PIPELINE_FILE_COMPLEX], indirect=True)
  95. def test_pipeline_process(monkeypatch, processor, parsed_pipeline, sample_metadata):
  96. mocked_runtime = Metadata(
  97. name="test-metadata", display_name="test", schema_name="airflow", metadata=sample_metadata["metadata"]
  98. )
  99. mocked_path = "/some-placeholder"
  100. monkeypatch.setattr(processor, "_get_metadata_configuration", lambda schemaspace, name: mocked_runtime)
  101. monkeypatch.setattr(
  102. processor,
  103. "create_pipeline_file",
  104. lambda pipeline, pipeline_export_format, pipeline_export_path, pipeline_name, pipeline_instance_id: mocked_path,
  105. )
  106. monkeypatch.setattr(github.Github, "get_repo", lambda x, y: True)
  107. monkeypatch.setattr(GithubClient, "upload_dag", lambda x, y, z: True)
  108. response = processor.process(pipeline=parsed_pipeline)
  109. assert response.run_url == sample_metadata["metadata"]["api_endpoint"]
  110. assert response.object_storage_url == sample_metadata["metadata"]["cos_endpoint"]
  111. # Verifies that only this substring is in the storage path since a timestamp is injected into the name
  112. assert "/" + sample_metadata["metadata"]["cos_bucket"] + "/" + "untitled" in response.object_storage_path
  113. @pytest.mark.parametrize("parsed_pipeline", [PIPELINE_FILE_COMPLEX], indirect=True)
  114. def test_create_file(monkeypatch, processor, parsed_pipeline, parsed_ordered_dict, sample_metadata):
  115. pipeline_json = _read_pipeline_resource(PIPELINE_FILE_COMPLEX)
  116. export_pipeline_name = "some-name"
  117. export_file_type = "py"
  118. mocked_runtime = Metadata(
  119. name="test-metadata", display_name="test", schema_name="airflow", metadata=sample_metadata["metadata"]
  120. )
  121. monkeypatch.setattr(processor, "_get_metadata_configuration", lambda name=None, schemaspace=None: mocked_runtime)
  122. monkeypatch.setattr(processor, "_upload_dependencies_to_object_store", lambda w, x, y, prefix: True)
  123. monkeypatch.setattr(processor, "_cc_pipeline", lambda x, y, z: parsed_ordered_dict)
  124. with tempfile.TemporaryDirectory() as temp_dir:
  125. export_pipeline_output_path = os.path.join(temp_dir, f"{export_pipeline_name}.py")
  126. response = processor.create_pipeline_file(
  127. parsed_pipeline,
  128. pipeline_export_format=export_file_type,
  129. pipeline_export_path=export_pipeline_output_path,
  130. pipeline_name=export_pipeline_name,
  131. pipeline_instance_id=export_pipeline_name,
  132. )
  133. assert export_pipeline_output_path == response
  134. assert os.path.isfile(export_pipeline_output_path)
  135. file_as_lines = open(response).read().splitlines()
  136. assert "from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator" in file_as_lines
  137. # Check DAG project name
  138. for i in range(len(file_as_lines)):
  139. if "args = {" == file_as_lines[i]:
  140. assert "project_id" == read_key_pair(file_as_lines[i + 1], sep=":")["key"]
  141. assert export_pipeline_name == read_key_pair(file_as_lines[i + 1], sep=":")["value"]
  142. # For every node in the original pipeline json
  143. for node in pipeline_json["pipelines"][0]["nodes"]:
  144. component_parameters = node["app_data"]["component_parameters"]
  145. for i in range(len(file_as_lines)):
  146. # Matches a generic op with a node ID
  147. if f"op_{node['id'].replace('-', '_')} = KubernetesPodOperator(" in file_as_lines[i]:
  148. sub_list_line_counter = 0
  149. # Gets sub-list slice starting where the Notebook Op starts
  150. init_line = i + 1
  151. for line in file_as_lines[init_line:]:
  152. if "namespace=" in line:
  153. assert sample_metadata["metadata"]["user_namespace"] == read_key_pair(line)["value"]
  154. elif "cos_endpoint=" in line:
  155. assert sample_metadata["metadata"]["cos_endpoint"] == read_key_pair(line)["value"]
  156. elif "cos_bucket=" in line:
  157. assert sample_metadata["metadata"]["cos_bucket"] == read_key_pair(line)["value"]
  158. elif "name=" in line:
  159. assert node["app_data"]["ui_data"]["label"] == read_key_pair(line)["value"]
  160. elif "notebook=" in line:
  161. assert component_parameters["filename"] == read_key_pair(line)["value"]
  162. elif "image=" in line:
  163. assert component_parameters["runtime_image"] == read_key_pair(line)["value"]
  164. elif "env_vars=" in line:
  165. for env in component_parameters["env_vars"]:
  166. var, value = env.split("=")
  167. # Gets sub-list slice starting where the env vars starts
  168. start_env = i + sub_list_line_counter + 2
  169. for env_line in file_as_lines[start_env:]:
  170. if "AWS_ACCESS_KEY_ID" in env_line:
  171. assert (
  172. sample_metadata["metadata"]["cos_username"]
  173. == read_key_pair(env_line, sep=":")["value"]
  174. )
  175. elif "AWS_SECRET_ACCESS_KEY" in env_line:
  176. assert (
  177. sample_metadata["metadata"]["cos_password"]
  178. == read_key_pair(env_line, sep=":")["value"]
  179. )
  180. elif var in env_line:
  181. assert var == read_key_pair(env_line, sep=":")["key"]
  182. assert value == read_key_pair(env_line, sep=":")["value"]
  183. elif env_line.strip() == "},": # end of env vars
  184. break
  185. elif "pipeline_inputs=" in line and component_parameters.get("inputs"):
  186. for input in component_parameters["inputs"]:
  187. assert input in string_to_list(read_key_pair(line)["value"])
  188. elif "pipeline_outputs=" in line and component_parameters.get("outputs"):
  189. for output in component_parameters["outputs"]:
  190. assert output in string_to_list(read_key_pair(line)["value"])
  191. elif line == ")": # End of this Notebook Op
  192. break
  193. sub_list_line_counter += 1
  194. @pytest.mark.parametrize("parsed_pipeline", [PIPELINE_FILE_CUSTOM_COMPONENTS], indirect=True)
  195. @pytest.mark.parametrize("catalog_instance", [AIRFLOW_COMPONENT_CACHE_INSTANCE], indirect=True)
  196. def test_create_file_custom_components(
  197. monkeypatch, processor, catalog_instance, component_cache, parsed_pipeline, parsed_ordered_dict, sample_metadata
  198. ):
  199. pipeline_json = _read_pipeline_resource(PIPELINE_FILE_CUSTOM_COMPONENTS)
  200. export_pipeline_name = "some-name"
  201. export_file_type = "py"
  202. mocked_runtime = Metadata(
  203. name="test-metadata", display_name="test", schema_name="airflow", metadata=sample_metadata["metadata"]
  204. )
  205. monkeypatch.setattr(processor, "_get_metadata_configuration", lambda name=None, schemaspace=None: mocked_runtime)
  206. monkeypatch.setattr(processor, "_upload_dependencies_to_object_store", lambda w, x, y, prefix: True)
  207. monkeypatch.setattr(processor, "_cc_pipeline", lambda x, y, z: parsed_ordered_dict)
  208. with tempfile.TemporaryDirectory() as temp_dir:
  209. export_pipeline_output_path = os.path.join(temp_dir, f"{export_pipeline_name}.py")
  210. response = processor.create_pipeline_file(
  211. parsed_pipeline,
  212. pipeline_export_format=export_file_type,
  213. pipeline_export_path=export_pipeline_output_path,
  214. pipeline_name=export_pipeline_name,
  215. pipeline_instance_id=export_pipeline_name,
  216. )
  217. assert export_pipeline_output_path == response
  218. assert os.path.isfile(export_pipeline_output_path)
  219. file_as_lines = open(response).read().splitlines()
  220. pipeline_description = pipeline_json["pipelines"][0]["app_data"]["properties"]["description"]
  221. escaped_description = pipeline_description.replace('"""', '\\"\\"\\"')
  222. for i in range(len(file_as_lines)):
  223. if "args = {" == file_as_lines[i]:
  224. # Check DAG project name
  225. assert "project_id" == read_key_pair(file_as_lines[i + 1], sep=":")["key"]
  226. assert export_pipeline_name == read_key_pair(file_as_lines[i + 1], sep=":")["value"]
  227. elif 'description="""' in file_as_lines[i]:
  228. # Check that DAG contains the correct description
  229. line_no = i + 1
  230. description_as_lines = []
  231. while '"""' not in file_as_lines[line_no]:
  232. description_as_lines.append(file_as_lines[line_no])
  233. line_no += 1
  234. expected_description_lines = escaped_description.split("\n")
  235. assert description_as_lines == expected_description_lines
  236. # Nothing more to be done in file
  237. break
  238. # For every node in the original pipeline json
  239. for node in pipeline_json["pipelines"][0]["nodes"]:
  240. component_parameters = node["app_data"]["component_parameters"]
  241. for i in range(len(file_as_lines)):
  242. # Matches custom component operators
  243. if f"op_{node['id'].replace('-', '_')} = " in file_as_lines[i]:
  244. for parameter in component_parameters:
  245. # Find 'parameter=' clause in file_as_lines list
  246. r = re.compile(rf"\s*{parameter}=.*")
  247. parameter_clause = i + 1
  248. assert len(list(filter(r.match, file_as_lines[parameter_clause:]))) > 0
  249. @pytest.mark.parametrize("parsed_pipeline", [PIPELINE_FILE_COMPLEX], indirect=True)
  250. def test_export_overwrite(monkeypatch, processor, parsed_pipeline):
  251. with tempfile.TemporaryDirectory() as temp_dir:
  252. mocked_path = os.path.join(temp_dir, "some-name.py")
  253. Path(mocked_path).touch()
  254. assert os.path.isfile(mocked_path)
  255. monkeypatch.setattr(
  256. processor,
  257. "create_pipeline_file",
  258. lambda pipeline, pipeline_export_format, pipeline_export_path, pipeline_name, pipeline_instance_id: mocked_path, # noqa: E501
  259. )
  260. returned_path = processor.export(parsed_pipeline, "py", mocked_path, True)
  261. assert returned_path == mocked_path
  262. @pytest.mark.parametrize("parsed_pipeline", [PIPELINE_FILE_COMPLEX], indirect=True)
  263. def test_fail_export_overwrite(processor, parsed_pipeline):
  264. with tempfile.TemporaryDirectory() as temp_dir:
  265. Path(f"{temp_dir}/test.py").touch()
  266. assert os.path.isfile(f"{temp_dir}/test.py")
  267. export_pipeline_output_path = os.path.join(temp_dir, "test.py")
  268. with pytest.raises(ValueError):
  269. processor.export(parsed_pipeline, "py", export_pipeline_output_path, False)
  270. @pytest.mark.parametrize("parsed_pipeline", [PIPELINE_FILE_COMPLEX], indirect=True)
  271. def test_pipeline_tree_creation(parsed_ordered_dict, sample_metadata, sample_image_metadata):
  272. pipeline_json = _read_pipeline_resource(PIPELINE_FILE_COMPLEX)
  273. ordered_dict = parsed_ordered_dict
  274. assert len(ordered_dict.keys()) == len(pipeline_json["pipelines"][0]["nodes"])
  275. # Verify tree structure is correct
  276. assert not ordered_dict["cded6818-e601-4fd8-b6b9-c9fdf1fd1fca"].get("parent_operation_ids")
  277. assert (
  278. ordered_dict["bb9606ca-29ec-4133-a36a-67bd2a1f6dc3"].get("parent_operation_ids").pop()
  279. == "cded6818-e601-4fd8-b6b9-c9fdf1fd1fca"
  280. )
  281. assert (
  282. ordered_dict["6f5c2ece-1977-48a1-847f-099b327c6ed1"].get("parent_operation_ids").pop()
  283. == "cded6818-e601-4fd8-b6b9-c9fdf1fd1fca"
  284. )
  285. assert (
  286. ordered_dict["4ef63a48-a27c-4d1e-a0ee-2fbbdbe3be74"].get("parent_operation_ids").pop()
  287. == "cded6818-e601-4fd8-b6b9-c9fdf1fd1fca"
  288. )
  289. assert (
  290. ordered_dict["4f7ae91b-682e-476c-8664-58412336b31f"].get("parent_operation_ids").pop()
  291. == "bb9606ca-29ec-4133-a36a-67bd2a1f6dc3"
  292. )
  293. assert (
  294. ordered_dict["f82c4699-b392-4a3e-92b0-45d9e11126fe"].get("parent_operation_ids").pop()
  295. == "bb9606ca-29ec-4133-a36a-67bd2a1f6dc3"
  296. )
  297. assert ordered_dict["137d3d2f-4224-42d9-b8c6-cbee9ff2872d"].get("parent_operation_ids") == [
  298. "4ef63a48-a27c-4d1e-a0ee-2fbbdbe3be74",
  299. "0a7eff92-fe2a-411c-92a6-73d6f3810516",
  300. ]
  301. assert not ordered_dict["779c2630-64bf-47ca-8a98-9ac8a60e85f7"].get("parent_operation_ids")
  302. assert (
  303. ordered_dict["0a7eff92-fe2a-411c-92a6-73d6f3810516"].get("parent_operation_ids").pop()
  304. == "779c2630-64bf-47ca-8a98-9ac8a60e85f7"
  305. )
  306. assert ordered_dict["92a7a247-1131-489c-8c3e-1e2389d4c673"].get("parent_operation_ids") == [
  307. "f82c4699-b392-4a3e-92b0-45d9e11126fe",
  308. "137d3d2f-4224-42d9-b8c6-cbee9ff2872d",
  309. "6f5c2ece-1977-48a1-847f-099b327c6ed1",
  310. ]
  311. for key in ordered_dict.keys():
  312. for node in pipeline_json["pipelines"][0]["nodes"]:
  313. if node["id"] == key:
  314. component_parameters = node["app_data"]["component_parameters"]
  315. assert ordered_dict[key]["runtime_image"] == component_parameters["runtime_image"]
  316. for image in sample_image_metadata:
  317. if ordered_dict[key]["runtime_image"] == image.metadata["image_name"]:
  318. assert ordered_dict[key]["image_pull_policy"] == image.metadata["pull_policy"]
  319. print(ordered_dict[key])
  320. for env in component_parameters["env_vars"]:
  321. var, value = env.split("=")
  322. assert ordered_dict[key]["pipeline_envs"][var] == value
  323. assert (
  324. ordered_dict[key]["pipeline_envs"]["AWS_ACCESS_KEY_ID"]
  325. == sample_metadata["metadata"]["cos_username"]
  326. )
  327. assert (
  328. ordered_dict[key]["pipeline_envs"]["AWS_SECRET_ACCESS_KEY"]
  329. == sample_metadata["metadata"]["cos_password"]
  330. )
  331. for arg in ["inputs", "outputs"]:
  332. if node["app_data"].get(arg):
  333. for file in node["app_data"][arg]:
  334. assert file in ordered_dict[key]["pipeline_" + arg]
  335. def test_collect_envs(processor):
  336. pipelines_test_file = "elyra/pipeline/tests/resources/archive/test.ipynb"
  337. # add system-owned envs with bogus values to ensure they get set to system-derived values,
  338. # and include some user-provided edge cases
  339. operation_envs = [
  340. 'ELYRA_RUNTIME_ENV="bogus_runtime"',
  341. 'ELYRA_ENABLE_PIPELINE_INFO="bogus_pipeline"',
  342. "ELYRA_WRITABLE_CONTAINER_DIR=", # simulate operation reference in pipeline
  343. 'AWS_ACCESS_KEY_ID="bogus_key"',
  344. 'AWS_SECRET_ACCESS_KEY="bogus_secret"',
  345. "USER_EMPTY_VALUE= ",
  346. "USER_TWO_EQUALS=KEY=value",
  347. "USER_NO_VALUE=",
  348. ]
  349. component_parameters = {
  350. "filename": pipelines_test_file,
  351. "env_vars": operation_envs,
  352. "runtime_image": "tensorflow/tensorflow:latest",
  353. }
  354. test_operation = GenericOperation(
  355. id="this-is-a-test-id",
  356. type="execution-node",
  357. classifier="execute-notebook-node",
  358. name="test",
  359. component_params=component_parameters,
  360. )
  361. envs = processor._collect_envs(test_operation, cos_secret=None, cos_username="Alice", cos_password="secret")
  362. assert envs["ELYRA_RUNTIME_ENV"] == "airflow"
  363. assert envs["AWS_ACCESS_KEY_ID"] == "Alice"
  364. assert envs["AWS_SECRET_ACCESS_KEY"] == "secret"
  365. assert envs["ELYRA_ENABLE_PIPELINE_INFO"] == "True"
  366. assert "ELYRA_WRITABLE_CONTAINER_DIR" not in envs
  367. assert "USER_EMPTY_VALUE" not in envs
  368. assert envs["USER_TWO_EQUALS"] == "KEY=value"
  369. assert "USER_NO_VALUE" not in envs
  370. # Repeat with non-None secret - ensure user and password envs are not present, but others are
  371. envs = processor._collect_envs(test_operation, cos_secret="secret", cos_username="Alice", cos_password="secret")
  372. assert envs["ELYRA_RUNTIME_ENV"] == "airflow"
  373. assert "AWS_ACCESS_KEY_ID" not in envs
  374. assert "AWS_SECRET_ACCESS_KEY" not in envs
  375. assert envs["ELYRA_ENABLE_PIPELINE_INFO"] == "True"
  376. assert "ELYRA_WRITABLE_CONTAINER_DIR" not in envs
  377. assert "USER_EMPTY_VALUE" not in envs
  378. assert envs["USER_TWO_EQUALS"] == "KEY=value"
  379. assert "USER_NO_VALUE" not in envs
  380. def test_unique_operation_name_existent(processor):
  381. op1 = SimpleNamespace(name="sample_operation")
  382. op2 = SimpleNamespace(name="sample_operation_2")
  383. op3 = SimpleNamespace(name="sample_operation_3")
  384. op4 = SimpleNamespace(name="sample_operation")
  385. op5 = SimpleNamespace(name="sample_operation_2")
  386. op6 = SimpleNamespace(name="sample_operation_3")
  387. sample_operation_list = [op1, op2, op3, op4, op5, op6]
  388. correct_name_list = [
  389. "sample_operation",
  390. "sample_operation_2",
  391. "sample_operation_3",
  392. "sample_operation_1",
  393. "sample_operation_2_1",
  394. "sample_operation_3_1",
  395. ]
  396. renamed_op_list = processor._create_unique_node_names(sample_operation_list)
  397. name_list = [op.name for op in renamed_op_list]
  398. assert name_list == correct_name_list
  399. def test_unique_operation_name_non_existent(processor):
  400. operation_name = "sample_operation_foo_bar"
  401. op1 = SimpleNamespace(name="sample_operation")
  402. op2 = SimpleNamespace(name="sample_operation_2")
  403. op3 = SimpleNamespace(name="sample_operation_3")
  404. sample_operation_list = [op1, op2, op3]
  405. correct_name_list = ["sample_operation", "sample_operation_2", "sample_operation_3"]
  406. renamed_op_list = processor._create_unique_node_names(sample_operation_list)
  407. name_list = [op.name for op in renamed_op_list]
  408. assert name_list == correct_name_list
  409. assert operation_name not in name_list
  410. def test_unique_operation_custom(processor):
  411. op1 = SimpleNamespace(name="this bash")
  412. op2 = SimpleNamespace(name="this@bash")
  413. op3 = SimpleNamespace(name="this!bash")
  414. op4 = SimpleNamespace(name="that^bash")
  415. op5 = SimpleNamespace(name="that bash")
  416. op6 = SimpleNamespace(name="that_bash_2")
  417. op7 = SimpleNamespace(name="that_bash_1")
  418. op8 = SimpleNamespace(name="that_bash_0")
  419. sample_operation_list = [op1, op2, op3, op4, op5, op6, op7, op8]
  420. correct_name_list = [
  421. "this_bash",
  422. "this_bash_1",
  423. "this_bash_2",
  424. "that_bash",
  425. "that_bash_1",
  426. "that_bash_2",
  427. "that_bash_1_1",
  428. "that_bash_0",
  429. ]
  430. scrubbed_list = processor._scrub_invalid_characters_from_list(sample_operation_list)
  431. renamed_op_list = processor._create_unique_node_names(scrubbed_list)
  432. name_list = [op.name for op in renamed_op_list]
  433. assert name_list == correct_name_list
  434. def test_process_list_value_function(processor):
  435. # Test values that will be successfully converted to list
  436. assert processor._process_list_value("") == []
  437. assert processor._process_list_value(None) == []
  438. assert processor._process_list_value("[]") == []
  439. assert processor._process_list_value("None") == []
  440. assert processor._process_list_value("['elem1']") == ["elem1"]
  441. assert processor._process_list_value("['elem1', 'elem2', 'elem3']") == ["elem1", "elem2", "elem3"]
  442. assert processor._process_list_value(" ['elem1', 'elem2' , 'elem3'] ") == ["elem1", "elem2", "elem3"]
  443. assert processor._process_list_value("[1, 2]") == [1, 2]
  444. assert processor._process_list_value("[True, False, True]") == [True, False, True]
  445. assert processor._process_list_value("[{'obj': 'val', 'obj2': 'val2'}, {}]") == [{"obj": "val", "obj2": "val2"}, {}]
  446. # Test values that will not be successfully converted to list
  447. # Surrounding quotes are added to string values for correct DAG render
  448. assert processor._process_list_value("[[]") == '"[[]"'
  449. assert processor._process_list_value("[elem1, elem2]") == '"[elem1, elem2]"'
  450. assert processor._process_list_value("elem1, elem2") == '"elem1, elem2"'
  451. assert processor._process_list_value(" elem1, elem2 ") == '"elem1, elem2"'
  452. assert processor._process_list_value("'elem1', 'elem2'") == "\"'elem1', 'elem2'\""
  453. def test_process_dictionary_value_function(processor):
  454. # Test values that will be successfully converted to dictionary
  455. assert processor._process_dictionary_value("") == {}
  456. assert processor._process_dictionary_value(None) == {}
  457. assert processor._process_dictionary_value("{}") == {}
  458. assert processor._process_dictionary_value("None") == {}
  459. assert processor._process_dictionary_value("{'key': 'value'}") == {"key": "value"}
  460. dict_as_str = "{'key1': 'value', 'key2': 'value'}"
  461. assert processor._process_dictionary_value(dict_as_str) == {"key1": "value", "key2": "value"}
  462. dict_as_str = " { 'key1': 'value' , 'key2' : 'value'} "
  463. assert processor._process_dictionary_value(dict_as_str) == {"key1": "value", "key2": "value"}
  464. dict_as_str = "{'key1': [1, 2, 3], 'key2': ['elem1', 'elem2']}"
  465. assert processor._process_dictionary_value(dict_as_str) == {"key1": [1, 2, 3], "key2": ["elem1", "elem2"]}
  466. dict_as_str = "{'key1': 2, 'key2': 'value', 'key3': True, 'key4': None, 'key5': [1, 2, 3]}"
  467. expected_value = {"key1": 2, "key2": "value", "key3": True, "key4": None, "key5": [1, 2, 3]}
  468. assert processor._process_dictionary_value(dict_as_str) == expected_value
  469. dict_as_str = "{'key1': {'key2': 2, 'key3': 3, 'key4': 4}, 'key5': {}}"
  470. expected_value = {
  471. "key1": {
  472. "key2": 2,
  473. "key3": 3,
  474. "key4": 4,
  475. },
  476. "key5": {},
  477. }
  478. assert processor._process_dictionary_value(dict_as_str) == expected_value
  479. # Test values that will not be successfully converted to dictionary
  480. # Surrounding quotes are added to string values for correct DAG render
  481. assert processor._process_dictionary_value("{{}") == '"{{}"'
  482. assert processor._process_dictionary_value("{key1: value, key2: value}") == '"{key1: value, key2: value}"'
  483. assert processor._process_dictionary_value(" { key1: value, key2: value } ") == '"{ key1: value, key2: value }"'
  484. assert processor._process_dictionary_value("key1: value, key2: value") == '"key1: value, key2: value"'
  485. assert processor._process_dictionary_value("{'key1': true}") == "\"{'key1': true}\""
  486. assert processor._process_dictionary_value("{'key': null}") == "\"{'key': null}\""
  487. dict_as_str = "{'key1': [elem1, elem2, elem3], 'key2': ['elem1', 'elem2']}"
  488. assert processor._process_dictionary_value(dict_as_str) == f'"{dict_as_str}"'
  489. dict_as_str = "{'key1': {key2: 2}, 'key3': ['elem1', 'elem2']}"
  490. assert processor._process_dictionary_value(dict_as_str) == f'"{dict_as_str}"'
  491. dict_as_str = "{'key1': {key2: 2}, 'key3': ['elem1', 'elem2']}"
  492. assert processor._process_dictionary_value(dict_as_str) == f'"{dict_as_str}"'
  493. @pytest.mark.parametrize(
  494. "parsed_pipeline", ["resources/validation_pipelines/aa_operator_same_name.json"], indirect=True
  495. )
  496. @pytest.mark.parametrize("catalog_instance", [AIRFLOW_COMPONENT_CACHE_INSTANCE], indirect=True)
  497. def test_same_name_operator_in_pipeline(monkeypatch, processor, catalog_instance, parsed_pipeline, sample_metadata):
  498. task_id = "e3922a29-f4c0-43d9-8d8b-4509aab80032"
  499. upstream_task_id = "0eb57369-99d1-4cd0-a205-8d8d96af3ad4"
  500. mocked_runtime = Metadata(
  501. name="test-metadata", display_name="test", schema_name="airflow", metadata=sample_metadata["metadata"]
  502. )
  503. monkeypatch.setattr(processor, "_get_metadata_configuration", lambda name=None, schemaspace=None: mocked_runtime)
  504. monkeypatch.setattr(processor, "_upload_dependencies_to_object_store", lambda w, x, y, prefix: True)
  505. pipeline_def_operation = parsed_pipeline.operations[task_id]
  506. pipeline_def_operation_parameters = pipeline_def_operation.component_params_as_dict
  507. pipeline_def_operation_bash_param = pipeline_def_operation_parameters["bash_command"]
  508. assert pipeline_def_operation_bash_param["activeControl"] == "NestedEnumControl"
  509. assert set(pipeline_def_operation_bash_param["NestedEnumControl"].keys()) == {"value", "option"}
  510. assert pipeline_def_operation_bash_param["NestedEnumControl"]["value"] == upstream_task_id
  511. ordered_operations = processor._cc_pipeline(
  512. parsed_pipeline, pipeline_name="some-name", pipeline_instance_id="some-instance-name"
  513. )
  514. operation_parameters = ordered_operations[task_id]["component_params"]
  515. operation_parameter_bash_command = operation_parameters["bash_command"]
  516. assert operation_parameter_bash_command == "\"{{ ti.xcom_pull(task_ids='BashOperator_1') }}\""
  517. def test_scrub_invalid_characters(processor):
  518. invalid_character_list_string = "[-!@#$%^&*(){};:,/<>?|`~=+ ]"
  519. valid_character_list_string = list(string.ascii_lowercase + string.ascii_uppercase + string.digits)
  520. for character in invalid_character_list_string:
  521. assert processor._scrub_invalid_characters(character) == "_"
  522. for character in valid_character_list_string:
  523. assert processor._scrub_invalid_characters(character) == character