from typing import Optional from fastapi import APIRouter from fastapi import Depends from sqlalchemy.orm import Session from app import schemas import app.crud as crud from utils.sx_time import sxtimeit from utils.sx_web import web_try from fastapi_pagination import Page, add_pagination, paginate, Params from app import get_db router = APIRouter( prefix="/jpt/jm_job_log", tags=["jm_job_log-定时任务日志管理"], ) @router.get("/") @web_try() @sxtimeit def get_job_logs(job_id: int = None, db: Session = Depends(get_db)): jm_job_infos = [] if job_id is not None: jm_job_infos = [crud.get_jm_job_info(db, job_id)] else: jm_job_infos = crud.get_jm_job_infos(db) job_id_to_job = {jm_job.id:jm_job for jm_job in jm_job_infos} jm_job_id_list = job_id_to_job.keys() job_history_list = crud.get_historys_by_job_ids(db,jm_job_id_list) res = [] for job_history in job_history_list: jm_job = job_id_to_job[job_history.job_id] job_history_dict = job_history.__dict__ job_history_dict.update({"job_name":jm_job.name}) job_history_dict.update({"job_type":jm_job.type}) job_history_dict.update({"job_tag":jm_job.tag}) res.append(job_history_dict) return res @router.get("/logs") @web_try() @sxtimeit def get_job_logs(job_history_id: int,db: Session = Depends(get_db)): job_history_info = crud.get_jm_job_history_info(db,job_history_id) job_info = crud.get_jm_job_info(db,job_history_info.job_id) job_logs = crud.get_jm_job_logs_by_history_id(db,job_history_id) if len(job_logs) <= 0: raise Exception("未找到该任务此次运行的日志") if job_info.type == '单作业离线任务': return { 'job_type': job_info.type, 'logs': job_logs, } res = {} for job_log in job_logs: if job_log.homework_id in res.keys(): res[job_log.homework_id]['nodes'].append(job_log) else: res.update({job_log.homework_id:{ "homework_name":job_log.homework_name, "nodes": [job_log] }}) logs = [v for k, v in res.items()] return { 'job_type': job_info.type, 'logs': logs, }