jm_job_log.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. from typing import List, Optional
  2. from fastapi import APIRouter
  3. from fastapi import Depends, Query
  4. from sqlalchemy.orm import Session
  5. from app import page_help, schemas
  6. import app.crud as crud
  7. from app.utils.send_util import get_job_run_status, get_task_log
  8. from constants.constants import RUN_STATUS
  9. from utils.sx_time import sxtimeit
  10. from utils.sx_web import web_try
  11. from fastapi_pagination import Page, add_pagination, paginate, Params
  12. from app import get_db, get_page
  13. router = APIRouter(
  14. prefix="/jpt/jm_job_log",
  15. tags=["jm_job_log-定时任务日志管理"],
  16. )
  17. @router.get("/")
  18. @web_try()
  19. @sxtimeit
  20. def get_job_logs(job_id: int = None, params: Params=Depends(get_page), db: Session = Depends(get_db)):
  21. jm_job_list = []
  22. if job_id is not None:
  23. jm_job_list = [crud.get_jm_job_info(db, job_id)]
  24. else:
  25. jm_job_list = crud.get_jm_job_infos(db)
  26. id_to_job = {job.id:job for job in jm_job_list}
  27. relations = crud.get_af_ids(db,id_to_job.keys(), 'job')
  28. af_to_datax = {relation.af_id:relation.se_id for relation in relations}
  29. af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys())
  30. af_job_runs.sort(key=lambda x: x.start_time, reverse=True)
  31. total = len(af_job_runs)
  32. af_job_runs = af_job_runs[(params['page'] - 1) * params['size']:params['page'] * params['size']]
  33. res = []
  34. for af_job_run in af_job_runs:
  35. tasks = list(af_job_run.details['tasks'].values()) if len(list(af_job_run.details['tasks'].values()))>0 else []
  36. if len(tasks) > 0:
  37. task = tasks[-1]
  38. task.pop('log',None)
  39. job_id = af_to_datax[int(af_job_run.job_id)]
  40. execute_result = None
  41. if af_job_run.status <= 1:
  42. run_status = get_job_run_status(af_job_run.id)
  43. execute_result = run_status['data']['status']
  44. log = {
  45. "id": af_job_run.id,
  46. "job_id": job_id,
  47. "job_name": id_to_job[job_id].name,
  48. "job_type": id_to_job[job_id].type,
  49. "job_tag": id_to_job[job_id].tag,
  50. "af_job_id": int(af_job_run.job_id),
  51. "run_id": af_job_run.af_run_id,
  52. "trigger_time": af_job_run.start_time,
  53. "trigger_result": 1,
  54. "execute_time": task['start_time'] if task else 0,
  55. "execute_result": execute_result if execute_result else af_job_run.status,
  56. "end_time": task['end_time'] if task else 0,
  57. }
  58. res.append(log)
  59. return page_help(res,params['page'],params['size'],total)
  60. @router.get("/logs")
  61. @web_try()
  62. @sxtimeit
  63. def get_job_log_once(run_id: str, db: Session = Depends(get_db)):
  64. af_job_run = crud.get_airflow_run_once(db, run_id)
  65. tasks = list(af_job_run.details['tasks'].values()) if len(list(af_job_run.details['tasks'].values()))>0 else []
  66. res = []
  67. for task in tasks:
  68. log = {
  69. "id": af_job_run.id,
  70. "af_job_id": int(af_job_run.job_id),
  71. "run_id": af_job_run.af_run_id,
  72. "trigger_time": af_job_run.start_time,
  73. "trigger_result": 1,
  74. "execute_time": task['start_time'] if task else 0,
  75. "execute_result": af_job_run.status,
  76. "end_time": task['end_time'] if task else 0,
  77. "log": task['log'] if task else None
  78. }
  79. res.append(log)
  80. res.sort(key=lambda x: x['trigger_time'], reverse=True)
  81. return res
  82. @router.get("/all_task")
  83. @web_try()
  84. @sxtimeit
  85. def get_job_all_task(run_id: str, db: Session = Depends(get_db)):
  86. af_job_run = crud.get_airflow_run_once(db, run_id)
  87. af_job_id = af_job_run.job_id
  88. af_job = crud.get_airflow_job_once(db, af_job_id)
  89. res = []
  90. for task in af_job.tasks:
  91. task.update({
  92. 'job_id':af_job_id,
  93. 'af_run_id':af_job_run.af_run_id,
  94. 'task_id':task['id'],
  95. })
  96. task_log_res = get_task_log(af_job_id, af_job_run.af_run_id, task['id'])
  97. task_log = task_log_res['data'] if 'data' in task_log_res else None
  98. if task_log:
  99. task.update({
  100. 'execute_result':task_log['status'] if 'status' in task_log else None,
  101. 'execute_time':task_log['start_time'] if 'start_time' in task_log else None,
  102. 'log': task_log['log'] if 'log' in task_log else None
  103. })
  104. res.append(task)
  105. return res
  106. @router.get("/task_log/{job_id}/{af_run_id}/{task_id}")
  107. @web_try()
  108. @sxtimeit
  109. def get_job_task_log(job_id: str, af_run_id: str, task_id: str, db: Session = Depends(get_db)):
  110. res = get_task_log(job_id, af_run_id, task_id)
  111. return res['data']
  112. @router.get("/logs_status/{ids}")
  113. @web_try()
  114. @sxtimeit
  115. def get_job_log_status(ids: str):
  116. run_ids = ids.split(',')
  117. id_to_status = {}
  118. for run_id in run_ids:
  119. res = get_job_run_status(run_id)
  120. id_to_status.update({run_id:res['data']['status']})
  121. return id_to_status