import os import re import time import datetime from io import BytesIO from fastapi import Depends, FastAPI, HTTPException, status, UploadFile, APIRouter, File from starlette.responses import StreamingResponse, FileResponse from starlette.testclient import TestClient from pydantic import BaseModel import json from minio import Minio from cacheout import Cache import os from datetime import timedelta from typing import Union from dotenv import load_dotenv from fastapi import Depends, FastAPI, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from jose import JWTError, jwt from passlib.context import CryptContext class Item(BaseModel): id: int name: str class Token(BaseModel): access_token: str token_type: str class TokenData(BaseModel): username: Union[str, None] = None class User(BaseModel): username: str email: Union[str, None] = None full_name: Union[str, None] = None disabled: Union[bool, None] = None class UserInDB(User): hashed_password: str fake_users_db = { "johndoe": { "username": "johndoe", "full_name": "John Doe", "email": "johndoe@example.com", "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", "disabled": False, } } # openssl rand -hex 32 # SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" # ALGORITHM="HS256" # ACCESS_TOKEN_EXPIRE_MINUTES=30 # 加载环境变量 load_dotenv() SECRET_KEY = os.getenv("SECRET_KEY") ALGORITHM = os.getenv("ALGORITHM") ACCESS_TOKEN_EXPIRE_MINUTES = os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES") # schemes 加密方式,默认第一个 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # 请求/token 返回一个令牌 oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") # 比较哈希值是否一直,一直就返回True,否则返回False def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) # 对用户输入的密码进行hash加密 def get_password_hash(password): return pwd_context.hash(password) # 去数据库中寻找,用户是否在数据库中,能找到就返回用户的所有信息 def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict) # 判断用户是否存在于数据库中,存在就比较hash密码,比对成功,返回用户信息 def authenticate_user(fake_db, username: str, password: str): user = get_user(fake_db, username) if not user: return False if not verify_password(password, user.hashed_password): return False return user # data:{"sub":user.username} datetime.utcnow():2023-05-31 23:46:27.912774 # utcnow()用于记录当前时间,datetime模块中的timedelta返回的数据可与utcnow()相加 def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) # update可以将过期时间加入到to_encode字典中 => to_encoded = {"sub":user.username,"exp":expire} to_encode.update({"exp": expire}) # jwt.encode(加密数据,密钥,加密方式) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) # 最终返回被加密后的to_encoded return encoded_jwt # 这个函数必须携带令牌才能执行,携带令牌获取用户,返回用户信息 async def get_current_user(token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: # 对token进行解码 payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = TokenData(username=username) except JWTError: raise credentials_exception user = get_user(fake_users_db, username=token_data.username) if user is None: raise credentials_exception return user # 这里接收用户信息,接收到了就返回用户信息,没接收到,就表示令牌过期了,或者是未登陆 async def get_current_active_user(current_user: User = Depends(get_current_user)): if current_user.disabled: raise HTTPException(status_code=400, detail="Inactive user") return current_user # 从配置文件读取设置 class MinioOperate: def __init__(self): with open(r"D:\pythonProject\django\fastapi_01\config\config.json", "r") as f: self.__config = json.load(f) self.minio_client = None def link_minio(self): self.minio_client = Minio(**self.__config["minio"]) return self.minio_client def create_bucket(self, buckets: []): for bucket_name in buckets: if not self.minio_client.bucket_exists(bucket_name): try: self.minio_client.make_bucket(bucket_name) except Exception as e: print(f"Bucket creation failed: {e}") else: print(f"Bucket {bucket_name} already exists") class SetCache: def __init__(self, maxsize, ttl): self.cache = Cache(maxsize=maxsize, ttl=ttl) def get(self, uid): self.data = self.cache.get(uid) return self.data def add(self, uid, data): self.cache.add(uid, data) # 创建minio对象 minio_class = MinioOperate() # 连接minio minio_client = minio_class.link_minio() # 创建bucket minio_class.create_bucket(["file", "image"]) # 初始化缓存 cache = SetCache(maxsize=128, ttl=10) app = FastAPI() PIC_NAME = None @app.post("/file") async def create_file(file: UploadFile = File(...)): timestamp = str(time.time()).ljust(18, "0") uid = re.sub(r"\.", "", timestamp) front, ext = os.path.splitext(file.filename) file_name = uid + ext # 168549427474778.png global PIC_NAME PIC_NAME = file_name data = await file.read() file_stream = BytesIO(initial_bytes=data) size = len(data) date = str(datetime.date.today()) object_path = date + "/{}".format(file_name) if (minio_client.put_object( "image", object_path, file_stream, size )): return {"status": 200, "data": [file_name], "msg": ""} else: return {"status": 400, "data": [], "msg": "Post Failed!"} @app.get("/file/{uid}") async def download_file(uid: str): try: timestamp, ext = os.path.splitext(uid) timestamp = float(str(float(timestamp) / 10000000).ljust(18, "0")) object_path = str(time.localtime(timestamp).tm_year) + "-" + str(time.localtime(timestamp).tm_mon).rjust(2, "0") + "-" \ + str(time.localtime(timestamp).tm_mday).rjust(2, "0") + "/{}".format(uid) file_obj = minio_client.get_object("image", object_path) if not cache.get(uid): # 添加缓存 # print("第一次获取,添加到缓存") cache.add(uid, file_obj.read()) else: # print("从缓存中找到uid,获取缓存") file_bytes = cache.get(uid) return StreamingResponse(BytesIO(file_bytes), media_type="image/{}".format(ext[1:])) file_content = BytesIO(file_obj.read()) response = StreamingResponse(file_content, media_type='image/{}'.format(ext[1:])) except Exception as e: return {"status": 400, "data": [], "msg": "Get Failed!"} # return response return {"status": 200, "data": [uid], "msg": ""} # 删除 鉴权 current_user: User = Depends(get_current_active_user) @app.delete("/file/{uid}") async def delete_file(uid: str): try: timestamp, ext = os.path.splitext(uid) timestamp = float(str(float(timestamp) / 10000000).ljust(18, "0")) object_path = str(time.localtime(timestamp).tm_year) + "-" + str(time.localtime(timestamp).tm_mon).rjust(2, "0") + "-" \ + str(time.localtime(timestamp).tm_mday).rjust(2, "0") + "/{}".format(uid) minio_client.get_object("image", object_path) minio_client.remove_object("image", object_path) return {"status": 200, "data": [], "msg": "Delete Success!"} except: return {"status": 404, "data": [], "msg": "Not Found"} client = TestClient() def test_create_file(): file = {"file": open(r"E:\wallhaven_pic\wallhaven-5gw639.jpg","rb") } response = client.post(f"/file",files=file) assert response.json() == { "status": 200, "data": [ PIC_NAME ], "msg": "" } def test_download_file(): response = client.get(f"/file/{PIC_NAME}") assert response.status_code == 200 assert response.json() == { "status": 200, "data": [ PIC_NAME ], "msg": "" } def test_delete_file(): response = client.delete(f"/file/{PIC_NAME}") assert response.status_code == 200 assert response.json() == {"status": 200, "data": [], "msg": "Delete Success!"}