job_log.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. from typing import Optional
  2. from fastapi import APIRouter
  3. from fastapi import Depends
  4. from sqlalchemy.orm import Session
  5. from app import schemas
  6. import app.crud as crud
  7. from utils.sx_time import sxtimeit
  8. from utils.sx_web import web_try
  9. from fastapi_pagination import Page, add_pagination, paginate, Params
  10. from app import get_db
  11. router = APIRouter(
  12. prefix="/jpt/joblog",
  13. tags=["joblog-日志管理"],
  14. )
  15. @router.post("/")
  16. @web_try()
  17. @sxtimeit
  18. def create_job_log(item: schemas.JobLogCreate, db: Session = Depends(get_db)):
  19. return crud.create_job_log(db, item)
  20. @router.get("/")
  21. @web_try()
  22. @sxtimeit
  23. def get_job_logs(params: Params = Depends(), db: Session = Depends(get_db)):
  24. job_infos = crud.get_job_infos(db)
  25. job_ids = [job.id for job in job_infos]
  26. relations = crud.get_af_ids(db, job_ids, 'datax')
  27. af_to_datax = {relation.af_id:relation.se_id for relation in relations}
  28. af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys())
  29. res = []
  30. for af_job_run in af_job_runs:
  31. task = list(af_job_run.details['tasks'].values())[0] if len(list(af_job_run.details['tasks'].values()))>0 else None
  32. log = {
  33. "id": af_job_run.id,
  34. "job_id": af_to_datax[int(af_job_run.job_id)],
  35. "af_job_id": int(af_job_run.job_id),
  36. "run_id": af_job_run.run_id,
  37. "trigger_time": af_job_run.start_time,
  38. "trigger_result": 1 if task else 0,
  39. "execute_time": task['start_time'] if task else 0,
  40. "execute_result": 1 if task and task['status'] == 'success' else 0,
  41. "end_time": task['end_time'] if task else 0,
  42. "log": task['log'] if task else None
  43. }
  44. res.append(log)
  45. return paginate(res, params)
  46. @router.get("/getOnce")
  47. @web_try()
  48. @sxtimeit
  49. def get_job_logs_once(run_id: str, job_id: int, db: Session = Depends(get_db)):
  50. af_job_run = crud.get_airflow_run_once(db, run_id, job_id)
  51. task = list(af_job_run.details['tasks'].values())[0] if len(list(af_job_run.details['tasks'].values()))>0 else None
  52. log = {
  53. "id": af_job_run.id,
  54. "af_job_id": int(af_job_run.job_id),
  55. "run_id": af_job_run.run_id,
  56. "trigger_time": af_job_run.start_time,
  57. "trigger_result": 1 if task else 0,
  58. "execute_time": task['start_time'] if task else 0,
  59. "execute_result": 1 if task and task['status'] == 'success' else 0,
  60. "end_time": task['end_time'] if task else 0,
  61. "log": task['log'] if task else None
  62. }
  63. return log