import json from typing import List, Optional from fastapi import APIRouter from fastapi import Depends, Query from sqlalchemy.orm import Session from app import page_help, schemas from app.common.security.auth import verify_users import app.crud as crud from app.utils.send_util import get_job_run_status, get_task_log from constants.constants import RUN_STATUS from utils.sx_time import sxtimeit from utils.sx_web import web_try from fastapi_pagination import Page, add_pagination, paginate, Params from app import get_db, get_page router = APIRouter( prefix="/jpt/jm_job_log", tags=["jm_job_log-定时任务日志管理"], ) @router.get("/") @web_try() @sxtimeit def get_job_logs(job_id: int = None, params: Params=Depends(get_page), token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): jm_job_list = [] # 是否有任务筛选 if job_id is not None: jm_job_list = [crud.get_jm_job_info(db, job_id)] else: jm_job_list = crud.get_jm_job_infos(db) id_to_job = {job.id:job for job in jm_job_list} relations = crud.get_af_ids(db,id_to_job.keys(), 'job') af_to_datax = {relation.af_id:relation.se_id for relation in relations} # 获取任务运行记录 af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys(),(params['page'] - 1) * params['size'],params['page'] * params['size']) total = crud.count_airflow_runs_by_job_ids(db, af_to_datax.keys()) res = [] for af_job_run in af_job_runs: job_id = af_to_datax[int(af_job_run.job_id)] execute_result = None # 若该记录未运行完成,获取运行的状态 if af_job_run.status <= 1: run_status = get_job_run_status(af_job_run.id) execute_result = run_status['data']['status'] log = { "id": af_job_run.id, "job_id": job_id, "job_name": id_to_job[job_id].name, "job_type": id_to_job[job_id].type, "job_tag": id_to_job[job_id].tag, "af_job_id": int(af_job_run.job_id), "run_id": af_job_run.af_run_id, "start_time": af_job_run.start_time, "result": execute_result if execute_result else af_job_run.status, } res.append(log) return page_help(res,params['page'],params['size'],total) @router.get("/all_task") @web_try() @sxtimeit def get_job_all_task(run_id: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): af_job_run = crud.get_airflow_run_once(db, run_id) af_job = crud.get_airflow_job_once(db, af_job_run.job_id) tasks = list(af_job.tasks) if len(list(af_job.tasks))>0 else [] res = [] for task in tasks: if task['task_type'] == 'sparks': task_script = json.loads(task['script']) for node in task_script['sub_nodes']: task_id = str(task['id'])+'_'+node['id'] log ={ 'name':task['name']+'-'+node['name'], 'job_id':af_job.id, 'af_run_id':af_job_run.af_run_id, 'task_id': task_id, } task_log_res = get_task_log(af_job.id, af_job_run.af_run_id, task_id) task_log = task_log_res['data'] if 'data' in task_log_res.keys() else None if task_log: log.update({ 'execute_result': RUN_STATUS[task_log['status']] if task_log['status'] else 0, 'execute_time':task_log['start_time'], 'log': task_log['log'] }) res.append(log) else: log ={ 'name':task['name'], 'job_id':af_job.id, 'af_run_id':af_job_run.af_run_id, 'task_id':task['id'], } task_log_res = get_task_log(af_job.id, af_job_run.af_run_id, task['id']) task_log = task_log_res['data'] if 'data' in task_log_res.keys() else None if task_log: log.update({ 'execute_result':RUN_STATUS[task_log['status']] if task_log['status'] else 0, 'execute_time':task_log['start_time'], 'log': task_log['log'] }) res.append(log) return res @router.get("/task_log/{job_id}/{af_run_id}/{task_id}") @web_try() @sxtimeit def get_job_task_log(job_id: str, af_run_id: str, task_id: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): res = get_task_log(job_id, af_run_id, task_id) log = res['data'] if 'data' in res else None if log: log['status'] = RUN_STATUS[log['status']] if log['status'] else 0 return log @router.get("/logs_status/{ids}") @web_try() @sxtimeit def get_job_log_status(ids: str, token_data: schemas.TokenData = Depends(verify_users)): run_ids = ids.split(',') id_to_status = {} for run_id in run_ids: res = get_job_run_status(run_id) # id_to_status.update({run_id:RUN_STATUS[res['data']['status']]}) id_to_status.update({run_id:res['data']['status']}) return id_to_status