import base64 import io from minio import Minio from minio.error import (ResponseError, BucketAlreadyOwnedByYou, BucketAlreadyExists, NoSuchKey) from PIL import Image from io import BytesIO URL = 'minio-api.sxkj.com' ACCESS_KEY = 'admin' SECRET_KEY = 'sxkjadmin' class FileHandler(object): def __init__(self, bucket_name): self.bucket_name = bucket_name self.minio_client = Minio(URL, access_key=ACCESS_KEY, secret_key=SECRET_KEY, secure=False) # 无脑创桶 try: self.minio_client.make_bucket(self.bucket_name) except BucketAlreadyExists: pass except BucketAlreadyOwnedByYou: pass except ResponseError: raise Exception("minio连接失败") # 上传图片 def put_image(self, img_name, image): file_name = img_name file_byte = base64.b64decode(image) f = io.BytesIO(file_byte) try: self.minio_client.put_object(self.bucket_name, file_name, f, len(file_byte)) return file_name except ResponseError: raise Exception("minio创建图片失败 文件名:%s" % file_name) def put_byte_file(self, file_name, file_content): f = io.BytesIO(file_content) try: self.minio_client.put_object(self.bucket_name, file_name, f, len(file_content)) return file_name except ResponseError: raise Exception("minio创建文件失败 文件名:%s" % file_name) def put_np_image(self, img_name, np_image): file_name = img_name img_crop_pil = Image.fromarray(np_image) byte_io = BytesIO() img_crop_pil.save(byte_io, format="jpeg") jpg_buffer = byte_io.getvalue() byte_io = BytesIO(jpg_buffer) try: self.minio_client.put_object(self.bucket_name, img_name, byte_io, len(jpg_buffer)) return file_name except ResponseError: raise Exception("minio创建图片失败 文件名:%s" % file_name) # 获取照片 def get_file(self, img_name): file_name = img_name try: d = self.minio_client.get_object(self.bucket_name, file_name) return d.data except NoSuchKey: pass except Exception as e: print("Get minio pic failed:{}".format(e)) return bytes() def del_image(self, img_name): file_name = img_name try: self.minio_client.remove_object(self.bucket_name, file_name) except ResponseError: raise Exception("删除minio图片失败 文件名:%s" % file_name) def ls_file(self, filename): objects = [] try: objects = self.minio_client.list_objects(self.bucket_name, prefix=filename, recursive=True) except ResponseError: raise Exception("列出文件失败") finally: return objects