123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697 |
- #
- # Copyright 2018-2022 Elyra Authors
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- #
- from abc import ABC
- from abc import abstractmethod
- import glob
- import json
- import logging
- import os
- from pathlib import Path
- import subprocess
- import sys
- from tempfile import TemporaryFile
- import time
- from typing import Any
- from typing import Optional
- from typing import Type
- from typing import TypeVar
- from urllib.parse import urljoin
- from urllib.parse import urlparse
- from urllib.parse import urlunparse
- from packaging import version
- # Inputs and Outputs separator character. If updated,
- # same-named variable in _notebook_op.py must be updated!
- INOUT_SEPARATOR = ";"
- # Setup forward reference for type hint on return from class factory method. See
- # https://stackoverflow.com/questions/39205527/can-you-annotate-return-type-when-value-is-instance-of-cls/39205612#39205612
- F = TypeVar("F", bound="FileOpBase")
- logger = logging.getLogger("elyra")
- enable_pipeline_info = os.getenv("ELYRA_ENABLE_PIPELINE_INFO", "true").lower() == "true"
- pipeline_name = None # global used in formatted logging
- operation_name = None # global used in formatted logging
- class FileOpBase(ABC):
- """Abstract base class for file-based operations"""
- filepath = None
- cos_client = None
- cos_bucket = None
- @classmethod
- def get_instance(cls: Type[F], **kwargs: Any) -> F:
- """Creates an appropriate subclass instance based on the extension of the filepath (-f) argument"""
- filepath = kwargs["filepath"]
- if ".ipynb" in filepath:
- return NotebookFileOp(**kwargs)
- elif ".py" in filepath:
- return PythonFileOp(**kwargs)
- elif ".r" in filepath:
- return RFileOp(**kwargs)
- else:
- raise ValueError(f"Unsupported file type: {filepath}")
- def __init__(self, **kwargs: Any) -> None:
- """Initializes the FileOpBase instance"""
- import minio
- from minio.credentials import providers
- self.filepath = kwargs["filepath"]
- self.input_params = kwargs or []
- self.cos_endpoint = urlparse(self.input_params.get("cos-endpoint"))
- self.cos_bucket = self.input_params.get("cos-bucket")
- # Infer secure from the endpoint's scheme.
- self.secure = self.cos_endpoint.scheme == "https"
- # get minio credentials provider
- if "cos-user" in self.input_params and "cos-password" in self.input_params:
- cred_provider = providers.StaticProvider(
- access_key=self.input_params.get("cos-user"),
- secret_key=self.input_params.get("cos-password"),
- )
- elif "AWS_ACCESS_KEY_ID" in os.environ and "AWS_SECRET_ACCESS_KEY" in os.environ:
- cred_provider = providers.EnvAWSProvider()
- elif "AWS_ROLE_ARN" in os.environ and "AWS_WEB_IDENTITY_TOKEN_FILE" in os.environ:
- cred_provider = providers.IamAwsProvider()
- else:
- raise RuntimeError(
- "No minio credentials provider can be initialised for current configs. "
- "Please validate your runtime configuration details and retry."
- )
- # get minio client
- self.cos_client = minio.Minio(self.cos_endpoint.netloc, secure=self.secure, credentials=cred_provider)
- @abstractmethod
- def execute(self) -> None:
- """Execute the operation relative to derived class"""
- raise NotImplementedError("Method 'execute()' must be implemented by subclasses!")
- def process_dependencies(self) -> None:
- """Process dependencies
- If a dependency archive is present, it will be downloaded from object storage
- and expanded into the local directory.
- This method can be overridden by subclasses, although overrides should first
- call the superclass method.
- """
- OpUtil.log_operation_info("processing dependencies")
- t0 = time.time()
- archive_file = self.input_params.get("cos-dependencies-archive")
- self.get_file_from_object_storage(archive_file)
- inputs = self.input_params.get("inputs")
- if inputs:
- input_list = inputs.split(INOUT_SEPARATOR)
- for file in input_list:
- self.get_file_from_object_storage(file.strip())
- subprocess.call(["tar", "-zxvf", archive_file])
- duration = time.time() - t0
- OpUtil.log_operation_info("dependencies processed", duration)
- def process_outputs(self) -> None:
- """Process outputs
- If outputs have been specified, it will upload the appropriate files to object storage
- This method can be overridden by subclasses, although overrides should first
- call the superclass method.
- """
- OpUtil.log_operation_info("processing outputs")
- t0 = time.time()
- outputs = self.input_params.get("outputs")
- if outputs:
- output_list = outputs.split(INOUT_SEPARATOR)
- for file in output_list:
- self.process_output_file(file.strip())
- duration = time.time() - t0
- OpUtil.log_operation_info("outputs processed", duration)
- def process_metrics_and_metadata(self) -> None:
- """Process metrics and metadata
- This method exposes metrics/metadata that the processed
- notebook | script produces in the KFP UI.
- This method should not be overridden by subclasses.
- """
- OpUtil.log_operation_info("processing metrics and metadata")
- t0 = time.time()
- # Location where the KFP specific output files will be stored
- # in the environment where the bootsrapper is running.
- # Defaults to '/tmp' if not specified.
- output_path = Path(os.getenv("ELYRA_WRITABLE_CONTAINER_DIR", "/tmp"))
- # verify that output_path exists, is a directory
- # and writable by creating a temporary file in that location
- try:
- with TemporaryFile(mode="w", dir=output_path) as t:
- t.write("can write")
- except Exception:
- # output_path doesn't meet the requirements
- # treat this as a non-fatal error and log a warning
- logger.warning(f'Cannot create files in "{output_path}".')
- OpUtil.log_operation_info("Aborted metrics and metadata processing", time.time() - t0)
- return
- # Name of the proprietary KFP UI metadata file.
- # Notebooks | scripts might (but don't have to) produce this file
- # as documented in
- # https://www.kubeflow.org/docs/pipelines/sdk/output-viewer/
- # Each ExecuteFileOp must declare this as an output file or
- # the KFP UI won't pick up the information.
- kfp_ui_metadata_filename = "mlpipeline-ui-metadata.json"
- # Name of the proprietary KFP metadata file.
- # Notebooks | scripts might (but don't have to) produce this file
- # as documented in
- # https://www.kubeflow.org/docs/pipelines/sdk/pipelines-metrics/
- # Each ExecuteFileOp must declare this as an output file or
- # the KFP UI won't pick up the information.
- kfp_metrics_filename = "mlpipeline-metrics.json"
- # If the notebook | Python script produced one of the files
- # copy it to the target location where KFP is looking for it.
- for filename in [kfp_ui_metadata_filename, kfp_metrics_filename]:
- try:
- src = Path(".") / filename
- logger.debug(f"Processing {src} ...")
- # try to load the file, if one was created by the
- # notebook or script
- with open(src, "r") as f:
- metadata = json.load(f)
- # the file exists and contains valid JSON
- logger.debug(f"File content: {json.dumps(metadata)}")
- target = output_path / filename
- # try to save the file in the destination location
- with open(target, "w") as f:
- json.dump(metadata, f)
- except FileNotFoundError:
- # The script | notebook didn't produce the file
- # we are looking for. This is not an error condition
- # that needs to be handled.
- logger.debug(f"{self.filepath} produced no file named {src}")
- except ValueError as ve:
- # The file content could not be parsed. Log a warning
- # and treat this as a non-fatal error.
- logger.warning(f"Ignoring incompatible {str(src)} produced by {self.filepath}: {ve} {str(ve)}")
- except Exception as ex:
- # Something is wrong with the user-generated metadata file.
- # Log a warning and treat this as a non-fatal error.
- logger.warning(f"Error processing {str(src)} produced by {self.filepath}: {ex} {str(ex)}")
- #
- # Augment kfp_ui_metadata_filename with Elyra-specific information:
- # - link to object storage where input and output artifacts are
- # stored
- ui_metadata_output = output_path / kfp_ui_metadata_filename
- try:
- # re-load the file
- with open(ui_metadata_output, "r") as f:
- metadata = json.load(f)
- except Exception:
- # ignore all errors
- metadata = {}
- # Assure the 'output' property exists and is of the correct type
- if metadata.get("outputs", None) is None or not isinstance(metadata["outputs"], list):
- metadata["outputs"] = []
- # Define HREF for COS bucket:
- # <COS_URL>/<BUCKET_NAME>/<COS_DIRECTORY>
- bucket_url = urljoin(
- urlunparse(self.cos_endpoint), f"{self.cos_bucket}/{self.input_params.get('cos-directory', '')}/"
- )
- # add Elyra metadata to 'outputs'
- metadata["outputs"].append(
- {
- "storage": "inline",
- "source": f"## Inputs for {self.filepath}\n"
- f"[{self.input_params['cos-dependencies-archive']}]({bucket_url})",
- "type": "markdown",
- }
- )
- # print the content of the augmented metadata file
- logger.debug(f"Output UI metadata: {json.dumps(metadata)}")
- logger.debug(f"Saving UI metadata file as {ui_metadata_output} ...")
- # Save [updated] KFP UI metadata file
- with open(ui_metadata_output, "w") as f:
- json.dump(metadata, f)
- duration = time.time() - t0
- OpUtil.log_operation_info("metrics and metadata processed", duration)
- def get_object_storage_filename(self, filename: str) -> str:
- """Function to pre-pend cloud storage working dir to file name
- :param filename: the local file
- :return: the full path of the object storage file
- """
- return os.path.join(self.input_params.get("cos-directory", ""), filename)
- def get_file_from_object_storage(self, file_to_get: str) -> None:
- """Utility function to get files from an object storage
- :param file_to_get: filename
- """
- object_to_get = self.get_object_storage_filename(file_to_get)
- t0 = time.time()
- self.cos_client.fget_object(bucket_name=self.cos_bucket, object_name=object_to_get, file_path=file_to_get)
- duration = time.time() - t0
- OpUtil.log_operation_info(
- f"downloaded {file_to_get} from bucket: {self.cos_bucket}, object: {object_to_get}", duration
- )
- def put_file_to_object_storage(self, file_to_upload: str, object_name: Optional[str] = None) -> None:
- """Utility function to put files into an object storage
- :param file_to_upload: filename
- :param object_name: remote filename (used to rename)
- """
- object_to_upload = object_name
- if not object_to_upload:
- object_to_upload = file_to_upload
- object_to_upload = self.get_object_storage_filename(object_to_upload)
- t0 = time.time()
- self.cos_client.fput_object(bucket_name=self.cos_bucket, object_name=object_to_upload, file_path=file_to_upload)
- duration = time.time() - t0
- OpUtil.log_operation_info(
- f"uploaded {file_to_upload} to bucket: {self.cos_bucket} object: {object_to_upload}", duration
- )
- def has_wildcard(self, filename):
- wildcards = ["*", "?"]
- return bool(any(c in filename for c in wildcards))
- def process_output_file(self, output_file):
- """Puts the file to object storage. Handles wildcards and directories."""
- matched_files = [output_file]
- if self.has_wildcard(output_file): # explode the wildcarded file
- matched_files = glob.glob(output_file)
- for matched_file in matched_files:
- if os.path.isdir(matched_file):
- for file in os.listdir(matched_file):
- self.process_output_file(os.path.join(matched_file, file))
- else:
- self.put_file_to_object_storage(matched_file)
- class NotebookFileOp(FileOpBase):
- """Perform Notebook File Operation"""
- def execute(self) -> None:
- """Execute the Notebook and upload results to object storage"""
- notebook = os.path.basename(self.filepath)
- notebook_name = notebook.replace(".ipynb", "")
- notebook_output = notebook_name + "-output.ipynb"
- notebook_html = notebook_name + ".html"
- try:
- OpUtil.log_operation_info(f"executing notebook using 'papermill {notebook} {notebook_output}'")
- t0 = time.time()
- # Include kernel selection in execution time
- kernel_name = NotebookFileOp.find_best_kernel(notebook)
- import papermill
- papermill.execute_notebook(notebook, notebook_output, kernel_name=kernel_name)
- duration = time.time() - t0
- OpUtil.log_operation_info("notebook execution completed", duration)
- NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html)
- self.put_file_to_object_storage(notebook_output, notebook)
- self.put_file_to_object_storage(notebook_html)
- self.process_outputs()
- except Exception as ex:
- # log in case of errors
- logger.error(f"Unexpected error: {sys.exc_info()[0]}")
- NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html)
- self.put_file_to_object_storage(notebook_output, notebook)
- self.put_file_to_object_storage(notebook_html)
- raise ex
- @staticmethod
- def convert_notebook_to_html(notebook_file: str, html_file: str) -> str:
- """Function to convert a Jupyter notebook file (.ipynb) into an html file
- :param notebook_file: object storage client
- :param html_file: name of what the html output file should be
- :return: html_file: the converted notebook in html format
- """
- import nbconvert
- import nbformat
- OpUtil.log_operation_info(f"converting from {notebook_file} to {html_file}")
- t0 = time.time()
- nb = nbformat.read(notebook_file, as_version=4)
- html_exporter = nbconvert.HTMLExporter()
- data, resources = html_exporter.from_notebook_node(nb)
- with open(html_file, "w") as f:
- f.write(data)
- f.close()
- duration = time.time() - t0
- OpUtil.log_operation_info(f"{notebook_file} converted to {html_file}", duration)
- return html_file
- @staticmethod
- def find_best_kernel(notebook_file: str) -> str:
- """Determines the best kernel to use via the following algorithm:
- 1. Loads notebook and gets kernel_name and kernel_language from NB metadata.
- 2. Gets the list of configured kernels using KernelSpecManager.
- 3. If notebook kernel_name is in list, use that, else
- 4. If not found, load each configured kernel.json file and find a language match.
- 5. On first match, log info message regarding the switch and use that kernel.
- 6. If no language match is found, revert to notebook kernel and log warning message.
- """
- from jupyter_client.kernelspec import KernelSpecManager
- import nbformat
- nb = nbformat.read(notebook_file, 4)
- nb_kspec = nb.metadata.kernelspec
- nb_kernel_name = nb_kspec.get("name")
- nb_kernel_lang = nb_kspec.get("language")
- kernel_specs = KernelSpecManager().find_kernel_specs()
- # see if we have a direct match...
- if nb_kernel_name in kernel_specs.keys():
- return nb_kernel_name
- # no match found for kernel, try matching language...
- for name, file in kernel_specs.items():
- # load file (JSON) and pick out language, if match, use first found
- with open(os.path.join(file, "kernel.json")) as f:
- kspec = json.load(f)
- if kspec.get("language").lower() == nb_kernel_lang.lower():
- matched_kernel = os.path.basename(file)
- logger.info(
- f"Matched kernel by language ({nb_kernel_lang}), using kernel "
- f"'{matched_kernel}' instead of the missing kernel '{nb_kernel_name}'."
- )
- return matched_kernel
- # no match found for language, return notebook kernel and let execution fail
- logger.warning(
- f"Reverting back to missing notebook kernel '{nb_kernel_name}' since no "
- f"language match ({nb_kernel_lang}) was found in current kernel specifications."
- )
- return nb_kernel_name
- class PythonFileOp(FileOpBase):
- """Perform Python File Operation"""
- def execute(self) -> None:
- """Execute the Python script and upload results to object storage"""
- python_script = os.path.basename(self.filepath)
- python_script_name = python_script.replace(".py", "")
- python_script_output = python_script_name + ".log"
- try:
- OpUtil.log_operation_info(
- f"executing python script using " f"'python3 {python_script}' to '{python_script_output}'"
- )
- t0 = time.time()
- with open(python_script_output, "w") as log_file:
- subprocess.run(["python3", python_script], stdout=log_file, stderr=subprocess.STDOUT, check=True)
- duration = time.time() - t0
- OpUtil.log_operation_info("python script execution completed", duration)
- self.put_file_to_object_storage(python_script_output, python_script_output)
- self.process_outputs()
- except Exception as ex:
- # log in case of errors
- logger.error(f"Unexpected error: {sys.exc_info()[0]}")
- logger.error(f"Error details: {ex}")
- self.put_file_to_object_storage(python_script_output, python_script_output)
- raise ex
- class RFileOp(FileOpBase):
- """Perform R File Operation"""
- def execute(self) -> None:
- """Execute the R script and upload results to object storage"""
- r_script = os.path.basename(self.filepath)
- r_script_name = r_script.replace(".r", "")
- r_script_output = r_script_name + ".log"
- try:
- OpUtil.log_operation_info(f"executing R script using " f"'Rscript {r_script}' to '{r_script_output}'")
- t0 = time.time()
- with open(r_script_output, "w") as log_file:
- subprocess.run(["Rscript", r_script], stdout=log_file, stderr=subprocess.STDOUT, check=True)
- duration = time.time() - t0
- OpUtil.log_operation_info("R script execution completed", duration)
- self.put_file_to_object_storage(r_script_output, r_script_output)
- self.process_outputs()
- except Exception as ex:
- # log in case of errors
- logger.error(f"Unexpected error: {sys.exc_info()[0]}")
- logger.error(f"Error details: {ex}")
- self.put_file_to_object_storage(r_script_output, r_script_output)
- raise ex
- class OpUtil(object):
- """Utility functions for preparing file execution."""
- @classmethod
- def package_install(cls, user_volume_path) -> None:
- OpUtil.log_operation_info("Installing packages")
- t0 = time.time()
- requirements_file = cls.determine_elyra_requirements()
- elyra_packages = cls.package_list_to_dict(requirements_file)
- current_packages = cls.package_list_to_dict("requirements-current.txt")
- to_install_list = []
- for package, ver in elyra_packages.items():
- if package in current_packages:
- if current_packages[package] is None:
- logger.warning(
- f"WARNING: Source package '{package}' found already installed as an "
- "editable package. This may conflict with the required version: "
- f"{ver} . Skipping..."
- )
- elif "git+" in current_packages[package]:
- logger.warning(
- f"WARNING: Source package '{package}' found already installed from "
- f"{current_packages[package]}. This may conflict with the required "
- f"version: {ver} . Skipping..."
- )
- elif isinstance(version.parse(current_packages[package]), version.LegacyVersion):
- logger.warning(
- f"WARNING: Package '{package}' found with unsupported Legacy version "
- f"scheme {current_packages[package]} already installed. Skipping..."
- )
- elif version.parse(ver) > version.parse(current_packages[package]):
- logger.info(f"Updating {package} package from version {current_packages[package]} to {ver}...")
- to_install_list.append(f"{package}=={ver}")
- elif version.parse(ver) < version.parse(current_packages[package]):
- logger.info(
- f"Newer {package} package with version {current_packages[package]} "
- f"already installed. Skipping..."
- )
- else:
- logger.info(f"Package not found. Installing {package} package with version {ver}...")
- to_install_list.append(f"{package}=={ver}")
- if to_install_list:
- if user_volume_path:
- to_install_list.insert(0, f"--target={user_volume_path}")
- to_install_list.append("--no-cache-dir")
- subprocess.run([sys.executable, "-m", "pip", "install"] + to_install_list, check=True)
- if user_volume_path:
- os.environ["PIP_CONFIG_FILE"] = f"{user_volume_path}/pip.conf"
- subprocess.run([sys.executable, "-m", "pip", "freeze"])
- duration = time.time() - t0
- OpUtil.log_operation_info("Packages installed", duration)
- @classmethod
- def determine_elyra_requirements(cls) -> Any:
- if sys.version_info.major == 3:
- if sys.version_info.minor == 7:
- return "requirements-elyra-py37.txt"
- elif sys.version_info.minor in [8, 9, 10]:
- return "requirements-elyra.txt"
- logger.error(
- f"This version of Python '{sys.version_info.major}.{sys.version_info.minor}' "
- f"is not supported for Elyra generic components"
- )
- return None
- @classmethod
- def package_list_to_dict(cls, filename: str) -> dict:
- package_dict = {}
- with open(filename) as fh:
- for line in fh:
- if line[0] != "#":
- if " @ " in line:
- package_name, package_version = line.strip("\n").split(sep=" @ ")
- elif "===" in line:
- package_name, package_version = line.strip("\n").split(sep="===")
- elif "==" in line:
- package_name, package_version = line.strip("\n").split(sep="==")
- elif line.startswith("-e ") or line.startswith("--editable "):
- package_name = line.strip("\n").replace("-e ", "").replace("--editable ", "")
- if "#egg=" in package_name: # editable package from version control system
- package_name = package_name.split("=")[-1]
- elif "/" in package_name: # editable package from local directory
- package_name = os.path.basename(package_name)
- package_version = None
- else:
- # Tolerate other formats but do not add to package list
- continue
- package_dict[package_name] = package_version
- return package_dict
- @classmethod
- def parse_arguments(cls, args) -> dict:
- import argparse
- global pipeline_name, operation_name
- logger.debug("Parsing Arguments.....")
- parser = argparse.ArgumentParser()
- parser.add_argument(
- "-e", "--cos-endpoint", dest="cos-endpoint", help="Cloud object storage endpoint", required=True
- )
- parser.add_argument(
- "-b", "--cos-bucket", dest="cos-bucket", help="Cloud object storage bucket to use", required=True
- )
- parser.add_argument(
- "-d",
- "--cos-directory",
- dest="cos-directory",
- help="Working directory in cloud object storage bucket to use",
- required=True,
- )
- parser.add_argument(
- "-t",
- "--cos-dependencies-archive",
- dest="cos-dependencies-archive",
- help="Archive containing notebook and dependency artifacts",
- required=True,
- )
- parser.add_argument("-f", "--file", dest="filepath", help="File to execute", required=True)
- parser.add_argument("-o", "--outputs", dest="outputs", help="Files to output to object store", required=False)
- parser.add_argument("-i", "--inputs", dest="inputs", help="Files to pull in from parent node", required=False)
- parser.add_argument(
- "-p",
- "--user-volume-path",
- dest="user-volume-path",
- help="Directory in Volume to install python libraries into",
- required=False,
- )
- parser.add_argument(
- "-n",
- "--pipeline-name",
- dest="pipeline-name",
- help="Pipeline name",
- required=True,
- )
- parsed_args = vars(parser.parse_args(args))
- # set pipeline name as global
- pipeline_name = parsed_args.get("pipeline-name")
- # operation/node name is the basename of the non-suffixed filepath, set as global
- operation_name = os.path.basename(os.path.splitext(parsed_args.get("filepath"))[0])
- return parsed_args
- @classmethod
- def log_operation_info(cls, action_clause: str, duration_secs: Optional[float] = None) -> None:
- """Produces a formatted log INFO message used entirely for support purposes.
- This method is intended to be called for any entries that should be captured across aggregated
- log files to identify steps within a given pipeline and each of its operations. As a result,
- calls to this method should produce single-line entries in the log (no embedded newlines).
- Each entry is prefixed with the pipeline name.
- General logging should NOT use this method but use logger.<level>() statements directly.
- :param action_clause: str representing the action that is being logged
- :param duration_secs: optional float value representing the duration of the action being logged
- """
- global pipeline_name, operation_name
- if enable_pipeline_info:
- duration_clause = f"({duration_secs:.3f} secs)" if duration_secs else ""
- logger.info(f"'{pipeline_name}':'{operation_name}' - {action_clause} {duration_clause}")
- def main():
- # Configure logger format, level
- logging.basicConfig(
- format="[%(levelname)1.1s %(asctime)s.%(msecs).03d] %(message)s", datefmt="%H:%M:%S", level=logging.DEBUG
- )
- # Setup packages and gather arguments
- input_params = OpUtil.parse_arguments(sys.argv[1:])
- OpUtil.log_operation_info("starting operation")
- t0 = time.time()
- OpUtil.package_install(user_volume_path=input_params.get("user-volume-path"))
- # Create the appropriate instance, process dependencies and execute the operation
- file_op = FileOpBase.get_instance(**input_params)
- file_op.process_dependencies()
- file_op.execute()
- # Process notebook | script metrics and KFP UI metadata
- file_op.process_metrics_and_metadata()
- duration = time.time() - t0
- OpUtil.log_operation_info("operation completed", duration)
- if __name__ == "__main__":
- main()
|