# -*- coding:utf-8 -*- # powered by Carlos # October 23, 2019 import base64 import io from minio import Minio from minio.error import (ResponseError, BucketAlreadyOwnedByYou, BucketAlreadyExists, NoSuchKey) from configs.settings import config from PIL import Image from io import BytesIO URL = config.get('MINIO', 'URL') ACCESS_KEY = config.get('MINIO', 'ACCESS_KEY') SECRET_KEY = config.get('MINIO', 'SECRET_KEY') class FileHandler(object): def __init__(self, bucket_name): self.bucket_name = bucket_name self.minio_client = Minio(URL, access_key=ACCESS_KEY, secret_key=SECRET_KEY, secure=False) # 无脑创桶 try: self.minio_client.make_bucket(self.bucket_name) except BucketAlreadyExists: pass except BucketAlreadyOwnedByYou: pass except ResponseError: # print('dssds') raise Exception("minio连接失败") # 上传图片 def put_image(self, img_name, image): file_name = img_name file_byte = base64.b64decode(image) f = io.BytesIO(file_byte) try: self.minio_client.put_object( self.bucket_name, file_name, f, len(file_byte)) return file_name except ResponseError: raise Exception("minio创建图片失败 文件名:%s" % file_name) def put_byte_file(self, file_name, file_content): f = io.BytesIO(file_content) try: self.minio_client.put_object( self.bucket_name, file_name, f, len(file_content)) return file_name except ResponseError: raise Exception("minio创建文件失败 文件名:%s" % file_name) def put_np_image(self, img_name, np_image): file_name = img_name img_crop_pil = Image.fromarray(np_image) byte_io = BytesIO() img_crop_pil.save(byte_io, format="jpeg") jpg_buffer = byte_io.getvalue() byte_io = BytesIO(jpg_buffer) try: self.minio_client.put_object( self.bucket_name, img_name, byte_io, len(jpg_buffer)) return file_name except ResponseError: raise Exception("minio创建图片失败 文件名:%s" % file_name) # 获取照片 def get_file(self, img_name): file_name = img_name try: d = self.minio_client.get_object(self.bucket_name, file_name) return d.data except NoSuchKey: pass except Exception as e: print("Get minio pic failed:{}".format(e)) return bytes() def del_file(self, file_name): try: self.minio_client.remove_object(self.bucket_name, file_name) except ResponseError: raise Exception("删除minio文件失败 文件名:%s" % file_name) def ls_file(self, filename): objects = [] try: objects = self.minio_client.list_objects(self.bucket_name, prefix=filename, recursive=True) return objects except ResponseError: raise Exception("列出文件失败") minio_client = FileHandler('datax') print('minio client started!!!')