jm_job_log.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. from typing import Optional
  2. from fastapi import APIRouter
  3. from fastapi import Depends
  4. from sqlalchemy.orm import Session
  5. from app import schemas
  6. import app.crud as crud
  7. from utils.sx_time import sxtimeit
  8. from utils.sx_web import web_try
  9. from fastapi_pagination import Page, add_pagination, paginate, Params
  10. from app import get_db
  11. router = APIRouter(
  12. prefix="/jpt/jm_job_log",
  13. tags=["jm_job_log-定时任务日志管理"],
  14. )
  15. @router.get("/")
  16. @web_try()
  17. @sxtimeit
  18. def get_job_logs(job_id: int = None, db: Session = Depends(get_db)):
  19. jm_job_list = []
  20. if job_id is not None:
  21. jm_job_list = [crud.get_jm_job_info(db, job_id)]
  22. else:
  23. jm_job_list = crud.get_jm_job_infos(db)
  24. jm_job_list = crud.get_jm_job_infos(db)
  25. id_to_job = {job.id:job for job in jm_job_list}
  26. relations = crud.get_af_ids(db,id_to_job.keys(), 'job')
  27. af_to_datax = {relation.af_id:relation.se_id for relation in relations}
  28. af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys())
  29. res = []
  30. for af_job_run in af_job_runs:
  31. tasks = list(af_job_run.details['tasks'].values()) if len(list(af_job_run.details['tasks'].values()))>0 else []
  32. if len(tasks) > 0:
  33. task = tasks[-1]
  34. task.pop('log',None)
  35. job_id = af_to_datax[int(af_job_run.job_id)]
  36. log = {
  37. "id": af_job_run.id,
  38. "job_id": job_id,
  39. "job_name": id_to_job[job_id].name,
  40. "job_type": id_to_job[job_id].type,
  41. "job_tag": id_to_job[job_id].tag,
  42. "af_job_id": int(af_job_run.job_id),
  43. "run_id": af_job_run.run_id,
  44. "trigger_time": af_job_run.start_time,
  45. "trigger_result": 1 if task else 0,
  46. "execute_time": task['start_time'] if task else 0,
  47. "execute_result": 1 if task and task['status'] == 'success' else 0,
  48. "end_time": task['end_time'] if task else 0,
  49. }
  50. res.append(log)
  51. return res
  52. @router.get("/logs")
  53. @web_try()
  54. @sxtimeit
  55. def get_job_logs(run_id: str, job_id: int, db: Session = Depends(get_db)):
  56. af_job_run = crud.get_airflow_run_once(db, run_id, job_id)
  57. tasks = list(af_job_run.details['tasks'].values()) if len(list(af_job_run.details['tasks'].values()))>0 else []
  58. res = []
  59. for task in tasks:
  60. log = {
  61. "id": af_job_run.id,
  62. "af_job_id": int(af_job_run.job_id),
  63. "run_id": af_job_run.run_id,
  64. "trigger_time": af_job_run.start_time,
  65. "trigger_result": 1 if task else 0,
  66. "execute_time": task['start_time'] if task else 0,
  67. "execute_result": 1 if task and task['status'] == 'success' else 0,
  68. "end_time": task['end_time'] if task else 0,
  69. "log": task['log'] if task else None
  70. }
  71. res.append(log)
  72. return res