cos.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. #
  2. # Copyright 2018-2022 Elyra Authors
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. #
  16. import os
  17. from pathlib import Path
  18. from typing import Optional
  19. from urllib.parse import urlparse
  20. import minio
  21. from minio.credentials import providers
  22. from minio.error import S3Error
  23. from traitlets.config import LoggingConfigurable
  24. class CosClient(LoggingConfigurable):
  25. """
  26. MinIO-based Object Storage client, enabling Elyra to upload and download
  27. files.This client is configurable via traitlets.
  28. """
  29. client = None
  30. def __init__(self, config=None, endpoint=None, access_key=None, secret_key=None, bucket=None, **kwargs):
  31. super().__init__(**kwargs)
  32. cred_provider = None
  33. if config is None:
  34. # The client was invoked by an entity that does not utilize
  35. # runtime configurations.
  36. if access_key is None or secret_key is None:
  37. # use env variables for authentication
  38. if (
  39. len(os.environ.get("AWS_ACCESS_KEY_ID", "").strip()) == 0
  40. or len(os.environ.get("AWS_SECRET_ACCESS_KEY", "").strip()) == 0
  41. ):
  42. raise RuntimeError(
  43. "Cannot connect to object storage. No credentials "
  44. " were provided and environment variables "
  45. " AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY are not "
  46. " properly defined."
  47. )
  48. else:
  49. cred_provider = providers.EnvAWSProvider()
  50. else:
  51. # use provided username and password for authentication
  52. cred_provider = providers.StaticProvider(
  53. access_key=access_key,
  54. secret_key=secret_key,
  55. )
  56. self.endpoint = endpoint
  57. self.bucket = bucket
  58. else:
  59. auth_type = config.metadata["cos_auth_type"]
  60. self.endpoint = urlparse(config.metadata["cos_endpoint"])
  61. self.bucket = config.metadata["cos_bucket"]
  62. if auth_type in ["USER_CREDENTIALS", "KUBERNETES_SECRET"]:
  63. cred_provider = providers.StaticProvider(
  64. access_key=config.metadata["cos_username"],
  65. secret_key=config.metadata["cos_password"],
  66. )
  67. elif auth_type == "AWS_IAM_ROLES_FOR_SERVICE_ACCOUNTS":
  68. if os.environ.get("AWS_ROLE_ARN") is None or os.environ.get("AWS_WEB_IDENTITY_TOKEN_FILE") is None:
  69. raise RuntimeError(
  70. "Cannot connect to object storage. "
  71. f"Authentication provider '{auth_type}' requires "
  72. "environment variables AWS_ROLE_ARN and AWS_WEB_IDENTITY_TOKEN_FILE."
  73. )
  74. # Verify that AWS_WEB_IDENTITY_TOKEN_FILE exists
  75. if Path(os.environ["AWS_WEB_IDENTITY_TOKEN_FILE"]).is_file() is False:
  76. raise RuntimeError(
  77. "Cannot connect to object storage. The value of environment "
  78. "variable AWS_WEB_IDENTITY_TOKEN_FILE references "
  79. f"'{os.environ['AWS_WEB_IDENTITY_TOKEN_FILE']}', which is not a valid file."
  80. )
  81. cred_provider = providers.IamAwsProvider()
  82. else:
  83. raise RuntimeError(
  84. "Cannot connect to object storage. " f"Authentication provider '{auth_type}' is not supported."
  85. )
  86. # Infer secure from the endpoint's scheme.
  87. self.secure = self.endpoint.scheme == "https"
  88. # get minio client
  89. self.client = minio.Minio(self.endpoint.netloc, secure=self.secure, credentials=cred_provider)
  90. # Make a bucket with the make_bucket API call.
  91. try:
  92. if not self.client.bucket_exists(self.bucket):
  93. self.client.make_bucket(self.bucket)
  94. except S3Error as ex:
  95. # unpack the S3Error based off error codes
  96. # https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
  97. if ex.code == "BucketAlreadyOwnedByYou":
  98. self.log.warning("Object Storage bucket already owned by you", exc_info=True)
  99. elif ex.code == "BucketAlreadyExists":
  100. self.log.warning("Object Storage bucket already exists", exc_info=True)
  101. elif ex.code == "SignatureDoesNotMatch":
  102. self.log.error("Incorrect Object Storage password supplied")
  103. elif ex.code == "InvalidAccessKeyId":
  104. self.log.error("Incorrect Object Storage username supplied")
  105. else:
  106. self.log.error(f"Object Storage error: {ex.code}", exc_info=True)
  107. raise ex from ex
  108. except ValueError as ex:
  109. # providers.IamAwsProvider raises this if something bad happened
  110. if isinstance(cred_provider, providers.IamAwsProvider):
  111. raise RuntimeError(
  112. f"Cannot connect to object storage: {ex}. Verify that "
  113. f"environment variable AWS_WEB_IDENTITY_TOKEN_FILE contains a valid value."
  114. )
  115. else:
  116. raise ex
  117. def upload_file(self, local_file_path: str, object_name: str, object_prefix: str = "") -> str:
  118. """
  119. Uploads contents from a file, located on the local filesystem at `local_file_path`,
  120. as `object_name` in object storage.
  121. :param local_file_path: Path on the local filesystem from which object data will be read.
  122. :param object_name: Name of the file object in object storage
  123. :param prefix: optional prefix to be applied to object_name
  124. :return: fully qualified object name, if upload was successful
  125. """
  126. fq_object_name = join_paths(object_prefix, object_name)
  127. try:
  128. # upload local_file_path as object_name
  129. self.client.fput_object(bucket_name=self.bucket, object_name=fq_object_name, file_path=local_file_path)
  130. except BaseException as ex:
  131. self.log.error(
  132. f"Error uploading file '{local_file_path}' to bucket '{self.bucket}' as '{fq_object_name}'",
  133. exc_info=True,
  134. )
  135. raise ex from ex
  136. return fq_object_name
  137. def download_file(self, object_name: str, local_file_path: str) -> None:
  138. """
  139. Downloads and saves the object as a file in the local filesystem.
  140. :param object_name: Name of the file object in object storage
  141. :param local_file_path: Path on the local filesystem to which the object data will be written.
  142. :return:
  143. """
  144. # sanitize object name; S3 does not accept leading /
  145. fq_object_name = join_paths(object_name)
  146. try:
  147. self.client.fget_object(bucket_name=self.bucket, object_name=fq_object_name, file_path=local_file_path)
  148. except BaseException as ex:
  149. self.log.error(
  150. f"Error downloading '{fq_object_name}' from bucket '{self.bucket}' to '{local_file_path}'",
  151. exc_info=True,
  152. )
  153. raise ex from ex
  154. def join_paths(path1: Optional[str] = "", path2: Optional[str] = "") -> str:
  155. """
  156. Joins path1 and path2, returning a valid object storage path string.
  157. Example: "/p1/p2" + "p3" -> "p1/p2/p3"
  158. """
  159. path1 = path1 or ""
  160. path2 = path2 or ""
  161. # combine paths and ensure the resulting path does not start with "/" char and
  162. path = f"{path1.rstrip('/')}/{path2}".lstrip("/")
  163. if len(path) > 0:
  164. # convert to Posix
  165. return Path(path).as_posix()
  166. return path