123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298 |
- import os
- import re
- import time
- import datetime
- from io import BytesIO
- from fastapi import Depends, FastAPI, HTTPException, status, UploadFile, APIRouter, File
- from starlette.responses import StreamingResponse, FileResponse
- from starlette.testclient import TestClient
- from pydantic import BaseModel
- import json
- from minio import Minio
- from cacheout import Cache
- import os
- from datetime import timedelta
- from typing import Union
- from dotenv import load_dotenv
- from fastapi import Depends, FastAPI, HTTPException, status
- from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
- from jose import JWTError, jwt
- from passlib.context import CryptContext
- class Item(BaseModel):
- id: int
- name: str
- class Token(BaseModel):
- access_token: str
- token_type: str
- class TokenData(BaseModel):
- username: Union[str, None] = None
- class User(BaseModel):
- username: str
- email: Union[str, None] = None
- full_name: Union[str, None] = None
- disabled: Union[bool, None] = None
- class UserInDB(User):
- hashed_password: str
- fake_users_db = {
- "johndoe": {
- "username": "johndoe",
- "full_name": "John Doe",
- "email": "johndoe@example.com",
- "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
- "disabled": False,
- }
- }
- # openssl rand -hex 32
- # SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
- # ALGORITHM="HS256"
- # ACCESS_TOKEN_EXPIRE_MINUTES=30
- # 加载环境变量
- load_dotenv()
- SECRET_KEY = os.getenv("SECRET_KEY")
- ALGORITHM = os.getenv("ALGORITHM")
- ACCESS_TOKEN_EXPIRE_MINUTES = os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES")
- # schemes 加密方式,默认第一个
- pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
- # 请求/token 返回一个令牌
- oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
- # 比较哈希值是否一直,一直就返回True,否则返回False
- def verify_password(plain_password, hashed_password):
- return pwd_context.verify(plain_password, hashed_password)
- # 对用户输入的密码进行hash加密
- def get_password_hash(password):
- return pwd_context.hash(password)
- # 去数据库中寻找,用户是否在数据库中,能找到就返回用户的所有信息
- def get_user(db, username: str):
- if username in db:
- user_dict = db[username]
- return UserInDB(**user_dict)
- # 判断用户是否存在于数据库中,存在就比较hash密码,比对成功,返回用户信息
- def authenticate_user(fake_db, username: str, password: str):
- user = get_user(fake_db, username)
- if not user:
- return False
- if not verify_password(password, user.hashed_password):
- return False
- return user
- # data:{"sub":user.username} datetime.utcnow():2023-05-31 23:46:27.912774
- # utcnow()用于记录当前时间,datetime模块中的timedelta返回的数据可与utcnow()相加
- def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
- to_encode = data.copy()
- if expires_delta:
- expire = datetime.utcnow() + expires_delta
- else:
- expire = datetime.utcnow() + timedelta(minutes=15)
- # update可以将过期时间加入到to_encode字典中 => to_encoded = {"sub":user.username,"exp":expire}
- to_encode.update({"exp": expire})
- # jwt.encode(加密数据,密钥,加密方式)
- encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
- # 最终返回被加密后的to_encoded
- return encoded_jwt
- # 这个函数必须携带令牌才能执行,携带令牌获取用户,返回用户信息
- async def get_current_user(token: str = Depends(oauth2_scheme)):
- credentials_exception = HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="Could not validate credentials",
- headers={"WWW-Authenticate": "Bearer"},
- )
- try:
- # 对token进行解码
- payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
- username: str = payload.get("sub")
- if username is None:
- raise credentials_exception
- token_data = TokenData(username=username)
- except JWTError:
- raise credentials_exception
- user = get_user(fake_users_db, username=token_data.username)
- if user is None:
- raise credentials_exception
- return user
- # 这里接收用户信息,接收到了就返回用户信息,没接收到,就表示令牌过期了,或者是未登陆
- async def get_current_active_user(current_user: User = Depends(get_current_user)):
- if current_user.disabled:
- raise HTTPException(status_code=400, detail="Inactive user")
- return current_user
- # 从配置文件读取设置
- class MinioOperate:
- def __init__(self):
- with open(r"D:\pythonProject\django\fastapi_01\config\config.json", "r") as f:
- self.__config = json.load(f)
- self.minio_client = None
- def link_minio(self):
- self.minio_client = Minio(**self.__config["minio"])
- return self.minio_client
- def create_bucket(self, buckets: []):
- for bucket_name in buckets:
- if not self.minio_client.bucket_exists(bucket_name):
- try:
- self.minio_client.make_bucket(bucket_name)
- except Exception as e:
- print(f"Bucket creation failed: {e}")
- else:
- print(f"Bucket {bucket_name} already exists")
- class SetCache:
- def __init__(self, maxsize, ttl):
- self.cache = Cache(maxsize=maxsize, ttl=ttl)
- def get(self, uid):
- self.data = self.cache.get(uid)
- return self.data
- def add(self, uid, data):
- self.cache.add(uid, data)
- # 创建minio对象
- minio_class = MinioOperate()
- # 连接minio
- minio_client = minio_class.link_minio()
- # 创建bucket
- minio_class.create_bucket(["file", "image"])
- # 初始化缓存
- cache = SetCache(maxsize=128, ttl=10)
- app = FastAPI()
- PIC_NAME = None
- @app.post("/file")
- async def create_file(file: UploadFile = File(...)):
- timestamp = str(time.time()).ljust(18, "0")
- uid = re.sub(r"\.", "", timestamp)
- front, ext = os.path.splitext(file.filename)
- file_name = uid + ext # 168549427474778.png
- global PIC_NAME
- PIC_NAME = file_name
- data = await file.read()
- file_stream = BytesIO(initial_bytes=data)
- size = len(data)
- date = str(datetime.date.today())
- object_path = date + "/{}".format(file_name)
- if (minio_client.put_object(
- "image",
- object_path,
- file_stream,
- size
- )):
- return {"status": 200, "data": [file_name], "msg": ""}
- else:
- return {"status": 400, "data": [], "msg": "Post Failed!"}
- @app.get("/file/{uid}")
- async def download_file(uid: str):
- try:
- timestamp, ext = os.path.splitext(uid)
- timestamp = float(str(float(timestamp) / 10000000).ljust(18, "0"))
- object_path = str(time.localtime(timestamp).tm_year) + "-" + str(time.localtime(timestamp).tm_mon).rjust(2,
- "0") + "-" \
- + str(time.localtime(timestamp).tm_mday).rjust(2, "0") + "/{}".format(uid)
- file_obj = minio_client.get_object("image", object_path)
- if not cache.get(uid):
- # 添加缓存
- # print("第一次获取,添加到缓存")
- cache.add(uid, file_obj.read())
- else:
- # print("从缓存中找到uid,获取缓存")
- file_bytes = cache.get(uid)
- return StreamingResponse(BytesIO(file_bytes), media_type="image/{}".format(ext[1:]))
- file_content = BytesIO(file_obj.read())
- response = StreamingResponse(file_content, media_type='image/{}'.format(ext[1:]))
- except Exception as e:
- return {"status": 400, "data": [], "msg": "Get Failed!"}
- # return response
- return {"status": 200, "data": [uid], "msg": ""}
- # 删除 鉴权 current_user: User = Depends(get_current_active_user)
- @app.delete("/file/{uid}")
- async def delete_file(uid: str):
- try:
- timestamp, ext = os.path.splitext(uid)
- timestamp = float(str(float(timestamp) / 10000000).ljust(18, "0"))
- object_path = str(time.localtime(timestamp).tm_year) + "-" + str(time.localtime(timestamp).tm_mon).rjust(2,
- "0") + "-" \
- + str(time.localtime(timestamp).tm_mday).rjust(2, "0") + "/{}".format(uid)
- minio_client.get_object("image", object_path)
- minio_client.remove_object("image", object_path)
- return {"status": 200, "data": [], "msg": "Delete Success!"}
- except:
- return {"status": 404, "data": [], "msg": "Not Found"}
- client = TestClient()
- def test_create_file():
- file = {"file": open(r"E:\wallhaven_pic\wallhaven-5gw639.jpg","rb") }
- response = client.post(f"/file",files=file)
- assert response.json() == {
- "status": 200,
- "data": [
- PIC_NAME
- ],
- "msg": ""
- }
- def test_download_file():
- response = client.get(f"/file/{PIC_NAME}")
- assert response.status_code == 200
- assert response.json() == {
- "status": 200,
- "data": [
- PIC_NAME
- ],
- "msg": ""
- }
- def test_delete_file():
- response = client.delete(f"/file/{PIC_NAME}")
- assert response.status_code == 200
- assert response.json() == {"status": 200, "data": [], "msg": "Delete Success!"}
|