test_processor_airflow.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import os
  17. from pathlib import Path
  18. import re
  19. import string
  20. import tempfile
  21. from types import SimpleNamespace
  22. from unittest import mock
  23. from conftest import AIRFLOW_TEST_OPERATOR_CATALOG
  24. import github
  25. import pytest
  26. from elyra.metadata.metadata import Metadata
  27. from elyra.pipeline.airflow.processor_airflow import AirflowPipelineProcessor
  28. from elyra.pipeline.parser import PipelineParser
  29. from elyra.pipeline.pipeline import GenericOperation
  30. from elyra.pipeline.runtime_type import RuntimeProcessorType
  31. from elyra.tests.pipeline.test_pipeline_parser import _read_pipeline_resource
  32. from elyra.util.github import GithubClient
  33. PIPELINE_FILE_COMPLEX = "resources/sample_pipelines/pipeline_dependency_complex.json"
  34. PIPELINE_FILE_CUSTOM_COMPONENTS = "resources/sample_pipelines/pipeline_with_airflow_components.json"
  35. @pytest.fixture
  36. def processor(monkeypatch, setup_factory_data):
  37. processor = AirflowPipelineProcessor(root_dir=os.getcwd())
  38. # Add spoofed TestOperator to class import map
  39. class_import_map = {
  40. "TestOperator": "from airflow.operators.test_operator import TestOperator",
  41. "DeriveFromTestOperator": "from airflow.operators.test_operator import DeriveFromTestOperator",
  42. }
  43. monkeypatch.setattr(processor, "class_import_map", class_import_map)
  44. return processor
  45. @pytest.fixture
  46. def parsed_pipeline(request):
  47. pipeline_resource = _read_pipeline_resource(request.param)
  48. return PipelineParser().parse(pipeline_json=pipeline_resource)
  49. @pytest.fixture
  50. def sample_metadata():
  51. return {
  52. "name": "airflow_test",
  53. "display_name": "Apache Airflow Test Endpoint",
  54. "metadata": {
  55. "github_api_endpoint": "https://api.github.com",
  56. "github_repo": "test/test-repo",
  57. "github_repo_token": "",
  58. "github_branch": "test",
  59. "api_endpoint": "http://test.example.com:30000/",
  60. "cos_endpoint": "http://test.example.com:30001/",
  61. "cos_username": "test",
  62. "cos_password": "test-password",
  63. "cos_bucket": "test-bucket",
  64. "tags": [],
  65. "user_namespace": "default",
  66. "runtime_type": "APACHE_AIRFLOW",
  67. },
  68. "schema_name": "airflow",
  69. "resource": "/User/test_directory/airflow_test.json",
  70. }
  71. @pytest.fixture
  72. def sample_image_metadata():
  73. image_one = {"image_name": "tensorflow/tensorflow:2.0.0-py3", "pull_policy": "IfNotPresent", "tags": []}
  74. image_two = {"image_name": "elyra/examples:1.0.0-py3", "pull_policy": "Always", "tags": []}
  75. mocked_runtime_images = [
  76. Metadata(name="test-image-metadata", display_name="test-image", schema_name="airflow", metadata=image_one),
  77. Metadata(name="test-image-metadata", display_name="test-image", schema_name="airflow", metadata=image_two),
  78. ]
  79. return mocked_runtime_images
  80. @pytest.fixture
  81. def parsed_ordered_dict(monkeypatch, processor, parsed_pipeline, sample_metadata, sample_image_metadata):
  82. mocked_runtime = Metadata(
  83. name="test-metadata", display_name="test", schema_name="airflow", metadata=sample_metadata["metadata"]
  84. )
  85. mocked_func = mock.Mock(return_value="default", side_effect=[mocked_runtime, sample_image_metadata])
  86. monkeypatch.setattr(processor, "_get_metadata_configuration", mocked_func)
  87. monkeypatch.setattr(processor, "_upload_dependencies_to_object_store", lambda w, x, y, prefix: True)
  88. monkeypatch.setattr(processor, "_get_dependency_archive_name", lambda x: True)
  89. monkeypatch.setattr(processor, "_verify_cos_connectivity", lambda x: True)
  90. return processor._cc_pipeline(parsed_pipeline, pipeline_name="some-name", pipeline_instance_id="some-instance-id")
  91. def read_key_pair(key_pair, sep="="):
  92. return {"key": key_pair.split(sep)[0].strip('" '), "value": key_pair.split(sep)[1].rstrip(",").strip('" ')}
  93. def string_to_list(stringed_list):
  94. return stringed_list.replace(" ", "").replace('"', "").strip("[]").split(",")
  95. def test_processor_type(processor):
  96. assert processor.type == RuntimeProcessorType.APACHE_AIRFLOW
  97. def test_fail_processor_type(processor):
  98. with pytest.raises(Exception):
  99. assert processor.type == RuntimeProcessorType.KUBEFLOW_PIPELINES
  100. @pytest.mark.parametrize("parsed_pipeline", [PIPELINE_FILE_COMPLEX], indirect=True)
  101. def test_pipeline_process(monkeypatch, processor, parsed_pipeline, sample_metadata):
  102. mocked_runtime = Metadata(
  103. name="test-metadata", display_name="test", schema_name="airflow", metadata=sample_metadata["metadata"]
  104. )
  105. mocked_path = "/some-placeholder"
  106. monkeypatch.setattr(processor, "_get_metadata_configuration", lambda schemaspace, name: mocked_runtime)
  107. monkeypatch.setattr(
  108. processor,
  109. "create_pipeline_file",
  110. lambda pipeline, pipeline_export_format, pipeline_export_path, pipeline_name, pipeline_instance_id: mocked_path,
  111. )
  112. monkeypatch.setattr(github.Github, "get_repo", lambda x, y: True)
  113. monkeypatch.setattr(GithubClient, "upload_dag", lambda x, y, z: True)
  114. response = processor.process(pipeline=parsed_pipeline)
  115. assert response.run_url == sample_metadata["metadata"]["api_endpoint"]
  116. assert response.object_storage_url == sample_metadata["metadata"]["cos_endpoint"]
  117. # Verifies that only this substring is in the storage path since a timestamp is injected into the name
  118. assert "/" + sample_metadata["metadata"]["cos_bucket"] + "/" + "untitled" in response.object_storage_path
  119. @pytest.mark.parametrize("parsed_pipeline", [PIPELINE_FILE_COMPLEX], indirect=True)
  120. def test_create_file(monkeypatch, processor, parsed_pipeline, parsed_ordered_dict, sample_metadata):
  121. pipeline_json = _read_pipeline_resource(PIPELINE_FILE_COMPLEX)
  122. export_pipeline_name = "some-name"
  123. export_file_type = "py"
  124. mocked_runtime = Metadata(
  125. name="test-metadata", display_name="test", schema_name="airflow", metadata=sample_metadata["metadata"]
  126. )
  127. monkeypatch.setattr(processor, "_get_metadata_configuration", lambda name=None, schemaspace=None: mocked_runtime)
  128. monkeypatch.setattr(processor, "_upload_dependencies_to_object_store", lambda w, x, y, prefix: True)
  129. monkeypatch.setattr(processor, "_cc_pipeline", lambda x, y, z: parsed_ordered_dict)
  130. with tempfile.TemporaryDirectory() as temp_dir:
  131. export_pipeline_output_path = os.path.join(temp_dir, f"{export_pipeline_name}.py")
  132. response = processor.create_pipeline_file(
  133. parsed_pipeline,
  134. pipeline_export_format=export_file_type,
  135. pipeline_export_path=export_pipeline_output_path,
  136. pipeline_name=export_pipeline_name,
  137. pipeline_instance_id=export_pipeline_name,
  138. )
  139. assert export_pipeline_output_path == response
  140. assert os.path.isfile(export_pipeline_output_path)
  141. file_as_lines = open(response).read().splitlines()
  142. assert "from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator" in file_as_lines
  143. # Check DAG project name
  144. for i in range(len(file_as_lines)):
  145. if "args = {" == file_as_lines[i]:
  146. assert "project_id" == read_key_pair(file_as_lines[i + 1], sep=":")["key"]
  147. assert export_pipeline_name == read_key_pair(file_as_lines[i + 1], sep=":")["value"]
  148. # For every node in the original pipeline json
  149. for node in pipeline_json["pipelines"][0]["nodes"]:
  150. component_parameters = node["app_data"]["component_parameters"]
  151. for i in range(len(file_as_lines)):
  152. # Matches a generic op with a node ID
  153. if f"op_{node['id'].replace('-', '_')} = KubernetesPodOperator(" in file_as_lines[i]:
  154. sub_list_line_counter = 0
  155. # Gets sub-list slice starting where the Notebook Op starts
  156. init_line = i + 1
  157. for line in file_as_lines[init_line:]:
  158. if "namespace=" in line:
  159. assert sample_metadata["metadata"]["user_namespace"] == read_key_pair(line)["value"]
  160. elif "cos_endpoint=" in line:
  161. assert sample_metadata["metadata"]["cos_endpoint"] == read_key_pair(line)["value"]
  162. elif "cos_bucket=" in line:
  163. assert sample_metadata["metadata"]["cos_bucket"] == read_key_pair(line)["value"]
  164. elif "name=" in line:
  165. assert node["app_data"]["ui_data"]["label"] == read_key_pair(line)["value"]
  166. elif "notebook=" in line:
  167. assert component_parameters["filename"] == read_key_pair(line)["value"]
  168. elif "image=" in line:
  169. assert component_parameters["runtime_image"] == read_key_pair(line)["value"]
  170. elif "env_vars=" in line:
  171. for env in component_parameters["env_vars"]:
  172. var, value = env.split("=")
  173. # Gets sub-list slice starting where the env vars starts
  174. start_env = i + sub_list_line_counter + 2
  175. for env_line in file_as_lines[start_env:]:
  176. if "AWS_ACCESS_KEY_ID" in env_line:
  177. assert (
  178. sample_metadata["metadata"]["cos_username"]
  179. == read_key_pair(env_line, sep=":")["value"]
  180. )
  181. elif "AWS_SECRET_ACCESS_KEY" in env_line:
  182. assert (
  183. sample_metadata["metadata"]["cos_password"]
  184. == read_key_pair(env_line, sep=":")["value"]
  185. )
  186. elif var in env_line:
  187. assert var == read_key_pair(env_line, sep=":")["key"]
  188. assert value == read_key_pair(env_line, sep=":")["value"]
  189. elif env_line.strip() == "},": # end of env vars
  190. break
  191. elif "pipeline_inputs=" in line and component_parameters.get("inputs"):
  192. for input in component_parameters["inputs"]:
  193. assert input in string_to_list(read_key_pair(line)["value"])
  194. elif "pipeline_outputs=" in line and component_parameters.get("outputs"):
  195. for output in component_parameters["outputs"]:
  196. assert output in string_to_list(read_key_pair(line)["value"])
  197. elif line == ")": # End of this Notebook Op
  198. break
  199. sub_list_line_counter += 1
  200. @pytest.mark.parametrize("parsed_pipeline", [PIPELINE_FILE_CUSTOM_COMPONENTS], indirect=True)
  201. @pytest.mark.parametrize("catalog_instance", [AIRFLOW_TEST_OPERATOR_CATALOG], indirect=True)
  202. def test_create_file_custom_components(
  203. monkeypatch,
  204. processor,
  205. catalog_instance,
  206. parsed_pipeline,
  207. parsed_ordered_dict,
  208. sample_metadata,
  209. ):
  210. pipeline_json = _read_pipeline_resource(PIPELINE_FILE_CUSTOM_COMPONENTS)
  211. export_pipeline_name = "some-name"
  212. export_file_type = "py"
  213. mocked_runtime = Metadata(
  214. name="test-metadata", display_name="test", schema_name="airflow", metadata=sample_metadata["metadata"]
  215. )
  216. monkeypatch.setattr(processor, "_get_metadata_configuration", lambda name=None, schemaspace=None: mocked_runtime)
  217. monkeypatch.setattr(processor, "_upload_dependencies_to_object_store", lambda w, x, y, prefix: True)
  218. monkeypatch.setattr(processor, "_cc_pipeline", lambda x, y, z: parsed_ordered_dict)
  219. with tempfile.TemporaryDirectory() as temp_dir:
  220. export_pipeline_output_path = os.path.join(temp_dir, f"{export_pipeline_name}.py")
  221. response = processor.create_pipeline_file(
  222. parsed_pipeline,
  223. pipeline_export_format=export_file_type,
  224. pipeline_export_path=export_pipeline_output_path,
  225. pipeline_name=export_pipeline_name,
  226. pipeline_instance_id=export_pipeline_name,
  227. )
  228. assert export_pipeline_output_path == response
  229. assert os.path.isfile(export_pipeline_output_path)
  230. file_as_lines = open(response).read().splitlines()
  231. pipeline_description = pipeline_json["pipelines"][0]["app_data"]["properties"]["description"]
  232. escaped_description = pipeline_description.replace('"""', '\\"\\"\\"')
  233. for i in range(len(file_as_lines)):
  234. if "args = {" == file_as_lines[i]:
  235. # Check DAG project name
  236. assert "project_id" == read_key_pair(file_as_lines[i + 1], sep=":")["key"]
  237. assert export_pipeline_name == read_key_pair(file_as_lines[i + 1], sep=":")["value"]
  238. elif 'description="""' in file_as_lines[i]:
  239. # Check that DAG contains the correct description
  240. line_no = i + 1
  241. description_as_lines = []
  242. while '"""' not in file_as_lines[line_no]:
  243. description_as_lines.append(file_as_lines[line_no])
  244. line_no += 1
  245. expected_description_lines = escaped_description.split("\n")
  246. assert description_as_lines == expected_description_lines
  247. # Nothing more to be done in file
  248. break
  249. # For every node in the original pipeline json
  250. for node in pipeline_json["pipelines"][0]["nodes"]:
  251. for op_id, op in parsed_pipeline.operations.items():
  252. if op_id == node["id"]:
  253. # Component parameters must be compared with those on the Operation
  254. # object rather than those given in the pipeline JSON, since property
  255. # propagation in PipelineDefinition can result in changed parameters
  256. component_parameters = op.component_params
  257. break
  258. for i in range(len(file_as_lines)):
  259. # Matches custom component operators
  260. if f"op_{node['id'].replace('-', '_')} = " in file_as_lines[i]:
  261. for parameter in component_parameters:
  262. # Find 'parameter=' clause in file_as_lines list
  263. r = re.compile(rf"\s*{parameter}=.*")
  264. parameter_clause = i + 1
  265. assert len(list(filter(r.match, file_as_lines[parameter_clause:]))) > 0
  266. # Test that parameter value processing proceeded as expected for each data type
  267. op_id = "bb9606ca-29ec-4133-a36a-67bd2a1f6dc3"
  268. op_params = parsed_ordered_dict[op_id].get("component_params", {})
  269. expected_params = {
  270. "mounted_volumes": '"a component-defined property"',
  271. "str_no_default": "\"echo 'test one'\"",
  272. "bool_no_default": True,
  273. "unusual_type_list": [1, 2],
  274. "unusual_type_dict": {},
  275. "int_default_non_zero": 2,
  276. }
  277. assert op_params == expected_params
  278. @pytest.mark.parametrize("parsed_pipeline", [PIPELINE_FILE_COMPLEX], indirect=True)
  279. def test_export_overwrite(monkeypatch, processor, parsed_pipeline):
  280. with tempfile.TemporaryDirectory() as temp_dir:
  281. mocked_path = os.path.join(temp_dir, "some-name.py")
  282. Path(mocked_path).touch()
  283. assert os.path.isfile(mocked_path)
  284. monkeypatch.setattr(
  285. processor,
  286. "create_pipeline_file",
  287. lambda pipeline, pipeline_export_format, pipeline_export_path, pipeline_name, pipeline_instance_id: mocked_path, # noqa: E501
  288. )
  289. returned_path = processor.export(parsed_pipeline, "py", mocked_path, True)
  290. assert returned_path == mocked_path
  291. @pytest.mark.parametrize("parsed_pipeline", [PIPELINE_FILE_COMPLEX], indirect=True)
  292. def test_fail_export_overwrite(processor, parsed_pipeline):
  293. with tempfile.TemporaryDirectory() as temp_dir:
  294. Path(f"{temp_dir}/test.py").touch()
  295. assert os.path.isfile(f"{temp_dir}/test.py")
  296. export_pipeline_output_path = os.path.join(temp_dir, "test.py")
  297. with pytest.raises(ValueError):
  298. processor.export(parsed_pipeline, "py", export_pipeline_output_path, False)
  299. @pytest.mark.parametrize("parsed_pipeline", [PIPELINE_FILE_COMPLEX], indirect=True)
  300. def test_pipeline_tree_creation(parsed_ordered_dict, sample_metadata, sample_image_metadata):
  301. pipeline_json = _read_pipeline_resource(PIPELINE_FILE_COMPLEX)
  302. ordered_dict = parsed_ordered_dict
  303. assert len(ordered_dict.keys()) == len(pipeline_json["pipelines"][0]["nodes"])
  304. # Verify tree structure is correct
  305. assert not ordered_dict["cded6818-e601-4fd8-b6b9-c9fdf1fd1fca"].get("parent_operation_ids")
  306. assert (
  307. ordered_dict["bb9606ca-29ec-4133-a36a-67bd2a1f6dc3"].get("parent_operation_ids").pop()
  308. == "cded6818-e601-4fd8-b6b9-c9fdf1fd1fca"
  309. )
  310. assert (
  311. ordered_dict["6f5c2ece-1977-48a1-847f-099b327c6ed1"].get("parent_operation_ids").pop()
  312. == "cded6818-e601-4fd8-b6b9-c9fdf1fd1fca"
  313. )
  314. assert (
  315. ordered_dict["4ef63a48-a27c-4d1e-a0ee-2fbbdbe3be74"].get("parent_operation_ids").pop()
  316. == "cded6818-e601-4fd8-b6b9-c9fdf1fd1fca"
  317. )
  318. assert (
  319. ordered_dict["4f7ae91b-682e-476c-8664-58412336b31f"].get("parent_operation_ids").pop()
  320. == "bb9606ca-29ec-4133-a36a-67bd2a1f6dc3"
  321. )
  322. assert (
  323. ordered_dict["f82c4699-b392-4a3e-92b0-45d9e11126fe"].get("parent_operation_ids").pop()
  324. == "bb9606ca-29ec-4133-a36a-67bd2a1f6dc3"
  325. )
  326. assert ordered_dict["137d3d2f-4224-42d9-b8c6-cbee9ff2872d"].get("parent_operation_ids") == [
  327. "4ef63a48-a27c-4d1e-a0ee-2fbbdbe3be74",
  328. "0a7eff92-fe2a-411c-92a6-73d6f3810516",
  329. ]
  330. assert not ordered_dict["779c2630-64bf-47ca-8a98-9ac8a60e85f7"].get("parent_operation_ids")
  331. assert (
  332. ordered_dict["0a7eff92-fe2a-411c-92a6-73d6f3810516"].get("parent_operation_ids").pop()
  333. == "779c2630-64bf-47ca-8a98-9ac8a60e85f7"
  334. )
  335. assert ordered_dict["92a7a247-1131-489c-8c3e-1e2389d4c673"].get("parent_operation_ids") == [
  336. "f82c4699-b392-4a3e-92b0-45d9e11126fe",
  337. "137d3d2f-4224-42d9-b8c6-cbee9ff2872d",
  338. "6f5c2ece-1977-48a1-847f-099b327c6ed1",
  339. ]
  340. for key in ordered_dict.keys():
  341. for node in pipeline_json["pipelines"][0]["nodes"]:
  342. if node["id"] == key:
  343. component_parameters = node["app_data"]["component_parameters"]
  344. assert ordered_dict[key]["runtime_image"] == component_parameters["runtime_image"]
  345. for image in sample_image_metadata:
  346. if ordered_dict[key]["runtime_image"] == image.metadata["image_name"]:
  347. assert ordered_dict[key]["image_pull_policy"] == image.metadata["pull_policy"]
  348. print(ordered_dict[key])
  349. for env in component_parameters["env_vars"]:
  350. var, value = env.split("=")
  351. assert ordered_dict[key]["pipeline_envs"][var] == value
  352. assert (
  353. ordered_dict[key]["pipeline_envs"]["AWS_ACCESS_KEY_ID"]
  354. == sample_metadata["metadata"]["cos_username"]
  355. )
  356. assert (
  357. ordered_dict[key]["pipeline_envs"]["AWS_SECRET_ACCESS_KEY"]
  358. == sample_metadata["metadata"]["cos_password"]
  359. )
  360. for arg in ["inputs", "outputs"]:
  361. if node["app_data"].get(arg):
  362. for file in node["app_data"][arg]:
  363. assert file in ordered_dict[key]["pipeline_" + arg]
  364. def test_collect_envs(processor):
  365. pipelines_test_file = "elyra/pipeline/tests/resources/archive/test.ipynb"
  366. # add system-owned envs with bogus values to ensure they get set to system-derived values,
  367. # and include some user-provided edge cases
  368. operation_envs = [
  369. 'ELYRA_RUNTIME_ENV="bogus_runtime"',
  370. 'ELYRA_ENABLE_PIPELINE_INFO="bogus_pipeline"',
  371. "ELYRA_WRITABLE_CONTAINER_DIR=", # simulate operation reference in pipeline
  372. 'AWS_ACCESS_KEY_ID="bogus_key"',
  373. 'AWS_SECRET_ACCESS_KEY="bogus_secret"',
  374. "USER_EMPTY_VALUE= ",
  375. "USER_TWO_EQUALS=KEY=value",
  376. "USER_NO_VALUE=",
  377. ]
  378. component_parameters = {
  379. "filename": pipelines_test_file,
  380. "env_vars": operation_envs,
  381. "runtime_image": "tensorflow/tensorflow:latest",
  382. }
  383. test_operation = GenericOperation(
  384. id="this-is-a-test-id",
  385. type="execution-node",
  386. classifier="execute-notebook-node",
  387. name="test",
  388. component_params=component_parameters,
  389. )
  390. envs = processor._collect_envs(test_operation, cos_secret=None, cos_username="Alice", cos_password="secret")
  391. assert envs["ELYRA_RUNTIME_ENV"] == "airflow"
  392. assert envs["AWS_ACCESS_KEY_ID"] == "Alice"
  393. assert envs["AWS_SECRET_ACCESS_KEY"] == "secret"
  394. assert envs["ELYRA_ENABLE_PIPELINE_INFO"] == "True"
  395. assert "ELYRA_WRITABLE_CONTAINER_DIR" not in envs
  396. assert "USER_EMPTY_VALUE" not in envs
  397. assert envs["USER_TWO_EQUALS"] == "KEY=value"
  398. assert "USER_NO_VALUE" not in envs
  399. # Repeat with non-None secret - ensure user and password envs are not present, but others are
  400. envs = processor._collect_envs(test_operation, cos_secret="secret", cos_username="Alice", cos_password="secret")
  401. assert envs["ELYRA_RUNTIME_ENV"] == "airflow"
  402. assert "AWS_ACCESS_KEY_ID" not in envs
  403. assert "AWS_SECRET_ACCESS_KEY" not in envs
  404. assert envs["ELYRA_ENABLE_PIPELINE_INFO"] == "True"
  405. assert "ELYRA_WRITABLE_CONTAINER_DIR" not in envs
  406. assert "USER_EMPTY_VALUE" not in envs
  407. assert envs["USER_TWO_EQUALS"] == "KEY=value"
  408. assert "USER_NO_VALUE" not in envs
  409. def test_unique_operation_name_existent(processor):
  410. op1 = SimpleNamespace(name="sample_operation")
  411. op2 = SimpleNamespace(name="sample_operation_2")
  412. op3 = SimpleNamespace(name="sample_operation_3")
  413. op4 = SimpleNamespace(name="sample_operation")
  414. op5 = SimpleNamespace(name="sample_operation_2")
  415. op6 = SimpleNamespace(name="sample_operation_3")
  416. sample_operation_list = [op1, op2, op3, op4, op5, op6]
  417. correct_name_list = [
  418. "sample_operation",
  419. "sample_operation_2",
  420. "sample_operation_3",
  421. "sample_operation_1",
  422. "sample_operation_2_1",
  423. "sample_operation_3_1",
  424. ]
  425. renamed_op_list = processor._create_unique_node_names(sample_operation_list)
  426. name_list = [op.name for op in renamed_op_list]
  427. assert name_list == correct_name_list
  428. def test_unique_operation_name_non_existent(processor):
  429. operation_name = "sample_operation_foo_bar"
  430. op1 = SimpleNamespace(name="sample_operation")
  431. op2 = SimpleNamespace(name="sample_operation_2")
  432. op3 = SimpleNamespace(name="sample_operation_3")
  433. sample_operation_list = [op1, op2, op3]
  434. correct_name_list = ["sample_operation", "sample_operation_2", "sample_operation_3"]
  435. renamed_op_list = processor._create_unique_node_names(sample_operation_list)
  436. name_list = [op.name for op in renamed_op_list]
  437. assert name_list == correct_name_list
  438. assert operation_name not in name_list
  439. def test_unique_operation_custom(processor):
  440. op1 = SimpleNamespace(name="this bash")
  441. op2 = SimpleNamespace(name="this@bash")
  442. op3 = SimpleNamespace(name="this!bash")
  443. op4 = SimpleNamespace(name="that^bash")
  444. op5 = SimpleNamespace(name="that bash")
  445. op6 = SimpleNamespace(name="that_bash_2")
  446. op7 = SimpleNamespace(name="that_bash_1")
  447. op8 = SimpleNamespace(name="that_bash_0")
  448. sample_operation_list = [op1, op2, op3, op4, op5, op6, op7, op8]
  449. correct_name_list = [
  450. "this_bash",
  451. "this_bash_1",
  452. "this_bash_2",
  453. "that_bash",
  454. "that_bash_1",
  455. "that_bash_2",
  456. "that_bash_1_1",
  457. "that_bash_0",
  458. ]
  459. scrubbed_list = processor._scrub_invalid_characters_from_list(sample_operation_list)
  460. renamed_op_list = processor._create_unique_node_names(scrubbed_list)
  461. name_list = [op.name for op in renamed_op_list]
  462. assert name_list == correct_name_list
  463. def test_process_list_value_function(processor):
  464. # Test values that will be successfully converted to list
  465. assert processor._process_list_value("") == []
  466. assert processor._process_list_value(None) == []
  467. assert processor._process_list_value("[]") == []
  468. assert processor._process_list_value("None") == []
  469. assert processor._process_list_value("['elem1']") == ["elem1"]
  470. assert processor._process_list_value("['elem1', 'elem2', 'elem3']") == ["elem1", "elem2", "elem3"]
  471. assert processor._process_list_value(" ['elem1', 'elem2' , 'elem3'] ") == ["elem1", "elem2", "elem3"]
  472. assert processor._process_list_value("[1, 2]") == [1, 2]
  473. assert processor._process_list_value("[True, False, True]") == [True, False, True]
  474. assert processor._process_list_value("[{'obj': 'val', 'obj2': 'val2'}, {}]") == [{"obj": "val", "obj2": "val2"}, {}]
  475. # Test values that will not be successfully converted to list
  476. # Surrounding quotes are added to string values for correct DAG render
  477. assert processor._process_list_value("[[]") == '"[[]"'
  478. assert processor._process_list_value("[elem1, elem2]") == '"[elem1, elem2]"'
  479. assert processor._process_list_value("elem1, elem2") == '"elem1, elem2"'
  480. assert processor._process_list_value(" elem1, elem2 ") == '"elem1, elem2"'
  481. assert processor._process_list_value("'elem1', 'elem2'") == "\"'elem1', 'elem2'\""
  482. def test_process_dictionary_value_function(processor):
  483. # Test values that will be successfully converted to dictionary
  484. assert processor._process_dictionary_value("") == {}
  485. assert processor._process_dictionary_value(None) == {}
  486. assert processor._process_dictionary_value("{}") == {}
  487. assert processor._process_dictionary_value("None") == {}
  488. assert processor._process_dictionary_value("{'key': 'value'}") == {"key": "value"}
  489. dict_as_str = "{'key1': 'value', 'key2': 'value'}"
  490. assert processor._process_dictionary_value(dict_as_str) == {"key1": "value", "key2": "value"}
  491. dict_as_str = " { 'key1': 'value' , 'key2' : 'value'} "
  492. assert processor._process_dictionary_value(dict_as_str) == {"key1": "value", "key2": "value"}
  493. dict_as_str = "{'key1': [1, 2, 3], 'key2': ['elem1', 'elem2']}"
  494. assert processor._process_dictionary_value(dict_as_str) == {"key1": [1, 2, 3], "key2": ["elem1", "elem2"]}
  495. dict_as_str = "{'key1': 2, 'key2': 'value', 'key3': True, 'key4': None, 'key5': [1, 2, 3]}"
  496. expected_value = {"key1": 2, "key2": "value", "key3": True, "key4": None, "key5": [1, 2, 3]}
  497. assert processor._process_dictionary_value(dict_as_str) == expected_value
  498. dict_as_str = "{'key1': {'key2': 2, 'key3': 3, 'key4': 4}, 'key5': {}}"
  499. expected_value = {
  500. "key1": {
  501. "key2": 2,
  502. "key3": 3,
  503. "key4": 4,
  504. },
  505. "key5": {},
  506. }
  507. assert processor._process_dictionary_value(dict_as_str) == expected_value
  508. # Test values that will not be successfully converted to dictionary
  509. # Surrounding quotes are added to string values for correct DAG render
  510. assert processor._process_dictionary_value("{{}") == '"{{}"'
  511. assert processor._process_dictionary_value("{key1: value, key2: value}") == '"{key1: value, key2: value}"'
  512. assert processor._process_dictionary_value(" { key1: value, key2: value } ") == '"{ key1: value, key2: value }"'
  513. assert processor._process_dictionary_value("key1: value, key2: value") == '"key1: value, key2: value"'
  514. assert processor._process_dictionary_value("{'key1': true}") == "\"{'key1': true}\""
  515. assert processor._process_dictionary_value("{'key': null}") == "\"{'key': null}\""
  516. dict_as_str = "{'key1': [elem1, elem2, elem3], 'key2': ['elem1', 'elem2']}"
  517. assert processor._process_dictionary_value(dict_as_str) == f'"{dict_as_str}"'
  518. dict_as_str = "{'key1': {key2: 2}, 'key3': ['elem1', 'elem2']}"
  519. assert processor._process_dictionary_value(dict_as_str) == f'"{dict_as_str}"'
  520. dict_as_str = "{'key1': {key2: 2}, 'key3': ['elem1', 'elem2']}"
  521. assert processor._process_dictionary_value(dict_as_str) == f'"{dict_as_str}"'
  522. @pytest.mark.parametrize(
  523. "parsed_pipeline", ["resources/validation_pipelines/aa_operator_same_name.json"], indirect=True
  524. )
  525. @pytest.mark.parametrize("catalog_instance", [AIRFLOW_TEST_OPERATOR_CATALOG], indirect=True)
  526. def test_same_name_operator_in_pipeline(monkeypatch, processor, catalog_instance, parsed_pipeline, sample_metadata):
  527. task_id = "e3922a29-f4c0-43d9-8d8b-4509aab80032"
  528. upstream_task_id = "0eb57369-99d1-4cd0-a205-8d8d96af3ad4"
  529. mocked_runtime = Metadata(
  530. name="test-metadata", display_name="test", schema_name="airflow", metadata=sample_metadata["metadata"]
  531. )
  532. monkeypatch.setattr(processor, "_get_metadata_configuration", lambda name=None, schemaspace=None: mocked_runtime)
  533. monkeypatch.setattr(processor, "_upload_dependencies_to_object_store", lambda w, x, y, prefix: True)
  534. pipeline_def_operation = parsed_pipeline.operations[task_id]
  535. pipeline_def_operation_parameters = pipeline_def_operation.component_params_as_dict
  536. pipeline_def_operation_str_param = pipeline_def_operation_parameters["str_no_default"]
  537. assert pipeline_def_operation_str_param["activeControl"] == "NestedEnumControl"
  538. assert set(pipeline_def_operation_str_param["NestedEnumControl"].keys()) == {"value", "option"}
  539. assert pipeline_def_operation_str_param["NestedEnumControl"]["value"] == upstream_task_id
  540. ordered_operations = processor._cc_pipeline(
  541. parsed_pipeline, pipeline_name="some-name", pipeline_instance_id="some-instance-name"
  542. )
  543. operation_parameters = ordered_operations[task_id]["component_params"]
  544. operation_parameter_str_command = operation_parameters["str_no_default"]
  545. assert operation_parameter_str_command == "\"{{ ti.xcom_pull(task_ids='TestOperator_1') }}\""
  546. def test_scrub_invalid_characters():
  547. invalid_character_list_string = "[-!@#$%^&*(){};:,/<>?|`~=+ ]"
  548. valid_character_list_string = list(string.ascii_lowercase + string.ascii_uppercase + string.digits)
  549. for character in invalid_character_list_string:
  550. assert AirflowPipelineProcessor.scrub_invalid_characters(character) == "_"
  551. for character in valid_character_list_string:
  552. assert AirflowPipelineProcessor.scrub_invalid_characters(character) == character