12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- from typing import Optional
- from fastapi import APIRouter
- from fastapi import Depends
- from sqlalchemy.orm import Session
- from app import schemas
- import app.crud as crud
- from utils.sx_time import sxtimeit
- from utils.sx_web import web_try
- from fastapi_pagination import Page, add_pagination, paginate, Params
- from app import get_db
- router = APIRouter(
- prefix="/jpt/joblog",
- tags=["joblog-日志管理"],
- )
- @router.post("/")
- @web_try()
- @sxtimeit
- def create_job_log(item: schemas.JobLogCreate, db: Session = Depends(get_db)):
- return crud.create_job_log(db, item)
- @router.get("/")
- @web_try()
- @sxtimeit
- def get_job_logs(params: Params = Depends(), db: Session = Depends(get_db)):
- job_infos = crud.get_job_infos(db)
- job_ids = [job.id for job in job_infos]
- relations = crud.get_af_ids(db, job_ids, 'datax')
- af_to_datax = {relation.af_id:relation.se_id for relation in relations}
- af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys())
- res = []
- for af_job_run in af_job_runs:
- task = list(af_job_run.details['tasks'].values())[0] if len(list(af_job_run.details['tasks'].values()))>0 else None
- log = {
- "id": af_job_run.id,
- "job_id": af_to_datax[int(af_job_run.job_id)],
- "af_job_id": int(af_job_run.job_id),
- "run_id": af_job_run.run_id,
- "trigger_time": af_job_run.start_time,
- "trigger_result": 1 if task else 0,
- "execute_time": task['start_time'] if task else 0,
- "execute_result": 1 if task and task['status'] == 'success' else 0,
- "end_time": task['end_time'] if task else 0,
- "log": task['log'] if task else None
- }
- res.append(log)
- return paginate(res, params)
- @router.get("/getOnce")
- @web_try()
- @sxtimeit
- def get_job_logs_once(run_id: str, job_id: int, db: Session = Depends(get_db)):
- af_job_run = crud.get_airflow_run_once(db, run_id, job_id)
- task = list(af_job_run.details['tasks'].values())[0] if len(list(af_job_run.details['tasks'].values()))>0 else None
- log = {
- "id": af_job_run.id,
- "af_job_id": int(af_job_run.job_id),
- "run_id": af_job_run.run_id,
- "trigger_time": af_job_run.start_time,
- "trigger_result": 1 if task else 0,
- "execute_time": task['start_time'] if task else 0,
- "execute_result": 1 if task and task['status'] == 'success' else 0,
- "end_time": task['end_time'] if task else 0,
- "log": task['log'] if task else None
- }
- return log
|