123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- # -*- coding:utf-8 -*-
- # powered by Carlos
- # October 23, 2019
- import base64
- import io
- from minio import Minio
- from minio.error import (ResponseError, BucketAlreadyOwnedByYou,
- BucketAlreadyExists, NoSuchKey)
- from configs.settings import config
- from PIL import Image
- from io import BytesIO
- URL = config.get('MINIO', 'URL')
- ACCESS_KEY = config.get('MINIO', 'ACCESS_KEY')
- SECRET_KEY = config.get('MINIO', 'SECRET_KEY')
- class FileHandler(object):
- def __init__(self, bucket_name):
- self.bucket_name = bucket_name
- self.minio_client = Minio(URL,
- access_key=ACCESS_KEY,
- secret_key=SECRET_KEY,
- secure=False)
- # 无脑创桶
- try:
- self.minio_client.make_bucket(self.bucket_name)
- except BucketAlreadyExists:
- pass
- except BucketAlreadyOwnedByYou:
- pass
- except ResponseError:
- # print('dssds')
- raise Exception("minio连接失败")
- # 上传图片
- def put_image(self, img_name, image):
- file_name = img_name
- file_byte = base64.b64decode(image)
- f = io.BytesIO(file_byte)
- try:
- self.minio_client.put_object(
- self.bucket_name, file_name, f, len(file_byte))
- return file_name
- except ResponseError:
- raise Exception("minio创建图片失败 文件名:%s" % file_name)
- def put_byte_file(self, file_name, file_content):
- f = io.BytesIO(file_content)
- try:
- self.minio_client.put_object(
- self.bucket_name, file_name, f, len(file_content))
- return file_name
- except ResponseError:
- raise Exception("minio创建文件失败 文件名:%s" % file_name)
- def put_np_image(self, img_name, np_image):
- file_name = img_name
- img_crop_pil = Image.fromarray(np_image)
- byte_io = BytesIO()
- img_crop_pil.save(byte_io, format="jpeg")
- jpg_buffer = byte_io.getvalue()
- byte_io = BytesIO(jpg_buffer)
- try:
- self.minio_client.put_object(
- self.bucket_name, img_name, byte_io, len(jpg_buffer))
- return file_name
- except ResponseError:
- raise Exception("minio创建图片失败 文件名:%s" % file_name)
- # 获取照片
- def get_file(self, img_name):
- file_name = img_name
- try:
- d = self.minio_client.get_object(self.bucket_name, file_name)
- return d.data
- except NoSuchKey:
- pass
- except Exception as e:
- print("Get minio pic failed:{}".format(e))
- return bytes()
- def del_file(self, file_name):
- try:
- self.minio_client.remove_object(self.bucket_name, file_name)
- except ResponseError:
- raise Exception("删除minio文件失败 文件名:%s" % file_name)
- def ls_file(self, filename):
- objects = []
- try:
- objects = self.minio_client.list_objects(self.bucket_name, prefix=filename,
- recursive=True)
- return objects
- except ResponseError:
- raise Exception("列出文件失败")
- minio_client = FileHandler('datax')
- print('minio client started!!!')
|