jm_job_log.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. import json
  2. from typing import List, Optional
  3. from fastapi import APIRouter
  4. from fastapi import Depends, Query
  5. from sqlalchemy.orm import Session
  6. from app import page_help, schemas
  7. import app.crud as crud
  8. from app.utils.send_util import get_job_run_status, get_task_log
  9. from constants.constants import RUN_STATUS
  10. from utils.sx_time import sxtimeit
  11. from utils.sx_web import web_try
  12. from fastapi_pagination import Page, add_pagination, paginate, Params
  13. from app import get_db, get_page
  14. router = APIRouter(
  15. prefix="/jpt/jm_job_log",
  16. tags=["jm_job_log-定时任务日志管理"],
  17. )
  18. @router.get("/")
  19. @web_try()
  20. @sxtimeit
  21. def get_job_logs(job_id: int = None, params: Params=Depends(get_page), db: Session = Depends(get_db)):
  22. jm_job_list = []
  23. # 是否有任务筛选
  24. if job_id is not None:
  25. jm_job_list = [crud.get_jm_job_info(db, job_id)]
  26. else:
  27. jm_job_list = crud.get_jm_job_infos(db)
  28. id_to_job = {job.id:job for job in jm_job_list}
  29. relations = crud.get_af_ids(db,id_to_job.keys(), 'job')
  30. af_to_datax = {relation.af_id:relation.se_id for relation in relations}
  31. # 获取任务运行记录
  32. af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys(),(params['page'] - 1) * params['size'],params['page'] * params['size'])
  33. total = crud.count_airflow_runs_by_job_ids(db, af_to_datax.keys())
  34. res = []
  35. for af_job_run in af_job_runs:
  36. job_id = af_to_datax[int(af_job_run.job_id)]
  37. execute_result = None
  38. # 若该记录未运行完成,获取运行的状态
  39. if af_job_run.status <= 1:
  40. run_status = get_job_run_status(af_job_run.id)
  41. execute_result = run_status['data']['status']
  42. log = {
  43. "id": af_job_run.id,
  44. "job_id": job_id,
  45. "job_name": id_to_job[job_id].name,
  46. "job_type": id_to_job[job_id].type,
  47. "job_tag": id_to_job[job_id].tag,
  48. "af_job_id": int(af_job_run.job_id),
  49. "run_id": af_job_run.af_run_id,
  50. "start_time": af_job_run.start_time,
  51. "result": execute_result if execute_result else af_job_run.status,
  52. }
  53. res.append(log)
  54. return page_help(res,params['page'],params['size'],total)
  55. @router.get("/all_task")
  56. @web_try()
  57. @sxtimeit
  58. def get_job_all_task(run_id: str, db: Session = Depends(get_db)):
  59. af_job_run = crud.get_airflow_run_once(db, run_id)
  60. af_job = crud.get_airflow_job_once(db, af_job_run.job_id)
  61. tasks = list(af_job.tasks) if len(list(af_job.tasks))>0 else []
  62. res = []
  63. for task in tasks:
  64. if task['task_type'] == 'sparks':
  65. task_script = json.loads(task['script'])
  66. for node in task_script['sub_nodes']:
  67. task_id = str(task['id'])+'_'+node['id']
  68. log ={
  69. 'name':task['name']+'-'+node['name'],
  70. 'job_id':af_job.id,
  71. 'af_run_id':af_job_run.af_run_id,
  72. 'task_id': task_id,
  73. }
  74. task_log_res = get_task_log(af_job.id, af_job_run.af_run_id, task_id)
  75. task_log = task_log_res['data'] if 'data' in task_log_res.keys() else None
  76. if task_log:
  77. log.update({
  78. 'execute_result': RUN_STATUS[task_log['status']] if task_log['status'] else 0,
  79. 'execute_time':task_log['start_time'],
  80. 'log': task_log['log']
  81. })
  82. res.append(log)
  83. else:
  84. log ={
  85. 'name':task['name'],
  86. 'job_id':af_job.id,
  87. 'af_run_id':af_job_run.af_run_id,
  88. 'task_id':task['id'],
  89. }
  90. task_log_res = get_task_log(af_job.id, af_job_run.af_run_id, task['id'])
  91. task_log = task_log_res['data'] if 'data' in task_log_res.keys() else None
  92. if task_log:
  93. log.update({
  94. 'execute_result':RUN_STATUS[task_log['status']] if task_log['status'] else 0,
  95. 'execute_time':task_log['start_time'],
  96. 'log': task_log['log']
  97. })
  98. res.append(log)
  99. return res
  100. @router.get("/task_log/{job_id}/{af_run_id}/{task_id}")
  101. @web_try()
  102. @sxtimeit
  103. def get_job_task_log(job_id: str, af_run_id: str, task_id: str, db: Session = Depends(get_db)):
  104. res = get_task_log(job_id, af_run_id, task_id)
  105. log = res['data'] if 'data' in res else None
  106. return log
  107. @router.get("/logs_status/{ids}")
  108. @web_try()
  109. @sxtimeit
  110. def get_job_log_status(ids: str):
  111. run_ids = ids.split(',')
  112. id_to_status = {}
  113. for run_id in run_ids:
  114. res = get_job_run_status(run_id)
  115. id_to_status.update({run_id:res['data']['status']})
  116. return id_to_status