bootstrapper.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. from abc import ABC
  17. from abc import abstractmethod
  18. import glob
  19. import logging
  20. import os
  21. import subprocess
  22. import sys
  23. import time
  24. from typing import Any
  25. from typing import Optional
  26. from typing import Type
  27. from typing import TypeVar
  28. from urllib.parse import urlparse
  29. from packaging import version
  30. # Inputs and Outputs separator character. If updated,
  31. # same-named variable in _notebook_op.py must be updated!
  32. INOUT_SEPARATOR = ";"
  33. # Setup forward reference for type hint on return from class factory method. See
  34. # https://stackoverflow.com/questions/39205527/can-you-annotate-return-type-when-value-is-instance-of-cls/39205612#39205612
  35. F = TypeVar("F", bound="FileOpBase")
  36. logger = logging.getLogger("elyra")
  37. enable_pipeline_info = os.getenv("ELYRA_ENABLE_PIPELINE_INFO", "true").lower() == "true"
  38. pipeline_name = None # global used in formatted logging
  39. operation_name = None # global used in formatted logging
  40. class FileOpBase(ABC):
  41. """Abstract base class for file-based operations"""
  42. filepath = None
  43. cos_client = None
  44. cos_bucket = None
  45. @classmethod
  46. def get_instance(cls: Type[F], **kwargs: Any) -> F:
  47. """Creates an appropriate subclass instance based on the extension of the filepath (-f) argument"""
  48. filepath = kwargs["filepath"]
  49. if ".ipynb" in filepath:
  50. return NotebookFileOp(**kwargs)
  51. elif ".py" in filepath:
  52. return PythonFileOp(**kwargs)
  53. elif ".r" in filepath:
  54. return RFileOp(**kwargs)
  55. else:
  56. raise ValueError(f"Unsupported file type: {filepath}")
  57. def __init__(self, **kwargs: Any) -> None:
  58. """Initializes the FileOpBase instance"""
  59. import minio
  60. from minio.credentials import providers
  61. self.filepath = kwargs["filepath"]
  62. self.input_params = kwargs or []
  63. self.cos_endpoint = urlparse(self.input_params.get("cos-endpoint"))
  64. self.cos_bucket = self.input_params.get("cos-bucket")
  65. # Infer secure from the endpoint's scheme.
  66. self.secure = self.cos_endpoint.scheme == "https"
  67. # get minio credentials provider
  68. if "cos-user" in self.input_params and "cos-password" in self.input_params:
  69. cred_provider = providers.StaticProvider(
  70. access_key=self.input_params.get("cos-user"),
  71. secret_key=self.input_params.get("cos-password"),
  72. )
  73. elif "AWS_ACCESS_KEY_ID" in os.environ and "AWS_SECRET_ACCESS_KEY" in os.environ:
  74. cred_provider = providers.EnvAWSProvider()
  75. elif "AWS_ROLE_ARN" in os.environ and "AWS_WEB_IDENTITY_TOKEN_FILE" in os.environ:
  76. cred_provider = providers.IamAwsProvider()
  77. else:
  78. raise RuntimeError(
  79. "No minio credentials provider can be initialised for current configs. "
  80. "Please validate your runtime configuration details and retry."
  81. )
  82. # get minio client
  83. self.cos_client = minio.Minio(self.cos_endpoint.netloc, secure=self.secure, credentials=cred_provider)
  84. @abstractmethod
  85. def execute(self) -> None:
  86. """Execute the operation relative to derived class"""
  87. raise NotImplementedError("Method 'execute()' must be implemented by subclasses!")
  88. def process_dependencies(self) -> None:
  89. """Process dependencies
  90. If a dependency archive is present, it will be downloaded from object storage
  91. and expanded into the local directory.
  92. This method can be overridden by subclasses, although overrides should first
  93. call the superclass method.
  94. """
  95. OpUtil.log_operation_info("processing dependencies")
  96. t0 = time.time()
  97. archive_file = self.input_params.get("cos-dependencies-archive")
  98. self.get_file_from_object_storage(archive_file)
  99. inputs = self.input_params.get("inputs")
  100. if inputs:
  101. input_list = inputs.split(INOUT_SEPARATOR)
  102. for file in input_list:
  103. self.get_file_from_object_storage(file.strip())
  104. subprocess.call(["tar", "-zxvf", archive_file])
  105. duration = time.time() - t0
  106. OpUtil.log_operation_info("dependencies processed", duration)
  107. def process_outputs(self) -> None:
  108. """Process outputs
  109. If outputs have been specified, it will upload the appropriate files to object storage
  110. This method can be overridden by subclasses, although overrides should first
  111. call the superclass method.
  112. """
  113. OpUtil.log_operation_info("processing outputs")
  114. t0 = time.time()
  115. outputs = self.input_params.get("outputs")
  116. if outputs:
  117. output_list = outputs.split(INOUT_SEPARATOR)
  118. for file in output_list:
  119. self.process_output_file(file.strip())
  120. duration = time.time() - t0
  121. OpUtil.log_operation_info("outputs processed", duration)
  122. else:
  123. OpUtil.log_operation_info("No outputs found in this operation")
  124. def get_object_storage_filename(self, filename: str) -> str:
  125. """Function to pre-pend cloud storage working dir to file name
  126. :param filename: the local file
  127. :return: the full path of the object storage file
  128. """
  129. return os.path.join(self.input_params.get("cos-directory", ""), filename)
  130. def get_file_from_object_storage(self, file_to_get: str) -> None:
  131. """Utility function to get files from an object storage
  132. :param file_to_get: filename
  133. """
  134. object_to_get = self.get_object_storage_filename(file_to_get)
  135. t0 = time.time()
  136. self.cos_client.fget_object(bucket_name=self.cos_bucket, object_name=object_to_get, file_path=file_to_get)
  137. duration = time.time() - t0
  138. OpUtil.log_operation_info(
  139. f"downloaded {file_to_get} from bucket: {self.cos_bucket}, object: {object_to_get}", duration
  140. )
  141. def put_file_to_object_storage(self, file_to_upload: str, object_name: Optional[str] = None) -> None:
  142. """Utility function to put files into an object storage
  143. :param file_to_upload: filename
  144. :param object_name: remote filename (used to rename)
  145. """
  146. object_to_upload = object_name
  147. if not object_to_upload:
  148. object_to_upload = file_to_upload
  149. object_to_upload = self.get_object_storage_filename(object_to_upload)
  150. t0 = time.time()
  151. self.cos_client.fput_object(bucket_name=self.cos_bucket, object_name=object_to_upload, file_path=file_to_upload)
  152. duration = time.time() - t0
  153. OpUtil.log_operation_info(
  154. f"uploaded {file_to_upload} to bucket: {self.cos_bucket} object: {object_to_upload}", duration
  155. )
  156. def has_wildcard(self, filename):
  157. wildcards = ["*", "?"]
  158. return bool(any(c in filename for c in wildcards))
  159. def process_output_file(self, output_file):
  160. """Puts the file to object storage. Handles wildcards and directories."""
  161. matched_files = [output_file]
  162. if self.has_wildcard(output_file): # explode the wildcarded file
  163. matched_files = glob.glob(output_file)
  164. for matched_file in matched_files:
  165. if os.path.isdir(matched_file):
  166. for file in os.listdir(matched_file):
  167. self.process_output_file(os.path.join(matched_file, file))
  168. else:
  169. self.put_file_to_object_storage(matched_file)
  170. class NotebookFileOp(FileOpBase):
  171. """Perform Notebook File Operation"""
  172. def execute(self) -> None:
  173. """Execute the Notebook and upload results to object storage"""
  174. notebook = os.path.basename(self.filepath)
  175. notebook_name = notebook.replace(".ipynb", "")
  176. notebook_output = notebook_name + "-output.ipynb"
  177. notebook_html = notebook_name + ".html"
  178. try:
  179. OpUtil.log_operation_info(f"executing notebook using 'papermill {notebook} {notebook_output}'")
  180. t0 = time.time()
  181. # Really hate to do this but have to invoke Papermill via library as workaround
  182. import papermill
  183. papermill.execute_notebook(notebook, notebook_output)
  184. duration = time.time() - t0
  185. OpUtil.log_operation_info("notebook execution completed", duration)
  186. NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html)
  187. self.put_file_to_object_storage(notebook_output, notebook)
  188. self.put_file_to_object_storage(notebook_html)
  189. self.process_outputs()
  190. except Exception as ex:
  191. # log in case of errors
  192. logger.error(f"Unexpected error: {sys.exc_info()[0]}")
  193. NotebookFileOp.convert_notebook_to_html(notebook_output, notebook_html)
  194. self.put_file_to_object_storage(notebook_output, notebook)
  195. self.put_file_to_object_storage(notebook_html)
  196. raise ex
  197. @staticmethod
  198. def convert_notebook_to_html(notebook_file: str, html_file: str) -> str:
  199. """Function to convert a Jupyter notebook file (.ipynb) into an html file
  200. :param notebook_file: object storage client
  201. :param html_file: name of what the html output file should be
  202. :return: html_file: the converted notebook in html format
  203. """
  204. import nbconvert
  205. import nbformat
  206. OpUtil.log_operation_info(f"converting from {notebook_file} to {html_file}")
  207. t0 = time.time()
  208. nb = nbformat.read(notebook_file, as_version=4)
  209. html_exporter = nbconvert.HTMLExporter()
  210. data, resources = html_exporter.from_notebook_node(nb)
  211. with open(html_file, "w") as f:
  212. f.write(data)
  213. f.close()
  214. duration = time.time() - t0
  215. OpUtil.log_operation_info(f"{notebook_file} converted to {html_file}", duration)
  216. return html_file
  217. class PythonFileOp(FileOpBase):
  218. """Perform Python File Operation"""
  219. def execute(self) -> None:
  220. """Execute the Python script and upload results to object storage"""
  221. python_script = os.path.basename(self.filepath)
  222. python_script_name = python_script.replace(".py", "")
  223. python_script_output = python_script_name + ".log"
  224. try:
  225. OpUtil.log_operation_info(
  226. f"executing python script using " f"'python3 {python_script}' to '{python_script_output}'"
  227. )
  228. t0 = time.time()
  229. with open(python_script_output, "w") as log_file:
  230. subprocess.run(["python3", python_script], stdout=log_file, stderr=subprocess.STDOUT, check=True)
  231. duration = time.time() - t0
  232. OpUtil.log_operation_info("python script execution completed", duration)
  233. self.put_file_to_object_storage(python_script_output, python_script_output)
  234. self.process_outputs()
  235. except Exception as ex:
  236. # log in case of errors
  237. logger.error(f"Unexpected error: {sys.exc_info()[0]}")
  238. logger.error(f"Error details: {ex}")
  239. self.put_file_to_object_storage(python_script_output, python_script_output)
  240. raise ex
  241. class RFileOp(FileOpBase):
  242. """Perform R File Operation"""
  243. def execute(self) -> None:
  244. """Execute the R script and upload results to object storage"""
  245. r_script = os.path.basename(self.filepath)
  246. r_script_name = r_script.replace(".r", "")
  247. r_script_output = r_script_name + ".log"
  248. try:
  249. OpUtil.log_operation_info(f"executing R script using " f"'Rscript {r_script}' to '{r_script_output}'")
  250. t0 = time.time()
  251. with open(r_script_output, "w") as log_file:
  252. subprocess.run(["Rscript", r_script], stdout=log_file, stderr=subprocess.STDOUT, check=True)
  253. duration = time.time() - t0
  254. OpUtil.log_operation_info("R script execution completed", duration)
  255. self.put_file_to_object_storage(r_script_output, r_script_output)
  256. self.process_outputs()
  257. except Exception as ex:
  258. # log in case of errors
  259. logger.error(f"Unexpected error: {sys.exc_info()[0]}")
  260. logger.error(f"Error details: {ex}")
  261. self.put_file_to_object_storage(r_script_output, r_script_output)
  262. raise ex
  263. class OpUtil(object):
  264. """Utility functions for preparing file execution."""
  265. @classmethod
  266. def package_install(cls) -> None:
  267. OpUtil.log_operation_info("Installing packages")
  268. t0 = time.time()
  269. requirements_file = cls.determine_elyra_requirements()
  270. elyra_packages = cls.package_list_to_dict(requirements_file)
  271. current_packages = cls.package_list_to_dict("requirements-current.txt")
  272. to_install_list = []
  273. for package, ver in elyra_packages.items():
  274. if package in current_packages:
  275. if current_packages[package] is None:
  276. logger.warning(
  277. f"WARNING: Source package '{package}' found already installed as an "
  278. "editable package. This may conflict with the required version: "
  279. f"{ver} . Skipping..."
  280. )
  281. elif "git+" in current_packages[package]:
  282. logger.warning(
  283. f"WARNING: Source package '{package}' found already installed from "
  284. f"{current_packages[package]}. This may conflict with the required "
  285. f"version: {ver} . Skipping..."
  286. )
  287. elif isinstance(version.parse(current_packages[package]), version.LegacyVersion):
  288. logger.warning(
  289. f"WARNING: Package '{package}' found with unsupported Legacy version "
  290. f"scheme {current_packages[package]} already installed. Skipping..."
  291. )
  292. elif version.parse(ver) > version.parse(current_packages[package]):
  293. logger.info(f"Updating {package} package from version {current_packages[package]} to {ver}...")
  294. to_install_list.append(f"{package}=={ver}")
  295. elif version.parse(ver) < version.parse(current_packages[package]):
  296. logger.info(
  297. f"Newer {package} package with version {current_packages[package]} "
  298. f"already installed. Skipping..."
  299. )
  300. else:
  301. logger.info(f"Package not found. Installing {package} package with version {ver}...")
  302. to_install_list.append(f"{package}=={ver}")
  303. if to_install_list:
  304. subprocess.run([sys.executable, "-m", "pip", "install"] + to_install_list, check=True)
  305. subprocess.run([sys.executable, "-m", "pip", "freeze"])
  306. duration = time.time() - t0
  307. OpUtil.log_operation_info("Packages installed", duration)
  308. @classmethod
  309. def determine_elyra_requirements(cls) -> Any:
  310. if sys.version_info.major == 3:
  311. if sys.version_info.minor == 7:
  312. return "requirements-elyra-py37.txt"
  313. elif sys.version_info.minor in [8, 9, 10]:
  314. return "requirements-elyra.txt"
  315. logger.error(
  316. f"This version of Python '{sys.version_info.major}.{sys.version_info.minor}' "
  317. f"is not supported for Elyra generic components"
  318. )
  319. return None
  320. @classmethod
  321. def package_list_to_dict(cls, filename: str) -> dict:
  322. package_dict = {}
  323. with open(filename) as fh:
  324. for line in fh:
  325. if line[0] != "#":
  326. if " @ " in line:
  327. package_name, package_version = line.strip("\n").split(sep=" @ ")
  328. elif "===" in line:
  329. package_name, package_version = line.strip("\n").split(sep="===")
  330. elif "==" in line:
  331. package_name, package_version = line.strip("\n").split(sep="==")
  332. elif line.startswith("-e ") or line.startswith("--editable "):
  333. package_name = line.strip("\n").replace("-e ", "").replace("--editable ", "")
  334. if "#egg=" in package_name: # editable package from version control system
  335. package_name = package_name.split("=")[-1]
  336. elif "/" in package_name: # editable package from local directory
  337. package_name = os.path.basename(package_name)
  338. package_version = None
  339. else:
  340. # Tolerate other formats but do not add to package list
  341. continue
  342. package_dict[package_name] = package_version
  343. return package_dict
  344. @classmethod
  345. def parse_arguments(cls, args) -> dict:
  346. import argparse
  347. global pipeline_name, operation_name
  348. logger.debug("Parsing Arguments.....")
  349. parser = argparse.ArgumentParser()
  350. parser.add_argument(
  351. "-e", "--cos-endpoint", dest="cos-endpoint", help="Cloud object storage endpoint", required=True
  352. )
  353. parser.add_argument(
  354. "-b", "--cos-bucket", dest="cos-bucket", help="Cloud object storage bucket to use", required=True
  355. )
  356. parser.add_argument(
  357. "-d",
  358. "--cos-directory",
  359. dest="cos-directory",
  360. help="Working directory in cloud object storage bucket to use",
  361. required=True,
  362. )
  363. parser.add_argument(
  364. "-t",
  365. "--cos-dependencies-archive",
  366. dest="cos-dependencies-archive",
  367. help="Archive containing notebook and dependency artifacts",
  368. required=True,
  369. )
  370. parser.add_argument(
  371. "-n",
  372. "--pipeline-name",
  373. dest="pipeline-name",
  374. help="Pipeline name",
  375. required=True,
  376. )
  377. parser.add_argument("-f", "--file", dest="filepath", help="File to execute", required=True)
  378. parser.add_argument("-o", "--outputs", dest="outputs", help="Files to output to object store", required=False)
  379. parser.add_argument("-i", "--inputs", dest="inputs", help="Files to pull in from parent node", required=False)
  380. parsed_args = vars(parser.parse_args(args))
  381. # set pipeline name as global
  382. pipeline_name = parsed_args.get("pipeline-name")
  383. # operation/node name is the basename of the non-suffixed filepath, set as global
  384. operation_name = os.path.basename(os.path.splitext(parsed_args.get("filepath"))[0])
  385. return parsed_args
  386. @classmethod
  387. def log_operation_info(cls, action_clause: str, duration_secs: Optional[float] = None) -> None:
  388. """Produces a formatted log INFO message used entirely for support purposes.
  389. This method is intended to be called for any entries that should be captured across aggregated
  390. log files to identify steps within a given pipeline and each of its operations. As a result,
  391. calls to this method should produce single-line entries in the log (no embedded newlines).
  392. Each entry is prefixed with the pipeline name.
  393. General logging should NOT use this method but use logger.<level>() statements directly.
  394. :param action_clause: str representing the action that is being logged
  395. :param duration_secs: optional float value representing the duration of the action being logged
  396. """
  397. global pipeline_name, operation_name
  398. if enable_pipeline_info:
  399. duration_clause = f"({duration_secs:.3f} secs)" if duration_secs else ""
  400. logger.info(f"'{pipeline_name}':'{operation_name}' - {action_clause} {duration_clause}")
  401. def main():
  402. # Configure logger format, level
  403. logging.basicConfig(
  404. format="[%(levelname)1.1s %(asctime)s.%(msecs).03d] %(message)s", datefmt="%H:%M:%S", level=logging.INFO
  405. )
  406. # Setup packages and gather arguments
  407. input_params = OpUtil.parse_arguments(sys.argv[1:])
  408. OpUtil.log_operation_info("starting operation")
  409. t0 = time.time()
  410. OpUtil.package_install()
  411. # Create the appropriate instance, process dependencies and execute the operation
  412. file_op = FileOpBase.get_instance(**input_params)
  413. file_op.process_dependencies()
  414. file_op.execute()
  415. duration = time.time() - t0
  416. OpUtil.log_operation_info("operation completed", duration)
  417. if __name__ == "__main__":
  418. main()