operator.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import os
  17. import string
  18. from typing import Dict
  19. from typing import List
  20. from typing import Optional
  21. from kfp.dsl import ContainerOp
  22. from kfp.dsl import RUN_ID_PLACEHOLDER
  23. from kubernetes.client.models import V1EmptyDirVolumeSource
  24. from kubernetes.client.models import V1EnvVar
  25. from kubernetes.client.models import V1EnvVarSource
  26. from kubernetes.client.models import V1ObjectFieldSelector
  27. from kubernetes.client.models import V1PersistentVolumeClaimVolumeSource
  28. from kubernetes.client.models import V1SecretKeySelector
  29. from kubernetes.client.models import V1Toleration
  30. from kubernetes.client.models import V1Volume
  31. from kubernetes.client.models import V1VolumeMount
  32. from elyra._version import __version__
  33. from elyra.pipeline.pipeline import KubernetesAnnotation
  34. from elyra.pipeline.pipeline import KubernetesSecret
  35. from elyra.pipeline.pipeline import KubernetesToleration
  36. from elyra.pipeline.pipeline import VolumeMount
  37. """
  38. The ExecuteFileOp uses a python script to bootstrap the user supplied image with the required dependencies.
  39. In order for the script run properly, the image used, must at a minimum, have the 'curl' utility available
  40. and have python3
  41. """
  42. # Inputs and Outputs separator character. If updated,
  43. # same-named variable in bootstrapper.py must be updated!
  44. INOUT_SEPARATOR = ";"
  45. ELYRA_GITHUB_ORG = os.getenv("ELYRA_GITHUB_ORG", "elyra-ai")
  46. ELYRA_GITHUB_BRANCH = os.getenv("ELYRA_GITHUB_BRANCH", "main" if "dev" in __version__ else "v" + __version__)
  47. ELYRA_PIP_CONFIG_URL = os.getenv(
  48. "ELYRA_PIP_CONFIG_URL",
  49. f"https://raw.githubusercontent.com/{ELYRA_GITHUB_ORG}/elyra/{ELYRA_GITHUB_BRANCH}/etc/kfp/pip.conf",
  50. )
  51. ELYRA_BOOTSTRAP_SCRIPT_URL = os.getenv(
  52. "ELYRA_BOOTSTRAP_SCRIPT_URL",
  53. f"https://raw.githubusercontent.com/{ELYRA_GITHUB_ORG}/elyra/{ELYRA_GITHUB_BRANCH}/elyra/kfp/bootstrapper.py",
  54. )
  55. ELYRA_REQUIREMENTS_URL = os.getenv(
  56. "ELYRA_REQUIREMENTS_URL",
  57. f"https://raw.githubusercontent.com/{ELYRA_GITHUB_ORG}/"
  58. f"elyra/{ELYRA_GITHUB_BRANCH}/etc/generic/requirements-elyra.txt",
  59. )
  60. ELYRA_REQUIREMENTS_URL_PY37 = os.getenv(
  61. "ELYRA_REQUIREMENTS_URL_PY37",
  62. f"https://raw.githubusercontent.com/{ELYRA_GITHUB_ORG}/"
  63. f"elyra/{ELYRA_GITHUB_BRANCH}/etc/generic/requirements-elyra-py37.txt",
  64. )
  65. class ExecuteFileOp(ContainerOp):
  66. def __init__(
  67. self,
  68. pipeline_name: str,
  69. experiment_name: str,
  70. notebook: str,
  71. cos_endpoint: str,
  72. cos_bucket: str,
  73. cos_directory: str,
  74. cos_dependencies_archive: str,
  75. pipeline_version: Optional[str] = "",
  76. pipeline_source: Optional[str] = None,
  77. pipeline_outputs: Optional[List[str]] = None,
  78. pipeline_inputs: Optional[List[str]] = None,
  79. pipeline_envs: Optional[Dict[str, str]] = None,
  80. requirements_url: Optional[str] = None,
  81. bootstrap_script_url: Optional[str] = None,
  82. emptydir_volume_size: Optional[str] = None,
  83. cpu_request: Optional[str] = None,
  84. mem_request: Optional[str] = None,
  85. gpu_limit: Optional[str] = None,
  86. workflow_engine: Optional[str] = "argo",
  87. volume_mounts: Optional[List[VolumeMount]] = None,
  88. kubernetes_secrets: Optional[List[KubernetesSecret]] = None,
  89. kubernetes_tolerations: Optional[List[KubernetesToleration]] = None,
  90. kubernetes_pod_annotations: Optional[List[KubernetesAnnotation]] = None,
  91. **kwargs,
  92. ):
  93. """Create a new instance of ContainerOp.
  94. Args:
  95. pipeline_name: pipeline that this op belongs to
  96. experiment_name: the experiment where pipeline_name is executed
  97. notebook: name of the notebook that will be executed per this operation
  98. cos_endpoint: object storage endpoint e.g weaikish1.fyre.ibm.com:30442
  99. cos_bucket: bucket to retrieve archive from
  100. cos_directory: name of the directory in the object storage bucket to pull
  101. cos_dependencies_archive: archive file name to get from object storage bucket e.g archive1.tar.gz
  102. pipeline_version: optional version identifier
  103. pipeline_source: pipeline source
  104. pipeline_outputs: comma delimited list of files produced by the notebook
  105. pipeline_inputs: comma delimited list of files to be consumed/are required by the notebook
  106. pipeline_envs: dictionary of environmental variables to set in the container prior to execution
  107. requirements_url: URL to a python requirements.txt file to be installed prior to running the notebook
  108. bootstrap_script_url: URL to a custom python bootstrap script to run
  109. emptydir_volume_size: Size(GB) of the volume to create for the workspace when using CRIO container runtime
  110. cpu_request: number of CPUs requested for the operation
  111. mem_request: memory requested for the operation (in Gi)
  112. gpu_limit: maximum number of GPUs allowed for the operation
  113. workflow_engine: Kubeflow workflow engine, defaults to 'argo'
  114. volume_mounts: data volumes to be mounted
  115. kubernetes_secrets: secrets to be made available as environment variables
  116. kubernetes_tolerations: Kubernetes tolerations to be added to the pod
  117. kubernetes_pod_annotations: annotations to be applied to the pod
  118. kwargs: additional key value pairs to pass e.g. name, image, sidecars & is_exit_handler.
  119. See Kubeflow pipelines ContainerOp definition for more parameters or how to use
  120. https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.dsl.html#kfp.dsl.ContainerOp
  121. """
  122. self.pipeline_name = pipeline_name
  123. self.pipeline_version = pipeline_version
  124. self.pipeline_source = pipeline_source
  125. self.experiment_name = experiment_name
  126. self.notebook = notebook
  127. self.notebook_name = os.path.basename(notebook)
  128. self.cos_endpoint = cos_endpoint
  129. self.cos_bucket = cos_bucket
  130. self.cos_directory = cos_directory
  131. self.cos_dependencies_archive = cos_dependencies_archive
  132. self.container_work_dir_root_path = "./"
  133. self.container_work_dir_name = "jupyter-work-dir/"
  134. self.container_work_dir = self.container_work_dir_root_path + self.container_work_dir_name
  135. self.bootstrap_script_url = bootstrap_script_url
  136. self.requirements_url = requirements_url
  137. self.pipeline_outputs = pipeline_outputs
  138. self.pipeline_inputs = pipeline_inputs
  139. self.pipeline_envs = pipeline_envs
  140. self.cpu_request = cpu_request
  141. self.mem_request = mem_request
  142. self.gpu_limit = gpu_limit
  143. self.volume_mounts = volume_mounts # optional data volumes to be mounted to the pod
  144. self.kubernetes_secrets = kubernetes_secrets # optional secrets to be made available as env vars
  145. self.kubernetes_tolerations = (
  146. kubernetes_tolerations # optional Kubernetes tolerations to be attached to the pod
  147. )
  148. self.kubernetes_pod_annotations = kubernetes_pod_annotations # optional annotations
  149. argument_list = []
  150. """ CRI-o support for kfp pipelines
  151. We need to attach an emptydir volume for each notebook that runs since CRI-o runtime does not allow
  152. us to write to the base image layer file system, only to volumes.
  153. """
  154. self.emptydir_volume_name = "workspace"
  155. self.emptydir_volume_size = emptydir_volume_size
  156. self.python_user_lib_path = ""
  157. self.python_user_lib_path_target = ""
  158. self.python_pip_config_url = ""
  159. if self.emptydir_volume_size:
  160. self.container_work_dir_root_path = "/opt/app-root/src/"
  161. self.container_python_dir_name = "python3/"
  162. self.container_work_dir = self.container_work_dir_root_path + self.container_work_dir_name
  163. self.python_user_lib_path = self.container_work_dir + self.container_python_dir_name
  164. self.python_user_lib_path_target = "--target=" + self.python_user_lib_path
  165. self.python_pip_config_url = ELYRA_PIP_CONFIG_URL
  166. if not self.bootstrap_script_url:
  167. self.bootstrap_script_url = ELYRA_BOOTSTRAP_SCRIPT_URL
  168. if not self.requirements_url:
  169. self.requirements_url = ELYRA_REQUIREMENTS_URL
  170. if "name" not in kwargs:
  171. raise TypeError("You need to provide a name for the operation.")
  172. elif not kwargs.get("name"):
  173. raise ValueError("You need to provide a name for the operation.")
  174. if "image" not in kwargs:
  175. raise ValueError("You need to provide an image.")
  176. if not notebook:
  177. raise ValueError("You need to provide a notebook.")
  178. if "arguments" not in kwargs:
  179. """If no arguments are passed, we use our own.
  180. If ['arguments'] are set, we assume container's ENTRYPOINT is set and dependencies are installed
  181. NOTE: Images being pulled must have python3 available on PATH and cURL utility
  182. """
  183. common_curl_options = '--fail -H "Cache-Control: no-cache"'
  184. argument_list.append(
  185. f"mkdir -p {self.container_work_dir} && cd {self.container_work_dir} && "
  186. f"echo 'Downloading {self.bootstrap_script_url}' && "
  187. f"curl {common_curl_options} -L {self.bootstrap_script_url} --output bootstrapper.py && "
  188. f"echo 'Downloading {self.requirements_url}' && "
  189. f"curl {common_curl_options} -L {self.requirements_url} --output requirements-elyra.txt && "
  190. f"echo 'Downloading {ELYRA_REQUIREMENTS_URL_PY37}' && "
  191. f"curl {common_curl_options} -L {ELYRA_REQUIREMENTS_URL_PY37} --output requirements-elyra-py37.txt && "
  192. )
  193. if self.emptydir_volume_size:
  194. argument_list.append(
  195. f"mkdir {self.container_python_dir_name} && cd {self.container_python_dir_name} && "
  196. f"echo 'Downloading {self.python_pip_config_url}' && "
  197. f"curl {common_curl_options} -L {self.python_pip_config_url} --output pip.conf && cd .. &&"
  198. )
  199. argument_list.append(
  200. f"python3 -m pip install {self.python_user_lib_path_target} packaging && "
  201. "python3 -m pip freeze > requirements-current.txt && "
  202. "python3 bootstrapper.py "
  203. f'--pipeline-name "{self.pipeline_name}" '
  204. f"--cos-endpoint {self.cos_endpoint} "
  205. f"--cos-bucket {self.cos_bucket} "
  206. f'--cos-directory "{self.cos_directory}" '
  207. f'--cos-dependencies-archive "{self.cos_dependencies_archive}" '
  208. f'--file "{self.notebook}" '
  209. )
  210. if self.pipeline_inputs:
  211. inputs_str = self._artifact_list_to_str(self.pipeline_inputs)
  212. argument_list.append(f'--inputs "{inputs_str}" ')
  213. if self.pipeline_outputs:
  214. outputs_str = self._artifact_list_to_str(self.pipeline_outputs)
  215. argument_list.append(f'--outputs "{outputs_str}" ')
  216. if self.emptydir_volume_size:
  217. argument_list.append(f'--user-volume-path "{self.python_user_lib_path}" ')
  218. kwargs["command"] = ["sh", "-c"]
  219. kwargs["arguments"] = "".join(argument_list)
  220. super().__init__(**kwargs)
  221. # add user-specified volume mounts: the referenced PVCs must exist
  222. # or this generic operation will fail
  223. if self.volume_mounts:
  224. unique_pvcs = []
  225. for volume_mount in self.volume_mounts:
  226. if volume_mount.pvc_name not in unique_pvcs:
  227. self.add_volume(
  228. V1Volume(
  229. name=volume_mount.pvc_name,
  230. persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(
  231. claim_name=volume_mount.pvc_name
  232. ),
  233. )
  234. )
  235. unique_pvcs.append(volume_mount.pvc_name)
  236. self.container.add_volume_mount(V1VolumeMount(mount_path=volume_mount.path, name=volume_mount.pvc_name))
  237. # We must deal with the envs after the superclass initialization since these amend the
  238. # container attribute that isn't available until now.
  239. if self.pipeline_envs:
  240. for key, value in self.pipeline_envs.items(): # Convert dict entries to format kfp needs
  241. self.container.add_env_variable(V1EnvVar(name=key, value=value))
  242. if self.kubernetes_secrets:
  243. for secret in self.kubernetes_secrets: # Convert tuple entries to format kfp needs
  244. self.container.add_env_variable(
  245. V1EnvVar(
  246. name=secret.env_var,
  247. value_from=V1EnvVarSource(secret_key_ref=V1SecretKeySelector(name=secret.name, key=secret.key)),
  248. )
  249. )
  250. # add user-provided tolerations
  251. if self.kubernetes_tolerations:
  252. for toleration in self.kubernetes_tolerations:
  253. self.add_toleration(
  254. V1Toleration(
  255. effect=toleration.effect,
  256. key=toleration.key,
  257. operator=toleration.operator,
  258. value=toleration.value,
  259. )
  260. )
  261. # add user-provided annotations to pod
  262. if self.kubernetes_pod_annotations:
  263. for annotation in self.kubernetes_pod_annotations:
  264. self.add_pod_annotation(annotation.key, annotation.value)
  265. # If crio volume size is found then assume kubeflow pipelines environment is using CRI-o as
  266. # its container runtime
  267. if self.emptydir_volume_size:
  268. self.add_volume(
  269. V1Volume(
  270. empty_dir=V1EmptyDirVolumeSource(medium="", size_limit=self.emptydir_volume_size),
  271. name=self.emptydir_volume_name,
  272. )
  273. )
  274. self.container.add_volume_mount(
  275. V1VolumeMount(mount_path=self.container_work_dir_root_path, name=self.emptydir_volume_name)
  276. )
  277. # Append to PYTHONPATH location of elyra dependencies in installed in Volume
  278. self.container.add_env_variable(V1EnvVar(name="PYTHONPATH", value=self.python_user_lib_path))
  279. if self.cpu_request:
  280. self.container.set_cpu_request(cpu=str(cpu_request))
  281. if self.mem_request:
  282. self.container.set_memory_request(memory=str(mem_request) + "G")
  283. if self.gpu_limit:
  284. gpu_vendor = self.pipeline_envs.get("GPU_VENDOR", "nvidia")
  285. self.container.set_gpu_limit(gpu=str(gpu_limit), vendor=gpu_vendor)
  286. # Generate unique ELYRA_RUN_NAME value and expose it as an environment
  287. # variable in the container
  288. if not workflow_engine:
  289. raise ValueError("workflow_engine is missing and needs to be specified.")
  290. if workflow_engine.lower() == "argo":
  291. # attach RUN_ID_PLACEHOLDER as run name
  292. # '{{workflow.annotations.pipelines.kubeflow.org/run_name}}' variable
  293. # cannot be resolved by Argo in KF 1.4
  294. run_name_placeholder = RUN_ID_PLACEHOLDER
  295. self.container.add_env_variable(V1EnvVar(name="ELYRA_RUN_NAME", value=run_name_placeholder))
  296. elif workflow_engine.lower() == "tekton":
  297. try:
  298. from kfp_tekton import TektonClient # noqa: F401
  299. except ImportError:
  300. raise ValueError(
  301. "kfp-tekton not installed. Please install using elyra[kfp-tekton] to use Tekton engine."
  302. )
  303. # For Tekton derive the value from the specified pod annotation
  304. annotation = "pipelines.kubeflow.org/run_name"
  305. field_path = f"metadata.annotations['{annotation}']"
  306. self.container.add_env_variable(
  307. V1EnvVar(
  308. name="ELYRA_RUN_NAME",
  309. value_from=V1EnvVarSource(field_ref=V1ObjectFieldSelector(field_path=field_path)),
  310. )
  311. )
  312. else:
  313. raise ValueError(f"{workflow_engine} is not a supported workflow engine.")
  314. # Attach metadata to the pod
  315. # Node type (a static type for this op)
  316. self.add_pod_label("elyra/node-type", ExecuteFileOp._normalize_label_value("notebook-script"))
  317. # Pipeline name
  318. self.add_pod_label("elyra/pipeline-name", ExecuteFileOp._normalize_label_value(self.pipeline_name))
  319. # Pipeline version
  320. self.add_pod_label("elyra/pipeline-version", ExecuteFileOp._normalize_label_value(self.pipeline_version))
  321. # Experiment name
  322. self.add_pod_label("elyra/experiment-name", ExecuteFileOp._normalize_label_value(self.experiment_name))
  323. # Pipeline node name
  324. self.add_pod_label("elyra/node-name", ExecuteFileOp._normalize_label_value(kwargs.get("name")))
  325. # Pipeline node file
  326. self.add_pod_annotation("elyra/node-file-name", self.notebook)
  327. # Identify the pipeline source, which can be a
  328. # pipeline file (mypipeline.pipeline), a Python
  329. # script or notebook that was submitted
  330. if self.pipeline_source is not None:
  331. self.add_pod_annotation("elyra/pipeline-source", self.pipeline_source)
  332. def _artifact_list_to_str(self, pipeline_array):
  333. trimmed_artifact_list = []
  334. for artifact_name in pipeline_array:
  335. if INOUT_SEPARATOR in artifact_name: # if INOUT_SEPARATOR is in name, throw since this is our separator
  336. raise ValueError(f"Illegal character ({INOUT_SEPARATOR}) found in filename '{artifact_name}'.")
  337. trimmed_artifact_list.append(artifact_name.strip())
  338. return INOUT_SEPARATOR.join(trimmed_artifact_list)
  339. @staticmethod
  340. def _normalize_label_value(value):
  341. """Produce a Kubernetes-compliant label from value
  342. Valid label values must be 63 characters or less and
  343. must be empty or begin and end with an alphanumeric
  344. character ([a-z0-9A-Z]) with dashes (-), underscores
  345. (_), dots (.), and alphanumerics between.
  346. """
  347. if value is None or len(value) == 0:
  348. return "" # nothing to do
  349. max_length = 63
  350. # This char is added at the front and/or back
  351. # of value, if the first and/or last character
  352. # is invalid. For example a value of "-abc"
  353. # is converted to "a-abc". The specified character
  354. # must meet the label value constraints.
  355. valid_char = "a"
  356. # This char is used to replace invalid characters
  357. # that are in the "middle" of value. For example
  358. # a value of "abc%def" is converted to "abc_def".
  359. # The specified character must meet the label value
  360. # constraints.
  361. valid_middle_char = "_"
  362. # must begin with [0-9a-zA-Z]
  363. valid_chars = string.ascii_letters + string.digits
  364. if value[0] not in valid_chars:
  365. value = valid_char + value
  366. value = value[:max_length] # enforce max length
  367. # must end with [0-9a-zA-Z]
  368. if value[-1] not in valid_chars:
  369. if len(value) <= max_length - 1:
  370. # append valid character if max length
  371. # would not be exceeded
  372. value = value + valid_char
  373. else:
  374. # replace with valid character
  375. value = value[:-1] + valid_char
  376. # middle chars must be [0-9a-zA-Z\-_.]
  377. valid_chars = valid_chars + "-_."
  378. newstr = ""
  379. for c in range(len(value)):
  380. if value[c] not in valid_chars:
  381. newstr = newstr + valid_middle_char
  382. else:
  383. newstr = newstr + value[c]
  384. value = newstr
  385. return value