from typing import Optional from fastapi import APIRouter from fastapi import Depends from sqlalchemy.orm import Session from app import get_page, page_help, schemas from app.common.security.auth import verify_users import app.crud as crud from app.utils.send_util import get_job_run_status, get_task_log from constants.constants import RUN_STATUS from utils.sx_time import sxtimeit from utils.sx_web import web_try from fastapi_pagination import Page, add_pagination, paginate, Params from app import get_db router = APIRouter( prefix="/jpt/joblog", tags=["joblog-日志管理"], ) @router.get("/") @web_try() @sxtimeit def get_job_logs(job_id: Optional[int] = None, params: Params=Depends(get_page), token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): job_infos = [] # 是否有任务筛选 if job_id is None: job_infos = crud.get_job_infos(db) else: job_infos = [crud.get_job_info(db,job_id)] id_to_job = {job.id:job for job in job_infos} relations = crud.get_af_ids(db, id_to_job.keys(), 'datax') af_to_datax = {relation.af_id:relation.se_id for relation in relations} # 获取运行记录 af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys(),(params['page'] - 1) * params['size'],params['page'] * params['size']) total = crud.count_airflow_runs_by_job_ids(db, af_to_datax.keys()) res = [] # 循环获取日志 for af_job_run in af_job_runs: job_id = af_to_datax[int(af_job_run.job_id)] # 获取af_job job_log = None if len(af_job_run.details['tasks']) > 0: job_log = list(af_job_run.details['tasks'].values())[0] else: af_job = crud.get_airflow_job_once(db, af_job_run.job_id) task = list(af_job.tasks)[0] if len(list(af_job.tasks))>0 else None log_res = get_task_log(af_job.id, af_job_run.af_run_id, task['id']) job_log = log_res['data'] if 'data' in log_res.keys() else None log = { "id": af_job_run.id, "job_id": job_id, "job_desc": id_to_job[job_id].job_desc, "af_job_id": int(af_job_run.job_id), "run_id": af_job_run.id, "af_run_id": af_job_run.af_run_id, "start_time": job_log['start_time'] if job_log and 'start_time' in job_log.keys() else None, "result": RUN_STATUS[job_log['status']] if job_log and 'status' in job_log.keys() else 0, } res.append(log) return page_help(res,params['page'],params['size'],total) @router.get("/getOnce") @web_try() @sxtimeit def get_job_logs_once(run_id: int, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): # 获取af_run af_job_run = crud.get_airflow_run_once(db, run_id) # 获取af_job af_job = crud.get_airflow_job_once(db, af_job_run.job_id) # 取出其中的task task = list(af_job.tasks)[0] if len(list(af_job.tasks))>0 else None log_res = get_task_log(af_job.id, af_job_run.af_run_id, task['id']) job_log = log_res['data'] if 'data' in log_res.keys() else None log = { "log": job_log['log'] if job_log and 'log' in job_log.keys() else None, "status": RUN_STATUS[job_log['status']] if job_log and 'status' in job_log.keys() else None } return log @router.get("/logs_status/{ids}") @web_try() @sxtimeit def get_job_log_status(ids: str, token_data: schemas.TokenData = Depends(verify_users)): run_ids = ids.split(',') id_to_status = {} for run_id in run_ids: res = get_job_run_status(run_id) id_to_status.update({run_id:res['data']['status']}) return id_to_status