intermediate.py 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. from ast import Raise
  2. import io
  3. import uuid
  4. from fastapi import APIRouter
  5. from fastapi import File, UploadFile
  6. from fastapi.responses import StreamingResponse
  7. from app.utils.utils import byte_conversion
  8. from utils.sx_time import sxtimeit
  9. from utils.sx_web import web_try
  10. from utils.sx_image import get_b64
  11. from app.common.minio import FileHandler
  12. from app import get_db
  13. router = APIRouter(
  14. prefix="/jpt/intermediate",
  15. tags=["intermediate-中间结果管理"],
  16. )
  17. @router.post("/")
  18. @web_try()
  19. @sxtimeit
  20. def put_intermediate(project_id: str, user_id: str, dag_uuid: str, node_uuid: str, file: UploadFile = File(...),):
  21. print("UploadFile-->",file.filename)
  22. file_name = file.filename.split('.', 1 )[0]
  23. print(project_id, user_id, dag_uuid, node_uuid,file_name)
  24. file_handler = FileHandler("datax")
  25. file_exist = file_handler.get_file("intermediate/{}/{}/{}/{}/{}".format(project_id, user_id, dag_uuid, node_uuid, file_name))
  26. if len(file_exist) > 0:
  27. raise Exception('文件已存在')
  28. url = file_handler.put_byte_file("intermediate/{}/{}/{}/{}/{}".format(project_id, user_id, dag_uuid, node_uuid, file_name), file.file.read())
  29. return url
  30. @router.get("/get_intermediate")
  31. @web_try()
  32. @sxtimeit
  33. def get_file(project_id: str, user_id: str, dag_uuid: str):
  34. file_handler = FileHandler("datax")
  35. prefix = "intermediate/{}/{}/{}".format(project_id, user_id, dag_uuid)
  36. objects = file_handler.ls_file(prefix)
  37. res = []
  38. for obj in objects:
  39. intermediate = {}
  40. storage_path = obj.object_name
  41. file_path = storage_path.replace(prefix+"/", "")
  42. n_n = file_path.split("/",1)
  43. node_uuid = n_n[0]
  44. file_name = n_n[1]
  45. create_time = obj.last_modified.strftime("%Y:%d:%m %H:%M")
  46. size = byte_conversion(obj.size)
  47. intermediate.update({'name': file_name,
  48. 'node_uuid': node_uuid,
  49. 'storage_path':storage_path,
  50. 'create_time': create_time,
  51. 'size': size})
  52. res.append(intermediate)
  53. return res