pipeline_app.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import asyncio
  17. from collections import OrderedDict
  18. import json
  19. from operator import itemgetter
  20. import os
  21. from pathlib import Path
  22. import sys
  23. from typing import Optional
  24. import warnings
  25. import click
  26. from colorama import Fore
  27. from colorama import Style
  28. from kfp import Client as ArgoClient
  29. from elyra._version import __version__
  30. from elyra.metadata.manager import MetadataManager
  31. from elyra.metadata.schema import SchemaManager
  32. from elyra.metadata.schemaspaces import Runtimes
  33. from elyra.pipeline import pipeline_constants
  34. from elyra.pipeline.component_catalog import ComponentCache
  35. from elyra.pipeline.kfp.kfp_authentication import AuthenticationError
  36. from elyra.pipeline.kfp.kfp_authentication import KFPAuthenticator
  37. from elyra.pipeline.parser import PipelineParser
  38. from elyra.pipeline.pipeline import Operation
  39. from elyra.pipeline.pipeline_definition import Pipeline
  40. from elyra.pipeline.pipeline_definition import PipelineDefinition
  41. from elyra.pipeline.processor import PipelineProcessorManager
  42. from elyra.pipeline.processor import PipelineProcessorResponse
  43. from elyra.pipeline.runtime_type import RuntimeProcessorType
  44. from elyra.pipeline.runtime_type import RuntimeTypeResources
  45. from elyra.pipeline.runtimes_metadata import RuntimesMetadata
  46. from elyra.pipeline.validation import PipelineValidationManager
  47. from elyra.pipeline.validation import ValidationSeverity
  48. if sys.stdout.isatty():
  49. from yaspin import yaspin as Spinner
  50. else:
  51. from .pipeline_app_utils import StaticTextSpinner as Spinner
  52. # custom exit code - a timeout occurred
  53. EXIT_TIMEDOUT = 124
  54. SEVERITY = {
  55. ValidationSeverity.Error: "Error",
  56. ValidationSeverity.Warning: "Warning",
  57. ValidationSeverity.Hint: "Hint",
  58. ValidationSeverity.Information: "Information",
  59. }
  60. def _get_runtime_config(runtime_config_name: Optional[str]) -> Optional[RuntimesMetadata]:
  61. """Fetch runtime configuration for the specified name"""
  62. if not runtime_config_name or runtime_config_name == "local":
  63. # No runtime configuration was specified or it is local.
  64. # Cannot use metadata manager to determine the runtime type.
  65. return None
  66. try:
  67. metadata_manager = MetadataManager(schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_NAME)
  68. return metadata_manager.get(runtime_config_name)
  69. except Exception as e:
  70. raise click.ClickException(f"Invalid runtime configuration: {runtime_config_name}\n {e}")
  71. def _get_runtime_type(runtime_config_name: Optional[str]) -> Optional[str]:
  72. """Get runtime type for the provided runtime configuration name"""
  73. runtime_config = _get_runtime_config(runtime_config_name)
  74. if runtime_config:
  75. return runtime_config.metadata.get("runtime_type")
  76. return None
  77. def _get_runtime_schema_name(runtime_config_name: Optional[str]) -> Optional[str]:
  78. """Get runtime schema name for the provided runtime configuration name"""
  79. if not runtime_config_name or runtime_config_name == "local":
  80. # No runtime configuration was specified or it is local.
  81. # Cannot use metadata manager to determine the runtime type.
  82. return "local"
  83. runtime_config = _get_runtime_config(runtime_config_name)
  84. if runtime_config:
  85. return runtime_config.schema_name
  86. return None
  87. def _get_runtime_display_name(schema_name: Optional[str]) -> Optional[str]:
  88. """Return the display name for the specified runtime schema_name"""
  89. if not schema_name or schema_name == "local":
  90. # No schame name was specified or it is local.
  91. # Cannot use metadata manager to determine the display name.
  92. return schema_name
  93. try:
  94. schema_manager = SchemaManager.instance()
  95. schema = schema_manager.get_schema(Runtimes.RUNTIMES_SCHEMASPACE_NAME, schema_name)
  96. return schema["display_name"]
  97. except Exception as e:
  98. raise click.ClickException(f"Invalid runtime configuration: {schema_name}\n {e}")
  99. def _get_pipeline_runtime_type(pipeline_definition: dict) -> Optional[str]:
  100. """Return the runtime type name associated with the given pipeline"""
  101. return pipeline_definition.get("pipelines", [{}])[0].get("app_data", {}).get("runtime_type")
  102. def _validate_pipeline_runtime(primary_pipeline: Pipeline, runtime: str) -> bool:
  103. """
  104. Generic pipelines do not have a persisted runtime type, and can be run on any runtime
  105. Runtime specific pipeline have a runtime type, and can only be run on matching runtime
  106. """
  107. is_valid = True
  108. if runtime: # Only perform validation if a target runtime has been specified
  109. pipeline_runtime = primary_pipeline.runtime
  110. if pipeline_runtime and pipeline_runtime != runtime:
  111. is_valid = False
  112. return is_valid
  113. def _preprocess_pipeline(
  114. pipeline_path: str, runtime: Optional[str] = None, runtime_config: Optional[str] = None
  115. ) -> dict:
  116. pipeline_path = os.path.expanduser(pipeline_path)
  117. pipeline_abs_path = os.path.join(os.getcwd(), pipeline_path)
  118. pipeline_dir = os.path.dirname(pipeline_abs_path)
  119. pipeline_name = os.path.splitext(os.path.basename(pipeline_abs_path))[0]
  120. if not os.path.exists(pipeline_abs_path):
  121. raise click.ClickException(f"Pipeline file not found: '{pipeline_abs_path}'\n")
  122. try:
  123. pipeline_definition = PipelineDefinition(pipeline_abs_path)
  124. except ValueError as ve:
  125. raise click.ClickException(f"Pipeline file is invalid: \n {ve}")
  126. try:
  127. primary_pipeline = pipeline_definition.primary_pipeline
  128. except Exception as e:
  129. raise click.ClickException(e)
  130. try:
  131. for pipeline in pipeline_definition.pipelines:
  132. for node in pipeline.nodes:
  133. filename = node.get_component_parameter("filename")
  134. if filename:
  135. abs_path = os.path.join(pipeline_dir, filename)
  136. node.set_component_parameter("filename", abs_path)
  137. except Exception as e:
  138. raise click.ClickException(f"Error pre-processing pipeline: \n {e}")
  139. # update pipeline transient fields
  140. primary_pipeline.set("name", pipeline_name)
  141. primary_pipeline.set("source", os.path.basename(pipeline_abs_path))
  142. # Only update the following if values were provided
  143. if runtime:
  144. primary_pipeline.set("runtime", runtime)
  145. if runtime_config:
  146. primary_pipeline.set("runtime_config", runtime_config)
  147. return pipeline_definition.to_dict()
  148. def _print_issues(issues):
  149. # print validation issues
  150. for issue in sorted(issues, key=itemgetter("severity")):
  151. severity = f" [{SEVERITY[issue.get('severity')]}]"
  152. prefix = ""
  153. postfix = ""
  154. if issue.get("data"):
  155. if issue["data"].get("nodeName"):
  156. # issue is associated with a single node; display it
  157. prefix = f"[{issue['data'].get('nodeName')}]"
  158. if issue["data"].get("propertyName"):
  159. # issue is associated with a node property; display it
  160. prefix = f"{prefix}[{issue['data'].get('propertyName')}]"
  161. if issue["data"].get("value"):
  162. # issue is caused by the value of a node property; display it
  163. postfix = f"The current property value is '{issue['data'].get('value')}'."
  164. elif issue["data"].get("nodeNames") and isinstance(issue["data"]["nodeNames"], list):
  165. # issue is associated with multiple nodes
  166. postfix = "Nodes: "
  167. separator = ""
  168. for nn in issue["data"]["nodeNames"]:
  169. postfix = f"{postfix}{separator}'{nn}'"
  170. separator = ", "
  171. output = f"{severity}{prefix} - {issue['message']} {postfix}"
  172. click.echo(output)
  173. click.echo("")
  174. def _validate_pipeline_definition(pipeline_definition: PipelineDefinition):
  175. """Validate pipeline definition and display issues"""
  176. click.echo("Validating pipeline...")
  177. # validate pipeline
  178. validation_response = asyncio.get_event_loop().run_until_complete(
  179. PipelineValidationManager.instance().validate(pipeline=pipeline_definition)
  180. )
  181. # print validation issues
  182. issues = validation_response.to_json().get("issues")
  183. _print_issues(issues)
  184. if validation_response.has_fatal:
  185. # raise an exception and let the caller decide what to do
  186. raise click.ClickException("Unable to continue due to pipeline validation issues.")
  187. def _execute_pipeline(pipeline_definition) -> PipelineProcessorResponse:
  188. try:
  189. # parse pipeline
  190. pipeline_object = PipelineParser().parse(pipeline_definition)
  191. # process pipeline
  192. with warnings.catch_warnings():
  193. warnings.simplefilter("ignore")
  194. response = asyncio.get_event_loop().run_until_complete(
  195. PipelineProcessorManager.instance().process(pipeline_object)
  196. )
  197. return response
  198. except ValueError as ve:
  199. raise click.ClickException(f"Error parsing pipeline: \n {ve}")
  200. except RuntimeError as re:
  201. raise click.ClickException(f"Error processing pipeline: \n {re} \n {re.__cause__}")
  202. def _build_component_cache():
  203. """Initialize a ComponentCache instance and wait for it to complete all tasks"""
  204. with Spinner(text="Initializing the component cache..."):
  205. component_cache = ComponentCache.instance(emulate_server_app=True)
  206. component_cache.load()
  207. component_cache.wait_for_all_cache_tasks()
  208. def validate_pipeline_path(ctx, param, value):
  209. """Callback for pipeline_path parameter"""
  210. if not value.is_file():
  211. raise click.BadParameter(f"'{value}' is not a file.")
  212. if value.suffix != ".pipeline":
  213. raise click.BadParameter(f"'{value}' is not a .pipeline file.")
  214. return value
  215. def print_banner(title):
  216. click.echo(Fore.CYAN + "────────────────────────────────────────────────────────────────" + Style.RESET_ALL)
  217. click.echo(f"{Fore.CYAN} {title}{Style.RESET_ALL}")
  218. click.echo(Fore.CYAN + "────────────────────────────────────────────────────────────────" + Style.RESET_ALL)
  219. click.echo()
  220. def print_info(title, info_list):
  221. click.echo(f"{Fore.CYAN}❯ {title}{Style.RESET_ALL}")
  222. for info_item in info_list:
  223. if isinstance(info_item, str):
  224. click.echo(f" {info_item}")
  225. else:
  226. click.echo(f" {info_item[0]}: {info_item[1]}")
  227. click.echo()
  228. def print_version():
  229. print_info("Version", [f"elyra {__version__}"])
  230. @click.group()
  231. @click.version_option(__version__, message="v%(version)s")
  232. def pipeline():
  233. """
  234. Run Elyra pipelines in your local environment or submit them to an external service,
  235. such as Kubeflow Pipelines or Apache Airflow.
  236. Find more information at: https://elyra.readthedocs.io/en/latest/
  237. """
  238. pass
  239. @click.command()
  240. @click.option("--runtime-config", required=False, help="Runtime config where the pipeline should be processed")
  241. @click.argument("pipeline_path", type=Path, callback=validate_pipeline_path)
  242. def validate(pipeline_path, runtime_config="local"):
  243. """
  244. Validate pipeline
  245. """
  246. click.echo()
  247. print_banner("Elyra Pipeline Validation")
  248. runtime = _get_runtime_schema_name(runtime_config)
  249. pipeline_definition = _preprocess_pipeline(pipeline_path, runtime=runtime, runtime_config=runtime_config)
  250. pipeline_runtime_type = _get_pipeline_runtime_type(pipeline_definition)
  251. if pipeline_runtime_type:
  252. _build_component_cache()
  253. try:
  254. _validate_pipeline_definition(pipeline_definition)
  255. except Exception:
  256. raise click.ClickException("Pipeline validation FAILED.")
  257. def validate_timeout_option(ctx, param, value):
  258. """Callback for monitor-timeout parameter validation"""
  259. try:
  260. value = int(value)
  261. if value <= 0:
  262. raise ValueError()
  263. except ValueError:
  264. raise click.BadParameter(f"'{value}' is not a positive integer.")
  265. else:
  266. return value
  267. @click.command()
  268. @click.argument("pipeline_path", type=Path, callback=validate_pipeline_path)
  269. @click.option("--json", "json_option", is_flag=True, required=False, help="Display pipeline summary in JSON format")
  270. @click.option(
  271. "--runtime-config",
  272. "runtime_config_name",
  273. required=True,
  274. help="Runtime config where the pipeline should be processed",
  275. )
  276. @click.option(
  277. "--monitor",
  278. "monitor_option",
  279. is_flag=True,
  280. required=False,
  281. help="Monitor the pipeline run (Supported for Kubeflow Pipelines only)",
  282. )
  283. @click.option(
  284. "--monitor-timeout",
  285. "timeout_option",
  286. type=int,
  287. default=60,
  288. show_default=True,
  289. required=False,
  290. help="Monitoring timeout in minutes.",
  291. callback=validate_timeout_option,
  292. )
  293. def submit(json_option, pipeline_path, runtime_config_name, monitor_option, timeout_option):
  294. """
  295. Submit a pipeline to be executed on the server
  296. """
  297. click.echo()
  298. print_banner("Elyra Pipeline Submission")
  299. runtime_config = _get_runtime_config(runtime_config_name)
  300. runtime_schema = runtime_config.schema_name
  301. pipeline_definition = _preprocess_pipeline(
  302. pipeline_path, runtime=runtime_schema, runtime_config=runtime_config_name
  303. )
  304. pipeline_runtime_type = _get_pipeline_runtime_type(pipeline_definition)
  305. if pipeline_runtime_type:
  306. _build_component_cache()
  307. try:
  308. _validate_pipeline_definition(pipeline_definition)
  309. except Exception:
  310. raise click.ClickException("Pipeline validation FAILED. The pipeline was not submitted for execution.")
  311. with Spinner(text="Submitting pipeline..."):
  312. response: PipelineProcessorResponse = _execute_pipeline(pipeline_definition)
  313. if not json_option:
  314. if response:
  315. msg = []
  316. # If there's a git_url attr, assume Apache Airflow DAG repo.
  317. # TODO: this will need to be revisited once front-end is decoupled from runtime platforms.
  318. if hasattr(response, "git_url"):
  319. msg.append(f"Apache Airflow DAG has been pushed to: {response.git_url}")
  320. msg.append(f"Check the status of your job at: {response.run_url}")
  321. if response.object_storage_path is not None and response.object_storage_url is not None:
  322. msg.append(
  323. f"The results and outputs are in the {response.object_storage_path} "
  324. f"working directory in {response.object_storage_url}"
  325. )
  326. print_info("Job submission succeeded", msg)
  327. click.echo()
  328. print_banner("Elyra Pipeline Submission Complete")
  329. else:
  330. if response:
  331. click.echo()
  332. print(json.dumps(response.to_json(), indent=4))
  333. # Start pipeline run monitoring, if requested
  334. if runtime_schema == "kfp" and monitor_option:
  335. minute_str = "minutes" if timeout_option > 1 else "minute"
  336. try:
  337. msg = (
  338. f"Monitoring status of pipeline run '{response.run_id}' for up to " f"{timeout_option} {minute_str}..."
  339. )
  340. with Spinner(text=msg):
  341. status = _monitor_kfp_submission(runtime_config, runtime_config_name, response.run_id, timeout_option)
  342. except TimeoutError:
  343. click.echo(
  344. "Monitoring was stopped because the timeout threshold "
  345. f"({timeout_option} {minute_str}) was exceeded. The pipeline is still running."
  346. )
  347. sys.exit(EXIT_TIMEDOUT)
  348. else:
  349. # The following are known KFP states: 'succeeded', 'failed', 'skipped',
  350. # 'error'. Treat 'unknown' as error. Exit with appropriate status code.
  351. click.echo(f"Monitoring ended with run status: {status}")
  352. if status.lower() not in ["succeeded", "skipped"]:
  353. # Click appears to use non-zero exit codes 1 (ClickException)
  354. # and 2 (UsageError). Terminate.
  355. sys.exit(click.ClickException.exit_code)
  356. def _monitor_kfp_submission(runtime_config: dict, runtime_config_name: str, run_id: str, timeout: int) -> str:
  357. """Monitor the status of a Kubeflow Pipelines run"""
  358. try:
  359. # Authenticate with the KFP server
  360. auth_info = KFPAuthenticator().authenticate(
  361. runtime_config.metadata["api_endpoint"].rstrip("/"),
  362. auth_type_str=runtime_config.metadata.get("auth_type"),
  363. runtime_config_name=runtime_config_name,
  364. auth_parm_1=runtime_config.metadata.get("api_username"),
  365. auth_parm_2=runtime_config.metadata.get("api_password"),
  366. )
  367. except AuthenticationError as ae:
  368. if ae.get_request_history() is not None:
  369. click.echo("An authentication error was raised. Diagnostic information follows.")
  370. click.echo(ae.request_history_to_string())
  371. raise click.ClickException(f"Kubeflow authentication failed: {ae}")
  372. try:
  373. # Create a Kubeflow Pipelines client. There is no need to use a Tekton client,
  374. # because the monitoring API is agnostic.
  375. client = ArgoClient(
  376. host=runtime_config.metadata["api_endpoint"].rstrip("/"),
  377. cookies=auth_info.get("cookies", None),
  378. credentials=auth_info.get("credentials", None),
  379. existing_token=auth_info.get("existing_token", None),
  380. namespace=runtime_config.metadata.get("user_namespace"),
  381. )
  382. # wait for the run to complete or timeout (API uses seconds as unit - convert)
  383. run_details = client.wait_for_run_completion(run_id, timeout * 60)
  384. except TimeoutError:
  385. # pipeline processing did not finish yet, stop monitoring
  386. raise
  387. except Exception as ex:
  388. # log error and return 'unknown' status
  389. click.echo(f"Monitoring failed: {type(ex)}: {ex}")
  390. return "unknown"
  391. else:
  392. return run_details.run.status
  393. @click.command()
  394. @click.option("--json", "json_option", is_flag=True, required=False, help="Display pipeline summary in JSON format")
  395. @click.argument("pipeline_path", type=Path, callback=validate_pipeline_path)
  396. def run(json_option, pipeline_path):
  397. """
  398. Run a pipeline in your local environment
  399. """
  400. click.echo()
  401. print_banner("Elyra Pipeline Local Run")
  402. pipeline_definition = _preprocess_pipeline(pipeline_path, runtime="local", runtime_config="local")
  403. try:
  404. _validate_pipeline_definition(pipeline_definition)
  405. except Exception:
  406. raise click.ClickException("Pipeline validation FAILED. The pipeline was not run.")
  407. response = _execute_pipeline(pipeline_definition)
  408. if not json_option:
  409. click.echo()
  410. print_banner("Elyra Pipeline Local Run Complete")
  411. else:
  412. click.echo()
  413. if response:
  414. print(json.dumps(response.to_json(), indent=4))
  415. @click.command()
  416. @click.option("--json", "json_option", is_flag=True, required=False, help="Display pipeline summary in JSON format")
  417. @click.argument("pipeline_path", type=Path, callback=validate_pipeline_path)
  418. def describe(json_option, pipeline_path):
  419. """
  420. Display pipeline summary and dependencies.
  421. """
  422. pipeline_definition = _preprocess_pipeline(pipeline_path, runtime="local", runtime_config="local")
  423. primary_pipeline = PipelineDefinition(pipeline_definition=pipeline_definition).primary_pipeline
  424. # Define and populate the data structure that holds the artifacts this command will
  425. # report on.
  426. describe_dict = OrderedDict()
  427. describe_dict["name"] = { # key is the artifact name, e.g. the pipeline name
  428. "display_name": "Pipeline name", # displayed in human-readable output
  429. "json_name": "name", # property name in machine-readable output
  430. "value": primary_pipeline.name, # the artifact's value
  431. }
  432. describe_dict["description"] = {
  433. "display_name": "Description",
  434. "json_name": "description",
  435. "value": primary_pipeline.get_property("description"),
  436. }
  437. describe_dict["type"] = {
  438. "display_name": "Pipeline type",
  439. "json_name": "pipeline_type",
  440. "value": primary_pipeline.type,
  441. }
  442. describe_dict["version"] = {
  443. "display_name": "Pipeline format version",
  444. "json_name": "pipeline_format_version",
  445. "value": primary_pipeline.version,
  446. }
  447. describe_dict["runtime"] = {
  448. "display_name": "Pipeline runtime",
  449. "json_name": "pipeline_runtime",
  450. "value": primary_pipeline.get_property("runtime"),
  451. }
  452. describe_dict["generic_node_count"] = {
  453. "display_name": "Number of generic nodes",
  454. "json_name": "generic_node_count",
  455. "value": 0,
  456. }
  457. describe_dict["custom_node_count"] = {
  458. "display_name": "Number of custom nodes",
  459. "json_name": "custom_node_count",
  460. "value": 0,
  461. }
  462. describe_dict["script_dependencies"] = {
  463. "display_name": "Script dependencies",
  464. "json_name": "scripts",
  465. "value": set(),
  466. }
  467. describe_dict["notebook_dependencies"] = {
  468. "display_name": "Notebook dependencies",
  469. "json_name": "notebooks",
  470. "value": set(),
  471. }
  472. describe_dict["file_dependencies"] = {
  473. "display_name": "Local file dependencies",
  474. "json_name": "files",
  475. "value": set(),
  476. }
  477. describe_dict["component_dependencies"] = {
  478. "display_name": "Component dependencies",
  479. "json_name": "custom_components",
  480. "value": set(),
  481. }
  482. describe_dict["volume_dependencies"] = {
  483. "display_name": "Volume dependencies",
  484. "json_name": "volumes",
  485. "value": set(),
  486. }
  487. describe_dict["container_image_dependencies"] = {
  488. "display_name": "Container image dependencies",
  489. "json_name": "container_images",
  490. "value": set(),
  491. }
  492. describe_dict["kubernetes_secret_dependencies"] = {
  493. "display_name": "Kubernetes secret dependencies",
  494. "json_name": "kubernetes_secrets",
  495. "value": set(),
  496. }
  497. # iterate through pipeline nodes and update the describe_dict values
  498. for node in primary_pipeline.nodes:
  499. # update describe_dict stats that take into account every operation
  500. # (... there are none today)
  501. # volumes
  502. for vm in node.get_component_parameter(pipeline_constants.MOUNTED_VOLUMES, []):
  503. describe_dict["volume_dependencies"]["value"].add(vm.pvc_name)
  504. if Operation.is_generic_operation(node.op):
  505. # update stats that are specific to generic components
  506. describe_dict["generic_node_count"]["value"] = describe_dict["generic_node_count"]["value"] + 1
  507. file_name = node.get_component_parameter("filename")
  508. if file_name:
  509. # Add notebook or script as dependency, if one was configured
  510. if Path(file_name).suffix == ".ipynb":
  511. describe_dict["notebook_dependencies"]["value"].add(file_name)
  512. else:
  513. describe_dict["script_dependencies"]["value"].add(file_name)
  514. # local files (or directories)
  515. for fd in node.get_component_parameter("dependencies", []):
  516. describe_dict["file_dependencies"]["value"].add(fd)
  517. # container image, if one was configured
  518. if node.get_component_parameter(pipeline_constants.RUNTIME_IMAGE):
  519. describe_dict["container_image_dependencies"]["value"].add(
  520. node.get_component_parameter(pipeline_constants.RUNTIME_IMAGE)
  521. )
  522. # Kubernetes secrets
  523. for ks in node.get_component_parameter(pipeline_constants.KUBERNETES_SECRETS, []):
  524. describe_dict["kubernetes_secret_dependencies"]["value"].add(ks.name)
  525. else:
  526. # update stats that are specific to custom components
  527. describe_dict["custom_node_count"]["value"] = describe_dict["custom_node_count"]["value"] + 1
  528. # component dependencies
  529. describe_dict["component_dependencies"]["value"].add(node.component_source)
  530. #
  531. # produce output in the requested human-readable or machine-readable format
  532. #
  533. indent_length = 4
  534. if json_option:
  535. # produce machine-readable output
  536. output_dict = {
  537. describe_dict["name"]["json_name"]: describe_dict["name"]["value"],
  538. describe_dict["description"]["json_name"]: describe_dict["description"]["value"],
  539. describe_dict["type"]["json_name"]: describe_dict["type"]["value"],
  540. describe_dict["version"]["json_name"]: describe_dict["version"]["value"],
  541. describe_dict["runtime"]["json_name"]: describe_dict["runtime"]["value"],
  542. describe_dict["generic_node_count"]["json_name"]: describe_dict["generic_node_count"]["value"],
  543. describe_dict["custom_node_count"]["json_name"]: describe_dict["custom_node_count"]["value"],
  544. "dependencies": {
  545. describe_dict["script_dependencies"]["json_name"]: list(describe_dict["script_dependencies"]["value"]),
  546. describe_dict["notebook_dependencies"]["json_name"]: list(
  547. describe_dict["notebook_dependencies"]["value"]
  548. ),
  549. describe_dict["file_dependencies"]["json_name"]: list(describe_dict["file_dependencies"]["value"]),
  550. describe_dict["component_dependencies"]["json_name"]: [],
  551. describe_dict["container_image_dependencies"]["json_name"]: list(
  552. describe_dict["container_image_dependencies"]["value"]
  553. ),
  554. describe_dict["volume_dependencies"]["json_name"]: list(describe_dict["volume_dependencies"]["value"]),
  555. describe_dict["kubernetes_secret_dependencies"]["json_name"]: list(
  556. describe_dict["kubernetes_secret_dependencies"]["value"]
  557. ),
  558. },
  559. }
  560. for component_dependency in describe_dict["component_dependencies"]["value"]:
  561. output_dict["dependencies"][describe_dict["component_dependencies"]["json_name"]].append(
  562. json.loads(component_dependency)
  563. )
  564. click.echo(json.dumps(output_dict, indent=indent_length))
  565. else:
  566. # produce human-readable output
  567. click.echo()
  568. print_banner("Elyra Pipeline details")
  569. for v in describe_dict.values():
  570. if v["value"] or v["value"] == 0:
  571. if isinstance(v["value"], set):
  572. click.echo(f'{v["display_name"]}:')
  573. for item in v["value"]:
  574. click.echo(f"{' ' * indent_length}- {item}")
  575. else:
  576. click.echo(f'{v["display_name"]}: {v["value"]}')
  577. else:
  578. click.echo(f'{v["display_name"]}: None specified')
  579. @click.command()
  580. @click.argument("pipeline_path", type=Path, callback=validate_pipeline_path)
  581. @click.option("--runtime-config", required=True, help="Runtime configuration name.")
  582. @click.option(
  583. "--output",
  584. required=False,
  585. type=Path,
  586. help="Exported file name (including optional path). Defaults to " " the current directory and the pipeline name.",
  587. )
  588. @click.option("--overwrite", is_flag=True, help="Overwrite output file if it already exists.")
  589. def export(pipeline_path, runtime_config, output, overwrite):
  590. """
  591. Export a pipeline to a runtime-specific format
  592. """
  593. click.echo()
  594. print_banner("Elyra pipeline export")
  595. rtc = _get_runtime_config(runtime_config)
  596. runtime_schema = rtc.schema_name
  597. runtime_type = rtc.metadata.get("runtime_type")
  598. pipeline_definition = _preprocess_pipeline(pipeline_path, runtime=runtime_schema, runtime_config=runtime_config)
  599. # Verify that the pipeline's runtime type is compatible with the
  600. # runtime configuration
  601. pipeline_runtime_type = _get_pipeline_runtime_type(pipeline_definition)
  602. if pipeline_runtime_type and pipeline_runtime_type != "Generic" and pipeline_runtime_type != runtime_type:
  603. raise click.BadParameter(
  604. f"The runtime configuration type '{runtime_type}' does not match "
  605. f"the pipeline's runtime type '{pipeline_runtime_type}'.",
  606. param_hint="--runtime-config",
  607. )
  608. resources = RuntimeTypeResources.get_instance_by_type(RuntimeProcessorType.get_instance_by_name(runtime_type))
  609. supported_export_formats = resources.get_export_extensions()
  610. if len(supported_export_formats) == 0:
  611. raise click.ClickException(f"Runtime type '{runtime_type}' does not support export.")
  612. # If, in the future, a runtime supports multiple export output formats,
  613. # the user can choose one. For now, choose the only option.
  614. selected_export_format = supported_export_formats[0]
  615. selected_export_format_suffix = f".{selected_export_format}"
  616. # generate output file name from the user-provided input
  617. if output is None:
  618. # user did not specify an output; use current directory
  619. # and derive the file name from the pipeline file name
  620. output_path = Path.cwd()
  621. filename = f"{Path(pipeline_path).stem}{selected_export_format_suffix}"
  622. else:
  623. if output.suffix == selected_export_format_suffix:
  624. # user provided a file name
  625. output_path = output.parent
  626. filename = output.name
  627. else:
  628. # user provided a directory
  629. output_path = output
  630. filename = f"{Path(pipeline_path).stem}{selected_export_format_suffix}"
  631. output_file = output_path.resolve() / filename
  632. # verify that the output path meets the prerequisites
  633. if not output_file.parent.is_dir():
  634. try:
  635. output_file.parent.mkdir(parents=True, exist_ok=True)
  636. except Exception as ex:
  637. raise click.BadParameter(f"Cannot create output directory: {ex}", param_hint="--output")
  638. # handle output overwrite
  639. if output_file.exists() and not overwrite:
  640. raise click.ClickException(
  641. f"Output file '{str(output_file)}' exists and " "option '--overwrite' was not specified."
  642. )
  643. if pipeline_runtime_type:
  644. _build_component_cache()
  645. # validate the pipeline
  646. try:
  647. _validate_pipeline_definition(pipeline_definition)
  648. except Exception:
  649. raise click.ClickException("Pipeline validation FAILED. The pipeline was not exported.")
  650. with Spinner(text="Exporting pipeline ..."):
  651. try:
  652. # parse pipeline
  653. pipeline_object = PipelineParser().parse(pipeline_definition)
  654. # process pipeline
  655. with warnings.catch_warnings():
  656. warnings.simplefilter("ignore")
  657. asyncio.get_event_loop().run_until_complete(
  658. PipelineProcessorManager.instance().export(
  659. pipeline_object, selected_export_format, str(output_file), True
  660. )
  661. )
  662. except ValueError as ve:
  663. raise click.ClickException(f"Error parsing pipeline: \n {ve}")
  664. except RuntimeError as re:
  665. raise click.ClickException(f"Error exporting pipeline: \n {re} \n {re.__cause__}")
  666. click.echo(f"Pipeline was exported to '{str(output_file)}'.")
  667. pipeline.add_command(describe)
  668. pipeline.add_command(validate)
  669. pipeline.add_command(submit)
  670. pipeline.add_command(run)
  671. pipeline.add_command(export)