""" encoding: UTF-8 @author:clx @file:files.py.py @time:2023/5/30 6:08 """ import json import os import random import time import re import aiofiles from fastapi import APIRouter, File, UploadFile, Request, Response, status, HTTPException from fastapi.responses import FileResponse, StreamingResponse from typing import Annotated, List from minio import Minio from io import BytesIO from config import config """ minio_client = Minio( "127.0.0.1:9000", access_key="minioadmin", secret_key="minioadmin", secure=False ) """ # 创建minio对象 minio_class = config.MinioOperate() # 连接minio minio_client = minio_class.link_minio() # 创建bucket minio_class.create_bucket() router = APIRouter() # minio ?上传文件时在重复时,需要修改文件名 @router.post("/file") async def create_file(file: UploadFile = File(...)): timestamp = str(time.time()) uid = re.sub(r"\.", "", timestamp) front, ext = os.path.splitext(file.filename) file_name = uid + ext # 168549427474778.png data = await file.read() file_stream = BytesIO(initial_bytes=data) size = len(data) uri = os.path.join("/file/", file_name) if (minio_client.put_object( "file1", file_name, file_stream, size )): return {"status": 200, "path": uri} else: return {"msg": "上传失败", "status": 400} # 图片预览 # response = StreamingResponse(file_content,media_type='image/jpg) # 图片下载功能 @router.get("/file/{uid}") async def download_file(uid: str): try: file_obj = minio_client.get_object("file1", uid) file_content = BytesIO(file_obj.read()) # response = StreamingResponse(file_content, # media_type='application/octet-stream', # headers={"Content-Disposition": f"attachment; filename={uid}"}) response = StreamingResponse(file_content, media_type='image/png') except Exception as e: return {"error": str(e)} return response # 删除 @router.delete("/file/{uid}") async def delete_file(uid: str): try: minio_client.get_object("file1", uid) minio_client.remove_object("file1", uid) return {"msg": "删除图片成功!", "status": 200} except: return {"msg": "Not Found", "status": 404}