dag.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. import json
  2. from sqlalchemy.orm import Session
  3. from fastapi import Depends
  4. from fastapi import APIRouter
  5. from app.common.decorators import verify_all
  6. from app.services.dag import dag_job_submit, get_tmp_table_name
  7. from app import crud, models, schemas
  8. from app.utils.send_util import get_job_run_status, get_task_log
  9. from constants.constants import RUN_STATUS
  10. from utils.sx_time import sxtimeit
  11. from utils.sx_web import web_try
  12. from app.common.hive import hiveDs
  13. from app import get_db
  14. from configs.settings import DefaultOption, config
  15. database_name = config.get('HIVE', 'database_name')
  16. base_path = config.get('HIVE', 'base_path')
  17. hdfs_path = config.get('HADOOP_INNER', 'default_fs')
  18. router = APIRouter(
  19. prefix="/jpt/dag",
  20. tags=["dag-dag管理"],
  21. )
  22. @router.post("/execute", dependencies=[Depends(verify_all)])
  23. @web_try()
  24. @sxtimeit
  25. def execute_dag(dag: schemas.Dag, db: Session = Depends(get_db)):
  26. af_job = dag_job_submit(dag.dag_uuid, dag.dag_script,db)
  27. return af_job
  28. @router.get("/debug_execute", dependencies=[Depends(verify_all)])
  29. @web_try()
  30. @sxtimeit
  31. def debug_execute(dag_uuid: str, db: Session = Depends(get_db)):
  32. relation = crud.get_dag_af_id(db,dag_uuid, 'debug')
  33. if relation is None:
  34. return False
  35. return True
  36. @router.get("/debug_status", dependencies=[Depends(verify_all)])
  37. @web_try()
  38. @sxtimeit
  39. def get_dag_debug_status(dag_uuid: str, db: Session = Depends(get_db)):
  40. relation = crud.get_dag_af_id(db,dag_uuid, 'debug')
  41. af_job_run = crud.get_airflow_run_once_debug_mode(db,relation.af_id)
  42. job_run_res = get_job_run_status(af_job_run.id)
  43. job_run_status = job_run_res['data']['status']
  44. af_job = crud.get_airflow_job_once(db, af_job_run.job_id)
  45. task = list(af_job.tasks)[0] if len(list(af_job.tasks))>0 else None
  46. nodes = {}
  47. if task:
  48. task_script = json.loads(task['script'])
  49. for node in task_script['sub_nodes']:
  50. task_id = str(task['id'])+'_'+node['id']
  51. task_log_res = get_task_log(af_job.id, af_job_run.af_run_id, task_id)
  52. task_log = task_log_res['data'] if 'data' in task_log_res.keys() else None
  53. if task_log:
  54. nodes.update({node['id']:task_log['status'] if task_log['status'] else 'running'})
  55. res = {
  56. "job":job_run_status if job_run_status else af_job_run.status,
  57. "nodes": nodes
  58. }
  59. return res
  60. @router.get("/node_log", dependencies=[Depends(verify_all)])
  61. @web_try()
  62. @sxtimeit
  63. def get_dag_debug_status(dag_uuid: str, node_id: str,db: Session = Depends(get_db)):
  64. relation = crud.get_dag_af_id(db,dag_uuid, 'debug')
  65. af_job_run = crud.get_airflow_run_once_debug_mode(db,relation.af_id)
  66. af_job = crud.get_airflow_job_once(db, af_job_run.job_id)
  67. task = list(af_job.tasks)[0] if len(list(af_job.tasks))>0 else None
  68. if task:
  69. task_id = str(task['id'])+'_'+node_id
  70. task_log_res = get_task_log(af_job.id, af_job_run.af_run_id, task_id)
  71. task_log = task_log_res['data'] if 'data' in task_log_res.keys() else None
  72. if task_log:
  73. return task_log['log']
  74. return None
  75. @router.get("/node_result", dependencies=[Depends(verify_all)])
  76. @web_try()
  77. @sxtimeit
  78. def get_dag_debug_result(dag_uuid: str,node_id: str,out_pin: int ,db: Session = Depends(get_db)):
  79. table_name = get_tmp_table_name(dag_uuid, node_id, str(out_pin), db)
  80. result = hiveDs.get_preview_data(table_name,500)
  81. result.update({
  82. 'table_name':f'{database_name}.{table_name}',
  83. 'table_path':f'{hdfs_path}{base_path}{table_name}'
  84. })
  85. return result