test_pipeline_app.py 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. """Tests for elyra-pipeline application"""
  17. import json
  18. from pathlib import Path
  19. import shutil
  20. from click.testing import CliRunner
  21. from conftest import KFP_COMPONENT_CACHE_INSTANCE
  22. import pytest
  23. from elyra.cli.pipeline_app import pipeline
  24. from elyra.metadata.manager import MetadataManager
  25. from elyra.metadata.metadata import Metadata
  26. from elyra.metadata.schemaspaces import Runtimes
  27. # used to drive generic parameter handling tests
  28. SUB_COMMANDS = ["run", "submit", "describe", "validate", "export"]
  29. @pytest.fixture
  30. def kubeflow_pipelines_runtime_instance():
  31. """Creates a Kubeflow Pipelines RTC and removes it after test."""
  32. instance_name = "valid_kfp_test_config"
  33. instance_config_file = Path(__file__).parent / "resources" / "runtime_configs" / f"{instance_name}.json"
  34. with open(instance_config_file, "r") as fd:
  35. instance_config = json.load(fd)
  36. md_mgr = MetadataManager(schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_ID)
  37. # clean possible orphaned instance...
  38. try:
  39. md_mgr.remove(instance_name)
  40. except Exception:
  41. pass
  42. runtime_instance = md_mgr.create(instance_name, Metadata(**instance_config))
  43. yield runtime_instance.name
  44. md_mgr.remove(runtime_instance.name)
  45. @pytest.fixture
  46. def airflow_runtime_instance():
  47. """Creates an airflow RTC and removes it after test."""
  48. instance_name = "valid_airflow_test_config"
  49. instance_config_file = Path(__file__).parent / "resources" / "runtime_configs" / f"{instance_name}.json"
  50. with open(instance_config_file, "r") as fd:
  51. instance_config = json.load(fd)
  52. md_mgr = MetadataManager(schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_ID)
  53. # clean possible orphaned instance...
  54. try:
  55. md_mgr.remove(instance_name)
  56. except Exception:
  57. pass
  58. runtime_instance = md_mgr.create(instance_name, Metadata(**instance_config))
  59. yield runtime_instance.name
  60. md_mgr.remove(runtime_instance.name)
  61. def test_no_opts():
  62. """Verify that all commands are displayed in help"""
  63. runner = CliRunner()
  64. result = runner.invoke(pipeline)
  65. assert "run Run a pipeline in your local environment" in result.output
  66. assert "submit Submit a pipeline to be executed on the server" in result.output
  67. assert "describe Display pipeline summary" in result.output
  68. assert "export Export a pipeline to a runtime-specific format" in result.output
  69. assert "validate Validate pipeline" in result.output
  70. assert result.exit_code == 0
  71. def test_bad_subcommand():
  72. runner = CliRunner()
  73. result = runner.invoke(pipeline, ["invalid_command"])
  74. assert "Error: No such command 'invalid_command'" in result.output
  75. assert result.exit_code != 0
  76. @pytest.mark.parametrize("subcommand", SUB_COMMANDS)
  77. def test_subcommand_no_opts(subcommand):
  78. runner = CliRunner()
  79. result = runner.invoke(pipeline, [subcommand])
  80. assert result.exit_code != 0
  81. assert "Error: Missing argument 'PIPELINE_PATH'" in result.output
  82. @pytest.mark.parametrize("subcommand", SUB_COMMANDS)
  83. def test_subcommand_invalid_pipeline_path(subcommand):
  84. """Verify that every command only accepts a valid pipeline_path file name"""
  85. runner = CliRunner()
  86. # test: file not found
  87. file_name = "no-such.pipeline"
  88. result = runner.invoke(pipeline, [subcommand, file_name])
  89. assert result.exit_code != 0
  90. assert f"Invalid value for 'PIPELINE_PATH': '{file_name}' is not a file." in result.output
  91. # test: file with wrong extension
  92. with runner.isolated_filesystem():
  93. file_name = "wrong.extension"
  94. with open(file_name, "w") as f:
  95. f.write("I am not a pipeline file.")
  96. result = runner.invoke(pipeline, [subcommand, file_name])
  97. assert result.exit_code != 0
  98. assert f"Invalid value for 'PIPELINE_PATH': '{file_name}' is not a .pipeline file." in result.output
  99. @pytest.mark.parametrize("subcommand", SUB_COMMANDS)
  100. def test_subcommand_with_no_pipelines_field(subcommand, kubeflow_pipelines_runtime_instance):
  101. """Verify that every command properly detects pipeline issues"""
  102. runner = CliRunner()
  103. with runner.isolated_filesystem():
  104. pipeline_file = "pipeline_without_pipelines_field.pipeline"
  105. pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / pipeline_file
  106. assert pipeline_file_path.is_file()
  107. # every CLI command invocation requires these parameters
  108. invoke_parameters = [subcommand, str(pipeline_file_path)]
  109. if subcommand in ["submit", "export"]:
  110. # these commands also require a runtime configuration
  111. invoke_parameters.extend(["--runtime-config", kubeflow_pipelines_runtime_instance])
  112. result = runner.invoke(pipeline, invoke_parameters)
  113. assert result.exit_code != 0
  114. assert "Pipeline is missing 'pipelines' field." in result.output
  115. @pytest.mark.parametrize("subcommand", SUB_COMMANDS)
  116. def test_subcommand_with_zero_length_pipelines_field(subcommand, kubeflow_pipelines_runtime_instance):
  117. """Verify that every command properly detects pipeline issues"""
  118. runner = CliRunner()
  119. with runner.isolated_filesystem():
  120. pipeline_file = "pipeline_with_zero_length_pipelines_field.pipeline"
  121. pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / pipeline_file
  122. assert pipeline_file_path.is_file()
  123. # every CLI command invocation requires these parameters
  124. invoke_parameters = [subcommand, str(pipeline_file_path)]
  125. if subcommand in ["submit", "export"]:
  126. # these commands also require a runtime configuration
  127. invoke_parameters.extend(["--runtime-config", kubeflow_pipelines_runtime_instance])
  128. result = runner.invoke(pipeline, invoke_parameters)
  129. assert result.exit_code != 0
  130. assert "Pipeline has zero length 'pipelines' field." in result.output
  131. @pytest.mark.parametrize("subcommand", SUB_COMMANDS)
  132. def test_subcommand_with_no_nodes(subcommand, kubeflow_pipelines_runtime_instance):
  133. """Verify that every command properly detects pipeline issues"""
  134. # don't run this test for the `describe` command
  135. # (see test_describe_with_no_nodes)
  136. if subcommand == "describe":
  137. return
  138. runner = CliRunner()
  139. with runner.isolated_filesystem():
  140. pipeline_file = "pipeline_with_zero_nodes.pipeline"
  141. pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / pipeline_file
  142. assert pipeline_file_path.is_file()
  143. # every CLI command invocation requires these parameters
  144. invoke_parameters = [subcommand, str(pipeline_file_path)]
  145. if subcommand in ["submit", "export"]:
  146. # these commands also require a runtime configuration
  147. invoke_parameters.extend(["--runtime-config", kubeflow_pipelines_runtime_instance])
  148. result = runner.invoke(pipeline, invoke_parameters)
  149. assert result.exit_code != 0
  150. def test_describe_with_no_nodes():
  151. runner = CliRunner()
  152. with runner.isolated_filesystem():
  153. pipeline_file = "pipeline_with_zero_nodes.pipeline"
  154. pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / pipeline_file
  155. assert pipeline_file_path.is_file()
  156. result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
  157. assert result.exit_code == 0, result.output
  158. assert "Description: None" in result.output
  159. assert "Type: KUBEFLOW_PIPELINES" in result.output
  160. assert "Nodes: 0" in result.output
  161. assert "File Dependencies:\n None Listed" in result.output
  162. assert "Component Dependencies:\n None Listed" in result.output
  163. def test_describe_with_kfp_components():
  164. runner = CliRunner()
  165. pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "kfp_3_node_custom.pipeline"
  166. result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
  167. assert "Description: 3-node custom component pipeline" in result.output
  168. assert "Type: KUBEFLOW_PIPELINES" in result.output
  169. assert "Nodes: 3" in result.output
  170. assert "File Dependencies:\n None Listed" in result.output
  171. assert (
  172. "- https://raw.githubusercontent.com/kubeflow/pipelines/1.6.0/components/"
  173. "basics/Calculate_hash/component.yaml" in result.output
  174. )
  175. assert (
  176. "- /opt/anaconda3/envs/elyra-dev/share/jupyter/components/"
  177. "kfp/filter_text_using_shell_and_grep.yaml" in result.output
  178. )
  179. assert (
  180. "- https://raw.githubusercontent.com/kubeflow/pipelines/1.6.0/components/"
  181. "web/Download/component.yaml" in result.output
  182. )
  183. assert result.exit_code == 0
  184. @pytest.mark.parametrize("catalog_instance", [KFP_COMPONENT_CACHE_INSTANCE], indirect=True)
  185. def test_validate_with_kfp_components(jp_environ, kubeflow_pipelines_runtime_instance, catalog_instance):
  186. runner = CliRunner()
  187. pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "kfp_3_node_custom.pipeline"
  188. result = runner.invoke(
  189. pipeline, ["validate", str(pipeline_file_path), "--runtime-config", kubeflow_pipelines_runtime_instance]
  190. )
  191. assert "Validating pipeline..." in result.output
  192. assert result.exit_code == 0
  193. def test_describe_with_missing_kfp_component():
  194. runner = CliRunner()
  195. with runner.isolated_filesystem():
  196. valid_file_path = Path(__file__).parent / "resources" / "pipelines" / "kfp_3_node_custom.pipeline"
  197. pipeline_file_path = Path.cwd() / "foo.pipeline"
  198. with open(pipeline_file_path, "w") as pipeline_file:
  199. with open(valid_file_path) as valid_file:
  200. valid_data = json.load(valid_file)
  201. # Update known component name to trigger a missing component
  202. valid_data["pipelines"][0]["nodes"][0]["op"] = valid_data["pipelines"][0]["nodes"][0]["op"] + "Missing"
  203. pipeline_file.write(json.dumps(valid_data))
  204. result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
  205. assert "Description: 3-node custom component pipeline" in result.output
  206. assert "Type: KUBEFLOW_PIPELINES" in result.output
  207. assert "Nodes: 3" in result.output
  208. assert result.exit_code == 0
  209. def test_describe_notebook_script_report():
  210. """
  211. Test report output for notebook/script property when none, one or many instances are present
  212. :return:
  213. """
  214. runner = CliRunner()
  215. # Test report output when only notebook/s present
  216. pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_notebook.pipeline"
  217. result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
  218. assert "Notebooks:\n" in result.output
  219. assert "dummy_notebook_1.ipynb" in result.output
  220. assert "dummy_notebook_2.ipynb" in result.output
  221. # Ensure no entries for scripts
  222. assert "Scripts:\n None Listed" in result.output
  223. assert result.exit_code == 0
  224. # Test report output when only script/s are present
  225. pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_script.pipeline"
  226. result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
  227. assert "Scripts:\n" in result.output
  228. assert "dummy_script_1.py" in result.output
  229. assert "dummy_script_2.py" in result.output
  230. # Ensure no entries for notebooks
  231. assert "Notebooks:\n None Listed" in result.output
  232. assert result.exit_code == 0
  233. # Test report output when both notebook and script are present
  234. pipeline_file_path = (
  235. Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_notebook_and_script.pipeline"
  236. )
  237. result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
  238. assert "Notebooks:\n" in result.output
  239. assert "dummy_notebook_1.ipynb" in result.output
  240. assert "Scripts:\n" in result.output
  241. assert "dummy_script_1.py" in result.output
  242. assert result.exit_code == 0
  243. def test_describe_notebook_script_json():
  244. """
  245. Test json output for notebook/script property when none, one or many instances are present
  246. :return:
  247. """
  248. runner = CliRunner()
  249. # Test report output when only notebook/s present
  250. pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_notebook.pipeline"
  251. result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
  252. result_json = json.loads(result.output)
  253. # Ensure that script entry is absent
  254. assert not result_json.get("scripts")
  255. # Tests notebook output is a list and check list count
  256. assert isinstance(result_json.get("notebooks"), list) and len(result_json.get("notebooks")) == 2
  257. # Tests the list content. Constructs below to address random sort order in list
  258. assert Path(result_json.get("notebooks")[0]).name != Path(result_json.get("notebooks")[1]).name
  259. assert Path(result_json.get("notebooks")[0]).name in list(["dummy_notebook_1.ipynb", "dummy_notebook_2.ipynb"])
  260. assert Path(result_json.get("notebooks")[1]).name in list(["dummy_notebook_1.ipynb", "dummy_notebook_2.ipynb"])
  261. assert result.exit_code == 0
  262. # Test report output when only script/s are present
  263. pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_script.pipeline"
  264. result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
  265. result_json = json.loads(result.output)
  266. # Ensure that notebook entry is absent
  267. assert not result_json.get("notebooks")
  268. # Tests script output is a list and check list count
  269. assert isinstance(result_json.get("scripts"), list) and len(result_json.get("scripts")) == 2
  270. # Tests the list content. Constructs below to address random sort order in list
  271. assert Path(result_json.get("scripts")[0]).name != Path(result_json.get("scripts")[1]).name
  272. assert Path(result_json.get("scripts")[0]).name in list(["dummy_script_1.py", "dummy_script_2.py"])
  273. assert Path(result_json.get("scripts")[1]).name in list(["dummy_script_1.py", "dummy_script_2.py"])
  274. assert result.exit_code == 0
  275. # Test report output when both notebook and script are present
  276. pipeline_file_path = (
  277. Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_notebook_and_script.pipeline"
  278. )
  279. result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
  280. result_json = json.loads(result.output)
  281. # Tests output is a list.
  282. assert isinstance(result_json.get("notebooks"), list) and len(result_json.get("runtime_image")) == 1
  283. assert isinstance(result_json.get("scripts"), list) and len(result_json.get("runtime_image")) == 1
  284. # Tests the list content
  285. assert Path(result_json.get("notebooks")[0]).name == "dummy_notebook_1.ipynb"
  286. assert Path(result_json.get("scripts")[0]).name == "dummy_script_1.py"
  287. assert result.exit_code == 0
  288. def test_describe_runtime_image_report():
  289. """
  290. Test report output for runtime_image property when none, one or many instances are present
  291. :return:
  292. """
  293. runner = CliRunner()
  294. # Test report output when there are no runtime_images
  295. pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_zero_runtime_image.pipeline"
  296. result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
  297. assert "Runtime Image:\n None Listed" in result.output
  298. assert result.exit_code == 0
  299. # Test report output when there is a single runtime_image
  300. pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_one_runtime_image.pipeline"
  301. result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
  302. assert "Runtime Image:\n - tensorflow/tensorflow:2.0.0-py3" in result.output
  303. assert result.exit_code == 0
  304. # Test report output where there are two / multiple runtime_images
  305. pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_many_runtime_image.pipeline"
  306. result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
  307. assert "Runtime Image:\n" in result.output
  308. assert "- tensorflow/tensorflow:2.0.0-py3" in result.output
  309. assert "- elyra/examples:1.0.0-py3" in result.output
  310. assert result.exit_code == 0
  311. def test_describe_runtime_image_json():
  312. """
  313. Test json format output for runtime_image property when none, one or many instances are present
  314. :return:
  315. """
  316. runner = CliRunner()
  317. # Test json output when there are no runtime_images
  318. pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "kfp_3_node_custom.pipeline"
  319. result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
  320. result_json = json.loads(result.output)
  321. assert not result_json.get("runtime_image")
  322. assert result.exit_code == 0
  323. # Test json output when there is a single runtime_image
  324. pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_one_runtime_image.pipeline"
  325. result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
  326. result_json = json.loads(result.output)
  327. # Tests output is a list
  328. assert isinstance(result_json.get("runtime_image"), list) and len(result_json.get("runtime_image")) == 1
  329. # Tests the list content
  330. assert result_json.get("runtime_image")[0] == "tensorflow/tensorflow:2.0.0-py3"
  331. assert result.exit_code == 0
  332. # Test json output where there are two / multiple runtime_images
  333. pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_many_runtime_image.pipeline"
  334. result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
  335. result_json = json.loads(result.output)
  336. # Tests output is a list. Count is one as the same value of runtime_image is repeated
  337. assert isinstance(result_json.get("runtime_image"), list) and len(result_json.get("runtime_image")) == 2
  338. # Tests the list content
  339. assert result_json.get("runtime_image")[0] != result_json.get("runtime_image")[1]
  340. assert result_json.get("runtime_image")[0] in list(["tensorflow/tensorflow:2.0.0-py3", "elyra/examples:1.0.0-py3"])
  341. assert result_json.get("runtime_image")[1] in list(["tensorflow/tensorflow:2.0.0-py3", "elyra/examples:1.0.0-py3"])
  342. assert result.exit_code == 0
  343. def test_describe_mount_report():
  344. """
  345. Test report format output for mount property when none, one or many mounts are present
  346. :return:
  347. """
  348. runner = CliRunner()
  349. # Test report output when there are no mount volumes
  350. pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_zero_mount.pipeline"
  351. result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
  352. assert "Mounted Volumes:\n None Listed" in result.output
  353. assert result.exit_code == 0
  354. # Test report output when there is a single mount volume
  355. pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_one_mount.pipeline"
  356. result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
  357. assert "Mounted Volumes:\n - rwx-test-claim" in result.output
  358. assert result.exit_code == 0
  359. # Test report output where there are two / multiple mount volumes
  360. pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_many_mount.pipeline"
  361. result = runner.invoke(pipeline, ["describe", str(pipeline_file_path)])
  362. assert "Mounted Volumes:\n" in result.output
  363. assert " - rwx-test-claim" in result.output
  364. assert " - rwx-test-claim-1" in result.output
  365. assert result.exit_code == 0
  366. def test_describe_mount_json():
  367. """
  368. Test json output for mount property when none, one or many mounts are present
  369. :return:
  370. """
  371. runner = CliRunner()
  372. # Test json output when there are no mount volumes
  373. pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_zero_mount.pipeline"
  374. result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
  375. result_json = json.loads(result.output)
  376. assert not result_json.get("mounted_volumes")
  377. assert result.exit_code == 0
  378. # Test json output when there is a single mount volume
  379. pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_one_mount.pipeline"
  380. result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
  381. result_json = json.loads(result.output)
  382. assert isinstance(result_json.get("mounted_volumes"), list) and len(result_json.get("mounted_volumes")) == 1
  383. assert result_json.get("mounted_volumes")[0] == "rwx-test-claim"
  384. assert result.exit_code == 0
  385. # Test json output where there are two / multiple mount volumes
  386. pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "pipeline_with_many_mount.pipeline"
  387. result = runner.invoke(pipeline, ["describe", "--json", str(pipeline_file_path)])
  388. result_json = json.loads(result.output)
  389. assert isinstance(result_json.get("mounted_volumes"), list) and len(result_json.get("mounted_volumes")) == 2
  390. assert set(result_json.get("mounted_volumes")) == set(list(["rwx-test-claim", "rwx-test-claim-1"]))
  391. assert result.exit_code == 0
  392. def test_validate_with_missing_kfp_component(jp_environ, kubeflow_pipelines_runtime_instance):
  393. runner = CliRunner()
  394. with runner.isolated_filesystem():
  395. valid_file_path = Path(__file__).parent / "resources" / "pipelines" / "kfp_3_node_custom.pipeline"
  396. pipeline_file_path = Path.cwd() / "foo.pipeline"
  397. with open(pipeline_file_path, "w") as pipeline_file:
  398. with open(valid_file_path) as valid_file:
  399. valid_data = json.load(valid_file)
  400. # Update known component name to trigger a missing component
  401. valid_data["pipelines"][0]["nodes"][0]["op"] = valid_data["pipelines"][0]["nodes"][0]["op"] + "Missing"
  402. pipeline_file.write(json.dumps(valid_data))
  403. result = runner.invoke(
  404. pipeline, ["validate", str(pipeline_file_path), "--runtime-config", kubeflow_pipelines_runtime_instance]
  405. )
  406. assert "Validating pipeline..." in result.output
  407. assert "[Error][Calculate data hash] - This component was not found in the catalog." in result.output
  408. assert result.exit_code != 0
  409. def test_validate_with_no_runtime_config():
  410. runner = CliRunner()
  411. with runner.isolated_filesystem():
  412. pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "kfp_3_node_custom.pipeline"
  413. result = runner.invoke(pipeline, ["validate", str(pipeline_file_path)])
  414. assert "Validating pipeline..." in result.output
  415. assert (
  416. "[Error] - This pipeline contains at least one runtime-specific component, "
  417. "but pipeline runtime is 'local'" in result.output
  418. )
  419. assert result.exit_code != 0
  420. # ------------------------------------------------------------------
  421. # tests for 'submit' command
  422. # ------------------------------------------------------------------
  423. def test_submit_invalid_monitor_interval_option(kubeflow_pipelines_runtime_instance):
  424. """Verify that the '--monitor-timeout' option works as expected"""
  425. runner = CliRunner()
  426. with runner.isolated_filesystem():
  427. # dummy pipeline - it's not used
  428. pipeline_file_path = Path(__file__).parent / "resources" / "pipelines" / "kfp_3_node_custom.pipeline"
  429. assert pipeline_file_path.is_file()
  430. # this should fail: '--monitor-timeout' must be an integer
  431. invalid_option_value = "abc"
  432. result = runner.invoke(
  433. pipeline,
  434. [
  435. "submit",
  436. str(pipeline_file_path),
  437. "--runtime-config",
  438. kubeflow_pipelines_runtime_instance,
  439. "--monitor-timeout",
  440. invalid_option_value,
  441. ],
  442. )
  443. assert result.exit_code != 0
  444. assert (
  445. f"Invalid value for '--monitor-timeout': '{invalid_option_value}' is not "
  446. "a valid integer" in result.output
  447. )
  448. # this should fail: '--monitor-timeout' must be a positive integer
  449. invalid_option_value = 0
  450. result = runner.invoke(
  451. pipeline,
  452. [
  453. "submit",
  454. str(pipeline_file_path),
  455. "--runtime-config",
  456. kubeflow_pipelines_runtime_instance,
  457. "--monitor-timeout",
  458. invalid_option_value,
  459. ],
  460. )
  461. assert result.exit_code != 0
  462. assert (
  463. f"Invalid value for '--monitor-timeout': '{invalid_option_value}' is not "
  464. "a positive integer" in result.output
  465. )
  466. # ------------------------------------------------------------------
  467. # end tests for 'submit' command
  468. # ------------------------------------------------------------------
  469. # tests for 'export' command
  470. # ------------------------------------------------------------------
  471. def do_mock_export(output_path: str, dir_only=False):
  472. # simulate export result
  473. p = Path(output_path)
  474. # create parent directories, if required
  475. if not p.parent.is_dir():
  476. p.parent.mkdir(parents=True, exist_ok=True)
  477. if dir_only:
  478. return
  479. # create a mock export file
  480. with open(output_path, "w") as output:
  481. output.write("dummy export output")
  482. def prepare_export_work_dir(work_dir: str, source_dir: str):
  483. """Copies the files in source_dir to work_dir"""
  484. for file in Path(source_dir).glob("*"):
  485. shutil.copy(str(file), work_dir)
  486. # print for debug purposes; this is only displayed if an assert fails
  487. print(f"Work directory content: {list(Path(work_dir).glob('*'))}")
  488. def test_export_invalid_runtime_config():
  489. """Test user error scenarios: the specified runtime configuration is 'invalid'"""
  490. runner = CliRunner()
  491. # test pipeline; it's not used in this test
  492. pipeline_file = "kubeflow_pipelines.pipeline"
  493. p = Path(__file__).parent / "resources" / "pipelines" / f"{pipeline_file}"
  494. assert p.is_file()
  495. # no runtime configuration was specified
  496. result = runner.invoke(pipeline, ["export", str(p)])
  497. assert result.exit_code != 0, result.output
  498. assert "Error: Missing option '--runtime-config'." in result.output, result.output
  499. # runtime configuration does not exist
  500. config_name = "no-such-config"
  501. result = runner.invoke(pipeline, ["export", str(p), "--runtime-config", config_name])
  502. assert result.exit_code != 0, result.output
  503. assert f"Error: Invalid runtime configuration: {config_name}" in result.output
  504. assert f"No such instance named '{config_name}' was found in the runtimes schemaspace." in result.output
  505. def test_export_incompatible_runtime_config(kubeflow_pipelines_runtime_instance, airflow_runtime_instance):
  506. """
  507. Test user error scenarios: the specified runtime configuration is not compatible
  508. with the pipeline type, e.g. KFP pipeline with Airflow runtime config
  509. """
  510. runner = CliRunner()
  511. # try exporting a KFP pipeline using an Airflow runtime configuration
  512. pipeline_file = "kubeflow_pipelines.pipeline"
  513. p = Path(__file__).parent / "resources" / "pipelines" / f"{pipeline_file}"
  514. assert p.is_file()
  515. # try export using Airflow runtime configuration
  516. result = runner.invoke(pipeline, ["export", str(p), "--runtime-config", airflow_runtime_instance])
  517. assert result.exit_code != 0, result.output
  518. assert (
  519. "The runtime configuration type 'APACHE_AIRFLOW' does not "
  520. "match the pipeline's runtime type 'KUBEFLOW_PIPELINES'." in result.output
  521. )
  522. # try exporting an Airflow pipeline using a Kubeflow Pipelines runtime configuration
  523. pipeline_file = "airflow.pipeline"
  524. p = Path(__file__).parent / "resources" / "pipelines" / f"{pipeline_file}"
  525. assert p.is_file()
  526. # try export using KFP runtime configuration
  527. result = runner.invoke(pipeline, ["export", str(p), "--runtime-config", kubeflow_pipelines_runtime_instance])
  528. assert result.exit_code != 0, result.output
  529. assert (
  530. "The runtime configuration type 'KUBEFLOW_PIPELINES' does not "
  531. "match the pipeline's runtime type 'APACHE_AIRFLOW'." in result.output
  532. )
  533. @pytest.mark.parametrize("catalog_instance", [KFP_COMPONENT_CACHE_INSTANCE], indirect=True)
  534. def test_export_kubeflow_output_option(jp_environ, kubeflow_pipelines_runtime_instance, catalog_instance):
  535. """Verify that the '--output' option works as expected for Kubeflow Pipelines"""
  536. runner = CliRunner()
  537. with runner.isolated_filesystem():
  538. cwd = Path.cwd().resolve()
  539. # copy pipeline file and depencencies
  540. prepare_export_work_dir(str(cwd), Path(__file__).parent / "resources" / "pipelines")
  541. pipeline_file = "kfp_3_node_custom.pipeline"
  542. pipeline_file_path = cwd / pipeline_file
  543. # make sure the pipeline file exists
  544. assert pipeline_file_path.is_file() is True
  545. print(f"Pipeline file: {pipeline_file_path}")
  546. # Test: '--output' not specified; exported file is created
  547. # in current directory and named like the pipeline file with
  548. # a '.yaml' suffix
  549. expected_output_file = pipeline_file_path.with_suffix(".yaml")
  550. # this should succeed
  551. result = runner.invoke(
  552. pipeline, ["export", str(pipeline_file_path), "--runtime-config", kubeflow_pipelines_runtime_instance]
  553. )
  554. assert result.exit_code == 0, result.output
  555. assert f"was exported to '{str(expected_output_file)}" in result.output, result.output
  556. # Test: '--output' specified and ends with '.yaml'
  557. expected_output_file = cwd / "test-dir" / "output.yaml"
  558. # this should succeed
  559. result = runner.invoke(
  560. pipeline,
  561. [
  562. "export",
  563. str(pipeline_file_path),
  564. "--runtime-config",
  565. kubeflow_pipelines_runtime_instance,
  566. "--output",
  567. str(expected_output_file),
  568. ],
  569. )
  570. assert result.exit_code == 0, result.output
  571. assert f"was exported to '{str(expected_output_file)}" in result.output, result.output
  572. # Test: '--output' specified and ends with '.yml'
  573. expected_output_file = cwd / "test-dir-2" / "output.yml"
  574. # this should succeed
  575. result = runner.invoke(
  576. pipeline,
  577. [
  578. "export",
  579. str(pipeline_file_path),
  580. "--runtime-config",
  581. kubeflow_pipelines_runtime_instance,
  582. "--output",
  583. str(expected_output_file),
  584. ],
  585. )
  586. assert result.exit_code == 0, result.output
  587. assert f"was exported to '{str(expected_output_file)}" in result.output, result.output
  588. def test_export_airflow_output_option(airflow_runtime_instance):
  589. """Verify that the '--output' option works as expected for Airflow"""
  590. runner = CliRunner()
  591. with runner.isolated_filesystem():
  592. cwd = Path.cwd().resolve()
  593. # copy pipeline file and depencencies
  594. prepare_export_work_dir(str(cwd), Path(__file__).parent / "resources" / "pipelines")
  595. pipeline_file = "airflow.pipeline"
  596. pipeline_file_path = cwd / pipeline_file
  597. # make sure the pipeline file exists
  598. assert pipeline_file_path.is_file() is True
  599. print(f"Pipeline file: {pipeline_file_path}")
  600. #
  601. # Test: '--output' not specified; exported file is created
  602. # in current directory and named like the pipeline file with
  603. # a '.py' suffix
  604. #
  605. expected_output_file = pipeline_file_path.with_suffix(".py")
  606. print(f"expected_output_file -> {expected_output_file}")
  607. do_mock_export(str(expected_output_file))
  608. # this should fail: default output file already exists
  609. result = runner.invoke(
  610. pipeline, ["export", str(pipeline_file_path), "--runtime-config", airflow_runtime_instance]
  611. )
  612. assert result.exit_code != 0, result.output
  613. assert (
  614. f"Error: Output file '{expected_output_file}' exists and option '--overwrite' "
  615. "was not specified." in result.output
  616. ), result.output
  617. #
  618. # Test: '--output' specified and ends with '.py' (the value is treated
  619. # as a file name)
  620. #
  621. expected_output_file = cwd / "test-dir-2" / "output.py"
  622. do_mock_export(str(expected_output_file))
  623. # this should fail: specified output file already exists
  624. result = runner.invoke(
  625. pipeline,
  626. [
  627. "export",
  628. str(pipeline_file_path),
  629. "--runtime-config",
  630. airflow_runtime_instance,
  631. "--output",
  632. str(expected_output_file),
  633. ],
  634. )
  635. assert result.exit_code != 0, result.output
  636. assert (
  637. f"Error: Output file '{expected_output_file}' exists and option '--overwrite' "
  638. "was not specified." in result.output
  639. ), result.output
  640. #
  641. # Test: '--output' specified and does not end with '.py' (the value
  642. # is treated as a directory)
  643. #
  644. output_dir = cwd / "test-dir-3"
  645. expected_output_file = output_dir / Path(pipeline_file).with_suffix(".py")
  646. do_mock_export(str(expected_output_file))
  647. # this should fail: specified output file already exists
  648. result = runner.invoke(
  649. pipeline,
  650. [
  651. "export",
  652. str(pipeline_file_path),
  653. "--runtime-config",
  654. airflow_runtime_instance,
  655. "--output",
  656. str(output_dir),
  657. ],
  658. )
  659. assert result.exit_code != 0, result.output
  660. assert (
  661. f"Error: Output file '{expected_output_file}' exists and option '--overwrite' "
  662. "was not specified." in result.output
  663. ), result.output
  664. @pytest.mark.parametrize("catalog_instance", [KFP_COMPONENT_CACHE_INSTANCE], indirect=True)
  665. def test_export_kubeflow_overwrite_option(jp_environ, kubeflow_pipelines_runtime_instance, catalog_instance):
  666. """Verify that the '--overwrite' option works as expected for Kubeflow Pipelines"""
  667. runner = CliRunner()
  668. with runner.isolated_filesystem():
  669. cwd = Path.cwd().resolve()
  670. # copy pipeline file and depencencies
  671. prepare_export_work_dir(str(cwd), Path(__file__).parent / "resources" / "pipelines")
  672. pipeline_file = "kfp_3_node_custom.pipeline"
  673. pipeline_file_path = cwd / pipeline_file
  674. # make sure the pipeline file exists
  675. assert pipeline_file_path.is_file() is True
  676. print(f"Pipeline file: {pipeline_file_path}")
  677. # Test: '--overwrite' not specified; exported file is created
  678. # in current directory and named like the pipeline file with
  679. # a '.yaml' suffix
  680. expected_output_file = pipeline_file_path.with_suffix(".yaml")
  681. # this should succeed
  682. result = runner.invoke(
  683. pipeline, ["export", str(pipeline_file_path), "--runtime-config", kubeflow_pipelines_runtime_instance]
  684. )
  685. assert result.exit_code == 0, result.output
  686. assert f"was exported to '{str(expected_output_file)}" in result.output, result.output
  687. # Test: '--overwrite' not specified; the output already exists
  688. # this should fail
  689. result = runner.invoke(
  690. pipeline, ["export", str(pipeline_file_path), "--runtime-config", kubeflow_pipelines_runtime_instance]
  691. )
  692. assert result.exit_code != 0, result.output
  693. assert f"Output file '{expected_output_file}' exists and option '--overwrite' was not" in result.output
  694. # Test: '--overwrite' specified; exported file is created
  695. # in current directory and named like the pipeline file with
  696. # a '.yaml' suffix
  697. # this should succeed
  698. result = runner.invoke(
  699. pipeline,
  700. ["export", str(pipeline_file_path), "--runtime-config", kubeflow_pipelines_runtime_instance, "--overwrite"],
  701. )
  702. assert result.exit_code == 0, result.output
  703. assert f"was exported to '{str(expected_output_file)}" in result.output, result.output
  704. # ------------------------------------------------------------------
  705. # end tests for 'export' command
  706. # ------------------------------------------------------------------