import base64 import re from typing import Optional, List from fastapi import APIRouter, Depends, Header from sqlalchemy.orm import Session from app import models, schemas from app.common.security.auth import verify_users import app.crud as crud from app.schemas import cron_expression from app.services.jm_job import on_off_control from app.services.jm_job_info import create_jm_job_info_services, execute_job_services, get_all_timeout_jobs_services, get_requirements_status_by_job_id, update_jm_job_info_services, update_jm_job_status_services from app.utils.cron_utils import * from app.utils.send_util import send_delete, send_execute from utils.sx_time import sxtimeit from utils.sx_web import web_try from fastapi_pagination import Page, add_pagination, paginate, Params from configs.settings import DefaultOption, config job_timeout = int(config.get('JOB_CONFIG', 'timeout')) job_auth_token = config.get('JOB_CONFIG', 'auth_token') from app import get_db router = APIRouter( prefix="/jpt/jm_job_info", tags=["jm_job_info-定时任务管理"], ) @router.post("/") @web_try() @sxtimeit def create_jm_job_info(item: schemas.JmJobInfoCreate, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): return create_jm_job_info_services(db, item) @router.get("/") @web_try() @sxtimeit def get_jm_job_infos(token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): res_list = [] jm_job_list = crud.get_jm_job_infos(db) jm_job_ids = [job.id for job in jm_job_list] relations = crud.get_af_ids(db,jm_job_ids, 'job') af_to_datax = {relation.af_id:relation.se_id for relation in relations} af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys()) res = {} for af_job_run in af_job_runs: if af_job_run.status in [2,3]: job_id = af_to_datax[int(af_job_run.job_id)] log = { "start_time": af_job_run.start_time, "execute_result": af_job_run.status, } if job_id in res.keys(): res[job_id].append(log) else: res.update({job_id: [log]}) for jm_job in jm_job_list: # 历史运行 history = res[jm_job.id] if jm_job.id in res.keys() else [] history.sort(key=lambda x: x['start_time'], reverse=True) jm_job_dict = jm_job.to_dict() history = history[0:10] history.sort(key=lambda x: x['start_time'], reverse=False) jm_job_dict.update({'history':history}) requirements_status = get_requirements_status_by_job_id(db,jm_job.id) jm_job_dict.update({'requirements_status': 0 if requirements_status else 1}) res_list.append(jm_job_dict) return res_list @router.get("/info") @web_try() @sxtimeit def get_jm_job_info(jm_job_id: int, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): jm_job = crud.get_jm_job_info(db,jm_job_id) jm_job_dict = jm_job.to_dict() nodes = crud.get_one_job_nodes(db,jm_job_id) cron_type, cron_select_type, cron_expression = jm_job_dict['cron_type'], jm_job_dict['cron_select_type'], jm_job_dict['cron_expression'] cron_expression_dict = {} if cron_type == 2: cron_expression_dict = parsing_cron_expression(cron_expression) cron_expression_dict.update({ 'cron_select_type': cron_select_type, 'cron_expression': cron_expression }) jm_job_dict.update({ 'nodes': nodes, 'cron_expression_dict': cron_expression_dict }) return jm_job_dict @router.put("/") @web_try() @sxtimeit def update_jm_job_info(item: schemas.JmJobInfoUpdate, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): return update_jm_job_info_services(db, item) @router.delete("/") @web_try() @sxtimeit def delete_jm_job_info(jm_job_id: int, db: Session = Depends(get_db)): jm_job = crud.get_jm_job_info(db,jm_job_id) if jm_job.status == 1: raise Exception('任务未停用,不可删除') relation = crud.get_af_id(db, jm_job_id, 'job') send_delete('/af/af_job', relation.af_id) return crud.delete_jm_job_info(db,jm_job_id) @router.put("/status") @web_try() @sxtimeit def update_jm_job_status(item: schemas.JmJobInfoStatusUpdate, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): return update_jm_job_status_services(db, item.id, item.status) @router.post("/execute/{jm_job_id}") @web_try() @sxtimeit def execute_jm_job(jm_job_id: int, db: Session = Depends(get_db)): jm_job = crud.get_jm_job_info(db,jm_job_id) if jm_job.status == 0: raise Exception('任务已被停用') res = execute_job_services(db,jm_job_id) return res['data'] @router.get("/api/{jm_job_id}") @web_try() @sxtimeit def copy_api_path(jm_job_id: int, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): initial = str(jm_job_id).encode('utf-8') encryption_id = base64.b64encode(initial).decode('utf-8') return f'/jpt/jm_job_info/api_execute/{encryption_id}' @router.get("/api_execute/{encryption_id}") @web_try() @sxtimeit def api_execute_jm_job(encryption_id: str, db: Session = Depends(get_db)): jm_job_id = 0 try: initial = base64.b64decode(encryption_id) jm_job_id = int(initial.decode('utf-8')) except Exception as e: raise Exception('任务路径解析失败') jm_job = crud.get_jm_job_info(db,jm_job_id) if jm_job.status == 0: raise Exception('任务已被停用') res = execute_job_services(db,jm_job_id) return res['data'] @router.post("/cron_expression") @web_try() @sxtimeit def get_cron_expression(cron_expression: schemas.CronExpression, token_data: schemas.TokenData = Depends(verify_users)): cron = joint_cron_expression(cron_expression) return cron @router.get("/cron_next_execute") @web_try() @sxtimeit def get_cron_next_execute(cron_expression: str, token_data: schemas.TokenData = Depends(verify_users)): execute_list = run_get_next_time(cron_expression) return execute_list @router.get("/cron_next_execute") @web_try() @sxtimeit def get_cron_next_execute(cron_expression: str, token_data: schemas.TokenData = Depends(verify_users)): execute_list = run_get_next_time(cron_expression) return execute_list @router.get("/all_timeout_jobs") @web_try() @sxtimeit def get_all_timeout_jobs(timeout: Optional[int] = job_timeout, auth_token: str = Header(), db: Session = Depends(get_db)): if auth_token != job_auth_token: raise Exception('Token校验失败') return get_all_timeout_jobs_services(db, timeout)