1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192 |
- import base64
- import io
- from minio import Minio
- from minio.error import (ResponseError, BucketAlreadyOwnedByYou,
- BucketAlreadyExists, NoSuchKey)
- from PIL import Image
- from io import BytesIO
- URL = 'minio-api.sxkj.com'
- ACCESS_KEY = 'admin'
- SECRET_KEY = 'sxkjadmin'
- class FileHandler(object):
- def __init__(self, bucket_name):
- self.bucket_name = bucket_name
- self.minio_client = Minio(URL,
- access_key=ACCESS_KEY,
- secret_key=SECRET_KEY,
- secure=False)
- # 无脑创桶
- try:
- self.minio_client.make_bucket(self.bucket_name)
- except BucketAlreadyExists:
- pass
- except BucketAlreadyOwnedByYou:
- pass
- except ResponseError:
- raise Exception("minio连接失败")
- # 上传图片
- def put_image(self, img_name, image):
- file_name = img_name
- file_byte = base64.b64decode(image)
- f = io.BytesIO(file_byte)
- try:
- self.minio_client.put_object(self.bucket_name, file_name, f, len(file_byte))
- return file_name
- except ResponseError:
- raise Exception("minio创建图片失败 文件名:%s" % file_name)
- def put_byte_file(self, file_name, file_content):
- f = io.BytesIO(file_content)
- try:
- self.minio_client.put_object(self.bucket_name, file_name, f, len(file_content))
- return file_name
- except ResponseError:
- raise Exception("minio创建文件失败 文件名:%s" % file_name)
- def put_np_image(self, img_name, np_image):
- file_name = img_name
- img_crop_pil = Image.fromarray(np_image)
- byte_io = BytesIO()
- img_crop_pil.save(byte_io, format="jpeg")
- jpg_buffer = byte_io.getvalue()
- byte_io = BytesIO(jpg_buffer)
- try:
- self.minio_client.put_object(self.bucket_name, img_name, byte_io, len(jpg_buffer))
- return file_name
- except ResponseError:
- raise Exception("minio创建图片失败 文件名:%s" % file_name)
- # 获取照片
- def get_file(self, img_name):
- file_name = img_name
- try:
- d = self.minio_client.get_object(self.bucket_name, file_name)
- return d.data
- except NoSuchKey:
- pass
- except Exception as e:
- print("Get minio pic failed:{}".format(e))
- return bytes()
- def del_image(self, img_name):
- file_name = img_name
- try:
- self.minio_client.remove_object(self.bucket_name, file_name)
- except ResponseError:
- raise Exception("删除minio图片失败 文件名:%s" % file_name)
- def ls_file(self, filename):
- objects = []
- try:
- objects = self.minio_client.list_objects(self.bucket_name, prefix=filename,
- recursive=True)
- except ResponseError:
- raise Exception("列出文件失败")
- finally:
- return objects
|