123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- import json
- from typing import List, Optional
- from fastapi import APIRouter
- from fastapi import Depends, Query
- from sqlalchemy.orm import Session
- from app import page_help, schemas
- from app.common.decorators import verify_all
- import app.crud as crud
- from app.utils.send_util import get_job_run_status, get_task_log
- from constants.constants import RUN_STATUS
- from utils.sx_time import sxtimeit
- from utils.sx_web import web_try
- from fastapi_pagination import Page, add_pagination, paginate, Params
- from app import get_db, get_page
- router = APIRouter(
- prefix="/jpt/jm_job_log",
- tags=["jm_job_log-定时任务日志管理"],
- )
- @router.get("/", dependencies=[Depends(verify_all)])
- @web_try()
- @sxtimeit
- def get_job_logs(job_id: int = None, params: Params=Depends(get_page), db: Session = Depends(get_db)):
- jm_job_list = []
- # 是否有任务筛选
- if job_id is not None:
- jm_job_list = [crud.get_jm_job_info(db, job_id)]
- else:
- jm_job_list = crud.get_jm_job_infos(db)
- id_to_job = {job.id:job for job in jm_job_list}
- relations = crud.get_af_ids(db,id_to_job.keys(), 'job')
- af_to_datax = {relation.af_id:relation.se_id for relation in relations}
- # 获取任务运行记录
- af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys(),(params['page'] - 1) * params['size'],params['page'] * params['size'])
- total = crud.count_airflow_runs_by_job_ids(db, af_to_datax.keys())
- res = []
- for af_job_run in af_job_runs:
- job_id = af_to_datax[int(af_job_run.job_id)]
- execute_result = None
- # 若该记录未运行完成,获取运行的状态
- if af_job_run.status <= 1:
- run_status = get_job_run_status(af_job_run.id)
- execute_result = run_status['data']['status']
- log = {
- "id": af_job_run.id,
- "job_id": job_id,
- "job_name": id_to_job[job_id].name,
- "job_type": id_to_job[job_id].type,
- "job_tag": id_to_job[job_id].tag,
- "af_job_id": int(af_job_run.job_id),
- "run_id": af_job_run.af_run_id,
- "start_time": af_job_run.start_time,
- "result": execute_result if execute_result else af_job_run.status,
- }
- res.append(log)
- return page_help(res,params['page'],params['size'],total)
- @router.get("/all_task", dependencies=[Depends(verify_all)])
- @web_try()
- @sxtimeit
- def get_job_all_task(run_id: str, db: Session = Depends(get_db)):
- af_job_run = crud.get_airflow_run_once(db, run_id)
- af_job = crud.get_airflow_job_once(db, af_job_run.job_id)
- tasks = list(af_job.tasks) if len(list(af_job.tasks))>0 else []
- res = []
- for task in tasks:
- if task['task_type'] == 'sparks':
- task_script = json.loads(task['script'])
- for node in task_script['sub_nodes']:
- task_id = str(task['id'])+'_'+node['id']
- log ={
- 'name':task['name']+'-'+node['name'],
- 'job_id':af_job.id,
- 'af_run_id':af_job_run.af_run_id,
- 'task_id': task_id,
- }
- task_log_res = get_task_log(af_job.id, af_job_run.af_run_id, task_id)
- task_log = task_log_res['data'] if 'data' in task_log_res.keys() else None
- if task_log:
- log.update({
- 'execute_result': RUN_STATUS[task_log['status']] if task_log['status'] else 0,
- 'execute_time':task_log['start_time'],
- 'log': task_log['log']
- })
- res.append(log)
- else:
- log ={
- 'name':task['name'],
- 'job_id':af_job.id,
- 'af_run_id':af_job_run.af_run_id,
- 'task_id':task['id'],
- }
- task_log_res = get_task_log(af_job.id, af_job_run.af_run_id, task['id'])
- task_log = task_log_res['data'] if 'data' in task_log_res.keys() else None
- if task_log:
- log.update({
- 'execute_result':RUN_STATUS[task_log['status']] if task_log['status'] else 0,
- 'execute_time':task_log['start_time'],
- 'log': task_log['log']
- })
- res.append(log)
- return res
- @router.get("/task_log/{job_id}/{af_run_id}/{task_id}", dependencies=[Depends(verify_all)])
- @web_try()
- @sxtimeit
- def get_job_task_log(job_id: str, af_run_id: str, task_id: str, db: Session = Depends(get_db)):
- res = get_task_log(job_id, af_run_id, task_id)
- log = res['data'] if 'data' in res else None
- if log:
- log['status'] = RUN_STATUS[log['status']] if log['status'] else 0
- return log
- @router.get("/logs_status/{ids}", dependencies=[Depends(verify_all)])
- @web_try()
- @sxtimeit
- def get_job_log_status(ids: str):
- run_ids = ids.split(',')
- id_to_status = {}
- for run_id in run_ids:
- res = get_job_run_status(run_id)
- # id_to_status.update({run_id:RUN_STATUS[res['data']['status']]})
- id_to_status.update({run_id:res['data']['status']})
- return id_to_status
|