dag.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. import json
  2. from typing import Optional
  3. from sqlalchemy.orm import Session
  4. from fastapi import Depends
  5. from fastapi import APIRouter
  6. from app.common.security.auth import verify_users
  7. from app.services.dag import dag_job_submit, get_tmp_table_name
  8. from app import crud, models, schemas
  9. from app.utils.send_util import get_job_run_status, get_running_status, get_task_log, post_install_requirements
  10. from constants.constants import RUN_STATUS
  11. from utils.sx_time import sxtimeit
  12. from utils.sx_web import web_try
  13. from app.common.hive import hiveDs
  14. from app import get_db
  15. from configs.settings import DefaultOption, config
  16. database_name = config.get('HIVE', 'database_name')
  17. base_path = config.get('HIVE', 'base_path')
  18. hdfs_path = config.get('HADOOP_INNER', 'default_fs')
  19. requirement_path = config.get('REQUIREMENTS_CONFIG', 'path')
  20. router = APIRouter(
  21. prefix="/jpt/dag",
  22. tags=["dag-dag管理"],
  23. )
  24. @router.post("/execute")
  25. @web_try()
  26. @sxtimeit
  27. def execute_dag(dag: schemas.Dag, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  28. relation = crud.get_requirements_status(db, dag.dag_uuid)
  29. if relation and relation.status in [0, 1]:
  30. raise Exception('依赖正在安装中')
  31. elif relation and relation.status in [3]:
  32. raise Exception('依赖安装失败,请重新安装')
  33. af_job = dag_job_submit(dag.dag_uuid, dag.dag_script, db)
  34. return af_job
  35. @router.get("/debug_execute")
  36. @web_try()
  37. @sxtimeit
  38. def debug_execute(dag_uuid: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  39. relation = crud.get_dag_af_id(db, dag_uuid, 'debug')
  40. if relation is None:
  41. return False
  42. return True
  43. @router.get("/debug_status")
  44. @web_try()
  45. @sxtimeit
  46. def get_dag_debug_status(dag_uuid: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  47. relation = crud.get_dag_af_id(db, dag_uuid, 'debug')
  48. af_job_run = crud.get_airflow_run_once_debug_mode(db, relation.af_id)
  49. job_run_res = get_job_run_status(af_job_run.id)
  50. job_run_status = job_run_res['data']['status']
  51. af_job = crud.get_airflow_job_once(db, af_job_run.job_id)
  52. task = list(af_job.tasks)[0] if len(list(af_job.tasks)) > 0 else None
  53. nodes = {}
  54. if task:
  55. task_script = json.loads(task['script'])
  56. for node in task_script['sub_nodes']:
  57. task_id = str(task['id'])+'_'+node['id']
  58. task_log_res = get_task_log(
  59. af_job.id, af_job_run.af_run_id, task_id)
  60. task_log = task_log_res['data'] if 'data' in task_log_res.keys(
  61. ) else None
  62. if task_log:
  63. nodes.update({node['id']: task_log['status']
  64. if task_log['status'] else 'running'})
  65. res = {
  66. "job": job_run_status if job_run_status else af_job_run.status,
  67. "nodes": nodes
  68. }
  69. return res
  70. @router.get("/node_log")
  71. @web_try()
  72. @sxtimeit
  73. def get_dag_debug_status(dag_uuid: str, node_id: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  74. relation = crud.get_dag_af_id(db, dag_uuid, 'debug')
  75. af_job_run = crud.get_airflow_run_once_debug_mode(db, relation.af_id)
  76. af_job = crud.get_airflow_job_once(db, af_job_run.job_id)
  77. task = list(af_job.tasks)[0] if len(list(af_job.tasks)) > 0 else None
  78. if task:
  79. task_id = str(task['id'])+'_'+node_id
  80. task_log_res = get_task_log(af_job.id, af_job_run.af_run_id, task_id)
  81. task_log = task_log_res['data'] if 'data' in task_log_res.keys(
  82. ) else None
  83. if task_log:
  84. return task_log['log']
  85. return None
  86. @router.get("/node_result_info")
  87. @web_try()
  88. @sxtimeit
  89. def get_dag_debug_result_info(dag_uuid: str, node_id: str, out_pin: int, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  90. table_name = get_tmp_table_name(dag_uuid, node_id, str(out_pin), db)
  91. location = ''
  92. owner = ''
  93. res = hiveDs.get_table_info(table_name)
  94. for line_list in res[0]:
  95. if line_list[0].find('Location') >= 0:
  96. location = line_list[1]
  97. if line_list[0].find('Owner') >= 0:
  98. owner = line_list[1]
  99. share_status = crud.check_share(db, table_name)
  100. result = {
  101. 'table_name': f'{database_name}.{table_name}',
  102. 'table_path': f'{hdfs_path}{base_path}{table_name}',
  103. 'owner': owner,
  104. 'location': location,
  105. 'share_status': share_status
  106. }
  107. return result
  108. @router.get("/node_result_data")
  109. @web_try()
  110. @sxtimeit
  111. def get_dag_debug_result_data(dag_uuid: str, node_id: str, out_pin: int, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  112. table_name = get_tmp_table_name(dag_uuid, node_id, str(out_pin), db)
  113. result = hiveDs.get_preview_data(table_name, 500)
  114. return result
  115. @router.get("/dag_node_content")
  116. @web_try()
  117. @sxtimeit
  118. def get_dag_node_content(table_name: str, page: Optional[int] = 1, size: Optional[int] = 100, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  119. table_name = table_name.split('.')[-1]
  120. result = hiveDs.get_preview_data(
  121. table_name, size=size, start=(page-1)*size)
  122. return result
  123. @router.post("/install_requirements")
  124. @web_try()
  125. @sxtimeit
  126. def install_requirements(item: schemas.Requirements, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  127. res = post_install_requirements(
  128. item.requirements, f'{requirement_path}/dag_{item.dag_uuid.lower()}.zip')
  129. af_run_id = res['data']['af_run_id']
  130. running_res = get_running_status(str(-1), af_run_id)
  131. running_status = running_res['data']['status']
  132. relation = crud.create_or_update_requirements_relation(
  133. db, item.dag_uuid, af_run_id, RUN_STATUS[running_status])
  134. return relation
  135. @router.get("/requirements_status")
  136. @web_try()
  137. @sxtimeit
  138. def get_requirements_status(dag_uuid: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  139. relation = crud.get_requirements_status(db, dag_uuid)
  140. if not relation:
  141. return "暂无依赖"
  142. return relation