jm_job_log.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. import json
  2. from typing import List, Optional
  3. from fastapi import APIRouter
  4. from fastapi import Depends, Query
  5. from sqlalchemy.orm import Session
  6. from app import page_help, schemas
  7. import app.crud as crud
  8. from app.utils.send_util import get_job_run_status, get_task_log
  9. from constants.constants import RUN_STATUS
  10. from utils.sx_time import sxtimeit
  11. from utils.sx_web import web_try
  12. from fastapi_pagination import Page, add_pagination, paginate, Params
  13. from app import get_db, get_page
  14. router = APIRouter(
  15. prefix="/jpt/jm_job_log",
  16. tags=["jm_job_log-定时任务日志管理"],
  17. )
  18. @router.get("/")
  19. @web_try()
  20. @sxtimeit
  21. def get_job_logs(job_id: int = None, params: Params=Depends(get_page), db: Session = Depends(get_db)):
  22. jm_job_list = []
  23. # 是否有任务筛选
  24. if job_id is not None:
  25. jm_job_list = [crud.get_jm_job_info(db, job_id)]
  26. else:
  27. jm_job_list = crud.get_jm_job_infos(db)
  28. id_to_job = {job.id:job for job in jm_job_list}
  29. relations = crud.get_af_ids(db,id_to_job.keys(), 'job')
  30. af_to_datax = {relation.af_id:relation.se_id for relation in relations}
  31. # 获取任务运行记录
  32. af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys())
  33. # 根据时间进行排序
  34. af_job_runs.sort(key=lambda x: x.start_time, reverse=True)
  35. total = len(af_job_runs)
  36. # 进行分页
  37. af_job_runs = af_job_runs[(params['page'] - 1) * params['size']:params['page'] * params['size']]
  38. res = []
  39. for af_job_run in af_job_runs:
  40. job_id = af_to_datax[int(af_job_run.job_id)]
  41. execute_result = None
  42. # 若该记录未运行完成,获取运行的状态
  43. if af_job_run.status <= 1:
  44. run_status = get_job_run_status(af_job_run.id)
  45. execute_result = run_status['data']['status']
  46. log = {
  47. "id": af_job_run.id,
  48. "job_id": job_id,
  49. "job_name": id_to_job[job_id].name,
  50. "job_type": id_to_job[job_id].type,
  51. "job_tag": id_to_job[job_id].tag,
  52. "af_job_id": int(af_job_run.job_id),
  53. "run_id": af_job_run.af_run_id,
  54. "start_time": af_job_run.start_time,
  55. "result": execute_result if execute_result else af_job_run.status,
  56. }
  57. res.append(log)
  58. return page_help(res,params['page'],params['size'],total)
  59. @router.get("/all_task")
  60. @web_try()
  61. @sxtimeit
  62. def get_job_all_task(run_id: str, db: Session = Depends(get_db)):
  63. af_job_run = crud.get_airflow_run_once(db, run_id)
  64. af_job = crud.get_airflow_job_once(db, af_job_run.job_id)
  65. tasks = list(af_job.tasks) if len(list(af_job.tasks))>0 else []
  66. res = []
  67. for task in tasks:
  68. if task['task_type'] == 'sparks':
  69. task_script = json.loads(task['script'])
  70. for node in task_script['sub_nodes']:
  71. task_id = str(task['id'])+'_'+node['id']
  72. log ={
  73. 'name':node['name'],
  74. 'job_id':af_job.id,
  75. 'af_run_id':af_job_run.af_run_id,
  76. 'task_id': task_id,
  77. }
  78. task_log_res = get_task_log(af_job.id, af_job_run.af_run_id, task_id)
  79. task_log = task_log_res['data'] if 'data' in task_log_res.keys() else None
  80. if task_log:
  81. log.update({
  82. 'execute_result': RUN_STATUS[task_log['status']] if 'status' in task_log.keys() and task_log['status'] else None,
  83. 'execute_time':task_log['start_time'] if 'start_time' in task_log.keys() else None,
  84. 'log': task_log['log'] if 'log' in task_log.keys() else None
  85. })
  86. res.append(log)
  87. else:
  88. log ={
  89. 'name':task['name'],
  90. 'job_id':af_job.id,
  91. 'af_run_id':af_job_run.af_run_id,
  92. 'task_id':task['id'],
  93. }
  94. task_log_res = get_task_log(af_job.id, af_job_run.af_run_id, task['id'])
  95. task_log = task_log_res['data'] if 'data' in task_log_res.keys() else None
  96. if task_log:
  97. log.update({
  98. 'execute_result':RUN_STATUS[task_log['status']] if 'status' in task_log.keys() and task_log['status'] else None,
  99. 'execute_time':task_log['start_time'] if 'start_time' in task_log.keys() else None,
  100. 'log': task_log['log'] if 'log' in task_log.keys() else None
  101. })
  102. res.append(log)
  103. return res
  104. @router.get("/task_log/{job_id}/{af_run_id}/{task_id}")
  105. @web_try()
  106. @sxtimeit
  107. def get_job_task_log(job_id: str, af_run_id: str, task_id: str, db: Session = Depends(get_db)):
  108. res = get_task_log(job_id, af_run_id, task_id)
  109. log = res['data'] if 'data' in res else None
  110. return log
  111. @router.get("/logs_status/{ids}")
  112. @web_try()
  113. @sxtimeit
  114. def get_job_log_status(ids: str):
  115. run_ids = ids.split(',')
  116. id_to_status = {}
  117. for run_id in run_ids:
  118. res = get_job_run_status(run_id)
  119. id_to_status.update({run_id:res['data']['status']})
  120. return id_to_status