pipeline_app.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import asyncio
  17. from collections import OrderedDict
  18. import json
  19. from operator import itemgetter
  20. import os
  21. from pathlib import Path
  22. import sys
  23. from typing import Optional
  24. import warnings
  25. import click
  26. from colorama import Fore
  27. from colorama import Style
  28. from kfp import Client as ArgoClient
  29. from elyra._version import __version__
  30. from elyra.metadata.manager import MetadataManager
  31. from elyra.metadata.schema import SchemaManager
  32. from elyra.metadata.schemaspaces import Runtimes
  33. from elyra.pipeline import pipeline_constants
  34. from elyra.pipeline.component_catalog import ComponentCache
  35. from elyra.pipeline.kfp.kfp_authentication import AuthenticationError
  36. from elyra.pipeline.kfp.kfp_authentication import KFPAuthenticator
  37. from elyra.pipeline.parser import PipelineParser
  38. from elyra.pipeline.pipeline_definition import Pipeline
  39. from elyra.pipeline.pipeline_definition import PipelineDefinition
  40. from elyra.pipeline.processor import PipelineProcessorManager
  41. from elyra.pipeline.processor import PipelineProcessorResponse
  42. from elyra.pipeline.runtime_type import RuntimeProcessorType
  43. from elyra.pipeline.runtime_type import RuntimeTypeResources
  44. from elyra.pipeline.runtimes_metadata import RuntimesMetadata
  45. from elyra.pipeline.validation import PipelineValidationManager
  46. from elyra.pipeline.validation import ValidationSeverity
  47. if sys.stdout.isatty():
  48. from yaspin import yaspin as Spinner
  49. else:
  50. from .pipeline_app_utils import StaticTextSpinner as Spinner
  51. # custom exit code - a timeout occurred
  52. EXIT_TIMEDOUT = 124
  53. SEVERITY = {
  54. ValidationSeverity.Error: "Error",
  55. ValidationSeverity.Warning: "Warning",
  56. ValidationSeverity.Hint: "Hint",
  57. ValidationSeverity.Information: "Information",
  58. }
  59. def _get_runtime_config(runtime_config_name: Optional[str]) -> Optional[RuntimesMetadata]:
  60. """Fetch runtime configuration for the specified name"""
  61. if not runtime_config_name or runtime_config_name == "local":
  62. # No runtime configuration was specified or it is local.
  63. # Cannot use metadata manager to determine the runtime type.
  64. return None
  65. try:
  66. metadata_manager = MetadataManager(schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_NAME)
  67. return metadata_manager.get(runtime_config_name)
  68. except Exception as e:
  69. raise click.ClickException(f"Invalid runtime configuration: {runtime_config_name}\n {e}")
  70. def _get_runtime_type(runtime_config_name: Optional[str]) -> Optional[str]:
  71. """Get runtime type for the provided runtime configuration name"""
  72. runtime_config = _get_runtime_config(runtime_config_name)
  73. if runtime_config:
  74. return runtime_config.metadata.get("runtime_type")
  75. return None
  76. def _get_runtime_schema_name(runtime_config_name: Optional[str]) -> Optional[str]:
  77. """Get runtime schema name for the provided runtime configuration name"""
  78. if not runtime_config_name or runtime_config_name == "local":
  79. # No runtime configuration was specified or it is local.
  80. # Cannot use metadata manager to determine the runtime type.
  81. return "local"
  82. runtime_config = _get_runtime_config(runtime_config_name)
  83. if runtime_config:
  84. return runtime_config.schema_name
  85. return None
  86. def _get_runtime_display_name(schema_name: Optional[str]) -> Optional[str]:
  87. """Return the display name for the specified runtime schema_name"""
  88. if not schema_name or schema_name == "local":
  89. # No schame name was specified or it is local.
  90. # Cannot use metadata manager to determine the display name.
  91. return schema_name
  92. try:
  93. schema_manager = SchemaManager.instance()
  94. schema = schema_manager.get_schema(Runtimes.RUNTIMES_SCHEMASPACE_NAME, schema_name)
  95. return schema["display_name"]
  96. except Exception as e:
  97. raise click.ClickException(f"Invalid runtime configuration: {schema_name}\n {e}")
  98. def _get_pipeline_runtime_type(pipeline_definition: dict) -> Optional[str]:
  99. """Return the runtime type name associated with the given pipeline"""
  100. return pipeline_definition.get("pipelines", [{}])[0].get("app_data", {}).get("runtime_type")
  101. def _validate_pipeline_runtime(primary_pipeline: Pipeline, runtime: str) -> bool:
  102. """
  103. Generic pipelines do not have a persisted runtime type, and can be run on any runtime
  104. Runtime specific pipeline have a runtime type, and can only be run on matching runtime
  105. """
  106. is_valid = True
  107. if runtime: # Only perform validation if a target runtime has been specified
  108. pipeline_runtime = primary_pipeline.runtime
  109. if pipeline_runtime and pipeline_runtime != runtime:
  110. is_valid = False
  111. return is_valid
  112. def _preprocess_pipeline(
  113. pipeline_path: str, runtime: Optional[str] = None, runtime_config: Optional[str] = None
  114. ) -> dict:
  115. pipeline_path = os.path.expanduser(pipeline_path)
  116. pipeline_abs_path = os.path.join(os.getcwd(), pipeline_path)
  117. pipeline_dir = os.path.dirname(pipeline_abs_path)
  118. pipeline_name = os.path.splitext(os.path.basename(pipeline_abs_path))[0]
  119. if not os.path.exists(pipeline_abs_path):
  120. raise click.ClickException(f"Pipeline file not found: '{pipeline_abs_path}'\n")
  121. try:
  122. pipeline_definition = PipelineDefinition(pipeline_abs_path)
  123. except ValueError as ve:
  124. raise click.ClickException(f"Pipeline file is invalid: \n {ve}")
  125. try:
  126. primary_pipeline = pipeline_definition.primary_pipeline
  127. except Exception as e:
  128. raise click.ClickException(e)
  129. try:
  130. for pipeline in pipeline_definition.pipelines:
  131. for node in pipeline.nodes:
  132. filename = node.get_component_parameter("filename")
  133. if filename:
  134. abs_path = os.path.join(pipeline_dir, filename)
  135. node.set_component_parameter("filename", abs_path)
  136. except Exception as e:
  137. raise click.ClickException(f"Error pre-processing pipeline: \n {e}")
  138. # update pipeline transient fields
  139. primary_pipeline.set("name", pipeline_name)
  140. primary_pipeline.set("source", os.path.basename(pipeline_abs_path))
  141. # Only update the following if values were provided
  142. if runtime:
  143. primary_pipeline.set("runtime", runtime)
  144. if runtime_config:
  145. primary_pipeline.set("runtime_config", runtime_config)
  146. return pipeline_definition.to_dict()
  147. def _print_issues(issues):
  148. # print validation issues
  149. for issue in sorted(issues, key=itemgetter("severity")):
  150. severity = f" [{SEVERITY[issue.get('severity')]}]"
  151. prefix = ""
  152. postfix = ""
  153. if issue.get("data"):
  154. if issue["data"].get("nodeName"):
  155. # issue is associated with a single node; display it
  156. prefix = f"[{issue['data'].get('nodeName')}]"
  157. if issue["data"].get("propertyName"):
  158. # issue is associated with a node property; display it
  159. prefix = f"{prefix}[{issue['data'].get('propertyName')}]"
  160. if issue["data"].get("value"):
  161. # issue is caused by the value of a node property; display it
  162. postfix = f"The current property value is '{issue['data'].get('value')}'."
  163. elif issue["data"].get("nodeNames") and isinstance(issue["data"]["nodeNames"], list):
  164. # issue is associated with multiple nodes
  165. postfix = "Nodes: "
  166. separator = ""
  167. for nn in issue["data"]["nodeNames"]:
  168. postfix = f"{postfix}{separator}'{nn}'"
  169. separator = ", "
  170. output = f"{severity}{prefix} - {issue['message']} {postfix}"
  171. click.echo(output)
  172. click.echo("")
  173. def _validate_pipeline_definition(pipeline_definition: PipelineDefinition):
  174. """Validate pipeline definition and display issues"""
  175. click.echo("Validating pipeline...")
  176. # validate pipeline
  177. validation_response = asyncio.get_event_loop().run_until_complete(
  178. PipelineValidationManager.instance().validate(pipeline=pipeline_definition)
  179. )
  180. # print validation issues
  181. issues = validation_response.to_json().get("issues")
  182. _print_issues(issues)
  183. if validation_response.has_fatal:
  184. # raise an exception and let the caller decide what to do
  185. raise click.ClickException("Unable to continue due to pipeline validation issues.")
  186. def _execute_pipeline(pipeline_definition) -> PipelineProcessorResponse:
  187. try:
  188. # parse pipeline
  189. pipeline_object = PipelineParser().parse(pipeline_definition)
  190. # process pipeline
  191. with warnings.catch_warnings():
  192. warnings.simplefilter("ignore")
  193. response = asyncio.get_event_loop().run_until_complete(
  194. PipelineProcessorManager.instance().process(pipeline_object)
  195. )
  196. return response
  197. except ValueError as ve:
  198. raise click.ClickException(f"Error parsing pipeline: \n {ve}")
  199. except RuntimeError as re:
  200. raise click.ClickException(f"Error processing pipeline: \n {re} \n {re.__cause__}")
  201. def _build_component_cache():
  202. """Initialize a ComponentCache instance and wait for it to complete all tasks"""
  203. with Spinner(text="Initializing the component cache..."):
  204. component_cache = ComponentCache.instance(emulate_server_app=True)
  205. component_cache.load()
  206. component_cache.wait_for_all_cache_tasks()
  207. def validate_pipeline_path(ctx, param, value):
  208. """Callback for pipeline_path parameter"""
  209. if not value.is_file():
  210. raise click.BadParameter(f"'{value}' is not a file.")
  211. if value.suffix != ".pipeline":
  212. raise click.BadParameter(f"'{value}' is not a .pipeline file.")
  213. return value
  214. def print_banner(title):
  215. click.echo(Fore.CYAN + "────────────────────────────────────────────────────────────────" + Style.RESET_ALL)
  216. click.echo(f"{Fore.CYAN} {title}{Style.RESET_ALL}")
  217. click.echo(Fore.CYAN + "────────────────────────────────────────────────────────────────" + Style.RESET_ALL)
  218. click.echo()
  219. def print_info(title, info_list):
  220. click.echo(f"{Fore.CYAN}❯ {title}{Style.RESET_ALL}")
  221. for info_item in info_list:
  222. if isinstance(info_item, str):
  223. click.echo(f" {info_item}")
  224. else:
  225. click.echo(f" {info_item[0]}: {info_item[1]}")
  226. click.echo()
  227. def print_version():
  228. print_info("Version", [f"elyra {__version__}"])
  229. @click.group()
  230. @click.version_option(__version__, message="v%(version)s")
  231. def pipeline():
  232. """
  233. Run Elyra pipelines in your local environment or submit them to an external service,
  234. such as Kubeflow Pipelines or Apache Airflow.
  235. Find more information at: https://elyra.readthedocs.io/en/latest/
  236. """
  237. pass
  238. @click.command()
  239. @click.option("--runtime-config", required=False, help="Runtime config where the pipeline should be processed")
  240. @click.argument("pipeline_path", type=Path, callback=validate_pipeline_path)
  241. def validate(pipeline_path, runtime_config="local"):
  242. """
  243. Validate pipeline
  244. """
  245. click.echo()
  246. print_banner("Elyra Pipeline Validation")
  247. runtime = _get_runtime_schema_name(runtime_config)
  248. pipeline_definition = _preprocess_pipeline(pipeline_path, runtime=runtime, runtime_config=runtime_config)
  249. pipeline_runtime_type = _get_pipeline_runtime_type(pipeline_definition)
  250. if pipeline_runtime_type:
  251. _build_component_cache()
  252. try:
  253. _validate_pipeline_definition(pipeline_definition)
  254. except Exception:
  255. raise click.ClickException("Pipeline validation FAILED.")
  256. def validate_timeout_option(ctx, param, value):
  257. """Callback for monitor-timeout parameter validation"""
  258. try:
  259. value = int(value)
  260. if value <= 0:
  261. raise ValueError()
  262. except ValueError:
  263. raise click.BadParameter(f"'{value}' is not a positive integer.")
  264. else:
  265. return value
  266. @click.command()
  267. @click.argument("pipeline_path", type=Path, callback=validate_pipeline_path)
  268. @click.option("--json", "json_option", is_flag=True, required=False, help="Display pipeline summary in JSON format")
  269. @click.option(
  270. "--runtime-config",
  271. "runtime_config_name",
  272. required=True,
  273. help="Runtime config where the pipeline should be processed",
  274. )
  275. @click.option(
  276. "--monitor",
  277. "monitor_option",
  278. is_flag=True,
  279. required=False,
  280. help="Monitor the pipeline run (Supported for Kubeflow Pipelines only)",
  281. )
  282. @click.option(
  283. "--monitor-timeout",
  284. "timeout_option",
  285. type=int,
  286. default=60,
  287. show_default=True,
  288. required=False,
  289. help="Monitoring timeout in minutes.",
  290. callback=validate_timeout_option,
  291. )
  292. def submit(json_option, pipeline_path, runtime_config_name, monitor_option, timeout_option):
  293. """
  294. Submit a pipeline to be executed on the server
  295. """
  296. click.echo()
  297. print_banner("Elyra Pipeline Submission")
  298. runtime_config = _get_runtime_config(runtime_config_name)
  299. runtime_schema = runtime_config.schema_name
  300. pipeline_definition = _preprocess_pipeline(
  301. pipeline_path, runtime=runtime_schema, runtime_config=runtime_config_name
  302. )
  303. pipeline_runtime_type = _get_pipeline_runtime_type(pipeline_definition)
  304. if pipeline_runtime_type:
  305. _build_component_cache()
  306. try:
  307. _validate_pipeline_definition(pipeline_definition)
  308. except Exception:
  309. raise click.ClickException("Pipeline validation FAILED. The pipeline was not submitted for execution.")
  310. with Spinner(text="Submitting pipeline..."):
  311. response: PipelineProcessorResponse = _execute_pipeline(pipeline_definition)
  312. if not json_option:
  313. if response:
  314. msg = []
  315. # If there's a git_url attr, assume Apache Airflow DAG repo.
  316. # TODO: this will need to be revisited once front-end is decoupled from runtime platforms.
  317. if hasattr(response, "git_url"):
  318. msg.append(f"Apache Airflow DAG has been pushed to: {response.git_url}")
  319. msg.append(f"Check the status of your job at: {response.run_url}")
  320. if response.object_storage_path is not None and response.object_storage_url is not None:
  321. msg.append(
  322. f"The results and outputs are in the {response.object_storage_path} "
  323. f"working directory in {response.object_storage_url}"
  324. )
  325. print_info("Job submission succeeded", msg)
  326. click.echo()
  327. print_banner("Elyra Pipeline Submission Complete")
  328. else:
  329. if response:
  330. click.echo()
  331. print(json.dumps(response.to_json(), indent=4))
  332. # Start pipeline run monitoring, if requested
  333. if runtime_schema == "kfp" and monitor_option:
  334. minute_str = "minutes" if timeout_option > 1 else "minute"
  335. try:
  336. msg = (
  337. f"Monitoring status of pipeline run '{response.run_id}' for up to " f"{timeout_option} {minute_str}..."
  338. )
  339. with Spinner(text=msg):
  340. status = _monitor_kfp_submission(runtime_config, runtime_config_name, response.run_id, timeout_option)
  341. except TimeoutError:
  342. click.echo(
  343. "Monitoring was stopped because the timeout threshold "
  344. f"({timeout_option} {minute_str}) was exceeded. The pipeline is still running."
  345. )
  346. sys.exit(EXIT_TIMEDOUT)
  347. else:
  348. # The following are known KFP states: 'succeeded', 'failed', 'skipped',
  349. # 'error'. Treat 'unknown' as error. Exit with appropriate status code.
  350. click.echo(f"Monitoring ended with run status: {status}")
  351. if status.lower() not in ["succeeded", "skipped"]:
  352. # Click appears to use non-zero exit codes 1 (ClickException)
  353. # and 2 (UsageError). Terminate.
  354. sys.exit(click.ClickException.exit_code)
  355. def _monitor_kfp_submission(runtime_config: dict, runtime_config_name: str, run_id: str, timeout: int) -> str:
  356. """Monitor the status of a Kubeflow Pipelines run"""
  357. try:
  358. # Authenticate with the KFP server
  359. auth_info = KFPAuthenticator().authenticate(
  360. runtime_config.metadata["api_endpoint"].rstrip("/"),
  361. auth_type_str=runtime_config.metadata.get("auth_type"),
  362. runtime_config_name=runtime_config_name,
  363. auth_parm_1=runtime_config.metadata.get("api_username"),
  364. auth_parm_2=runtime_config.metadata.get("api_password"),
  365. )
  366. except AuthenticationError as ae:
  367. if ae.get_request_history() is not None:
  368. click.echo("An authentication error was raised. Diagnostic information follows.")
  369. click.echo(ae.request_history_to_string())
  370. raise click.ClickException(f"Kubeflow authentication failed: {ae}")
  371. try:
  372. # Create a Kubeflow Pipelines client. There is no need to use a Tekton client,
  373. # because the monitoring API is agnostic.
  374. client = ArgoClient(
  375. host=runtime_config.metadata["api_endpoint"].rstrip("/"),
  376. cookies=auth_info.get("cookies", None),
  377. credentials=auth_info.get("credentials", None),
  378. existing_token=auth_info.get("existing_token", None),
  379. namespace=runtime_config.metadata.get("user_namespace"),
  380. )
  381. # wait for the run to complete or timeout (API uses seconds as unit - convert)
  382. run_details = client.wait_for_run_completion(run_id, timeout * 60)
  383. except TimeoutError:
  384. # pipeline processing did not finish yet, stop monitoring
  385. raise
  386. except Exception as ex:
  387. # log error and return 'unknown' status
  388. click.echo(f"Monitoring failed: {type(ex)}: {ex}")
  389. return "unknown"
  390. else:
  391. return run_details.run.status
  392. @click.command()
  393. @click.option("--json", "json_option", is_flag=True, required=False, help="Display pipeline summary in JSON format")
  394. @click.argument("pipeline_path", type=Path, callback=validate_pipeline_path)
  395. def run(json_option, pipeline_path):
  396. """
  397. Run a pipeline in your local environment
  398. """
  399. click.echo()
  400. print_banner("Elyra Pipeline Local Run")
  401. pipeline_definition = _preprocess_pipeline(pipeline_path, runtime="local", runtime_config="local")
  402. try:
  403. _validate_pipeline_definition(pipeline_definition)
  404. except Exception:
  405. raise click.ClickException("Pipeline validation FAILED. The pipeline was not run.")
  406. response = _execute_pipeline(pipeline_definition)
  407. if not json_option:
  408. click.echo()
  409. print_banner("Elyra Pipeline Local Run Complete")
  410. else:
  411. click.echo()
  412. if response:
  413. print(json.dumps(response.to_json(), indent=4))
  414. @click.command()
  415. @click.option("--json", "json_option", is_flag=True, required=False, help="Display pipeline summary in JSON format")
  416. @click.argument("pipeline_path", type=Path, callback=validate_pipeline_path)
  417. def describe(json_option, pipeline_path):
  418. """
  419. Display pipeline summary
  420. """
  421. indent_length = 4
  422. blank_field = "Not Specified"
  423. blank_list = ["None Listed"]
  424. pipeline_definition = _preprocess_pipeline(pipeline_path, runtime="local", runtime_config="local")
  425. primary_pipeline = PipelineDefinition(pipeline_definition=pipeline_definition).primary_pipeline
  426. describe_dict = OrderedDict()
  427. describe_dict["name"] = primary_pipeline.name
  428. describe_dict["description"] = primary_pipeline.get_property("description")
  429. describe_dict["type"] = primary_pipeline.type
  430. describe_dict["version"] = primary_pipeline.version
  431. describe_dict["runtime"] = primary_pipeline.get_property("runtime")
  432. describe_dict["nodes"] = len(primary_pipeline.nodes)
  433. describe_dict["scripts"] = set()
  434. describe_dict["notebooks"] = set()
  435. describe_dict["file_dependencies"] = set()
  436. describe_dict["component_dependencies"] = set()
  437. describe_dict[pipeline_constants.MOUNTED_VOLUMES] = set()
  438. describe_dict[pipeline_constants.RUNTIME_IMAGE] = set()
  439. for node in primary_pipeline.nodes:
  440. # collect information about file dependencies
  441. for dependency in node.get_component_parameter("dependencies", []):
  442. describe_dict["file_dependencies"].add(f"{dependency}")
  443. # collect information about component dependencies
  444. if node.component_source is not None:
  445. describe_dict["component_dependencies"].add(node.component_source)
  446. # collect information of mounted volumes
  447. for mounted_volume in node.get_component_parameter(pipeline_constants.MOUNTED_VOLUMES, []):
  448. describe_dict[pipeline_constants.MOUNTED_VOLUMES].add(f"{mounted_volume.pvc_name}")
  449. # collection runtime image details
  450. temp_runtime_image_value = node.get_component_parameter(pipeline_constants.RUNTIME_IMAGE)
  451. if temp_runtime_image_value:
  452. describe_dict[pipeline_constants.RUNTIME_IMAGE].add(f"{temp_runtime_image_value}")
  453. # collect notebook / script name when pipeline is generic
  454. if describe_dict["runtime"] == "Generic":
  455. temp_value = node.get_component_parameter("filename")
  456. if not temp_value:
  457. pass
  458. elif Path(Path(temp_value).name).suffix == ".ipynb":
  459. describe_dict["notebooks"].add(temp_value)
  460. else:
  461. describe_dict["scripts"].add(temp_value)
  462. if not json_option:
  463. click.echo()
  464. print_banner("Elyra Pipeline details")
  465. for key in list(describe_dict):
  466. readable_key = " ".join(key.title().split("_"))
  467. if isinstance(describe_dict[key], set):
  468. click.echo(f"{readable_key}:")
  469. if not describe_dict.get(key):
  470. click.echo(f"{' ' * indent_length}{blank_list[0]}")
  471. else:
  472. for item in describe_dict.get(key, blank_list):
  473. click.echo(f"{' ' * indent_length}- {item}")
  474. else:
  475. click.echo(f"{readable_key}: {describe_dict.get(key, blank_field)}")
  476. else:
  477. for key in list(describe_dict):
  478. if isinstance(describe_dict[key], set):
  479. describe_dict[key] = list(describe_dict[key])
  480. value = describe_dict.get(key)
  481. if not value:
  482. describe_dict.pop(key)
  483. click.echo(json.dumps(describe_dict, indent=indent_length))
  484. @click.command()
  485. @click.argument("pipeline_path", type=Path, callback=validate_pipeline_path)
  486. @click.option("--runtime-config", required=True, help="Runtime configuration name.")
  487. @click.option(
  488. "--output",
  489. required=False,
  490. type=Path,
  491. help="Exported file name (including optional path). Defaults to " " the current directory and the pipeline name.",
  492. )
  493. @click.option("--overwrite", is_flag=True, help="Overwrite output file if it already exists.")
  494. def export(pipeline_path, runtime_config, output, overwrite):
  495. """
  496. Export a pipeline to a runtime-specific format
  497. """
  498. click.echo()
  499. print_banner("Elyra pipeline export")
  500. rtc = _get_runtime_config(runtime_config)
  501. runtime_schema = rtc.schema_name
  502. runtime_type = rtc.metadata.get("runtime_type")
  503. pipeline_definition = _preprocess_pipeline(pipeline_path, runtime=runtime_schema, runtime_config=runtime_config)
  504. # Verify that the pipeline's runtime type is compatible with the
  505. # runtime configuration
  506. pipeline_runtime_type = _get_pipeline_runtime_type(pipeline_definition)
  507. if pipeline_runtime_type and pipeline_runtime_type != "Generic" and pipeline_runtime_type != runtime_type:
  508. raise click.BadParameter(
  509. f"The runtime configuration type '{runtime_type}' does not match "
  510. f"the pipeline's runtime type '{pipeline_runtime_type}'.",
  511. param_hint="--runtime-config",
  512. )
  513. resources = RuntimeTypeResources.get_instance_by_type(RuntimeProcessorType.get_instance_by_name(runtime_type))
  514. supported_export_formats = resources.get_export_extensions()
  515. if len(supported_export_formats) == 0:
  516. raise click.ClickException(f"Runtime type '{runtime_type}' does not support export.")
  517. # If, in the future, a runtime supports multiple export output formats,
  518. # the user can choose one. For now, choose the only option.
  519. selected_export_format = supported_export_formats[0]
  520. selected_export_format_suffix = f".{selected_export_format}"
  521. # generate output file name from the user-provided input
  522. if output is None:
  523. # user did not specify an output; use current directory
  524. # and derive the file name from the pipeline file name
  525. output_path = Path.cwd()
  526. filename = f"{Path(pipeline_path).stem}{selected_export_format_suffix}"
  527. else:
  528. if output.suffix == selected_export_format_suffix:
  529. # user provided a file name
  530. output_path = output.parent
  531. filename = output.name
  532. else:
  533. # user provided a directory
  534. output_path = output
  535. filename = f"{Path(pipeline_path).stem}{selected_export_format_suffix}"
  536. output_file = output_path.resolve() / filename
  537. # verify that the output path meets the prerequisites
  538. if not output_file.parent.is_dir():
  539. try:
  540. output_file.parent.mkdir(parents=True, exist_ok=True)
  541. except Exception as ex:
  542. raise click.BadParameter(f"Cannot create output directory: {ex}", param_hint="--output")
  543. # handle output overwrite
  544. if output_file.exists() and not overwrite:
  545. raise click.ClickException(
  546. f"Output file '{str(output_file)}' exists and " "option '--overwrite' was not specified."
  547. )
  548. if pipeline_runtime_type:
  549. _build_component_cache()
  550. # validate the pipeline
  551. try:
  552. _validate_pipeline_definition(pipeline_definition)
  553. except Exception:
  554. raise click.ClickException("Pipeline validation FAILED. The pipeline was not exported.")
  555. with Spinner(text="Exporting pipeline ..."):
  556. try:
  557. # parse pipeline
  558. pipeline_object = PipelineParser().parse(pipeline_definition)
  559. # process pipeline
  560. with warnings.catch_warnings():
  561. warnings.simplefilter("ignore")
  562. asyncio.get_event_loop().run_until_complete(
  563. PipelineProcessorManager.instance().export(
  564. pipeline_object, selected_export_format, str(output_file), True
  565. )
  566. )
  567. except ValueError as ve:
  568. raise click.ClickException(f"Error parsing pipeline: \n {ve}")
  569. except RuntimeError as re:
  570. raise click.ClickException(f"Error exporting pipeline: \n {re} \n {re.__cause__}")
  571. click.echo(f"Pipeline was exported to '{str(output_file)}'.")
  572. pipeline.add_command(describe)
  573. pipeline.add_command(validate)
  574. pipeline.add_command(submit)
  575. pipeline.add_command(run)
  576. pipeline.add_command(export)