dag_file.py 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. import io
  2. from multiprocessing.connection import wait
  3. import os
  4. from fastapi import APIRouter
  5. from utils.sx_time import sxtimeit
  6. from utils.sx_web import web_try
  7. from fastapi.responses import StreamingResponse
  8. router = APIRouter(
  9. prefix="/jpt/dag_file",
  10. tags=["dag-dag管理"],
  11. )
  12. @router.get("/")
  13. @web_try()
  14. @sxtimeit
  15. def get_dags(project_id: str, user_id: str):
  16. path = f"./{user_id}/dag"
  17. file_list = get_all_dags(path)
  18. return file_list
  19. def get_all_dags(path):
  20. file_list = []
  21. files = os.listdir(path)
  22. for file in files:
  23. if file == ".DS_Store":
  24. continue
  25. next_path = path + '/' + file
  26. if os.path.isdir(next_path):
  27. n_file_list = get_all_dags(next_path)
  28. file_list.extend(n_file_list)
  29. else:
  30. if file[-4:] == ".dag":
  31. file_list.append(next_path.replace('./dag', ''))
  32. return file_list
  33. @router.get("/info")
  34. @sxtimeit
  35. def get_file(uri: str):
  36. response = StreamingResponse(get_file_byte('./dag' + uri))
  37. return response
  38. def get_file_byte(filename, chunk_size=1024):
  39. with open(filename, "rb") as f:
  40. while True:
  41. content = f.read(chunk_size)
  42. if content:
  43. yield content
  44. else:
  45. break