from ast import Raise import io import uuid from fastapi import APIRouter from fastapi import File, UploadFile from fastapi.responses import StreamingResponse from app.utils.utils import byte_conversion from utils.sx_time import sxtimeit from utils.sx_web import web_try from utils.sx_image import get_b64 from app.common.minio import FileHandler from app import get_db router = APIRouter( prefix="/jpt/intermediate", tags=["intermediate-中间结果管理"], ) @router.post("/") @web_try() @sxtimeit def put_intermediate(project_id: str, user_id: str, dag_uuid: str, node_uuid: str, file: UploadFile = File(...),): print("UploadFile-->",file.filename) file_name = file.filename.split('.', 1 )[0] print(project_id, user_id, dag_uuid, node_uuid,file_name) file_handler = FileHandler("datax") file_exist = file_handler.get_file("intermediate/{}/{}/{}/{}/{}".format(project_id, user_id, dag_uuid, node_uuid, file_name)) if len(file_exist) > 0: raise Exception('文件已存在') url = file_handler.put_byte_file("intermediate/{}/{}/{}/{}/{}".format(project_id, user_id, dag_uuid, node_uuid, file_name), file.file.read()) return url @router.get("/get_intermediate") @web_try() @sxtimeit def get_file(project_id: str, user_id: str, dag_uuid: str): file_handler = FileHandler("datax") prefix = "intermediate/{}/{}/{}".format(project_id, user_id, dag_uuid) objects = file_handler.ls_file(prefix) res = [] for obj in objects: intermediate = {} storage_path = obj.object_name file_path = storage_path.replace(prefix+"/", "") n_n = file_path.split("/",1) node_uuid = n_n[0] file_name = n_n[1] create_time = obj.last_modified.strftime("%Y:%d:%m %H:%M") size = byte_conversion(obj.size) intermediate.update({'name': file_name, 'node_uuid': node_uuid, 'storage_path':storage_path, 'create_time': create_time, 'size': size}) res.append(intermediate) return res