1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586 |
- import io
- import json
- import time
- from datetime import datetime, timedelta, timezone
- from app.common.security.auth import verify_users
- import app.crud as crud
- from fastapi import APIRouter, File, UploadFile, Form, Depends
- from sqlalchemy.orm import Session
- from fastapi.responses import StreamingResponse
- from utils.sx_time import sxtimeit
- from utils.sx_web import web_try
- from app.common.minio import minio_client
- from app import get_db, schemas
- router = APIRouter(
- prefix="/jpt/files",
- tags=["files-文件管理"],
- )
- @router.delete("/dag")
- @web_try()
- @sxtimeit
- def delete_dag_file(uri: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
- res = crud.get_jm_homework_by_dag_url(db,uri)
- if len(res) == 0:
- minio_client.del_file(uri)
- else:
- raise Exception("该算子正在被作业使用,不可删除")
- @router.post("/upload_file")
- @web_try()
- @sxtimeit
- def upload_file(file: UploadFile = File(...), file_type: str=Form(...), token_data: schemas.TokenData = Depends(verify_users)):
- print("UploadFile-->",file.filename)
- file_name = str(int(time.time()))+'_'+file.filename
- url = minio_client.put_byte_file(f"{token_data.project_id}/{file_type}/"+file_name, file.file.read())
- return url
- @router.get("/directory")
- @web_try()
- @sxtimeit
- def get_directory(file_type: str, token_data: schemas.TokenData = Depends(verify_users)):
- files = minio_client.ls_file(f'{token_data.project_id}/{file_type}/')
- res = []
- td = timedelta(hours=8)
- tz = timezone(td)
- for file in files:
- timestamp = file.object_name.split('_',1)[0].split('/')[-1]
- dt = datetime.fromtimestamp(int(timestamp), tz)
- time_str = dt.strftime('%Y-%m-%d %H:%M:%S')
- dag_name = file.object_name.split('_',1)[1]
- res.append({'name':time_str+'-'+dag_name, 'uri':file.object_name,'timestamp':timestamp})
- res.sort(key=lambda x: x['timestamp'],reverse=True)
- return res
- @router.get("/dag_content")
- @sxtimeit
- @web_try()
- def get_dag_content(uri: str, token_data: schemas.TokenData = Depends(verify_users)):
- file = minio_client.get_file(uri)
- if len(file) == 0:
- raise Exception('No file found')
- res = json.loads(file)
- return res
- # 此接口置于末尾,勿乱动,乱动者砍死
- @router.get("/{uri:path}",name="path-convertor")
- @sxtimeit
- def get_file(uri: str):
- file = minio_client.get_file(uri)
- code = 200
- if len(file) == 0:
- code = 404
- response = StreamingResponse(io.BytesIO(file), status_code=code, media_type="application/octet-stream")
- # 在请求头进行配置
- response.headers["Content-Disposition"] = "attachment; filename="+uri
- return response
|