job_log.py 3.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. from typing import Optional
  2. from fastapi import APIRouter
  3. from fastapi import Depends
  4. from sqlalchemy.orm import Session
  5. from app import get_page, page_help, schemas
  6. import app.crud as crud
  7. from app.utils.send_util import get_job_run_status, get_task_log
  8. from constants.constants import RUN_STATUS
  9. from utils.sx_time import sxtimeit
  10. from utils.sx_web import web_try
  11. from fastapi_pagination import Page, add_pagination, paginate, Params
  12. from app import get_db
  13. router = APIRouter(
  14. prefix="/jpt/joblog",
  15. tags=["joblog-日志管理"],
  16. )
  17. @router.get("/")
  18. @web_try()
  19. @sxtimeit
  20. def get_job_logs(job_id: Optional[int] = None, params: Params=Depends(get_page), db: Session = Depends(get_db)):
  21. job_infos = []
  22. # 是否有任务筛选
  23. if job_id is None:
  24. job_infos = crud.get_job_infos(db)
  25. else:
  26. job_infos = [crud.get_job_info(db,job_id)]
  27. id_to_job = {job.id:job for job in job_infos}
  28. relations = crud.get_af_ids(db, id_to_job.keys(), 'datax')
  29. af_to_datax = {relation.af_id:relation.se_id for relation in relations}
  30. # 获取运行记录
  31. af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys(),(params['page'] - 1) * params['size'],params['page'] * params['size'])
  32. total = crud.count_airflow_runs_by_job_ids(db, af_to_datax.keys())
  33. res = []
  34. # 循环获取日志
  35. for af_job_run in af_job_runs:
  36. job_id = af_to_datax[int(af_job_run.job_id)]
  37. print(f'job_id==>{job_id}')
  38. # 获取af_job
  39. job_log = None
  40. if len(af_job_run.details['tasks']) > 0:
  41. job_log = list(af_job_run.details['tasks'].values())[0]
  42. else:
  43. af_job = crud.get_airflow_job_once(db, af_job_run.job_id)
  44. task = list(af_job.tasks)[0] if len(list(af_job.tasks))>0 else None
  45. print(f"datax任务的作业{task['id']}")
  46. log_res = get_task_log(af_job.id, af_job_run.af_run_id, task['id'])
  47. job_log = log_res['data'] if 'data' in log_res.keys() else None
  48. log = {
  49. "id": af_job_run.id,
  50. "job_id": job_id,
  51. "job_desc": id_to_job[job_id].job_desc,
  52. "af_job_id": int(af_job_run.job_id),
  53. "run_id": af_job_run.id,
  54. "af_run_id": af_job_run.af_run_id,
  55. "start_time": job_log['start_time'] if job_log and 'start_time' in job_log.keys() else None,
  56. "result": RUN_STATUS[job_log['status']] if job_log and 'status' in job_log.keys() else 0,
  57. }
  58. res.append(log)
  59. return page_help(res,params['page'],params['size'],total)
  60. @router.get("/getOnce")
  61. @web_try()
  62. @sxtimeit
  63. def get_job_logs_once(run_id: int, db: Session = Depends(get_db)):
  64. # 获取af_run
  65. af_job_run = crud.get_airflow_run_once(db, run_id)
  66. # 获取af_job
  67. af_job = crud.get_airflow_job_once(db, af_job_run.job_id)
  68. # 取出其中的task
  69. task = list(af_job.tasks)[0] if len(list(af_job.tasks))>0 else None
  70. log_res = get_task_log(af_job.id, af_job_run.af_run_id, task['id'])
  71. job_log = log_res['data'] if 'data' in log_res.keys() else None
  72. log = {
  73. "log": job_log['log'] if job_log and 'log' in job_log.keys() else None,
  74. "status": RUN_STATUS[job_log['status']] if job_log and 'status' in job_log.keys() else None
  75. }
  76. return log
  77. @router.get("/logs_status/{ids}")
  78. @web_try()
  79. @sxtimeit
  80. def get_job_log_status(ids: str):
  81. run_ids = ids.split(',')
  82. id_to_status = {}
  83. for run_id in run_ids:
  84. res = get_job_run_status(run_id)
  85. id_to_status.update({run_id:res['data']['status']})
  86. return id_to_status