dag.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. import json
  2. from sqlalchemy.orm import Session
  3. from fastapi import Depends
  4. from fastapi import APIRouter
  5. from app.common.security.auth import verify_users
  6. from app.services.dag import dag_job_submit, get_tmp_table_name
  7. from app import crud, models, schemas
  8. from app.utils.send_util import get_job_run_status, get_running_status, get_task_log, post_install_requirements
  9. from constants.constants import RUN_STATUS
  10. from utils.sx_time import sxtimeit
  11. from utils.sx_web import web_try
  12. from app.common.hive import hiveDs
  13. from app import get_db
  14. from configs.settings import DefaultOption, config
  15. database_name = config.get('HIVE', 'database_name')
  16. base_path = config.get('HIVE', 'base_path')
  17. hdfs_path = config.get('HADOOP_INNER', 'default_fs')
  18. requirement_path = config.get('REQUIREMENTS_CONFIG', 'path')
  19. router = APIRouter(
  20. prefix="/jpt/dag",
  21. tags=["dag-dag管理"],
  22. )
  23. @router.post("/execute")
  24. @web_try()
  25. @sxtimeit
  26. def execute_dag(dag: schemas.Dag, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  27. relation = crud.get_requirements_status(db, dag.dag_uuid)
  28. if relation and relation.status in [0,1]:
  29. raise Exception('依赖正在安装中')
  30. elif relation and relation.status in [3]:
  31. raise Exception('依赖安装失败,请重新安装')
  32. af_job = dag_job_submit(dag.dag_uuid, dag.dag_script,db)
  33. return af_job
  34. @router.get("/debug_execute")
  35. @web_try()
  36. @sxtimeit
  37. def debug_execute(dag_uuid: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  38. relation = crud.get_dag_af_id(db,dag_uuid, 'debug')
  39. if relation is None:
  40. return False
  41. return True
  42. @router.get("/debug_status")
  43. @web_try()
  44. @sxtimeit
  45. def get_dag_debug_status(dag_uuid: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  46. relation = crud.get_dag_af_id(db,dag_uuid, 'debug')
  47. af_job_run = crud.get_airflow_run_once_debug_mode(db,relation.af_id)
  48. job_run_res = get_job_run_status(af_job_run.id)
  49. job_run_status = job_run_res['data']['status']
  50. af_job = crud.get_airflow_job_once(db, af_job_run.job_id)
  51. task = list(af_job.tasks)[0] if len(list(af_job.tasks))>0 else None
  52. nodes = {}
  53. if task:
  54. task_script = json.loads(task['script'])
  55. for node in task_script['sub_nodes']:
  56. task_id = str(task['id'])+'_'+node['id']
  57. task_log_res = get_task_log(af_job.id, af_job_run.af_run_id, task_id)
  58. task_log = task_log_res['data'] if 'data' in task_log_res.keys() else None
  59. if task_log:
  60. nodes.update({node['id']:task_log['status'] if task_log['status'] else 'running'})
  61. res = {
  62. "job":job_run_status if job_run_status else af_job_run.status,
  63. "nodes": nodes
  64. }
  65. return res
  66. @router.get("/node_log")
  67. @web_try()
  68. @sxtimeit
  69. def get_dag_debug_status(dag_uuid: str, node_id: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  70. relation = crud.get_dag_af_id(db,dag_uuid, 'debug')
  71. af_job_run = crud.get_airflow_run_once_debug_mode(db,relation.af_id)
  72. af_job = crud.get_airflow_job_once(db, af_job_run.job_id)
  73. task = list(af_job.tasks)[0] if len(list(af_job.tasks))>0 else None
  74. if task:
  75. task_id = str(task['id'])+'_'+node_id
  76. task_log_res = get_task_log(af_job.id, af_job_run.af_run_id, task_id)
  77. task_log = task_log_res['data'] if 'data' in task_log_res.keys() else None
  78. if task_log:
  79. return task_log['log']
  80. return None
  81. @router.get("/node_result")
  82. @web_try()
  83. @sxtimeit
  84. def get_dag_debug_result(dag_uuid: str, node_id: str, out_pin: int , token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  85. table_name = get_tmp_table_name(dag_uuid, node_id, str(out_pin), db)
  86. result = hiveDs.get_preview_data(table_name,500)
  87. result.update({
  88. 'table_name':f'{database_name}.{table_name}',
  89. 'table_path':f'{hdfs_path}{base_path}{table_name}'
  90. })
  91. return result
  92. @router.post("/install_requirements")
  93. @web_try()
  94. @sxtimeit
  95. def install_requirements(item: schemas.Requirements, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  96. res = post_install_requirements(item.requirements, f'{requirement_path}/dag_{item.dag_uuid.lower()}.zip')
  97. af_run_id = res['data']['af_run_id']
  98. running_res = get_running_status(str(-1), af_run_id)
  99. running_status = running_res['data']['status']
  100. relation = crud.create_or_update_requirements_relation(db,item.dag_uuid, af_run_id, RUN_STATUS[running_status])
  101. return relation
  102. @router.get("/requirements_status")
  103. @web_try()
  104. @sxtimeit
  105. def get_requirements_status(dag_uuid: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  106. relation = crud.get_requirements_status(db, dag_uuid)
  107. if not relation:
  108. return "暂无依赖"
  109. return relation