job_log.py 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. from typing import Optional
  2. from fastapi import APIRouter
  3. from fastapi import Depends
  4. from sqlalchemy.orm import Session
  5. from app import schemas
  6. import app.crud as crud
  7. from utils.sx_time import sxtimeit
  8. from utils.sx_web import web_try
  9. from fastapi_pagination import Page, add_pagination, paginate, Params
  10. from app import get_db
  11. router = APIRouter(
  12. prefix="/jpt/joblog",
  13. tags=["joblog-日志管理"],
  14. )
  15. @router.post("/")
  16. @web_try()
  17. @sxtimeit
  18. def create_job_log(item: schemas.JobLogCreate, db: Session = Depends(get_db)):
  19. return crud.create_job_log(db, item)
  20. @router.get("/")
  21. @web_try()
  22. @sxtimeit
  23. def get_job_logs(job_id: Optional[int] = None, params: Params = Depends(), db: Session = Depends(get_db)):
  24. job_infos = []
  25. if job_id is None:
  26. job_infos = crud.get_job_infos(db)
  27. else:
  28. job_infos = [crud.get_job_info(db,job_id)]
  29. id_to_job = {job.id:job for job in job_infos}
  30. relations = crud.get_af_ids(db, id_to_job.keys(), 'datax')
  31. af_to_datax = {relation.af_id:relation.se_id for relation in relations}
  32. af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys())
  33. res = []
  34. for af_job_run in af_job_runs:
  35. task = list(af_job_run.details['tasks'].values())[0] if len(list(af_job_run.details['tasks'].values()))>0 else None
  36. job_id = af_to_datax[int(af_job_run.job_id)]
  37. log = {
  38. "id": af_job_run.id,
  39. "job_id": job_id,
  40. "job_desc": id_to_job[job_id].job_desc,
  41. "af_job_id": int(af_job_run.job_id),
  42. "run_id": af_job_run.run_id,
  43. "trigger_time": af_job_run.start_time,
  44. "trigger_result": 1 if task else 0,
  45. "execute_time": task['start_time'] if task else 0,
  46. "execute_result": 1 if task and task['status'] == 'success' else 0,
  47. "end_time": task['end_time'] if task else 0,
  48. "log": task['log'] if task else None
  49. }
  50. res.append(log)
  51. res.sort(key=lambda x: x['trigger_time'], reverse=True)
  52. return paginate(res, params)
  53. @router.get("/getOnce")
  54. @web_try()
  55. @sxtimeit
  56. def get_job_logs_once(run_id: str, job_id: int, db: Session = Depends(get_db)):
  57. af_job_run = crud.get_airflow_run_once(db, run_id, job_id)
  58. task = list(af_job_run.details['tasks'].values())[0] if len(list(af_job_run.details['tasks'].values()))>0 else None
  59. log = {
  60. "id": af_job_run.id,
  61. "af_job_id": int(af_job_run.job_id),
  62. "run_id": af_job_run.run_id,
  63. "trigger_time": af_job_run.start_time,
  64. "trigger_result": 1 if task else 0,
  65. "execute_time": task['start_time'] if task else 0,
  66. "execute_result": 1 if task and task['status'] == 'success' else 0,
  67. "end_time": task['end_time'] if task else 0,
  68. "log": task['log'] if task else None
  69. }
  70. return log