operator.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import os
  17. import string
  18. from typing import Dict
  19. from typing import List
  20. from typing import Optional
  21. from kfp.dsl import ContainerOp
  22. from kfp.dsl import RUN_ID_PLACEHOLDER
  23. from kubernetes.client.models import V1EmptyDirVolumeSource
  24. from kubernetes.client.models import V1EnvVar
  25. from kubernetes.client.models import V1EnvVarSource
  26. from kubernetes.client.models import V1ObjectFieldSelector
  27. from kubernetes.client.models import V1PersistentVolumeClaimVolumeSource
  28. from kubernetes.client.models import V1SecretKeySelector
  29. from kubernetes.client.models import V1Volume
  30. from kubernetes.client.models import V1VolumeMount
  31. from elyra._version import __version__
  32. from elyra.pipeline.pipeline import KubernetesSecret
  33. from elyra.pipeline.pipeline import VolumeMount
  34. """
  35. The ExecuteFileOp uses a python script to bootstrap the user supplied image with the required dependencies.
  36. In order for the script run properly, the image used, must at a minimum, have the 'curl' utility available
  37. and have python3
  38. """
  39. # Inputs and Outputs separator character. If updated,
  40. # same-named variable in bootstrapper.py must be updated!
  41. INOUT_SEPARATOR = ";"
  42. ELYRA_GITHUB_ORG = os.getenv("ELYRA_GITHUB_ORG", "elyra-ai")
  43. ELYRA_GITHUB_BRANCH = os.getenv("ELYRA_GITHUB_BRANCH", "master" if "dev" in __version__ else "v" + __version__)
  44. ELYRA_PIP_CONFIG_URL = os.getenv(
  45. "ELYRA_PIP_CONFIG_URL",
  46. f"https://raw.githubusercontent.com/{ELYRA_GITHUB_ORG}/elyra/{ELYRA_GITHUB_BRANCH}/etc/kfp/pip.conf",
  47. )
  48. ELYRA_BOOTSTRAP_SCRIPT_URL = os.getenv(
  49. "ELYRA_BOOTSTRAP_SCRIPT_URL",
  50. f"https://raw.githubusercontent.com/{ELYRA_GITHUB_ORG}/elyra/{ELYRA_GITHUB_BRANCH}/elyra/kfp/bootstrapper.py",
  51. )
  52. ELYRA_REQUIREMENTS_URL = os.getenv(
  53. "ELYRA_REQUIREMENTS_URL",
  54. f"https://raw.githubusercontent.com/{ELYRA_GITHUB_ORG}/"
  55. f"elyra/{ELYRA_GITHUB_BRANCH}/etc/generic/requirements-elyra.txt",
  56. )
  57. ELYRA_REQUIREMENTS_URL_PY37 = os.getenv(
  58. "ELYRA_REQUIREMENTS_URL_PY37",
  59. f"https://raw.githubusercontent.com/{ELYRA_GITHUB_ORG}/"
  60. f"elyra/{ELYRA_GITHUB_BRANCH}/etc/generic/requirements-elyra-py37.txt",
  61. )
  62. class ExecuteFileOp(ContainerOp):
  63. def __init__(
  64. self,
  65. pipeline_name: str,
  66. experiment_name: str,
  67. notebook: str,
  68. cos_endpoint: str,
  69. cos_bucket: str,
  70. cos_directory: str,
  71. cos_dependencies_archive: str,
  72. pipeline_version: Optional[str] = "",
  73. pipeline_source: Optional[str] = None,
  74. pipeline_outputs: Optional[List[str]] = None,
  75. pipeline_inputs: Optional[List[str]] = None,
  76. pipeline_envs: Optional[Dict[str, str]] = None,
  77. requirements_url: Optional[str] = None,
  78. bootstrap_script_url: Optional[str] = None,
  79. emptydir_volume_size: Optional[str] = None,
  80. cpu_request: Optional[str] = None,
  81. mem_request: Optional[str] = None,
  82. gpu_limit: Optional[str] = None,
  83. workflow_engine: Optional[str] = "argo",
  84. volume_mounts: Optional[List[VolumeMount]] = None,
  85. kubernetes_secrets: Optional[List[KubernetesSecret]] = None,
  86. **kwargs,
  87. ):
  88. """Create a new instance of ContainerOp.
  89. Args:
  90. pipeline_name: pipeline that this op belongs to
  91. experiment_name: the experiment where pipeline_name is executed
  92. notebook: name of the notebook that will be executed per this operation
  93. cos_endpoint: object storage endpoint e.g weaikish1.fyre.ibm.com:30442
  94. cos_bucket: bucket to retrieve archive from
  95. cos_directory: name of the directory in the object storage bucket to pull
  96. cos_dependencies_archive: archive file name to get from object storage bucket e.g archive1.tar.gz
  97. pipeline_version: optional version identifier
  98. pipeline_source: pipeline source
  99. pipeline_outputs: comma delimited list of files produced by the notebook
  100. pipeline_inputs: comma delimited list of files to be consumed/are required by the notebook
  101. pipeline_envs: dictionary of environmental variables to set in the container prior to execution
  102. requirements_url: URL to a python requirements.txt file to be installed prior to running the notebook
  103. bootstrap_script_url: URL to a custom python bootstrap script to run
  104. emptydir_volume_size: Size(GB) of the volume to create for the workspace when using CRIO container runtime
  105. cpu_request: number of CPUs requested for the operation
  106. mem_request: memory requested for the operation (in Gi)
  107. gpu_limit: maximum number of GPUs allowed for the operation
  108. workflow_engine: Kubeflow workflow engine, defaults to 'argo'
  109. volume_mounts: data volumes to be mounted
  110. kubernetes_secrets: secrets to be made available as environment variables
  111. kwargs: additional key value pairs to pass e.g. name, image, sidecars & is_exit_handler.
  112. See Kubeflow pipelines ContainerOp definition for more parameters or how to use
  113. https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.dsl.html#kfp.dsl.ContainerOp
  114. """
  115. self.pipeline_name = pipeline_name
  116. self.pipeline_version = pipeline_version
  117. self.pipeline_source = pipeline_source
  118. self.experiment_name = experiment_name
  119. self.notebook = notebook
  120. self.notebook_name = os.path.basename(notebook)
  121. self.cos_endpoint = cos_endpoint
  122. self.cos_bucket = cos_bucket
  123. self.cos_directory = cos_directory
  124. self.cos_dependencies_archive = cos_dependencies_archive
  125. self.container_work_dir_root_path = "./"
  126. self.container_work_dir_name = "jupyter-work-dir/"
  127. self.container_work_dir = self.container_work_dir_root_path + self.container_work_dir_name
  128. self.bootstrap_script_url = bootstrap_script_url
  129. self.requirements_url = requirements_url
  130. self.pipeline_outputs = pipeline_outputs
  131. self.pipeline_inputs = pipeline_inputs
  132. self.pipeline_envs = pipeline_envs
  133. self.cpu_request = cpu_request
  134. self.mem_request = mem_request
  135. self.gpu_limit = gpu_limit
  136. self.volume_mounts = volume_mounts # optional data volumes to be mounted to the pod
  137. self.kubernetes_secrets = kubernetes_secrets # optional secrets to be made available as env vars
  138. argument_list = []
  139. """ CRI-o support for kfp pipelines
  140. We need to attach an emptydir volume for each notebook that runs since CRI-o runtime does not allow
  141. us to write to the base image layer file system, only to volumes.
  142. """
  143. self.emptydir_volume_name = "workspace"
  144. self.emptydir_volume_size = emptydir_volume_size
  145. self.python_user_lib_path = ""
  146. self.python_user_lib_path_target = ""
  147. self.python_pip_config_url = ""
  148. if self.emptydir_volume_size:
  149. self.container_work_dir_root_path = "/opt/app-root/src/"
  150. self.container_python_dir_name = "python3/"
  151. self.container_work_dir = self.container_work_dir_root_path + self.container_work_dir_name
  152. self.python_user_lib_path = self.container_work_dir + self.container_python_dir_name
  153. self.python_user_lib_path_target = "--target=" + self.python_user_lib_path
  154. self.python_pip_config_url = ELYRA_PIP_CONFIG_URL
  155. if not self.bootstrap_script_url:
  156. self.bootstrap_script_url = ELYRA_BOOTSTRAP_SCRIPT_URL
  157. if not self.requirements_url:
  158. self.requirements_url = ELYRA_REQUIREMENTS_URL
  159. if "name" not in kwargs:
  160. raise TypeError("You need to provide a name for the operation.")
  161. elif not kwargs.get("name"):
  162. raise ValueError("You need to provide a name for the operation.")
  163. if "image" not in kwargs:
  164. raise ValueError("You need to provide an image.")
  165. if not notebook:
  166. raise ValueError("You need to provide a notebook.")
  167. if "arguments" not in kwargs:
  168. """If no arguments are passed, we use our own.
  169. If ['arguments'] are set, we assume container's ENTRYPOINT is set and dependencies are installed
  170. NOTE: Images being pulled must have python3 available on PATH and cURL utility
  171. """
  172. common_curl_options = '--fail -H "Cache-Control: no-cache"'
  173. argument_list.append(
  174. f"mkdir -p {self.container_work_dir} && cd {self.container_work_dir} && "
  175. f"echo 'Downloading {self.bootstrap_script_url}' && "
  176. f"curl {common_curl_options} -L {self.bootstrap_script_url} --output bootstrapper.py && "
  177. f"echo 'Downloading {self.requirements_url}' && "
  178. f"curl {common_curl_options} -L {self.requirements_url} --output requirements-elyra.txt && "
  179. f"echo 'Downloading {ELYRA_REQUIREMENTS_URL_PY37}' && "
  180. f"curl {common_curl_options} -L {ELYRA_REQUIREMENTS_URL_PY37} --output requirements-elyra-py37.txt && "
  181. )
  182. if self.emptydir_volume_size:
  183. argument_list.append(
  184. f"mkdir {self.container_python_dir_name} && cd {self.container_python_dir_name} && "
  185. f"echo 'Downloading {self.python_pip_config_url}' && "
  186. f"curl {common_curl_options} -L {self.python_pip_config_url} --output pip.conf && cd .. &&"
  187. )
  188. argument_list.append(
  189. f"python3 -m pip install {self.python_user_lib_path_target} packaging && "
  190. "python3 -m pip freeze > requirements-current.txt && "
  191. "python3 bootstrapper.py "
  192. f'--pipeline-name "{self.pipeline_name}" '
  193. f"--cos-endpoint {self.cos_endpoint} "
  194. f"--cos-bucket {self.cos_bucket} "
  195. f'--cos-directory "{self.cos_directory}" '
  196. f'--cos-dependencies-archive "{self.cos_dependencies_archive}" '
  197. f'--file "{self.notebook}" '
  198. )
  199. if self.pipeline_inputs:
  200. inputs_str = self._artifact_list_to_str(self.pipeline_inputs)
  201. argument_list.append(f'--inputs "{inputs_str}" ')
  202. if self.pipeline_outputs:
  203. outputs_str = self._artifact_list_to_str(self.pipeline_outputs)
  204. argument_list.append(f'--outputs "{outputs_str}" ')
  205. if self.emptydir_volume_size:
  206. argument_list.append(f'--user-volume-path "{self.python_user_lib_path}" ')
  207. kwargs["command"] = ["sh", "-c"]
  208. kwargs["arguments"] = "".join(argument_list)
  209. super().__init__(**kwargs)
  210. # add user-specified volume mounts: the referenced PVCs must exist
  211. # or this generic operation will fail
  212. if self.volume_mounts:
  213. unique_pvcs = []
  214. for volume_mount in self.volume_mounts:
  215. if volume_mount.pvc_name not in unique_pvcs:
  216. self.add_volume(
  217. V1Volume(
  218. name=volume_mount.pvc_name,
  219. persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(
  220. claim_name=volume_mount.pvc_name
  221. ),
  222. )
  223. )
  224. unique_pvcs.append(volume_mount.pvc_name)
  225. self.container.add_volume_mount(V1VolumeMount(mount_path=volume_mount.path, name=volume_mount.pvc_name))
  226. # We must deal with the envs after the superclass initialization since these amend the
  227. # container attribute that isn't available until now.
  228. if self.pipeline_envs:
  229. for key, value in self.pipeline_envs.items(): # Convert dict entries to format kfp needs
  230. self.container.add_env_variable(V1EnvVar(name=key, value=value))
  231. if self.kubernetes_secrets:
  232. for secret in self.kubernetes_secrets: # Convert tuple entries to format kfp needs
  233. self.container.add_env_variable(
  234. V1EnvVar(
  235. name=secret.env_var,
  236. value_from=V1EnvVarSource(secret_key_ref=V1SecretKeySelector(name=secret.name, key=secret.key)),
  237. )
  238. )
  239. # If crio volume size is found then assume kubeflow pipelines environment is using CRI-o as
  240. # its container runtime
  241. if self.emptydir_volume_size:
  242. self.add_volume(
  243. V1Volume(
  244. empty_dir=V1EmptyDirVolumeSource(medium="", size_limit=self.emptydir_volume_size),
  245. name=self.emptydir_volume_name,
  246. )
  247. )
  248. self.container.add_volume_mount(
  249. V1VolumeMount(mount_path=self.container_work_dir_root_path, name=self.emptydir_volume_name)
  250. )
  251. # Append to PYTHONPATH location of elyra dependencies in installed in Volume
  252. self.container.add_env_variable(V1EnvVar(name="PYTHONPATH", value=self.python_user_lib_path))
  253. if self.cpu_request:
  254. self.container.set_cpu_request(cpu=str(cpu_request))
  255. if self.mem_request:
  256. self.container.set_memory_request(memory=str(mem_request) + "G")
  257. if self.gpu_limit:
  258. gpu_vendor = self.pipeline_envs.get("GPU_VENDOR", "nvidia")
  259. self.container.set_gpu_limit(gpu=str(gpu_limit), vendor=gpu_vendor)
  260. # Generate unique ELYRA_RUN_NAME value and expose it as an environment
  261. # variable in the container
  262. if not workflow_engine:
  263. raise ValueError("workflow_engine is missing and needs to be specified.")
  264. if workflow_engine.lower() == "argo":
  265. # attach RUN_ID_PLACEHOLDER as run name
  266. # '{{workflow.annotations.pipelines.kubeflow.org/run_name}}' variable
  267. # cannot be resolved by Argo in KF 1.4
  268. run_name_placeholder = RUN_ID_PLACEHOLDER
  269. self.container.add_env_variable(V1EnvVar(name="ELYRA_RUN_NAME", value=run_name_placeholder))
  270. elif workflow_engine.lower() == "tekton":
  271. try:
  272. from kfp_tekton import TektonClient # noqa: F401
  273. except ImportError:
  274. raise ValueError(
  275. "kfp-tekton not installed. Please install using elyra[kfp-tekton] to use Tekton engine."
  276. )
  277. # For Tekton derive the value from the specified pod annotation
  278. annotation = "pipelines.kubeflow.org/run_name"
  279. field_path = f"metadata.annotations['{annotation}']"
  280. self.container.add_env_variable(
  281. V1EnvVar(
  282. name="ELYRA_RUN_NAME",
  283. value_from=V1EnvVarSource(field_ref=V1ObjectFieldSelector(field_path=field_path)),
  284. )
  285. )
  286. else:
  287. raise ValueError(f"{workflow_engine} is not a supported workflow engine.")
  288. # Attach metadata to the pod
  289. # Node type (a static type for this op)
  290. self.add_pod_label("elyra/node-type", ExecuteFileOp._normalize_label_value("notebook-script"))
  291. # Pipeline name
  292. self.add_pod_label("elyra/pipeline-name", ExecuteFileOp._normalize_label_value(self.pipeline_name))
  293. # Pipeline version
  294. self.add_pod_label("elyra/pipeline-version", ExecuteFileOp._normalize_label_value(self.pipeline_version))
  295. # Experiment name
  296. self.add_pod_label("elyra/experiment-name", ExecuteFileOp._normalize_label_value(self.experiment_name))
  297. # Pipeline node name
  298. self.add_pod_label("elyra/node-name", ExecuteFileOp._normalize_label_value(kwargs.get("name")))
  299. # Pipeline node file
  300. self.add_pod_annotation("elyra/node-file-name", self.notebook)
  301. # Identify the pipeline source, which can be a
  302. # pipeline file (mypipeline.pipeline), a Python
  303. # script or notebook that was submitted
  304. if self.pipeline_source is not None:
  305. self.add_pod_annotation("elyra/pipeline-source", self.pipeline_source)
  306. def _artifact_list_to_str(self, pipeline_array):
  307. trimmed_artifact_list = []
  308. for artifact_name in pipeline_array:
  309. if INOUT_SEPARATOR in artifact_name: # if INOUT_SEPARATOR is in name, throw since this is our separator
  310. raise ValueError(f"Illegal character ({INOUT_SEPARATOR}) found in filename '{artifact_name}'.")
  311. trimmed_artifact_list.append(artifact_name.strip())
  312. return INOUT_SEPARATOR.join(trimmed_artifact_list)
  313. @staticmethod
  314. def _normalize_label_value(value):
  315. """Produce a Kubernetes-compliant label from value
  316. Valid label values must be 63 characters or less and
  317. must be empty or begin and end with an alphanumeric
  318. character ([a-z0-9A-Z]) with dashes (-), underscores
  319. (_), dots (.), and alphanumerics between.
  320. """
  321. if value is None or len(value) == 0:
  322. return "" # nothing to do
  323. max_length = 63
  324. # This char is added at the front and/or back
  325. # of value, if the first and/or last character
  326. # is invalid. For example a value of "-abc"
  327. # is converted to "a-abc". The specified character
  328. # must meet the label value constraints.
  329. valid_char = "a"
  330. # This char is used to replace invalid characters
  331. # that are in the "middle" of value. For example
  332. # a value of "abc%def" is converted to "abc_def".
  333. # The specified character must meet the label value
  334. # constraints.
  335. valid_middle_char = "_"
  336. # must begin with [0-9a-zA-Z]
  337. valid_chars = string.ascii_letters + string.digits
  338. if value[0] not in valid_chars:
  339. value = valid_char + value
  340. value = value[:max_length] # enforce max length
  341. # must end with [0-9a-zA-Z]
  342. if value[-1] not in valid_chars:
  343. if len(value) <= max_length - 1:
  344. # append valid character if max length
  345. # would not be exceeded
  346. value = value + valid_char
  347. else:
  348. # replace with valid character
  349. value = value[:-1] + valid_char
  350. # middle chars must be [0-9a-zA-Z\-_.]
  351. valid_chars = valid_chars + "-_."
  352. newstr = ""
  353. for c in range(len(value)):
  354. if value[c] not in valid_chars:
  355. newstr = newstr + valid_middle_char
  356. else:
  357. newstr = newstr + value[c]
  358. value = newstr
  359. return value