bootstrapper.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from abc import ABC
  17. from abc import abstractmethod
  18. import glob
  19. import json
  20. import logging
  21. import os
  22. from pathlib import Path
  23. import subprocess
  24. import sys
  25. from tempfile import TemporaryFile
  26. import time
  27. from typing import Any
  28. from typing import Optional
  29. from typing import Type
  30. from typing import TypeVar
  31. from urllib.parse import urljoin
  32. from urllib.parse import urlparse
  33. from urllib.parse import urlunparse
  34. from packaging import version
  35. # Inputs and Outputs separator character. If updated,
  36. # same-named variable in _notebook_op.py must be updated!
  37. INOUT_SEPARATOR = ";"
  38. # Setup forward reference for type hint on return from class factory method. See
  39. # https://stackoverflow.com/questions/39205527/can-you-annotate-return-type-when-value-is-instance-of-cls/39205612#39205612
  40. F = TypeVar("F", bound="FileOpBase")
  41. logger = logging.getLogger("elyra")
  42. enable_pipeline_info = os.getenv("ELYRA_ENABLE_PIPELINE_INFO", "true").lower() == "true"
  43. pipeline_name = None # global used in formatted logging
  44. operation_name = None # global used in formatted logging
  45. class FileOpBase(ABC):
  46. """Abstract base class for file-based operations"""
  47. filepath = None
  48. cos_client = None
  49. cos_bucket = None
  50. @classmethod
  51. def get_instance(cls: Type[F], **kwargs: Any) -> F:
  52. """Creates an appropriate subclass instance based on the extension of the filepath (-f) argument"""
  53. filepath = kwargs["filepath"]
  54. if ".ipynb" in filepath:
  55. return NotebookFileOp(**kwargs)
  56. elif ".py" in filepath:
  57. return PythonFileOp(**kwargs)
  58. elif ".r" in filepath:
  59. return RFileOp(**kwargs)
  60. else:
  61. raise ValueError(f"Unsupported file type: {filepath}")
  62. def __init__(self, **kwargs: Any) -> None:
  63. """Initializes the FileOpBase instance"""
  64. import minio
  65. from minio.credentials import providers
  66. self.filepath = kwargs["filepath"]
  67. self.input_params = kwargs or []
  68. self.cos_endpoint = urlparse(self.input_params.get("cos-endpoint"))
  69. self.cos_bucket = self.input_params.get("cos-bucket")
  70. # Infer secure from the endpoint's scheme.
  71. self.secure = self.cos_endpoint.scheme == "https"
  72. # get minio credentials provider
  73. if "cos-user" in self.input_params and "cos-password" in self.input_params:
  74. cred_provider = providers.StaticProvider(
  75. access_key=self.input_params.get("cos-user"),
  76. secret_key=self.input_params.get("cos-password"),
  77. )
  78. elif "AWS_ACCESS_KEY_ID" in os.environ and "AWS_SECRET_ACCESS_KEY" in os.environ:
  79. cred_provider = providers.EnvAWSProvider()
  80. elif "AWS_ROLE_ARN" in os.environ and "AWS_WEB_IDENTITY_TOKEN_FILE" in os.environ:
  81. cred_provider = providers.IamAwsProvider()
  82. else:
  83. raise RuntimeError(
  84. "No minio credentials provider can be initialised for current configs. "
  85. "Please validate your runtime configuration details and retry."
  86. )
  87. # get minio client
  88. self.cos_client = minio.Minio(self.cos_endpoint.netloc, secure=self.secure, credentials=cred_provider)
  89. @abstractmethod
  90. def execute(self) -> None:
  91. """Execute the operation relative to derived class"""
  92. raise NotImplementedError("Method 'execute()' must be implemented by subclasses!")
  93. def process_dependencies(self) -> None:
  94. """Process dependencies
  95. If a dependency archive is present, it will be downloaded from object storage
  96. and expanded into the local directory.
  97. This method can be overridden by subclasses, although overrides should first
  98. call the superclass method.
  99. """
  100. OpUtil.log_operation_info("processing dependencies")
  101. t0 = time.time()
  102. archive_file = self.input_params.get("cos-dependencies-archive")
  103. self.get_file_from_object_storage(archive_file)
  104. inputs = self.input_params.get("inputs")
  105. if inputs:
  106. input_list = inputs.split(INOUT_SEPARATOR)
  107. for file in input_list:
  108. self.get_file_from_object_storage(file.strip())
  109. subprocess.call(["tar", "-zxvf", archive_file])
  110. duration = time.time() - t0
  111. OpUtil.log_operation_info("dependencies processed", duration)
  112. def process_outputs(self) -> None:
  113. """Process outputs
  114. If outputs have been specified, it will upload the appropriate files to object storage
  115. This method can be overridden by subclasses, although overrides should first
  116. call the superclass method.
  117. """
  118. OpUtil.log_operation_info("processing outputs")
  119. t0 = time.time()
  120. outputs = self.input_params.get("outputs")
  121. if outputs:
  122. output_list = outputs.split(INOUT_SEPARATOR)
  123. for file in output_list:
  124. self.process_output_file(file.strip())
  125. duration = time.time() - t0
  126. OpUtil.log_operation_info("outputs processed", duration)
  127. def process_metrics_and_metadata(self) -> None:
  128. """Process metrics and metadata
  129. This method exposes metrics/metadata that the processed
  130. notebook | script produces in the KFP UI.
  131. This method should not be overridden by subclasses.
  132. """
  133. OpUtil.log_operation_info("processing metrics and metadata")
  134. t0 = time.time()
  135. # Location where the KFP specific output files will be stored
  136. # in the environment where the bootsrapper is running.
  137. # Defaults to '/tmp' if not specified.
  138. output_path = Path(os.getenv("ELYRA_WRITABLE_CONTAINER_DIR", "/tmp"))
  139. # verify that output_path exists, is a directory
  140. # and writable by creating a temporary file in that location
  141. try:
  142. with TemporaryFile(mode="w", dir=output_path) as t:
  143. t.write("can write")
  144. except Exception:
  145. # output_path doesn't meet the requirements
  146. # treat this as a non-fatal error and log a warning
  147. logger.warning(f'Cannot create files in "{output_path}".')
  148. OpUtil.log_operation_info("Aborted metrics and metadata processing", time.time() - t0)
  149. return
  150. # Name of the proprietary KFP UI metadata file.
  151. # Notebooks | scripts might (but don't have to) produce this file
  152. # as documented in
  153. # https://www.kubeflow.org/docs/pipelines/sdk/output-viewer/
  154. # Each ExecuteFileOp must declare this as an output file or
  155. # the KFP UI won't pick up the information.
  156. kfp_ui_metadata_filename = "mlpipeline-ui-metadata.json"
  157. # Name of the proprietary KFP metadata file.
  158. # Notebooks | scripts might (but don't have to) produce this file
  159. # as documented in
  160. # https://www.kubeflow.org/docs/pipelines/sdk/pipelines-metrics/
  161. # Each ExecuteFileOp must declare this as an output file or
  162. # the KFP UI won't pick up the information.
  163. kfp_metrics_filename = "mlpipeline-metrics.json"
  164. # If the notebook | Python script produced one of the files
  165. # copy it to the target location where KFP is looking for it.
  166. for filename in [kfp_ui_metadata_filename, kfp_metrics_filename]:
  167. try:
  168. src = Path(".") / filename
  169. logger.debug(f"Processing {src} ...")
  170. # try to load the file, if one was created by the
  171. # notebook or script
  172. with open(src, "r") as f:
  173. metadata = json.load(f)
  174. # the file exists and contains valid JSON
  175. logger.debug(f"File content: {json.dumps(metadata)}")
  176. target = output_path / filename
  177. # try to save the file in the destination location
  178. with open(target, "w") as f:
  179. json.dump(metadata, f)
  180. except FileNotFoundError:
  181. # The script | notebook didn't produce the file
  182. # we are looking for. This is not an error condition
  183. # that needs to be handled.
  184. logger.debug(f"{self.filepath} produced no file named {src}")
  185. except ValueError as ve:
  186. # The file content could not be parsed. Log a warning
  187. # and treat this as a non-fatal error.
  188. logger.warning(f"Ignoring incompatible {str(src)} produced by {self.filepath}: {ve} {str(ve)}")
  189. except Exception as ex:
  190. # Something is wrong with the user-generated metadata file.
  191. # Log a warning and treat this as a non-fatal error.
  192. logger.warning(f"Error processing {str(src)} produced by {self.filepath}: {ex} {str(ex)}")
  193. #
  194. # Augment kfp_ui_metadata_filename with Elyra-specific information:
  195. # - link to object storage where input and output artifacts are
  196. # stored
  197. ui_metadata_output = output_path / kfp_ui_metadata_filename
  198. try:
  199. # re-load the file
  200. with open(ui_metadata_output, "r") as f:
  201. metadata = json.load(f)
  202. except Exception:
  203. # ignore all errors
  204. metadata = {}
  205. # Assure the 'output' property exists and is of the correct type
  206. if metadata.get("outputs", None) is None or not isinstance(metadata["outputs"], list):
  207. metadata["outputs"] = []
  208. # Define HREF for COS bucket:
  209. # <COS_URL>/<BUCKET_NAME>/<COS_DIRECTORY>
  210. bucket_url = urljoin(
  211. urlunparse(self.cos_endpoint), f"{self.cos_bucket}/{self.input_params.get('cos-directory', '')}/"
  212. )
  213. # add Elyra metadata to 'outputs'
  214. metadata["outputs"].append(
  215. {
  216. "storage": "inline",
  217. "source": f"## Inputs for {self.filepath}\n"
  218. f"[{self.input_params['cos-dependencies-archive']}]({bucket_url})",
  219. "type": "markdown",
  220. }
  221. )
  222. # print the content of the augmented metadata file
  223. logger.debug(f"Output UI metadata: {json.dumps(metadata)}")
  224. logger.debug(f"Saving UI metadata file as {ui_metadata_output} ...")
  225. # Save [updated] KFP UI metadata file
  226. with open(ui_metadata_output, "w") as f:
  227. json.dump(metadata, f)
  228. duration = time.time() - t0
  229. OpUtil.log_operation_info("metrics and metadata processed", duration)
  230. def get_object_storage_filename(self, filename: str) -> str:
  231. """Function to pre-pend cloud storage working dir to file name
  232. :param filename: the local file
  233. :return: the full path of the object storage file
  234. """
  235. return os.path.join(self.input_params.get("cos-directory", ""), filename)
  236. def get_file_from_object_storage(self, file_to_get: str) -> None:
  237. """Utility function to get files from an object storage
  238. :param file_to_get: filename
  239. """
  240. object_to_get = self.get_object_storage_filename(file_to_get)
  241. t0 = time.time()
  242. self.cos_client.fget_object(bucket_name=self.cos_bucket, object_name=object_to_get, file_path=file_to_get)
  243. duration = time.time() - t0
  244. OpUtil.log_operation_info(
  245. f"downloaded {file_to_get} from bucket: {self.cos_bucket}, object: {object_to_get}", duration
  246. )
  247. def put_file_to_object_storage(self, file_to_upload: str, object_name: Optional[str] = None) -> None:
  248. """Utility function to put files into an object storage
  249. :param file_to_upload: filename
  250. :param object_name: remote filename (used to rename)
  251. """
  252. object_to_upload = object_name
  253. if not object_to_upload:
  254. object_to_upload = file_to_upload
  255. object_to_upload = self.get_object_storage_filename(object_to_upload)
  256. t0 = time.time()
  257. self.cos_client.fput_object(bucket_name=self.cos_bucket, object_name=object_to_upload, file_path=file_to_upload)
  258. duration = time.time() - t0
  259. OpUtil.log_operation_info(
  260. f"uploaded {file_to_upload} to bucket: {self.cos_bucket} object: {object_to_upload}", duration
  261. )
  262. def has_wildcard(self, filename):
  263. wildcards = ["*", "?"]
  264. return bool(any(c in filename for c in wildcards))
  265. def process_output_file(self, output_file):
  266. """Puts the file to object storage. Handles wildcards and directories."""
  267. matched_files = [output_file]
  268. if self.has_wildcard(output_file): # explode the wildcarded file
  269. matched_files = glob.glob(output_file)
  270. for matched_file in matched_files:
  271. if os.path.isdir(matched_file):
  272. for file in os.listdir(matched_file):
  273. self.process_output_file(os.path.join(matched_file, file))
  274. else:
  275. self.put_file_to_object_storage(matched_file)
  276. class NotebookFileOp(FileOpBase):
  277. """Perform Notebook File Operation"""
  278. def execute(self) -> None:
  279. """Execute the Notebook and upload results to object storage"""
  280. notebook = os.path.basename(self.filepath)
  281. notebook_name = notebook.replace(".ipynb", "")
  282. notebook_output = notebook_name + "-output.ipynb"
  283. notebook_html = notebook_name + ".html"
  284. try:
  285. OpUtil.log_operation_info(f"executing notebook using 'papermill {notebook} {notebook_output}'")
  286. t0 = time.time()
  287. # Include kernel selection in execution time
  288. kernel_name = NotebookFileOp.find_best_kernel(notebook)
  289. import papermill
  290. papermill.execute_notebook(notebook, notebook_output, kernel_name=kernel_name)
  291. duration = time.time() - t0
  292. OpUtil.log_operation_info("notebook execution completed", duration)
  293. NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html)
  294. self.put_file_to_object_storage(notebook_output, notebook)
  295. self.put_file_to_object_storage(notebook_html)
  296. self.process_outputs()
  297. except Exception as ex:
  298. # log in case of errors
  299. logger.error(f"Unexpected error: {sys.exc_info()[0]}")
  300. NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html)
  301. self.put_file_to_object_storage(notebook_output, notebook)
  302. self.put_file_to_object_storage(notebook_html)
  303. raise ex
  304. @staticmethod
  305. def convert_notebook_to_html(notebook_file: str, html_file: str) -> str:
  306. """Function to convert a Jupyter notebook file (.ipynb) into an html file
  307. :param notebook_file: object storage client
  308. :param html_file: name of what the html output file should be
  309. :return: html_file: the converted notebook in html format
  310. """
  311. import nbconvert
  312. import nbformat
  313. OpUtil.log_operation_info(f"converting from {notebook_file} to {html_file}")
  314. t0 = time.time()
  315. nb = nbformat.read(notebook_file, as_version=4)
  316. html_exporter = nbconvert.HTMLExporter()
  317. data, resources = html_exporter.from_notebook_node(nb)
  318. with open(html_file, "w") as f:
  319. f.write(data)
  320. f.close()
  321. duration = time.time() - t0
  322. OpUtil.log_operation_info(f"{notebook_file} converted to {html_file}", duration)
  323. return html_file
  324. @staticmethod
  325. def find_best_kernel(notebook_file: str) -> str:
  326. """Determines the best kernel to use via the following algorithm:
  327. 1. Loads notebook and gets kernel_name and kernel_language from NB metadata.
  328. 2. Gets the list of configured kernels using KernelSpecManager.
  329. 3. If notebook kernel_name is in list, use that, else
  330. 4. If not found, load each configured kernel.json file and find a language match.
  331. 5. On first match, log info message regarding the switch and use that kernel.
  332. 6. If no language match is found, revert to notebook kernel and log warning message.
  333. """
  334. from jupyter_client.kernelspec import KernelSpecManager
  335. import nbformat
  336. nb = nbformat.read(notebook_file, 4)
  337. nb_kspec = nb.metadata.kernelspec
  338. nb_kernel_name = nb_kspec.get("name")
  339. nb_kernel_lang = nb_kspec.get("language")
  340. kernel_specs = KernelSpecManager().find_kernel_specs()
  341. # see if we have a direct match...
  342. if nb_kernel_name in kernel_specs.keys():
  343. return nb_kernel_name
  344. # no match found for kernel, try matching language...
  345. for name, file in kernel_specs.items():
  346. # load file (JSON) and pick out language, if match, use first found
  347. with open(os.path.join(file, "kernel.json")) as f:
  348. kspec = json.load(f)
  349. if kspec.get("language").lower() == nb_kernel_lang.lower():
  350. matched_kernel = os.path.basename(file)
  351. logger.info(
  352. f"Matched kernel by language ({nb_kernel_lang}), using kernel "
  353. f"'{matched_kernel}' instead of the missing kernel '{nb_kernel_name}'."
  354. )
  355. return matched_kernel
  356. # no match found for language, return notebook kernel and let execution fail
  357. logger.warning(
  358. f"Reverting back to missing notebook kernel '{nb_kernel_name}' since no "
  359. f"language match ({nb_kernel_lang}) was found in current kernel specifications."
  360. )
  361. return nb_kernel_name
  362. class PythonFileOp(FileOpBase):
  363. """Perform Python File Operation"""
  364. def execute(self) -> None:
  365. """Execute the Python script and upload results to object storage"""
  366. python_script = os.path.basename(self.filepath)
  367. python_script_name = python_script.replace(".py", "")
  368. python_script_output = python_script_name + ".log"
  369. try:
  370. OpUtil.log_operation_info(
  371. f"executing python script using " f"'python3 {python_script}' to '{python_script_output}'"
  372. )
  373. t0 = time.time()
  374. with open(python_script_output, "w") as log_file:
  375. subprocess.run(["python3", python_script], stdout=log_file, stderr=subprocess.STDOUT, check=True)
  376. duration = time.time() - t0
  377. OpUtil.log_operation_info("python script execution completed", duration)
  378. self.put_file_to_object_storage(python_script_output, python_script_output)
  379. self.process_outputs()
  380. except Exception as ex:
  381. # log in case of errors
  382. logger.error(f"Unexpected error: {sys.exc_info()[0]}")
  383. logger.error(f"Error details: {ex}")
  384. self.put_file_to_object_storage(python_script_output, python_script_output)
  385. raise ex
  386. class RFileOp(FileOpBase):
  387. """Perform R File Operation"""
  388. def execute(self) -> None:
  389. """Execute the R script and upload results to object storage"""
  390. r_script = os.path.basename(self.filepath)
  391. r_script_name = r_script.replace(".r", "")
  392. r_script_output = r_script_name + ".log"
  393. try:
  394. OpUtil.log_operation_info(f"executing R script using " f"'Rscript {r_script}' to '{r_script_output}'")
  395. t0 = time.time()
  396. with open(r_script_output, "w") as log_file:
  397. subprocess.run(["Rscript", r_script], stdout=log_file, stderr=subprocess.STDOUT, check=True)
  398. duration = time.time() - t0
  399. OpUtil.log_operation_info("R script execution completed", duration)
  400. self.put_file_to_object_storage(r_script_output, r_script_output)
  401. self.process_outputs()
  402. except Exception as ex:
  403. # log in case of errors
  404. logger.error(f"Unexpected error: {sys.exc_info()[0]}")
  405. logger.error(f"Error details: {ex}")
  406. self.put_file_to_object_storage(r_script_output, r_script_output)
  407. raise ex
  408. class OpUtil(object):
  409. """Utility functions for preparing file execution."""
  410. @classmethod
  411. def package_install(cls, user_volume_path) -> None:
  412. OpUtil.log_operation_info("Installing packages")
  413. t0 = time.time()
  414. requirements_file = cls.determine_elyra_requirements()
  415. elyra_packages = cls.package_list_to_dict(requirements_file)
  416. current_packages = cls.package_list_to_dict("requirements-current.txt")
  417. to_install_list = []
  418. for package, ver in elyra_packages.items():
  419. if package in current_packages:
  420. if current_packages[package] is None:
  421. logger.warning(
  422. f"WARNING: Source package '{package}' found already installed as an "
  423. "editable package. This may conflict with the required version: "
  424. f"{ver} . Skipping..."
  425. )
  426. elif "git+" in current_packages[package]:
  427. logger.warning(
  428. f"WARNING: Source package '{package}' found already installed from "
  429. f"{current_packages[package]}. This may conflict with the required "
  430. f"version: {ver} . Skipping..."
  431. )
  432. elif isinstance(version.parse(current_packages[package]), version.LegacyVersion):
  433. logger.warning(
  434. f"WARNING: Package '{package}' found with unsupported Legacy version "
  435. f"scheme {current_packages[package]} already installed. Skipping..."
  436. )
  437. elif version.parse(ver) > version.parse(current_packages[package]):
  438. logger.info(f"Updating {package} package from version {current_packages[package]} to {ver}...")
  439. to_install_list.append(f"{package}=={ver}")
  440. elif version.parse(ver) < version.parse(current_packages[package]):
  441. logger.info(
  442. f"Newer {package} package with version {current_packages[package]} "
  443. f"already installed. Skipping..."
  444. )
  445. else:
  446. logger.info(f"Package not found. Installing {package} package with version {ver}...")
  447. to_install_list.append(f"{package}=={ver}")
  448. if to_install_list:
  449. if user_volume_path:
  450. to_install_list.insert(0, f"--target={user_volume_path}")
  451. to_install_list.append("--no-cache-dir")
  452. subprocess.run([sys.executable, "-m", "pip", "install"] + to_install_list, check=True)
  453. if user_volume_path:
  454. os.environ["PIP_CONFIG_FILE"] = f"{user_volume_path}/pip.conf"
  455. subprocess.run([sys.executable, "-m", "pip", "freeze"])
  456. duration = time.time() - t0
  457. OpUtil.log_operation_info("Packages installed", duration)
  458. @classmethod
  459. def determine_elyra_requirements(cls) -> Any:
  460. if sys.version_info.major == 3:
  461. if sys.version_info.minor == 7:
  462. return "requirements-elyra-py37.txt"
  463. elif sys.version_info.minor in [8, 9, 10]:
  464. return "requirements-elyra.txt"
  465. logger.error(
  466. f"This version of Python '{sys.version_info.major}.{sys.version_info.minor}' "
  467. f"is not supported for Elyra generic components"
  468. )
  469. return None
  470. @classmethod
  471. def package_list_to_dict(cls, filename: str) -> dict:
  472. package_dict = {}
  473. with open(filename) as fh:
  474. for line in fh:
  475. if line[0] != "#":
  476. if " @ " in line:
  477. package_name, package_version = line.strip("\n").split(sep=" @ ")
  478. elif "===" in line:
  479. package_name, package_version = line.strip("\n").split(sep="===")
  480. elif "==" in line:
  481. package_name, package_version = line.strip("\n").split(sep="==")
  482. elif line.startswith("-e ") or line.startswith("--editable "):
  483. package_name = line.strip("\n").replace("-e ", "").replace("--editable ", "")
  484. if "#egg=" in package_name: # editable package from version control system
  485. package_name = package_name.split("=")[-1]
  486. elif "/" in package_name: # editable package from local directory
  487. package_name = os.path.basename(package_name)
  488. package_version = None
  489. else:
  490. # Tolerate other formats but do not add to package list
  491. continue
  492. package_dict[package_name] = package_version
  493. return package_dict
  494. @classmethod
  495. def parse_arguments(cls, args) -> dict:
  496. import argparse
  497. global pipeline_name, operation_name
  498. logger.debug("Parsing Arguments.....")
  499. parser = argparse.ArgumentParser()
  500. parser.add_argument(
  501. "-e", "--cos-endpoint", dest="cos-endpoint", help="Cloud object storage endpoint", required=True
  502. )
  503. parser.add_argument(
  504. "-b", "--cos-bucket", dest="cos-bucket", help="Cloud object storage bucket to use", required=True
  505. )
  506. parser.add_argument(
  507. "-d",
  508. "--cos-directory",
  509. dest="cos-directory",
  510. help="Working directory in cloud object storage bucket to use",
  511. required=True,
  512. )
  513. parser.add_argument(
  514. "-t",
  515. "--cos-dependencies-archive",
  516. dest="cos-dependencies-archive",
  517. help="Archive containing notebook and dependency artifacts",
  518. required=True,
  519. )
  520. parser.add_argument("-f", "--file", dest="filepath", help="File to execute", required=True)
  521. parser.add_argument("-o", "--outputs", dest="outputs", help="Files to output to object store", required=False)
  522. parser.add_argument("-i", "--inputs", dest="inputs", help="Files to pull in from parent node", required=False)
  523. parser.add_argument(
  524. "-p",
  525. "--user-volume-path",
  526. dest="user-volume-path",
  527. help="Directory in Volume to install python libraries into",
  528. required=False,
  529. )
  530. parser.add_argument(
  531. "-n",
  532. "--pipeline-name",
  533. dest="pipeline-name",
  534. help="Pipeline name",
  535. required=True,
  536. )
  537. parsed_args = vars(parser.parse_args(args))
  538. # set pipeline name as global
  539. pipeline_name = parsed_args.get("pipeline-name")
  540. # operation/node name is the basename of the non-suffixed filepath, set as global
  541. operation_name = os.path.basename(os.path.splitext(parsed_args.get("filepath"))[0])
  542. return parsed_args
  543. @classmethod
  544. def log_operation_info(cls, action_clause: str, duration_secs: Optional[float] = None) -> None:
  545. """Produces a formatted log INFO message used entirely for support purposes.
  546. This method is intended to be called for any entries that should be captured across aggregated
  547. log files to identify steps within a given pipeline and each of its operations. As a result,
  548. calls to this method should produce single-line entries in the log (no embedded newlines).
  549. Each entry is prefixed with the pipeline name.
  550. General logging should NOT use this method but use logger.<level>() statements directly.
  551. :param action_clause: str representing the action that is being logged
  552. :param duration_secs: optional float value representing the duration of the action being logged
  553. """
  554. global pipeline_name, operation_name
  555. if enable_pipeline_info:
  556. duration_clause = f"({duration_secs:.3f} secs)" if duration_secs else ""
  557. logger.info(f"'{pipeline_name}':'{operation_name}' - {action_clause} {duration_clause}")
  558. def main():
  559. # Configure logger format, level
  560. logging.basicConfig(
  561. format="[%(levelname)1.1s %(asctime)s.%(msecs).03d] %(message)s", datefmt="%H:%M:%S", level=logging.DEBUG
  562. )
  563. # Setup packages and gather arguments
  564. input_params = OpUtil.parse_arguments(sys.argv[1:])
  565. OpUtil.log_operation_info("starting operation")
  566. t0 = time.time()
  567. OpUtil.package_install(user_volume_path=input_params.get("user-volume-path"))
  568. # Create the appropriate instance, process dependencies and execute the operation
  569. file_op = FileOpBase.get_instance(**input_params)
  570. file_op.process_dependencies()
  571. file_op.execute()
  572. # Process notebook | script metrics and KFP UI metadata
  573. file_op.process_metrics_and_metadata()
  574. duration = time.time() - t0
  575. OpUtil.log_operation_info("operation completed", duration)
  576. if __name__ == "__main__":
  577. main()