Browse Source

fastapi+minio:v6
这应该是最终版本,加入了缓存机制

clxHardstudy 1 year ago
parent
commit
4eec388962
8 changed files with 510 additions and 8 deletions
  1. 22 0
      Dockerfile
  2. 4 0
      config/setting.env
  3. 0 1
      models/model.py
  4. 6 0
      requirements.txt
  5. 21 7
      routers/files.py
  6. 298 0
      test/test_files.py
  7. 66 0
      test/test_items.py
  8. 93 0
      utils/jwt.py

+ 22 - 0
Dockerfile

@@ -0,0 +1,22 @@
+#FROM python:3.10
+#
+#WORKDIR ./peoject
+#
+#COPY ./requirements.txt /app/requirement.txt
+#
+#
+#RUN set -eux \
+#    && pip install --upgrade pip \
+#    && pip install -r ./requirements.txt
+#
+#COPY . /project1
+FROM python:3.10
+
+WORKDIR /app
+
+COPY ./requirements.txt .
+
+
+RUN pip install -r requirements.txt
+
+COPY . .

+ 4 - 0
config/setting.env

@@ -0,0 +1,4 @@
+# openssl rand -hex 32
+SECRET_KEY=09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7
+ALGORITHM=HS256
+ACCESS_TOKEN_EXPIRE_MINUTES=30

+ 0 - 1
models/model.py

