dag.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. from sqlalchemy.orm import Session
  2. from fastapi import Depends
  3. from fastapi import APIRouter
  4. from app.services.dag import dag_job_submit, get_tmp_table_name
  5. from app import crud, models, schemas
  6. from app.utils.send_util import get_job_run_status
  7. from utils.sx_time import sxtimeit
  8. from utils.sx_web import web_try
  9. from app.common.hive import hiveDs
  10. from app import get_db
  11. router = APIRouter(
  12. prefix="/jpt/dag",
  13. tags=["dag-dag管理"],
  14. )
  15. @router.post("/execute")
  16. @web_try()
  17. @sxtimeit
  18. def execute_dag(dag: schemas.Dag,db: Session = Depends(get_db)):
  19. af_job = dag_job_submit(dag.dag_uuid, dag.dag_script,db)
  20. return af_job
  21. @router.get("/debug_status")
  22. @web_try()
  23. @sxtimeit
  24. def get_dag_debug_status(dag_uuid: str, db: Session = Depends(get_db)):
  25. relation = crud.get_dag_af_id(db,dag_uuid, 'debug')
  26. af_job_run = crud.get_airflow_run_once_debug_mode(db,relation.af_id)
  27. tasks = af_job_run.details['tasks'] if len(af_job_run.details['tasks'])>0 else {}
  28. nodes = {}
  29. for task_id in tasks:
  30. task = tasks[task_id]
  31. node_id = task_id.split('_')[1]
  32. nodes.update({node_id:task['status']})
  33. job_run_res = get_job_run_status(af_job_run.id)
  34. job_run_status = job_run_res['data']['status']
  35. res = {
  36. "job":job_run_status if job_run_status else af_job_run.status,
  37. "nodes": nodes
  38. }
  39. return res
  40. @router.get("/node_log")
  41. @web_try()
  42. @sxtimeit
  43. def get_dag_debug_status(dag_uuid: str, node_id: str,db: Session = Depends(get_db)):
  44. relation = crud.get_dag_af_id(db,dag_uuid, 'debug')
  45. af_job_run = crud.get_airflow_run_once_debug_mode(db,relation.af_id)
  46. tasks = af_job_run.details['tasks'] if len(af_job_run.details['tasks'])>0 else {}
  47. node_log = {}
  48. for task_id in tasks:
  49. task = tasks[task_id]
  50. task_id = task_id.split('_')[1]
  51. node_log.update({task_id:task['log']})
  52. if node_id in node_log.keys():
  53. return node_log[node_id]
  54. else:
  55. return None
  56. @router.get("/node_result")
  57. @web_try()
  58. @sxtimeit
  59. def get_dag_debug_result(dag_uuid: str,node_id: str,out_pin: int ,db: Session = Depends(get_db)):
  60. table_name = get_tmp_table_name(dag_uuid, node_id, str(out_pin), db)
  61. result = hiveDs.get_preview_data(table_name,500)
  62. return result