processor_kfp.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from datetime import datetime
  17. import os
  18. import re
  19. import tempfile
  20. import time
  21. from typing import Dict
  22. from urllib.parse import urlsplit
  23. from kfp import Client as ArgoClient
  24. from kfp import compiler as kfp_argo_compiler
  25. from kfp import components as components
  26. from kfp.dsl import PipelineConf
  27. from kfp.aws import use_aws_secret # noqa H306
  28. from kubernetes import client as k8s_client
  29. try:
  30. from kfp_tekton import compiler as kfp_tekton_compiler
  31. from kfp_tekton import TektonClient
  32. except ImportError:
  33. # We may not have kfp-tekton available and that's okay!
  34. kfp_tekton_compiler = None
  35. TektonClient = None
  36. from elyra._version import __version__
  37. from elyra.kfp.operator import ExecuteFileOp
  38. from elyra.metadata.schemaspaces import RuntimeImages
  39. from elyra.metadata.schemaspaces import Runtimes
  40. from elyra.pipeline.component_catalog import ComponentCache
  41. from elyra.pipeline.kfp.kfp_authentication import AuthenticationError
  42. from elyra.pipeline.kfp.kfp_authentication import KFPAuthenticator
  43. from elyra.pipeline.pipeline import GenericOperation
  44. from elyra.pipeline.pipeline import Operation
  45. from elyra.pipeline.pipeline import Pipeline
  46. from elyra.pipeline.pipeline_constants import COS_OBJECT_PREFIX
  47. from elyra.pipeline.pipeline_constants import KUBERNETES_SECRETS
  48. from elyra.pipeline.pipeline_constants import MOUNTED_VOLUMES
  49. from elyra.pipeline.processor import PipelineProcessor
  50. from elyra.pipeline.processor import PipelineProcessorResponse
  51. from elyra.pipeline.processor import RuntimePipelineProcessor
  52. from elyra.pipeline.runtime_type import RuntimeProcessorType
  53. from elyra.util.cos import join_paths
  54. from elyra.util.path import get_absolute_path
  55. class KfpPipelineProcessor(RuntimePipelineProcessor):
  56. _type = RuntimeProcessorType.KUBEFLOW_PIPELINES
  57. _name = "kfp"
  58. # Provide users with the ability to identify a writable directory in the
  59. # running container where the notebook | script is executed. The location
  60. # must exist and be known before the container is started.
  61. # Defaults to `/tmp`
  62. WCD = os.getenv("ELYRA_WRITABLE_CONTAINER_DIR", "/tmp").strip().rstrip("/")
  63. def __init__(self, root_dir, **kwargs):
  64. super().__init__(root_dir, **kwargs)
  65. def process(self, pipeline):
  66. """
  67. Runs a pipeline on Kubeflow Pipelines
  68. Each time a pipeline is processed, a new version
  69. is uploaded and run under the same experiment name.
  70. """
  71. timestamp = datetime.now().strftime("%m%d%H%M%S")
  72. ################
  73. # Runtime Configs
  74. ################
  75. runtime_configuration = self._get_metadata_configuration(
  76. schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_ID, name=pipeline.runtime_config
  77. )
  78. # unpack Kubeflow Pipelines configs
  79. api_endpoint = runtime_configuration.metadata["api_endpoint"].rstrip("/")
  80. public_api_endpoint = runtime_configuration.metadata.get("public_api_endpoint", api_endpoint)
  81. api_username = runtime_configuration.metadata.get("api_username")
  82. api_password = runtime_configuration.metadata.get("api_password")
  83. user_namespace = runtime_configuration.metadata.get("user_namespace")
  84. engine = runtime_configuration.metadata.get("engine")
  85. if engine == "Tekton" and not TektonClient:
  86. raise ValueError(
  87. "Python package `kfp-tekton` is not installed. "
  88. "Please install using `elyra[kfp-tekton]` to use Tekton engine."
  89. )
  90. # unpack Cloud Object Storage configs
  91. cos_endpoint = runtime_configuration.metadata["cos_endpoint"]
  92. cos_bucket = runtime_configuration.metadata["cos_bucket"]
  93. # Determine which provider to use to authenticate with Kubeflow
  94. auth_type = runtime_configuration.metadata.get("auth_type")
  95. try:
  96. auth_info = KFPAuthenticator().authenticate(
  97. api_endpoint,
  98. auth_type_str=auth_type,
  99. runtime_config_name=pipeline.runtime_config,
  100. auth_parm_1=api_username,
  101. auth_parm_2=api_password,
  102. )
  103. self.log.debug(f"Authenticator returned {auth_info}")
  104. except AuthenticationError as ae:
  105. if ae.get_request_history() is not None:
  106. self.log.info("An authentication error was raised. Diagnostic information follows.")
  107. self.log.info(ae.request_history_to_string())
  108. raise RuntimeError(f"Kubeflow authentication failed: {ae}")
  109. #############
  110. # Create Kubeflow Client
  111. #############
  112. try:
  113. if engine == "Tekton":
  114. client = TektonClient(
  115. host=api_endpoint,
  116. cookies=auth_info.get("cookies", None),
  117. credentials=auth_info.get("credentials", None),
  118. existing_token=auth_info.get("existing_token", None),
  119. namespace=user_namespace,
  120. )
  121. else:
  122. client = ArgoClient(
  123. host=api_endpoint,
  124. cookies=auth_info.get("cookies", None),
  125. credentials=auth_info.get("credentials", None),
  126. existing_token=auth_info.get("existing_token", None),
  127. namespace=user_namespace,
  128. )
  129. except Exception as ex:
  130. # a common cause of these errors is forgetting to include `/pipeline` or including it with an 's'
  131. api_endpoint_obj = urlsplit(api_endpoint)
  132. if api_endpoint_obj.path != "/pipeline":
  133. api_endpoint_tip = api_endpoint_obj._replace(path="/pipeline").geturl()
  134. tip_string = (
  135. f" - [TIP: did you mean to set '{api_endpoint_tip}' as the endpoint, "
  136. f"take care not to include 's' at end]"
  137. )
  138. else:
  139. tip_string = ""
  140. raise RuntimeError(
  141. f"Failed to initialize `kfp.Client()` against: '{api_endpoint}' - "
  142. f"Check Kubeflow Pipelines runtime configuration: '{pipeline.runtime_config}'"
  143. f"{tip_string}"
  144. ) from ex
  145. #############
  146. # Verify Namespace
  147. #############
  148. try:
  149. client.list_experiments(namespace=user_namespace, page_size=1)
  150. except Exception as ex:
  151. if user_namespace:
  152. tip_string = f"[TIP: ensure namespace '{user_namespace}' is correct]"
  153. else:
  154. tip_string = "[TIP: you probably need to set a namespace]"
  155. raise RuntimeError(
  156. f"Failed to `kfp.Client().list_experiments()` against: '{api_endpoint}' - "
  157. f"Check Kubeflow Pipelines runtime configuration: '{pipeline.runtime_config}' - "
  158. f"{tip_string}"
  159. ) from ex
  160. #############
  161. # Pipeline Metadata none - inherited
  162. #############
  163. # generate a pipeline name
  164. pipeline_name = pipeline.name
  165. # generate a pipeline description
  166. pipeline_description = pipeline.description
  167. if pipeline_description is None:
  168. pipeline_description = f"Created with Elyra {__version__} pipeline editor using `{pipeline.source}`."
  169. #############
  170. # Submit & Run the Pipeline
  171. #############
  172. self.log_pipeline_info(pipeline_name, "submitting pipeline")
  173. with tempfile.TemporaryDirectory() as temp_dir:
  174. self.log.debug(f"Created temporary directory at: {temp_dir}")
  175. pipeline_path = os.path.join(temp_dir, f"{pipeline_name}.tar.gz")
  176. #############
  177. # Get Pipeline ID
  178. #############
  179. try:
  180. # get the kubeflow pipeline id (returns None if not found, otherwise the ID of the pipeline)
  181. pipeline_id = client.get_pipeline_id(pipeline_name)
  182. # calculate what "pipeline version" name to use
  183. if pipeline_id is None:
  184. # the first "pipeline version" name must be the pipeline name
  185. pipeline_version_name = pipeline_name
  186. else:
  187. # generate a unique name for a new "pipeline version" by appending the current timestamp
  188. pipeline_version_name = f"{pipeline_name}-{timestamp}"
  189. except Exception as ex:
  190. raise RuntimeError(
  191. f"Failed to get ID of Kubeflow pipeline: '{pipeline_name}' - "
  192. f"Check Kubeflow Pipelines runtime configuration: '{pipeline.runtime_config}'"
  193. ) from ex
  194. #############
  195. # Compile the Pipeline
  196. #############
  197. try:
  198. t0 = time.time()
  199. # generate a name for the experiment (lowercase because experiments are case intensive)
  200. experiment_name = pipeline_name.lower()
  201. # Create an instance id that will be used to store
  202. # the pipelines' dependencies, if applicable
  203. pipeline_instance_id = f"{pipeline_name}-{timestamp}"
  204. pipeline_function = lambda: self._cc_pipeline( # nopep8 E731
  205. pipeline,
  206. pipeline_name=pipeline_name,
  207. pipeline_version=pipeline_version_name,
  208. experiment_name=experiment_name,
  209. pipeline_instance_id=pipeline_instance_id,
  210. )
  211. # collect pipeline configuration information
  212. pipeline_conf = self._generate_pipeline_conf(pipeline)
  213. # compile the pipeline
  214. if engine == "Tekton":
  215. kfp_tekton_compiler.TektonCompiler().compile(
  216. pipeline_function, pipeline_path, pipeline_conf=pipeline_conf
  217. )
  218. else:
  219. kfp_argo_compiler.Compiler().compile(pipeline_function, pipeline_path, pipeline_conf=pipeline_conf)
  220. except RuntimeError:
  221. raise
  222. except Exception as ex:
  223. raise RuntimeError(
  224. f"Failed to compile pipeline '{pipeline_name}' with engine '{engine}' to: '{pipeline_path}'"
  225. ) from ex
  226. self.log_pipeline_info(pipeline_name, "pipeline compiled", duration=time.time() - t0)
  227. #############
  228. # Upload Pipeline Version
  229. #############
  230. try:
  231. t0 = time.time()
  232. # CASE 1: pipeline needs to be created
  233. if pipeline_id is None:
  234. # create new pipeline (and initial "pipeline version")
  235. kfp_pipeline = client.upload_pipeline(
  236. pipeline_package_path=pipeline_path,
  237. pipeline_name=pipeline_name,
  238. description=pipeline_description,
  239. )
  240. # extract the ID of the pipeline we created
  241. pipeline_id = kfp_pipeline.id
  242. # the initial "pipeline version" has the same id as the pipeline itself
  243. version_id = pipeline_id
  244. # CASE 2: pipeline already exists
  245. else:
  246. # upload the "pipeline version"
  247. kfp_pipeline = client.upload_pipeline_version(
  248. pipeline_package_path=pipeline_path,
  249. pipeline_version_name=pipeline_version_name,
  250. pipeline_id=pipeline_id,
  251. )
  252. # extract the id of the "pipeline version" that was created
  253. version_id = kfp_pipeline.id
  254. except Exception as ex:
  255. # a common cause of these errors is forgetting to include `/pipeline` or including it with an 's'
  256. api_endpoint_obj = urlsplit(api_endpoint)
  257. if api_endpoint_obj.path != "/pipeline":
  258. api_endpoint_tip = api_endpoint_obj._replace(path="/pipeline").geturl()
  259. tip_string = (
  260. f" - [TIP: did you mean to set '{api_endpoint_tip}' as the endpoint, "
  261. f"take care not to include 's' at end]"
  262. )
  263. else:
  264. tip_string = ""
  265. raise RuntimeError(
  266. f"Failed to upload Kubeflow pipeline '{pipeline_name}' - "
  267. f"Check Kubeflow Pipelines runtime configuration: '{pipeline.runtime_config}'"
  268. f"{tip_string}"
  269. ) from ex
  270. self.log_pipeline_info(pipeline_name, "pipeline uploaded", duration=time.time() - t0)
  271. #############
  272. # Create Experiment
  273. #############
  274. try:
  275. t0 = time.time()
  276. # create a new experiment (if already exists, this a no-op)
  277. experiment = client.create_experiment(name=experiment_name, namespace=user_namespace)
  278. except Exception as ex:
  279. raise RuntimeError(
  280. f"Failed to create Kubeflow experiment: '{experiment_name}' - "
  281. f"Check Kubeflow Pipelines runtime configuration: '{pipeline.runtime_config}'"
  282. ) from ex
  283. self.log_pipeline_info(pipeline_name, "created experiment", duration=time.time() - t0)
  284. #############
  285. # Create Pipeline Run
  286. #############
  287. try:
  288. t0 = time.time()
  289. # generate name for the pipeline run
  290. job_name = pipeline_instance_id
  291. # create pipeline run (or specified pipeline version)
  292. run = client.run_pipeline(
  293. experiment_id=experiment.id, job_name=job_name, pipeline_id=pipeline_id, version_id=version_id
  294. )
  295. except Exception as ex:
  296. raise RuntimeError(
  297. f"Failed to create Kubeflow pipeline run: '{job_name}' - "
  298. f"Check Kubeflow Pipelines runtime configuration: '{pipeline.runtime_config}'"
  299. ) from ex
  300. if run is None:
  301. # client.run_pipeline seemed to have encountered an issue
  302. # but didn't raise an exception
  303. raise RuntimeError(
  304. f"Failed to create Kubeflow pipeline run: '{job_name}' - "
  305. f"Check Kubeflow Pipelines runtime configuration: '{pipeline.runtime_config}'"
  306. )
  307. self.log_pipeline_info(
  308. pipeline_name,
  309. f"pipeline submitted: {public_api_endpoint}/#/runs/details/{run.id}",
  310. duration=time.time() - t0,
  311. )
  312. if pipeline.contains_generic_operations():
  313. object_storage_url = f"{cos_endpoint}"
  314. os_path = join_paths(pipeline.pipeline_parameters.get(COS_OBJECT_PREFIX), pipeline_instance_id)
  315. object_storage_path = f"/{cos_bucket}/{os_path}"
  316. else:
  317. object_storage_url = None
  318. object_storage_path = None
  319. return KfpPipelineProcessorResponse(
  320. run_id=run.id,
  321. run_url=f"{public_api_endpoint}/#/runs/details/{run.id}",
  322. object_storage_url=object_storage_url,
  323. object_storage_path=object_storage_path,
  324. )
  325. def export(self, pipeline, pipeline_export_format, pipeline_export_path, overwrite):
  326. # Verify that the KfpPipelineProcessor supports the given export format
  327. self._verify_export_format(pipeline_export_format)
  328. t0_all = time.time()
  329. timestamp = datetime.now().strftime("%m%d%H%M%S")
  330. pipeline_name = pipeline.name
  331. # Create an instance id that will be used to store
  332. # the pipelines' dependencies, if applicable
  333. pipeline_instance_id = f"{pipeline_name}-{timestamp}"
  334. # Since pipeline_export_path may be relative to the notebook directory, ensure
  335. # we're using its absolute form.
  336. absolute_pipeline_export_path = get_absolute_path(self.root_dir, pipeline_export_path)
  337. runtime_configuration = self._get_metadata_configuration(
  338. schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_ID, name=pipeline.runtime_config
  339. )
  340. engine = runtime_configuration.metadata.get("engine")
  341. if engine == "Tekton" and not TektonClient:
  342. raise ValueError("kfp-tekton not installed. Please install using elyra[kfp-tekton] to use Tekton engine.")
  343. if os.path.exists(absolute_pipeline_export_path) and not overwrite:
  344. raise ValueError("File " + absolute_pipeline_export_path + " already exists.")
  345. self.log_pipeline_info(pipeline_name, f"Exporting pipeline as a .{pipeline_export_format} file")
  346. # Export pipeline as static configuration file (YAML formatted)
  347. try:
  348. # Exported pipeline is not associated with an experiment
  349. # or a version. The association is established when the
  350. # pipeline is imported into KFP by the user.
  351. pipeline_function = lambda: self._cc_pipeline(
  352. pipeline, pipeline_name, pipeline_instance_id=pipeline_instance_id
  353. ) # nopep8
  354. if engine == "Tekton":
  355. self.log.info("Compiling pipeline for Tekton engine")
  356. kfp_tekton_compiler.TektonCompiler().compile(pipeline_function, absolute_pipeline_export_path)
  357. else:
  358. self.log.info("Compiling pipeline for Argo engine")
  359. kfp_argo_compiler.Compiler().compile(pipeline_function, absolute_pipeline_export_path)
  360. except RuntimeError:
  361. raise
  362. except Exception as ex:
  363. if ex.__cause__:
  364. raise RuntimeError(str(ex)) from ex
  365. raise RuntimeError(
  366. f"Error pre-processing pipeline '{pipeline_name}' for export to '{absolute_pipeline_export_path}'",
  367. str(ex),
  368. ) from ex
  369. self.log_pipeline_info(
  370. pipeline_name, f"pipeline exported to '{pipeline_export_path}'", duration=(time.time() - t0_all)
  371. )
  372. return pipeline_export_path # Return the input value, not its absolute form
  373. def _collect_envs(self, operation: Operation, **kwargs) -> Dict:
  374. """
  375. Amends envs collected from superclass with those pertaining to this subclass
  376. :return: dictionary containing environment name/value pairs
  377. """
  378. envs = super()._collect_envs(operation, **kwargs)
  379. # Only Unix-style path spec is supported.
  380. envs["ELYRA_WRITABLE_CONTAINER_DIR"] = self.WCD
  381. return envs
  382. def _cc_pipeline(
  383. self,
  384. pipeline: Pipeline,
  385. pipeline_name: str,
  386. pipeline_version: str = "",
  387. experiment_name: str = "",
  388. pipeline_instance_id: str = None,
  389. export=False,
  390. ):
  391. runtime_configuration = self._get_metadata_configuration(
  392. schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_ID, name=pipeline.runtime_config
  393. )
  394. cos_endpoint = runtime_configuration.metadata["cos_endpoint"]
  395. cos_username = runtime_configuration.metadata.get("cos_username")
  396. cos_password = runtime_configuration.metadata.get("cos_password")
  397. cos_secret = runtime_configuration.metadata.get("cos_secret")
  398. cos_bucket = runtime_configuration.metadata.get("cos_bucket")
  399. engine = runtime_configuration.metadata["engine"]
  400. pipeline_instance_id = pipeline_instance_id or pipeline_name
  401. artifact_object_prefix = join_paths(pipeline.pipeline_parameters.get(COS_OBJECT_PREFIX), pipeline_instance_id)
  402. self.log_pipeline_info(
  403. pipeline_name,
  404. f"processing pipeline dependencies for upload to '{cos_endpoint}' "
  405. f"bucket '{cos_bucket}' folder '{artifact_object_prefix}'",
  406. )
  407. t0_all = time.time()
  408. emptydir_volume_size = ""
  409. container_runtime = bool(os.getenv("CRIO_RUNTIME", "False").lower() == "true")
  410. # Create dictionary that maps component Id to its ContainerOp instance
  411. target_ops = {}
  412. # Sort operations based on dependency graph (topological order)
  413. sorted_operations = PipelineProcessor._sort_operations(pipeline.operations)
  414. # Determine whether access to cloud storage is required
  415. for operation in sorted_operations:
  416. if isinstance(operation, GenericOperation):
  417. self._verify_cos_connectivity(runtime_configuration)
  418. break
  419. # All previous operation outputs should be propagated throughout the pipeline.
  420. # In order to process this recursively, the current operation's inputs should be combined
  421. # from its parent's inputs (which, themselves are derived from the outputs of their parent)
  422. # and its parent's outputs.
  423. PipelineProcessor._propagate_operation_inputs_outputs(pipeline, sorted_operations)
  424. for operation in sorted_operations:
  425. if container_runtime:
  426. # Volume size to create when using CRI-o, NOTE: IBM Cloud minimum is 20Gi
  427. emptydir_volume_size = "20Gi"
  428. sanitized_operation_name = self._sanitize_operation_name(operation.name)
  429. # Create pipeline operation
  430. # If operation is one of the "generic" set of NBs or scripts, construct custom ExecuteFileOp
  431. if isinstance(operation, GenericOperation):
  432. # Collect env variables
  433. pipeline_envs = self._collect_envs(
  434. operation, cos_secret=cos_secret, cos_username=cos_username, cos_password=cos_password
  435. )
  436. operation_artifact_archive = self._get_dependency_archive_name(operation)
  437. self.log.debug(
  438. f"Creating pipeline component archive '{operation_artifact_archive}' for operation '{operation}'"
  439. )
  440. target_ops[operation.id] = ExecuteFileOp(
  441. name=sanitized_operation_name,
  442. pipeline_name=pipeline_name,
  443. experiment_name=experiment_name,
  444. notebook=operation.filename,
  445. cos_endpoint=cos_endpoint,
  446. cos_bucket=cos_bucket,
  447. cos_directory=artifact_object_prefix,
  448. cos_dependencies_archive=operation_artifact_archive,
  449. pipeline_version=pipeline_version,
  450. pipeline_source=pipeline.source,
  451. pipeline_inputs=operation.inputs,
  452. pipeline_outputs=operation.outputs,
  453. pipeline_envs=pipeline_envs,
  454. emptydir_volume_size=emptydir_volume_size,
  455. cpu_request=operation.cpu,
  456. mem_request=operation.memory,
  457. gpu_limit=operation.gpu,
  458. workflow_engine=engine,
  459. image=operation.runtime_image,
  460. file_outputs={
  461. "mlpipeline-metrics": f"{pipeline_envs['ELYRA_WRITABLE_CONTAINER_DIR']}/mlpipeline-metrics.json", # noqa
  462. "mlpipeline-ui-metadata": f"{pipeline_envs['ELYRA_WRITABLE_CONTAINER_DIR']}/mlpipeline-ui-metadata.json", # noqa
  463. },
  464. volume_mounts=operation.component_params.get(MOUNTED_VOLUMES, []),
  465. kubernetes_secrets=operation.component_params.get(KUBERNETES_SECRETS, []),
  466. )
  467. if operation.doc:
  468. target_ops[operation.id].add_pod_annotation("elyra/node-user-doc", operation.doc)
  469. # TODO Can we move all of this to apply to non-standard components as well? Test when servers are up
  470. if cos_secret and not export:
  471. target_ops[operation.id].apply(use_aws_secret(cos_secret))
  472. image_namespace = self._get_metadata_configuration(RuntimeImages.RUNTIME_IMAGES_SCHEMASPACE_ID)
  473. for image_instance in image_namespace:
  474. if image_instance.metadata["image_name"] == operation.runtime_image and image_instance.metadata.get(
  475. "pull_policy"
  476. ):
  477. target_ops[operation.id].container.set_image_pull_policy(image_instance.metadata["pull_policy"])
  478. self.log_pipeline_info(
  479. pipeline_name,
  480. f"processing operation dependencies for id '{operation.id}'",
  481. operation_name=operation.name,
  482. )
  483. self._upload_dependencies_to_object_store(
  484. runtime_configuration, pipeline_name, operation, prefix=artifact_object_prefix
  485. )
  486. # If operation is a "non-standard" component, load it's spec and create operation with factory function
  487. else:
  488. # Retrieve component from cache
  489. component = ComponentCache.instance().get_component(self._type, operation.classifier)
  490. # Convert the user-entered value of certain properties according to their type
  491. for component_property in component.properties:
  492. # Get corresponding property's value from parsed pipeline
  493. property_value = operation.component_params.get(component_property.ref)
  494. self.log.debug(
  495. f"Processing component parameter '{component_property.name}' "
  496. f"of type '{component_property.data_type}'"
  497. )
  498. if component_property.data_type == "inputpath":
  499. output_node_id = property_value["value"]
  500. output_node_parameter_key = property_value["option"].replace("elyra_output_", "")
  501. operation.component_params[component_property.ref] = target_ops[output_node_id].outputs[
  502. output_node_parameter_key
  503. ]
  504. elif component_property.data_type == "inputvalue":
  505. active_property = property_value["activeControl"]
  506. active_property_value = property_value.get(active_property, None)
  507. # If the value is not found, assign it the default value assigned in parser
  508. if active_property_value is None:
  509. active_property_value = component_property.value
  510. if isinstance(active_property_value, dict) and set(active_property_value.keys()) == {
  511. "value",
  512. "option",
  513. }:
  514. output_node_id = active_property_value["value"]
  515. output_node_parameter_key = active_property_value["option"].replace("elyra_output_", "")
  516. operation.component_params[component_property.ref] = target_ops[output_node_id].outputs[
  517. output_node_parameter_key
  518. ]
  519. elif component_property.default_data_type == "dictionary":
  520. processed_value = self._process_dictionary_value(active_property_value)
  521. operation.component_params[component_property.ref] = processed_value
  522. elif component_property.default_data_type == "list":
  523. processed_value = self._process_list_value(active_property_value)
  524. operation.component_params[component_property.ref] = processed_value
  525. else:
  526. operation.component_params[component_property.ref] = active_property_value
  527. # Build component task factory
  528. try:
  529. factory_function = components.load_component_from_text(component.definition)
  530. except Exception as e:
  531. # TODO Fix error messaging and break exceptions down into categories
  532. self.log.error(f"Error loading component spec for {operation.name}: {str(e)}")
  533. raise RuntimeError(f"Error loading component spec for {operation.name}.")
  534. # Add factory function, which returns a ContainerOp task instance, to pipeline operation dict
  535. try:
  536. comp_spec_inputs = [
  537. inputs.name.lower().replace(" ", "_") for inputs in factory_function.component_spec.inputs or []
  538. ]
  539. # Remove inputs and outputs from params dict
  540. # TODO: need to have way to retrieve only required params
  541. parameter_removal_list = ["inputs", "outputs"]
  542. for component_param in operation.component_params_as_dict.keys():
  543. if component_param not in comp_spec_inputs:
  544. parameter_removal_list.append(component_param)
  545. for parameter in parameter_removal_list:
  546. operation.component_params_as_dict.pop(parameter, None)
  547. # Create ContainerOp instance and assign appropriate user-provided name
  548. sanitized_component_params = {
  549. self._sanitize_param_name(name): value
  550. for name, value in operation.component_params_as_dict.items()
  551. }
  552. container_op = factory_function(**sanitized_component_params)
  553. container_op.set_display_name(operation.name)
  554. if operation.doc:
  555. container_op.add_pod_annotation("elyra/node-user-doc", operation.doc)
  556. target_ops[operation.id] = container_op
  557. except Exception as e:
  558. # TODO Fix error messaging and break exceptions down into categories
  559. self.log.error(f"Error constructing component {operation.name}: {str(e)}")
  560. raise RuntimeError(f"Error constructing component {operation.name}.")
  561. # Process dependencies after all the operations have been created
  562. for operation in pipeline.operations.values():
  563. op = target_ops[operation.id]
  564. for parent_operation_id in operation.parent_operation_ids:
  565. parent_op = target_ops[parent_operation_id] # Parent Operation
  566. op.after(parent_op)
  567. self.log_pipeline_info(pipeline_name, "pipeline dependencies processed", duration=(time.time() - t0_all))
  568. return target_ops
  569. def _generate_pipeline_conf(self, pipeline: dict) -> PipelineConf:
  570. """
  571. Returns a KFP pipeline configuration for this pipeline, which can be empty.
  572. :param pipeline: pipeline dictionary
  573. :type pipeline: dict
  574. :return: https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.dsl.html#kfp.dsl.PipelineConf
  575. :rtype: kfp.dsl import PipelineConf
  576. """
  577. self.log.debug("Generating pipeline configuration ...")
  578. pipeline_conf = PipelineConf()
  579. #
  580. # Gather input for container image pull secrets in support of private container image registries
  581. # https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.dsl.html#kfp.dsl.PipelineConf.set_image_pull_secrets
  582. #
  583. image_namespace = self._get_metadata_configuration(schemaspace=RuntimeImages.RUNTIME_IMAGES_SCHEMASPACE_ID)
  584. # iterate through pipeline operations and create list of Kubernetes secret names
  585. # that are associated with generic components
  586. container_image_pull_secret_names = []
  587. for operation in pipeline.operations.values():
  588. if isinstance(operation, GenericOperation):
  589. for image_instance in image_namespace:
  590. if image_instance.metadata["image_name"] == operation.runtime_image:
  591. if image_instance.metadata.get("pull_secret"):
  592. container_image_pull_secret_names.append(image_instance.metadata.get("pull_secret"))
  593. break
  594. if len(container_image_pull_secret_names) > 0:
  595. # de-duplicate the pull secret name list, create Kubernetes resource
  596. # references and add them to the pipeline configuration
  597. container_image_pull_secrets = []
  598. for secret_name in list(set(container_image_pull_secret_names)):
  599. container_image_pull_secrets.append(k8s_client.V1ObjectReference(name=secret_name))
  600. pipeline_conf.set_image_pull_secrets(container_image_pull_secrets)
  601. self.log.debug(
  602. f"Added {len(container_image_pull_secrets)}" " image pull secret(s) to the pipeline configuration."
  603. )
  604. return pipeline_conf
  605. @staticmethod
  606. def _sanitize_operation_name(name: str) -> str:
  607. """
  608. In KFP, only letters, numbers, spaces, "_", and "-" are allowed in name.
  609. :param name: name of the operation
  610. """
  611. return re.sub("-+", "-", re.sub("[^-_0-9A-Za-z ]+", "-", name)).lstrip("-").rstrip("-")
  612. @staticmethod
  613. def _sanitize_param_name(name: str) -> str:
  614. """
  615. Sanitize a component parameter name.
  616. Behavior is mirrored from how Kubeflow 1.X sanitizes identifier names:
  617. - https://github.com/kubeflow/pipelines/blob/1.8.1/sdk/python/kfp/components/_naming.py#L32-L42
  618. - https://github.com/kubeflow/pipelines/blob/1.8.1/sdk/python/kfp/components/_naming.py#L49-L50
  619. """
  620. normalized_name = name.lower()
  621. # remove non-word characters
  622. normalized_name = re.sub(r"[\W_]", " ", normalized_name)
  623. # no double spaces, leading or trailing spaces
  624. normalized_name = re.sub(" +", " ", normalized_name).strip()
  625. # no leading digits
  626. if re.match(r"\d", normalized_name):
  627. normalized_name = "n" + normalized_name
  628. return normalized_name.replace(" ", "_")
  629. class KfpPipelineProcessorResponse(PipelineProcessorResponse):
  630. _type = RuntimeProcessorType.KUBEFLOW_PIPELINES
  631. _name = "kfp"
  632. def __init__(self, run_id, run_url, object_storage_url, object_storage_path):
  633. super().__init__(run_url, object_storage_url, object_storage_path)
  634. self.run_id = run_id
  635. def to_json(self):
  636. response = super().to_json()
  637. response["run_id"] = self.run_id
  638. return response