processor_kfp.py 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from datetime import datetime
  17. import os
  18. import re
  19. import tempfile
  20. import time
  21. from typing import Dict
  22. from urllib.parse import urlsplit
  23. from kfp import Client as ArgoClient
  24. from kfp import compiler as kfp_argo_compiler
  25. from kfp import components as components
  26. from kfp.dsl import PipelineConf
  27. from kfp.aws import use_aws_secret # noqa H306
  28. from kubernetes import client as k8s_client
  29. from kubernetes.client import V1PersistentVolumeClaimVolumeSource
  30. from kubernetes.client import V1Toleration
  31. from kubernetes.client import V1Volume
  32. from kubernetes.client import V1VolumeMount
  33. try:
  34. from kfp_tekton import compiler as kfp_tekton_compiler
  35. from kfp_tekton import TektonClient
  36. except ImportError:
  37. # We may not have kfp-tekton available and that's okay!
  38. kfp_tekton_compiler = None
  39. TektonClient = None
  40. from elyra._version import __version__
  41. from elyra.kfp.operator import ExecuteFileOp
  42. from elyra.metadata.schemaspaces import RuntimeImages
  43. from elyra.metadata.schemaspaces import Runtimes
  44. from elyra.pipeline.component_catalog import ComponentCache
  45. from elyra.pipeline.kfp.kfp_authentication import AuthenticationError
  46. from elyra.pipeline.kfp.kfp_authentication import KFPAuthenticator
  47. from elyra.pipeline.pipeline import GenericOperation
  48. from elyra.pipeline.pipeline import Operation
  49. from elyra.pipeline.pipeline import Pipeline
  50. from elyra.pipeline.pipeline_constants import COS_OBJECT_PREFIX
  51. from elyra.pipeline.processor import PipelineProcessor
  52. from elyra.pipeline.processor import RuntimePipelineProcessor
  53. from elyra.pipeline.processor import RuntimePipelineProcessorResponse
  54. from elyra.pipeline.runtime_type import RuntimeProcessorType
  55. from elyra.util.cos import join_paths
  56. from elyra.util.path import get_absolute_path
  57. class KfpPipelineProcessor(RuntimePipelineProcessor):
  58. _type = RuntimeProcessorType.KUBEFLOW_PIPELINES
  59. _name = "kfp"
  60. # Provide users with the ability to identify a writable directory in the
  61. # running container where the notebook | script is executed. The location
  62. # must exist and be known before the container is started.
  63. # Defaults to `/tmp`
  64. WCD = os.getenv("ELYRA_WRITABLE_CONTAINER_DIR", "/tmp").strip().rstrip("/")
  65. def process(self, pipeline):
  66. """
  67. Runs a pipeline on Kubeflow Pipelines
  68. Each time a pipeline is processed, a new version
  69. is uploaded and run under the same experiment name.
  70. """
  71. timestamp = datetime.now().strftime("%m%d%H%M%S")
  72. ################
  73. # Runtime Configs
  74. ################
  75. runtime_configuration = self._get_metadata_configuration(
  76. schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_ID, name=pipeline.runtime_config
  77. )
  78. # unpack Kubeflow Pipelines configs
  79. api_endpoint = runtime_configuration.metadata["api_endpoint"].rstrip("/")
  80. public_api_endpoint = runtime_configuration.metadata.get("public_api_endpoint", api_endpoint)
  81. api_username = runtime_configuration.metadata.get("api_username")
  82. api_password = runtime_configuration.metadata.get("api_password")
  83. user_namespace = runtime_configuration.metadata.get("user_namespace")
  84. engine = runtime_configuration.metadata.get("engine")
  85. if engine == "Tekton" and not TektonClient:
  86. raise ValueError(
  87. "Python package `kfp-tekton` is not installed. "
  88. "Please install using `elyra[kfp-tekton]` to use Tekton engine."
  89. )
  90. # unpack Cloud Object Storage configs
  91. cos_endpoint = runtime_configuration.metadata["cos_endpoint"]
  92. cos_public_endpoint = runtime_configuration.metadata.get("public_cos_endpoint", cos_endpoint)
  93. cos_bucket = runtime_configuration.metadata["cos_bucket"]
  94. # Determine which provider to use to authenticate with Kubeflow
  95. auth_type = runtime_configuration.metadata.get("auth_type")
  96. try:
  97. auth_info = KFPAuthenticator().authenticate(
  98. api_endpoint,
  99. auth_type_str=auth_type,
  100. runtime_config_name=pipeline.runtime_config,
  101. auth_parm_1=api_username,
  102. auth_parm_2=api_password,
  103. )
  104. self.log.debug(f"Authenticator returned {auth_info}")
  105. except AuthenticationError as ae:
  106. if ae.get_request_history() is not None:
  107. self.log.info("An authentication error was raised. Diagnostic information follows.")
  108. self.log.info(ae.request_history_to_string())
  109. raise RuntimeError(f"Kubeflow authentication failed: {ae}")
  110. #############
  111. # Create Kubeflow Client
  112. #############
  113. try:
  114. if engine == "Tekton":
  115. client = TektonClient(
  116. host=api_endpoint,
  117. cookies=auth_info.get("cookies", None),
  118. credentials=auth_info.get("credentials", None),
  119. existing_token=auth_info.get("existing_token", None),
  120. namespace=user_namespace,
  121. )
  122. else:
  123. client = ArgoClient(
  124. host=api_endpoint,
  125. cookies=auth_info.get("cookies", None),
  126. credentials=auth_info.get("credentials", None),
  127. existing_token=auth_info.get("existing_token", None),
  128. namespace=user_namespace,
  129. )
  130. except Exception as ex:
  131. # a common cause of these errors is forgetting to include `/pipeline` or including it with an 's'
  132. api_endpoint_obj = urlsplit(api_endpoint)
  133. if api_endpoint_obj.path != "/pipeline":
  134. api_endpoint_tip = api_endpoint_obj._replace(path="/pipeline").geturl()
  135. tip_string = (
  136. f" - [TIP: did you mean to set '{api_endpoint_tip}' as the endpoint, "
  137. f"take care not to include 's' at end]"
  138. )
  139. else:
  140. tip_string = ""
  141. raise RuntimeError(
  142. f"Failed to initialize `kfp.Client()` against: '{api_endpoint}' - "
  143. f"Check Kubeflow Pipelines runtime configuration: '{pipeline.runtime_config}'"
  144. f"{tip_string}"
  145. ) from ex
  146. #############
  147. # Verify Namespace
  148. #############
  149. try:
  150. client.list_experiments(namespace=user_namespace, page_size=1)
  151. except Exception as ex:
  152. if user_namespace:
  153. tip_string = f"[TIP: ensure namespace '{user_namespace}' is correct]"
  154. else:
  155. tip_string = "[TIP: you probably need to set a namespace]"
  156. raise RuntimeError(
  157. f"Failed to `kfp.Client().list_experiments()` against: '{api_endpoint}' - "
  158. f"Check Kubeflow Pipelines runtime configuration: '{pipeline.runtime_config}' - "
  159. f"{tip_string}"
  160. ) from ex
  161. #############
  162. # Pipeline Metadata none - inherited
  163. #############
  164. # generate a pipeline name
  165. pipeline_name = pipeline.name
  166. # generate a pipeline description
  167. pipeline_description = pipeline.description
  168. if pipeline_description is None:
  169. pipeline_description = f"Created with Elyra {__version__} pipeline editor using `{pipeline.source}`."
  170. #############
  171. # Submit & Run the Pipeline
  172. #############
  173. self.log_pipeline_info(pipeline_name, "submitting pipeline")
  174. with tempfile.TemporaryDirectory() as temp_dir:
  175. self.log.debug(f"Created temporary directory at: {temp_dir}")
  176. pipeline_path = os.path.join(temp_dir, f"{pipeline_name}.tar.gz")
  177. #############
  178. # Get Pipeline ID
  179. #############
  180. try:
  181. # get the kubeflow pipeline id (returns None if not found, otherwise the ID of the pipeline)
  182. pipeline_id = client.get_pipeline_id(pipeline_name)
  183. # calculate what "pipeline version" name to use
  184. if pipeline_id is None:
  185. # the first "pipeline version" name must be the pipeline name
  186. pipeline_version_name = pipeline_name
  187. else:
  188. # generate a unique name for a new "pipeline version" by appending the current timestamp
  189. pipeline_version_name = f"{pipeline_name}-{timestamp}"
  190. except Exception as ex:
  191. raise RuntimeError(
  192. f"Failed to get ID of Kubeflow pipeline: '{pipeline_name}' - "
  193. f"Check Kubeflow Pipelines runtime configuration: '{pipeline.runtime_config}'"
  194. ) from ex
  195. #############
  196. # Compile the Pipeline
  197. #############
  198. try:
  199. t0 = time.time()
  200. # generate a name for the experiment (lowercase because experiments are case intensive)
  201. experiment_name = pipeline_name.lower()
  202. # Create an instance id that will be used to store
  203. # the pipelines' dependencies, if applicable
  204. pipeline_instance_id = f"{pipeline_name}-{timestamp}"
  205. pipeline_function = lambda: self._cc_pipeline( # nopep8 E731
  206. pipeline,
  207. pipeline_name=pipeline_name,
  208. pipeline_version=pipeline_version_name,
  209. experiment_name=experiment_name,
  210. pipeline_instance_id=pipeline_instance_id,
  211. )
  212. # collect pipeline configuration information
  213. pipeline_conf = self._generate_pipeline_conf(pipeline)
  214. # compile the pipeline
  215. if engine == "Tekton":
  216. kfp_tekton_compiler.TektonCompiler().compile(
  217. pipeline_function, pipeline_path, pipeline_conf=pipeline_conf
  218. )
  219. else:
  220. kfp_argo_compiler.Compiler().compile(pipeline_function, pipeline_path, pipeline_conf=pipeline_conf)
  221. except RuntimeError:
  222. raise
  223. except Exception as ex:
  224. raise RuntimeError(
  225. f"Failed to compile pipeline '{pipeline_name}' with engine '{engine}' to: '{pipeline_path}'"
  226. ) from ex
  227. self.log_pipeline_info(pipeline_name, "pipeline compiled", duration=time.time() - t0)
  228. #############
  229. # Upload Pipeline Version
  230. #############
  231. try:
  232. t0 = time.time()
  233. # CASE 1: pipeline needs to be created
  234. if pipeline_id is None:
  235. # create new pipeline (and initial "pipeline version")
  236. kfp_pipeline = client.upload_pipeline(
  237. pipeline_package_path=pipeline_path,
  238. pipeline_name=pipeline_name,
  239. description=pipeline_description,
  240. )
  241. # extract the ID of the pipeline we created
  242. pipeline_id = kfp_pipeline.id
  243. # the initial "pipeline version" has the same id as the pipeline itself
  244. version_id = pipeline_id
  245. # CASE 2: pipeline already exists
  246. else:
  247. # upload the "pipeline version"
  248. kfp_pipeline = client.upload_pipeline_version(
  249. pipeline_package_path=pipeline_path,
  250. pipeline_version_name=pipeline_version_name,
  251. pipeline_id=pipeline_id,
  252. )
  253. # extract the id of the "pipeline version" that was created
  254. version_id = kfp_pipeline.id
  255. except Exception as ex:
  256. # a common cause of these errors is forgetting to include `/pipeline` or including it with an 's'
  257. api_endpoint_obj = urlsplit(api_endpoint)
  258. if api_endpoint_obj.path != "/pipeline":
  259. api_endpoint_tip = api_endpoint_obj._replace(path="/pipeline").geturl()
  260. tip_string = (
  261. f" - [TIP: did you mean to set '{api_endpoint_tip}' as the endpoint, "
  262. f"take care not to include 's' at end]"
  263. )
  264. else:
  265. tip_string = ""
  266. raise RuntimeError(
  267. f"Failed to upload Kubeflow pipeline '{pipeline_name}' - "
  268. f"Check Kubeflow Pipelines runtime configuration: '{pipeline.runtime_config}'"
  269. f"{tip_string}"
  270. ) from ex
  271. self.log_pipeline_info(pipeline_name, "pipeline uploaded", duration=time.time() - t0)
  272. #############
  273. # Create Experiment
  274. #############
  275. try:
  276. t0 = time.time()
  277. # create a new experiment (if already exists, this a no-op)
  278. experiment = client.create_experiment(name=experiment_name, namespace=user_namespace)
  279. except Exception as ex:
  280. raise RuntimeError(
  281. f"Failed to create Kubeflow experiment: '{experiment_name}' - "
  282. f"Check Kubeflow Pipelines runtime configuration: '{pipeline.runtime_config}'"
  283. ) from ex
  284. self.log_pipeline_info(pipeline_name, "created experiment", duration=time.time() - t0)
  285. #############
  286. # Create Pipeline Run
  287. #############
  288. try:
  289. t0 = time.time()
  290. # generate name for the pipeline run
  291. job_name = pipeline_instance_id
  292. # create pipeline run (or specified pipeline version)
  293. run = client.run_pipeline(
  294. experiment_id=experiment.id, job_name=job_name, pipeline_id=pipeline_id, version_id=version_id
  295. )
  296. except Exception as ex:
  297. raise RuntimeError(
  298. f"Failed to create Kubeflow pipeline run: '{job_name}' - "
  299. f"Check Kubeflow Pipelines runtime configuration: '{pipeline.runtime_config}'"
  300. ) from ex
  301. if run is None:
  302. # client.run_pipeline seemed to have encountered an issue
  303. # but didn't raise an exception
  304. raise RuntimeError(
  305. f"Failed to create Kubeflow pipeline run: '{job_name}' - "
  306. f"Check Kubeflow Pipelines runtime configuration: '{pipeline.runtime_config}'"
  307. )
  308. self.log_pipeline_info(
  309. pipeline_name,
  310. f"pipeline submitted: {public_api_endpoint}/#/runs/details/{run.id}",
  311. duration=time.time() - t0,
  312. )
  313. if pipeline.contains_generic_operations():
  314. object_storage_url = f"{cos_public_endpoint}"
  315. os_path = join_paths(pipeline.pipeline_parameters.get(COS_OBJECT_PREFIX), pipeline_instance_id)
  316. object_storage_path = f"/{cos_bucket}/{os_path}"
  317. else:
  318. object_storage_url = None
  319. object_storage_path = None
  320. return KfpPipelineProcessorResponse(
  321. run_id=run.id,
  322. run_url=f"{public_api_endpoint}/#/runs/details/{run.id}",
  323. object_storage_url=object_storage_url,
  324. object_storage_path=object_storage_path,
  325. )
  326. def export(self, pipeline: Pipeline, pipeline_export_format: str, pipeline_export_path: str, overwrite: bool):
  327. # Verify that the KfpPipelineProcessor supports the given export format
  328. self._verify_export_format(pipeline_export_format)
  329. t0_all = time.time()
  330. timestamp = datetime.now().strftime("%m%d%H%M%S")
  331. pipeline_name = pipeline.name
  332. # Create an instance id that will be used to store
  333. # the pipelines' dependencies, if applicable
  334. pipeline_instance_id = f"{pipeline_name}-{timestamp}"
  335. # Since pipeline_export_path may be relative to the notebook directory, ensure
  336. # we're using its absolute form.
  337. absolute_pipeline_export_path = get_absolute_path(self.root_dir, pipeline_export_path)
  338. runtime_configuration = self._get_metadata_configuration(
  339. schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_ID, name=pipeline.runtime_config
  340. )
  341. engine = runtime_configuration.metadata.get("engine")
  342. if engine == "Tekton" and not TektonClient:
  343. raise ValueError("kfp-tekton not installed. Please install using elyra[kfp-tekton] to use Tekton engine.")
  344. if os.path.exists(absolute_pipeline_export_path) and not overwrite:
  345. raise ValueError("File " + absolute_pipeline_export_path + " already exists.")
  346. self.log_pipeline_info(pipeline_name, f"Exporting pipeline as a .{pipeline_export_format} file")
  347. # Export pipeline as static configuration file (YAML formatted)
  348. try:
  349. # Exported pipeline is not associated with an experiment
  350. # or a version. The association is established when the
  351. # pipeline is imported into KFP by the user.
  352. pipeline_function = lambda: self._cc_pipeline(
  353. pipeline, pipeline_name, pipeline_instance_id=pipeline_instance_id
  354. ) # nopep8
  355. if engine == "Tekton":
  356. self.log.info("Compiling pipeline for Tekton engine")
  357. kfp_tekton_compiler.TektonCompiler().compile(pipeline_function, absolute_pipeline_export_path)
  358. else:
  359. self.log.info("Compiling pipeline for Argo engine")
  360. kfp_argo_compiler.Compiler().compile(pipeline_function, absolute_pipeline_export_path)
  361. except RuntimeError:
  362. raise
  363. except Exception as ex:
  364. if ex.__cause__:
  365. raise RuntimeError(str(ex)) from ex
  366. raise RuntimeError(
  367. f"Error pre-processing pipeline '{pipeline_name}' for export to '{absolute_pipeline_export_path}'",
  368. str(ex),
  369. ) from ex
  370. self.log_pipeline_info(
  371. pipeline_name, f"pipeline exported to '{pipeline_export_path}'", duration=(time.time() - t0_all)
  372. )
  373. return pipeline_export_path # Return the input value, not its absolute form
  374. def _collect_envs(self, operation: Operation, **kwargs) -> Dict:
  375. """
  376. Amends envs collected from superclass with those pertaining to this subclass
  377. :return: dictionary containing environment name/value pairs
  378. """
  379. envs = super()._collect_envs(operation, **kwargs)
  380. # Only Unix-style path spec is supported.
  381. envs["ELYRA_WRITABLE_CONTAINER_DIR"] = self.WCD
  382. return envs
  383. def _cc_pipeline(
  384. self,
  385. pipeline: Pipeline,
  386. pipeline_name: str,
  387. pipeline_version: str = "",
  388. experiment_name: str = "",
  389. pipeline_instance_id: str = None,
  390. export=False,
  391. ):
  392. runtime_configuration = self._get_metadata_configuration(
  393. schemaspace=Runtimes.RUNTIMES_SCHEMASPACE_ID, name=pipeline.runtime_config
  394. )
  395. cos_endpoint = runtime_configuration.metadata["cos_endpoint"]
  396. cos_username = runtime_configuration.metadata.get("cos_username")
  397. cos_password = runtime_configuration.metadata.get("cos_password")
  398. cos_secret = runtime_configuration.metadata.get("cos_secret")
  399. cos_bucket = runtime_configuration.metadata.get("cos_bucket")
  400. engine = runtime_configuration.metadata["engine"]
  401. pipeline_instance_id = pipeline_instance_id or pipeline_name
  402. artifact_object_prefix = join_paths(pipeline.pipeline_parameters.get(COS_OBJECT_PREFIX), pipeline_instance_id)
  403. self.log_pipeline_info(
  404. pipeline_name,
  405. f"processing pipeline dependencies for upload to '{cos_endpoint}' "
  406. f"bucket '{cos_bucket}' folder '{artifact_object_prefix}'",
  407. )
  408. t0_all = time.time()
  409. emptydir_volume_size = ""
  410. container_runtime = bool(os.getenv("CRIO_RUNTIME", "False").lower() == "true")
  411. # Create dictionary that maps component Id to its ContainerOp instance
  412. target_ops = {}
  413. # Sort operations based on dependency graph (topological order)
  414. sorted_operations = PipelineProcessor._sort_operations(pipeline.operations)
  415. # Determine whether access to cloud storage is required
  416. for operation in sorted_operations:
  417. if isinstance(operation, GenericOperation):
  418. self._verify_cos_connectivity(runtime_configuration)
  419. break
  420. # All previous operation outputs should be propagated throughout the pipeline.
  421. # In order to process this recursively, the current operation's inputs should be combined
  422. # from its parent's inputs (which, themselves are derived from the outputs of their parent)
  423. # and its parent's outputs.
  424. PipelineProcessor._propagate_operation_inputs_outputs(pipeline, sorted_operations)
  425. for operation in sorted_operations:
  426. if container_runtime:
  427. # Volume size to create when using CRI-o, NOTE: IBM Cloud minimum is 20Gi
  428. emptydir_volume_size = "20Gi"
  429. sanitized_operation_name = self._sanitize_operation_name(operation.name)
  430. # Create pipeline operation
  431. # If operation is one of the "generic" set of NBs or scripts, construct custom ExecuteFileOp
  432. if isinstance(operation, GenericOperation):
  433. # Collect env variables
  434. pipeline_envs = self._collect_envs(
  435. operation, cos_secret=cos_secret, cos_username=cos_username, cos_password=cos_password
  436. )
  437. operation_artifact_archive = self._get_dependency_archive_name(operation)
  438. self.log.debug(
  439. f"Creating pipeline component archive '{operation_artifact_archive}' for operation '{operation}'"
  440. )
  441. target_ops[operation.id] = ExecuteFileOp(
  442. name=sanitized_operation_name,
  443. pipeline_name=pipeline_name,
  444. experiment_name=experiment_name,
  445. notebook=operation.filename,
  446. cos_endpoint=cos_endpoint,
  447. cos_bucket=cos_bucket,
  448. cos_directory=artifact_object_prefix,
  449. cos_dependencies_archive=operation_artifact_archive,
  450. pipeline_version=pipeline_version,
  451. pipeline_source=pipeline.source,
  452. pipeline_inputs=operation.inputs,
  453. pipeline_outputs=operation.outputs,
  454. pipeline_envs=pipeline_envs,
  455. emptydir_volume_size=emptydir_volume_size,
  456. cpu_request=operation.cpu,
  457. mem_request=operation.memory,
  458. gpu_limit=operation.gpu,
  459. workflow_engine=engine,
  460. image=operation.runtime_image,
  461. file_outputs={
  462. "mlpipeline-metrics": f"{pipeline_envs['ELYRA_WRITABLE_CONTAINER_DIR']}/mlpipeline-metrics.json", # noqa
  463. "mlpipeline-ui-metadata": f"{pipeline_envs['ELYRA_WRITABLE_CONTAINER_DIR']}/mlpipeline-ui-metadata.json", # noqa
  464. },
  465. volume_mounts=operation.mounted_volumes,
  466. kubernetes_secrets=operation.kubernetes_secrets,
  467. kubernetes_tolerations=operation.kubernetes_tolerations,
  468. kubernetes_pod_annotations=operation.kubernetes_pod_annotations,
  469. )
  470. if operation.doc:
  471. target_ops[operation.id].add_pod_annotation("elyra/node-user-doc", operation.doc)
  472. # TODO Can we move all of this to apply to non-standard components as well? Test when servers are up
  473. if cos_secret and not export:
  474. target_ops[operation.id].apply(use_aws_secret(cos_secret))
  475. image_namespace = self._get_metadata_configuration(RuntimeImages.RUNTIME_IMAGES_SCHEMASPACE_ID)
  476. for image_instance in image_namespace:
  477. if image_instance.metadata["image_name"] == operation.runtime_image and image_instance.metadata.get(
  478. "pull_policy"
  479. ):
  480. target_ops[operation.id].container.set_image_pull_policy(image_instance.metadata["pull_policy"])
  481. self.log_pipeline_info(
  482. pipeline_name,
  483. f"processing operation dependencies for id '{operation.id}'",
  484. operation_name=operation.name,
  485. )
  486. self._upload_dependencies_to_object_store(
  487. runtime_configuration, pipeline_name, operation, prefix=artifact_object_prefix
  488. )
  489. # If operation is a "non-standard" component, load it's spec and create operation with factory function
  490. else:
  491. # Retrieve component from cache
  492. component = ComponentCache.instance().get_component(self._type, operation.classifier)
  493. # Convert the user-entered value of certain properties according to their type
  494. for component_property in component.properties:
  495. # Get corresponding property's value from parsed pipeline
  496. property_value = operation.component_params.get(component_property.ref)
  497. self.log.debug(
  498. f"Processing component parameter '{component_property.name}' "
  499. f"of type '{component_property.data_type}'"
  500. )
  501. if component_property.data_type == "inputpath":
  502. output_node_id = property_value["value"]
  503. output_node_parameter_key = property_value["option"].replace("elyra_output_", "")
  504. operation.component_params[component_property.ref] = target_ops[output_node_id].outputs[
  505. output_node_parameter_key
  506. ]
  507. elif component_property.data_type == "inputvalue":
  508. active_property = property_value["activeControl"]
  509. active_property_value = property_value.get(active_property, None)
  510. # If the value is not found, assign it the default value assigned in parser
  511. if active_property_value is None:
  512. active_property_value = component_property.value
  513. if isinstance(active_property_value, dict) and set(active_property_value.keys()) == {
  514. "value",
  515. "option",
  516. }:
  517. output_node_id = active_property_value["value"]
  518. output_node_parameter_key = active_property_value["option"].replace("elyra_output_", "")
  519. operation.component_params[component_property.ref] = target_ops[output_node_id].outputs[
  520. output_node_parameter_key
  521. ]
  522. elif component_property.default_data_type == "dictionary":
  523. processed_value = self._process_dictionary_value(active_property_value)
  524. operation.component_params[component_property.ref] = processed_value
  525. elif component_property.default_data_type == "list":
  526. processed_value = self._process_list_value(active_property_value)
  527. operation.component_params[component_property.ref] = processed_value
  528. else:
  529. operation.component_params[component_property.ref] = active_property_value
  530. # Build component task factory
  531. try:
  532. factory_function = components.load_component_from_text(component.definition)
  533. except Exception as e:
  534. # TODO Fix error messaging and break exceptions down into categories
  535. self.log.error(f"Error loading component spec for {operation.name}: {str(e)}")
  536. raise RuntimeError(f"Error loading component spec for {operation.name}.")
  537. # Add factory function, which returns a ContainerOp task instance, to pipeline operation dict
  538. try:
  539. comp_spec_inputs = [
  540. inputs.name.lower().replace(" ", "_") for inputs in factory_function.component_spec.inputs or []
  541. ]
  542. # Remove inputs and outputs from params dict
  543. # TODO: need to have way to retrieve only required params
  544. parameter_removal_list = ["inputs", "outputs"]
  545. for component_param in operation.component_params_as_dict.keys():
  546. if component_param not in comp_spec_inputs:
  547. parameter_removal_list.append(component_param)
  548. for parameter in parameter_removal_list:
  549. operation.component_params_as_dict.pop(parameter, None)
  550. # Create ContainerOp instance and assign appropriate user-provided name
  551. sanitized_component_params = {
  552. self._sanitize_param_name(name): value
  553. for name, value in operation.component_params_as_dict.items()
  554. }
  555. container_op = factory_function(**sanitized_component_params)
  556. container_op.set_display_name(operation.name)
  557. # Attach node comment
  558. if operation.doc:
  559. container_op.add_pod_annotation("elyra/node-user-doc", operation.doc)
  560. # Add user-specified volume mounts: the referenced PVCs must exist
  561. # or this operation will fail
  562. if operation.mounted_volumes:
  563. unique_pvcs = []
  564. for volume_mount in operation.mounted_volumes:
  565. if volume_mount.pvc_name not in unique_pvcs:
  566. container_op.add_volume(
  567. V1Volume(
  568. name=volume_mount.pvc_name,
  569. persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(
  570. claim_name=volume_mount.pvc_name
  571. ),
  572. )
  573. )
  574. unique_pvcs.append(volume_mount.pvc_name)
  575. container_op.add_volume_mount(
  576. V1VolumeMount(mount_path=volume_mount.path, name=volume_mount.pvc_name)
  577. )
  578. # Add user-specified Kubernetes tolerations
  579. if operation.kubernetes_tolerations:
  580. unique_tolerations = []
  581. for toleration in operation.kubernetes_tolerations:
  582. if str(toleration) not in unique_tolerations:
  583. container_op.add_toleration(
  584. V1Toleration(
  585. effect=toleration.effect,
  586. key=toleration.key,
  587. operator=toleration.operator,
  588. value=toleration.value,
  589. )
  590. )
  591. unique_tolerations.append(str(toleration))
  592. # Add user-specified pod annotations
  593. if operation.kubernetes_pod_annotations:
  594. unique_annotations = []
  595. for annotation in operation.kubernetes_pod_annotations:
  596. if annotation.key not in unique_annotations:
  597. container_op.add_pod_annotation(annotation.key, annotation.value)
  598. unique_annotations.append(annotation.key)
  599. # Force re-execution of the operation by setting staleness to zero days
  600. # https://www.kubeflow.org/docs/components/pipelines/overview/caching/#managing-caching-staleness
  601. if operation.disallow_cached_output:
  602. container_op.set_caching_options(enable_caching=False)
  603. container_op.execution_options.caching_strategy.max_cache_staleness = "P0D"
  604. target_ops[operation.id] = container_op
  605. except Exception as e:
  606. # TODO Fix error messaging and break exceptions down into categories
  607. self.log.error(f"Error constructing component {operation.name}: {str(e)}")
  608. raise RuntimeError(f"Error constructing component {operation.name}.")
  609. # Process dependencies after all the operations have been created
  610. for operation in pipeline.operations.values():
  611. op = target_ops[operation.id]
  612. for parent_operation_id in operation.parent_operation_ids:
  613. parent_op = target_ops[parent_operation_id] # Parent Operation
  614. op.after(parent_op)
  615. self.log_pipeline_info(pipeline_name, "pipeline dependencies processed", duration=(time.time() - t0_all))
  616. return target_ops
  617. def _generate_pipeline_conf(self, pipeline: dict) -> PipelineConf:
  618. """
  619. Returns a KFP pipeline configuration for this pipeline, which can be empty.
  620. :param pipeline: pipeline dictionary
  621. :type pipeline: dict
  622. :return: https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.dsl.html#kfp.dsl.PipelineConf
  623. :rtype: kfp.dsl import PipelineConf
  624. """
  625. self.log.debug("Generating pipeline configuration ...")
  626. pipeline_conf = PipelineConf()
  627. #
  628. # Gather input for container image pull secrets in support of private container image registries
  629. # https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.dsl.html#kfp.dsl.PipelineConf.set_image_pull_secrets
  630. #
  631. image_namespace = self._get_metadata_configuration(schemaspace=RuntimeImages.RUNTIME_IMAGES_SCHEMASPACE_ID)
  632. # iterate through pipeline operations and create list of Kubernetes secret names
  633. # that are associated with generic components
  634. container_image_pull_secret_names = []
  635. for operation in pipeline.operations.values():
  636. if isinstance(operation, GenericOperation):
  637. for image_instance in image_namespace:
  638. if image_instance.metadata["image_name"] == operation.runtime_image:
  639. if image_instance.metadata.get("pull_secret"):
  640. container_image_pull_secret_names.append(image_instance.metadata.get("pull_secret"))
  641. break
  642. if len(container_image_pull_secret_names) > 0:
  643. # de-duplicate the pull secret name list, create Kubernetes resource
  644. # references and add them to the pipeline configuration
  645. container_image_pull_secrets = []
  646. for secret_name in list(set(container_image_pull_secret_names)):
  647. container_image_pull_secrets.append(k8s_client.V1ObjectReference(name=secret_name))
  648. pipeline_conf.set_image_pull_secrets(container_image_pull_secrets)
  649. self.log.debug(
  650. f"Added {len(container_image_pull_secrets)}" " image pull secret(s) to the pipeline configuration."
  651. )
  652. return pipeline_conf
  653. @staticmethod
  654. def _sanitize_operation_name(name: str) -> str:
  655. """
  656. In KFP, only letters, numbers, spaces, "_", and "-" are allowed in name.
  657. :param name: name of the operation
  658. """
  659. return re.sub("-+", "-", re.sub("[^-_0-9A-Za-z ]+", "-", name)).lstrip("-").rstrip("-")
  660. @staticmethod
  661. def _sanitize_param_name(name: str) -> str:
  662. """
  663. Sanitize a component parameter name.
  664. Behavior is mirrored from how Kubeflow 1.X sanitizes identifier names:
  665. - https://github.com/kubeflow/pipelines/blob/1.8.1/sdk/python/kfp/components/_naming.py#L32-L42
  666. - https://github.com/kubeflow/pipelines/blob/1.8.1/sdk/python/kfp/components/_naming.py#L49-L50
  667. """
  668. normalized_name = name.lower()
  669. # remove non-word characters
  670. normalized_name = re.sub(r"[\W_]", " ", normalized_name)
  671. # no double spaces, leading or trailing spaces
  672. normalized_name = re.sub(" +", " ", normalized_name).strip()
  673. # no leading digits
  674. if re.match(r"\d", normalized_name):
  675. normalized_name = "n" + normalized_name
  676. return normalized_name.replace(" ", "_")
  677. class KfpPipelineProcessorResponse(RuntimePipelineProcessorResponse):
  678. _type = RuntimeProcessorType.KUBEFLOW_PIPELINES
  679. _name = "kfp"
  680. def __init__(self, run_id, run_url, object_storage_url, object_storage_path):
  681. super().__init__(run_url, object_storage_url, object_storage_path)
  682. self.run_id = run_id
  683. def to_json(self):
  684. response = super().to_json()
  685. response["run_id"] = self.run_id
  686. return response