job_log.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. from typing import Optional
  2. from fastapi import APIRouter
  3. from fastapi import Depends
  4. from sqlalchemy.orm import Session
  5. from app import get_page, page_help, schemas
  6. import app.crud as crud
  7. from app.utils.send_util import get_job_run_status
  8. from constants.constants import RUN_STATUS
  9. from utils.sx_time import sxtimeit
  10. from utils.sx_web import web_try
  11. from fastapi_pagination import Page, add_pagination, paginate, Params
  12. from app import get_db
  13. router = APIRouter(
  14. prefix="/jpt/joblog",
  15. tags=["joblog-日志管理"],
  16. )
  17. @router.post("/")
  18. @web_try()
  19. @sxtimeit
  20. def create_job_log(item: schemas.JobLogCreate, db: Session = Depends(get_db)):
  21. return crud.create_job_log(db, item)
  22. @router.get("/")
  23. @web_try()
  24. @sxtimeit
  25. def get_job_logs(job_id: Optional[int] = None, params: Params=Depends(get_page), db: Session = Depends(get_db)):
  26. job_infos = []
  27. if job_id is None:
  28. job_infos = crud.get_job_infos(db)
  29. else:
  30. job_infos = [crud.get_job_info(db,job_id)]
  31. id_to_job = {job.id:job for job in job_infos}
  32. relations = crud.get_af_ids(db, id_to_job.keys(), 'datax')
  33. af_to_datax = {relation.af_id:relation.se_id for relation in relations}
  34. af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys())
  35. af_job_runs.sort(key=lambda x: x.start_time, reverse=True)
  36. total = len(af_job_runs)
  37. af_job_runs = af_job_runs[(params['page'] - 1) * params['size']:params['page'] * params['size']]
  38. res = []
  39. for af_job_run in af_job_runs:
  40. task = list(af_job_run.details['tasks'].values())[0] if len(list(af_job_run.details['tasks'].values()))>0 else None
  41. job_id = af_to_datax[int(af_job_run.job_id)]
  42. execute_result = None
  43. if af_job_run.status <= 1:
  44. run_status = get_job_run_status(af_job_run.id)
  45. execute_result = run_status['data']['status']
  46. log = {
  47. "id": af_job_run.id,
  48. "job_id": job_id,
  49. "job_desc": id_to_job[job_id].job_desc,
  50. "af_job_id": int(af_job_run.job_id),
  51. "run_id": af_job_run.af_run_id,
  52. "trigger_time": af_job_run.start_time,
  53. "trigger_result": 1,
  54. "execute_time": task['start_time'] if task else 0,
  55. "execute_result": execute_result if execute_result else af_job_run.status,
  56. "end_time": task['end_time'] if task else 0,
  57. "log": task['log'] if task else None
  58. }
  59. res.append(log)
  60. return page_help(res,params['page'],params['size'],total)
  61. @router.get("/getOnce")
  62. @web_try()
  63. @sxtimeit
  64. def get_job_logs_once(run_id: int, db: Session = Depends(get_db)):
  65. af_job_run = crud.get_airflow_run_once(db, run_id)
  66. task = list(af_job_run.details['tasks'].values())[0] if len(list(af_job_run.details['tasks'].values()))>0 else None
  67. log = {
  68. "id": af_job_run.id,
  69. "af_job_id": int(af_job_run.job_id),
  70. "run_id": af_job_run.af_run_id,
  71. "trigger_time": af_job_run.start_time,
  72. "trigger_result": 1,
  73. "execute_time": task['start_time'] if task else 0,
  74. "execute_result": af_job_run.status,
  75. "end_time": task['end_time'] if task else 0,
  76. "log": task['log'] if task else None
  77. }
  78. return log
  79. @router.get("/logs_status/{ids}")
  80. @web_try()
  81. @sxtimeit
  82. def get_job_log_status(ids: str):
  83. run_ids = ids.split(',')
  84. id_to_status = {}
  85. for run_id in run_ids:
  86. res = get_job_run_status(run_id)
  87. id_to_status.update({run_id:res['data']['status']})
  88. return id_to_status