dag.py 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. from importlib.resources import contents
  2. import json
  3. import os
  4. from fastapi import APIRouter
  5. from utils.sx_time import sxtimeit
  6. from utils.sx_web import web_try
  7. from fastapi.responses import StreamingResponse
  8. router = APIRouter(
  9. prefix="/jpt/dag",
  10. tags=["dag-dag管理"],
  11. )
  12. @router.get("/")
  13. @web_try()
  14. @sxtimeit
  15. def get_dags(project_id: str, user_id: str):
  16. path = "./dag"
  17. file_list = get_all_dags(path)
  18. return file_list
  19. def get_all_dags(path):
  20. file_list = []
  21. files = os.listdir(path)
  22. for file in files:
  23. if file == ".DS_Store":
  24. continue
  25. next_path = path + '/' + file
  26. if os.path.isdir(next_path):
  27. n_file_list = get_all_dags(next_path)
  28. file_list.extend(n_file_list)
  29. else:
  30. if file[-4:] == ".dag":
  31. file_list.append(next_path.replace('./dag', ''))
  32. return file_list
  33. @router.get("/info")
  34. # @web_try()
  35. @sxtimeit
  36. def get_file(uri: str):
  37. response = StreamingResponse(get_file_byte('./dag' + uri))
  38. return response
  39. def get_file_byte(filename, chunk_size=1024):
  40. with open(filename, "rb") as f:
  41. while True:
  42. content = f.read(chunk_size)
  43. if content:
  44. yield content
  45. else:
  46. break
  47. @router.post("/execute")
  48. @web_try()
  49. @sxtimeit
  50. def execute_dag(dag_script: str):
  51. print(dag_script)
  52. return ""