@@ -5,7 +5,6 @@
     @time:2023/5/30 6:29
 """
 from typing import Optional, Union
-
 from pydantic import BaseModel
 
 

+ 6 - 0
requirements.txt

@@ -0,0 +1,6 @@
+fastapi
+minio
+cacheout
+jose
+passlib
+pydantic

+ 21 - 7
routers/files.py

@@ -4,24 +4,28 @@ import time
 from datetime import timedelta
 import datetime
 from io import BytesIO
+
+import pytest
 from fastapi import Depends, FastAPI, HTTPException, status, UploadFile, APIRouter, File
 from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
+from httpx import AsyncClient
 from starlette.responses import StreamingResponse, FileResponse
-from config import config
+
+from config.config import MinioOperate ,SetCache
 from models.model import *
 from utils.jwt import authenticate_user, ACCESS_TOKEN_EXPIRE_MINUTES, create_access_token, get_current_active_user, \
     fake_users_db
 
 
 # 创建minio对象
-minio_class = config.MinioOperate()
+minio_class = MinioOperate()
 # 连接minio
 minio_client = minio_class.link_minio()
 # 创建bucket
 minio_class.create_bucket(["file", "image"])
 
 # 初始化缓存
-cache = config.SetCache(maxsize=128,ttl=10)
+cache = SetCache(maxsize=128,ttl=10)
 
 router = APIRouter()
 
@@ -87,18 +91,18 @@ async def download_file(uid: str):
 
         if not cache.get(uid):
             # 添加缓存
-            print("第一次获取,添加到缓存")
+            # print("第一次获取,添加到缓存")
             cache.add(uid, file_obj.read())
         else:
-            print("从缓存中找到uid,获取缓存")
+            # print("从缓存中找到uid,获取缓存")
             file_bytes = cache.get(uid)
             return StreamingResponse(BytesIO(file_bytes), media_type="image/{}".format(ext[1:]))
         file_content = BytesIO(file_obj.read())
         response = StreamingResponse(file_content, media_type='image/{}'.format(ext[1:]))
     except Exception as e:
         return {"status": 400, "data": [], "msg": "Get Failed!"}
-    return response
-
+    # return response
+    return {"status": 200, "data": [uid], "msg": ""}
 
 # 删除  鉴权 current_user: User = Depends(get_current_active_user)
 @router.delete("/file/{uid}")
@@ -114,3 +118,13 @@ async def delete_file(uid: str):
         return {"status": 200, "data": [], "msg": "Delete Success!"}
     except:
         return {"status": 404, "data": [], "msg": "Not Found"}
+
+
+
+
+
+
+
+
+
+

+ 298 - 0
test/test_files.py

@@ -0,0 +1,298 @@
+import os
+import re
+import time
+import datetime
+from io import BytesIO
+from fastapi import Depends, FastAPI, HTTPException, status, UploadFile, APIRouter, File
+from starlette.responses import StreamingResponse, FileResponse
+from starlette.testclient import TestClient
+from pydantic import BaseModel
+import json
+from minio import Minio
+from cacheout import Cache
+import os
+from datetime import  timedelta
+from typing import Union
+from dotenv import load_dotenv
+from fastapi import Depends, FastAPI, HTTPException, status
+from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
+from jose import JWTError, jwt
+from passlib.context import CryptContext
+
+
+class Item(BaseModel):
+    id: int
+    name: str
+
+
+class Token(BaseModel):
+    access_token: str
+    token_type: str
+
+
+class TokenData(BaseModel):
+    username: Union[str, None] = None
+
+
+class User(BaseModel):
+    username: str
+    email: Union[str, None] = None
+    full_name: Union[str, None] = None
+    disabled: Union[bool, None] = None
+
+
+class UserInDB(User):
+    hashed_password: str
+
+
+fake_users_db = {
+    "johndoe": {
+        "username": "johndoe",
+        "full_name": "John Doe",
+        "email": "johndoe@example.com",
+        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
+        "disabled": False,
+    }
+}
+
+# openssl rand -hex 32
+# SECRET_KEY="09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
+# ALGORITHM="HS256"
+# ACCESS_TOKEN_EXPIRE_MINUTES=30
+
+# 加载环境变量
+load_dotenv()
+SECRET_KEY = os.getenv("SECRET_KEY")
+ALGORITHM = os.getenv("ALGORITHM")
+ACCESS_TOKEN_EXPIRE_MINUTES = os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES")
+
+# schemes 加密方式,默认第一个
+pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
+
+# 请求/token 返回一个令牌
+oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
+
+
+# 比较哈希值是否一直,一直就返回True,否则返回False
+def verify_password(plain_password, hashed_password):
+    return pwd_context.verify(plain_password, hashed_password)
+
+
+# 对用户输入的密码进行hash加密
+def get_password_hash(password):
+    return pwd_context.hash(password)
+
+
+# 去数据库中寻找,用户是否在数据库中,能找到就返回用户的所有信息
+def get_user(db, username: str):
+    if username in db:
+        user_dict = db[username]
+        return UserInDB(**user_dict)
+
+
+# 判断用户是否存在于数据库中,存在就比较hash密码,比对成功,返回用户信息
+def authenticate_user(fake_db, username: str, password: str):
+    user = get_user(fake_db, username)
+    if not user:
+        return False
+    if not verify_password(password, user.hashed_password):
+        return False
+    return user
+
+
+# data:{"sub":user.username} datetime.utcnow():2023-05-31 23:46:27.912774
+# utcnow()用于记录当前时间,datetime模块中的timedelta返回的数据可与utcnow()相加
+def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
+    to_encode = data.copy()
+    if expires_delta:
+        expire = datetime.utcnow() + expires_delta
+    else:
+        expire = datetime.utcnow() + timedelta(minutes=15)
+    # update可以将过期时间加入到to_encode字典中 => to_encoded = {"sub":user.username,"exp":expire}
+    to_encode.update({"exp": expire})
+    # jwt.encode(加密数据,密钥,加密方式)
+    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
+    # 最终返回被加密后的to_encoded
+    return encoded_jwt
+
+
+# 这个函数必须携带令牌才能执行,携带令牌获取用户,返回用户信息
+async def get_current_user(token: str = Depends(oauth2_scheme)):
+    credentials_exception = HTTPException(
+        status_code=status.HTTP_401_UNAUTHORIZED,
+        detail="Could not validate credentials",
+        headers={"WWW-Authenticate": "Bearer"},
+    )
+    try:
+        # 对token进行解码
+        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
+        username: str = payload.get("sub")
+        if username is None:
+            raise credentials_exception
+        token_data = TokenData(username=username)
+    except JWTError:
+        raise credentials_exception
+    user = get_user(fake_users_db, username=token_data.username)
+    if user is None:
+        raise credentials_exception
+    return user
+
+
+# 这里接收用户信息,接收到了就返回用户信息,没接收到,就表示令牌过期了,或者是未登陆
+async def get_current_active_user(current_user: User = Depends(get_current_user)):
+    if current_user.disabled:
+        raise HTTPException(status_code=400, detail="Inactive user")
+    return current_user
+
+
+# 从配置文件读取设置
+class MinioOperate:
+
+    def __init__(self):
+        with open(r"D:\pythonProject\django\fastapi_01\config\config.json", "r") as f:
+            self.__config = json.load(f)
+        self.minio_client = None
+
+    def link_minio(self):
+        self.minio_client = Minio(**self.__config["minio"])
+        return self.minio_client
+
+    def create_bucket(self, buckets: []):
+        for bucket_name in buckets:
+            if not self.minio_client.bucket_exists(bucket_name):
+                try:
+                    self.minio_client.make_bucket(bucket_name)
+                except Exception as e:
+                    print(f"Bucket creation failed: {e}")
+            else:
+                print(f"Bucket {bucket_name} already exists")
+
+
+class SetCache:
+    def __init__(self, maxsize, ttl):
+        self.cache = Cache(maxsize=maxsize, ttl=ttl)
+
+    def get(self, uid):
+        self.data = self.cache.get(uid)
+        return self.data
+
+    def add(self, uid, data):
+        self.cache.add(uid, data)
+
+
+# 创建minio对象
+minio_class = MinioOperate()
+# 连接minio
+minio_client = minio_class.link_minio()
+# 创建bucket
+minio_class.create_bucket(["file", "image"])
+
+# 初始化缓存
+cache = SetCache(maxsize=128, ttl=10)
+
+app = FastAPI()
+
+
+PIC_NAME = None
+
+
+@app.post("/file")
+async def create_file(file: UploadFile = File(...)):
+    timestamp = str(time.time()).ljust(18, "0")
+    uid = re.sub(r"\.", "", timestamp)
+    front, ext = os.path.splitext(file.filename)
+
+    file_name = uid + ext  # 168549427474778.png
+    global PIC_NAME
+    PIC_NAME = file_name
+    data = await file.read()
+    file_stream = BytesIO(initial_bytes=data)
+    size = len(data)
+
+    date = str(datetime.date.today())
+    object_path = date + "/{}".format(file_name)
+    if (minio_client.put_object(
+            "image",
+            object_path,
+            file_stream,
+            size
+    )):
+        return {"status": 200, "data": [file_name], "msg": ""}
+    else:
+        return {"status": 400, "data": [], "msg": "Post Failed!"}
+
+
+@app.get("/file/{uid}")
+async def download_file(uid: str):
+    try:
+        timestamp, ext = os.path.splitext(uid)
+        timestamp = float(str(float(timestamp) / 10000000).ljust(18, "0"))
+        object_path = str(time.localtime(timestamp).tm_year) + "-" + str(time.localtime(timestamp).tm_mon).rjust(2,
+                                                                                                                 "0") + "-" \
+                      + str(time.localtime(timestamp).tm_mday).rjust(2, "0") + "/{}".format(uid)
+        file_obj = minio_client.get_object("image", object_path)
+
+        if not cache.get(uid):
+            # 添加缓存
+            # print("第一次获取,添加到缓存")
+            cache.add(uid, file_obj.read())
+        else:
+            # print("从缓存中找到uid,获取缓存")
+            file_bytes = cache.get(uid)
+            return StreamingResponse(BytesIO(file_bytes), media_type="image/{}".format(ext[1:]))
+        file_content = BytesIO(file_obj.read())
+        response = StreamingResponse(file_content, media_type='image/{}'.format(ext[1:]))
+    except Exception as e:
+        return {"status": 400, "data": [], "msg": "Get Failed!"}
+    # return response
+    return {"status": 200, "data": [uid], "msg": ""}
+
+
+# 删除  鉴权 current_user: User = Depends(get_current_active_user)
+@app.delete("/file/{uid}")
+async def delete_file(uid: str):
+    try:
+        timestamp, ext = os.path.splitext(uid)
+        timestamp = float(str(float(timestamp) / 10000000).ljust(18, "0"))
+        object_path = str(time.localtime(timestamp).tm_year) + "-" + str(time.localtime(timestamp).tm_mon).rjust(2,
+                                                                                                                 "0") + "-" \
+                      + str(time.localtime(timestamp).tm_mday).rjust(2, "0") + "/{}".format(uid)
+        minio_client.get_object("image", object_path)
+        minio_client.remove_object("image", object_path)
+        return {"status": 200, "data": [], "msg": "Delete Success!"}
+    except:
+        return {"status": 404, "data": [], "msg": "Not Found"}
+
+
+client = TestClient(app)
+
+
+
+def test_create_file():
+    file = {"file": open(r"E:\wallhaven_pic\wallhaven-5gw639.jpg","rb") }
+    response = client.post(f"/file",files=file)
+    assert response.json() == {
+        "status": 200,
+        "data": [
+            PIC_NAME
+        ],
+        "msg": ""
+    }
+
+
+def test_download_file():
+    response = client.get(f"/file/{PIC_NAME}")
+    assert response.status_code == 200
+    assert response.json() == {
+        "status": 200,
+        "data": [
+            PIC_NAME
+        ],
+        "msg": ""
+    }
+
+
+def test_delete_file():
+    response = client.delete(f"/file/{PIC_NAME}")
+    assert response.status_code == 200
+    assert response.json() == {"status": 200, "data": [], "msg": "Delete Success!"}

+ 66 - 0
test/test_items.py

@@ -0,0 +1,66 @@
+from fastapi import FastAPI
+from starlette.testclient import TestClient
+from fastapi import APIRouter, HTTPException
+from pydantic import BaseModel
+
+class Item(BaseModel):
+    id: int
+    name: str
+
+
+app = FastAPI()
+
+res = [
+    {"id": 1, "name": "jack"},
+    {"id": 2, "name": "mark"},
+    {"id": 3, "name": "tom"},
+    {"id": 4, "name": "sery"}
+]
+
+@app.get("/item/res")
+async def get_res():
+    return res
+
+@app.get("/item/res/{res_id}")
+async def get_res_id(res_id:int):
+    for data in res:
+        if data["id"] == res_id:
+            return data
+
+@app.post("/item/res")
+async def create_res(item:Item):
+    res.append(item.dict())
+    return res
+
+@app.put("/item/res/{res_id}")
+async def update_res(res_id:int,item:Item):
+    for data in res:
+        print(data["id"])
+        print(res_id)
+        if data["id"] == res_id:
+            res.remove(data)
+            data = item.dict()
+            res.append(data)
+            return {"status":True}
+    else:
+        return HTTPException(status_code=404, detail="id:{} not found.".format(res_id))
+
+@app.delete("/item/res/{res_id}")
+async def delete_res(res_id:int):
+    for data in res:
+        if data["id"] == res_id:
+            res.remove(data)
+    return {"status":True}
+
+
+client = TestClient(app)
+
+def test_read_main():
+    response = client.get("/item/res")
+    assert response.status_code == 200
+    assert response.json() == res
+
+def test_get_res_id():
+    response = client.get(f"/item/res/1")
+    assert response.status_code == 200
+    assert response.json() == {"id": 1, "name": "jack"}

+ 93 - 0
utils/jwt.py

@@ -0,0 +1,93 @@
+import os
+from datetime import datetime, timedelta
+from typing import Union
+from dotenv import load_dotenv
+from fastapi import Depends, FastAPI, HTTPException, status
+from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
+from jose import JWTError, jwt
+from passlib.context import CryptContext
+from models.model import UserInDB, TokenData, User, Token, fake_users_db
+
+# 加载环境变量
+load_dotenv()
+SECRET_KEY = os.getenv("SECRET_KEY")
+ALGORITHM = os.getenv("ALGORITHM")
+ACCESS_TOKEN_EXPIRE_MINUTES = os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES")
+
+# schemes 加密方式,默认第一个
+pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
+
+# 请求/token 返回一个令牌
+oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
+
+
+# 比较哈希值是否一直,一直就返回True,否则返回False
+def verify_password(plain_password, hashed_password):
+    return pwd_context.verify(plain_password, hashed_password)
+
+
+# 对用户输入的密码进行hash加密
+def get_password_hash(password):
+    return pwd_context.hash(password)
+
+
+# 去数据库中寻找,用户是否在数据库中,能找到就返回用户的所有信息
+def get_user(db, username: str):
+    if username in db:
+        user_dict = db[username]
+        return UserInDB(**user_dict)
+
+
+# 判断用户是否存在于数据库中,存在就比较hash密码,比对成功,返回用户信息
+def authenticate_user(fake_db, username: str, password: str):
+    user = get_user(fake_db, username)
+    if not user:
+        return False
+    if not verify_password(password, user.hashed_password):
+        return False
+    return user
+
+
+# data:{"sub":user.username} datetime.utcnow():2023-05-31 23:46:27.912774
+# utcnow()用于记录当前时间,datetime模块中的timedelta返回的数据可与utcnow()相加
+def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
+    to_encode = data.copy()
+    if expires_delta:
+        expire = datetime.utcnow() + expires_delta
+    else:
+        expire = datetime.utcnow() + timedelta(minutes=15)
+    # update可以将过期时间加入到to_encode字典中 => to_encoded = {"sub":user.username,"exp":expire}
+    to_encode.update({"exp": expire})
+    # jwt.encode(加密数据,密钥,加密方式)
+    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
+    # 最终返回被加密后的to_encoded
+    return encoded_jwt
+
+
+# 这个函数必须携带令牌才能执行,携带令牌获取用户,返回用户信息
+async def get_current_user(token: str = Depends(oauth2_scheme)):
+    credentials_exception = HTTPException(
+        status_code=status.HTTP_401_UNAUTHORIZED,
+        detail="Could not validate credentials",
+        headers={"WWW-Authenticate": "Bearer"},
+    )
+    try:
+        # 对token进行解码
+        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
+        username: str = payload.get("sub")
+        if username is None:
+            raise credentials_exception
+        token_data = TokenData(username=username)
+    except JWTError:
+        raise credentials_exception
+    user = get_user(fake_users_db, username=token_data.username)
+    if user is None:
+        raise credentials_exception
+    return user
+
+
+# 这里接收用户信息,接收到了就返回用户信息,没接收到,就表示令牌过期了,或者是未登陆
+async def get_current_active_user(current_user: User = Depends(get_current_user)):
+    if current_user.disabled:
+        raise HTTPException(status_code=400, detail="Inactive user")
+    return current_user