import base64 from datetime import datetime from datetime import timedelta from datetime import timezone import croniter import re from typing import Optional, List from fastapi import APIRouter from fastapi import Depends from sqlalchemy.orm import Session from app import models, schemas from app.common.security.auth import verify_users import app.crud as crud from app.schemas import cron_expression from app.services.jm_job import on_off_control from app.services.jm_job_info import create_jm_job_info_services, execute_job_services, update_jm_job_info_services, update_jm_job_status_services from app.utils.cron_utils import * from app.utils.send_util import send_delete, send_execute from utils.sx_time import sxtimeit from utils.sx_web import web_try from fastapi_pagination import Page, add_pagination, paginate, Params from app import get_db router = APIRouter( prefix="/jpt/jm_job_info", tags=["jm_job_info-定时任务管理"], ) @router.post("/") @web_try() @sxtimeit def create_jm_job_info(item: schemas.JmJobInfoCreate, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): return create_jm_job_info_services(db, item) @router.get("/") @web_try() @sxtimeit def get_jm_job_infos(token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): res_list = [] jm_job_list = crud.get_jm_job_infos(db) jm_job_ids = [job.id for job in jm_job_list] relations = crud.get_af_ids(db,jm_job_ids, 'job') af_to_datax = {relation.af_id:relation.se_id for relation in relations} af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys()) res = {} for af_job_run in af_job_runs: if af_job_run.status in [2,3]: job_id = af_to_datax[int(af_job_run.job_id)] log = { "start_time": af_job_run.start_time, "execute_result": af_job_run.status, } if job_id in res.keys(): res[job_id].append(log) else: res.update({job_id: [log]}) for jm_job in jm_job_list: history = res[jm_job.id] if jm_job.id in res.keys() else [] history.sort(key=lambda x: x['start_time'], reverse=True) jm_job_dict = jm_job.to_dict() history = history[0:10] history.sort(key=lambda x: x['start_time'], reverse=False) jm_job_dict.update({'history':history}) res_list.append(jm_job_dict) return res_list @router.get("/info") @web_try() @sxtimeit def get_jm_job_info(jm_job_id: int, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): jm_job = crud.get_jm_job_info(db,jm_job_id) jm_job_dict = jm_job.to_dict() nodes = crud.get_one_job_nodes(db,jm_job_id) cron_type, cron_select_type, cron_expression = jm_job_dict['cron_type'], jm_job_dict['cron_select_type'], jm_job_dict['cron_expression'] cron_expression_dict = {} if cron_type == 2: cron_expression_dict = parsing_cron_expression(cron_expression) cron_expression_dict.update({ 'cron_select_type': cron_select_type, 'cron_expression': cron_expression }) jm_job_dict.update({ 'nodes': nodes, 'cron_expression_dict': cron_expression_dict }) return jm_job_dict @router.put("/") @web_try() @sxtimeit def update_jm_job_info(item: schemas.JmJobInfoUpdate, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): return update_jm_job_info_services(db, item) @router.delete("/") @web_try() @sxtimeit def delete_jm_job_info(jm_job_id: int, db: Session = Depends(get_db)): jm_job = crud.get_jm_job_info(db,jm_job_id) if jm_job.status == 1: raise Exception('任务未停用,不可删除') relation = crud.get_af_id(db, jm_job_id, 'job') send_delete('/af/af_job', relation.af_id) return crud.delete_jm_job_info(db,jm_job_id) @router.put("/status") @web_try() @sxtimeit def update_jm_job_status(item: schemas.JmJobInfoStatusUpdate, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): return update_jm_job_status_services(db, item.id, item.status) @router.post("/execute/{jm_job_id}") @web_try() @sxtimeit def execute_jm_job(jm_job_id: int, db: Session = Depends(get_db)): jm_job = crud.get_jm_job_info(db,jm_job_id) if jm_job.status == 0: raise Exception('任务已被停用') res = execute_job_services(db,jm_job_id) return res['data'] @router.get("/api/{jm_job_id}") @web_try() @sxtimeit def copy_api_path(jm_job_id: int, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): initial = str(jm_job_id).encode('utf-8') encryption_id = base64.b64encode(initial).decode('utf-8') return f'/jpt/jm_job_info/api_execute/{encryption_id}' @router.get("/api_execute/{encryption_id}") @web_try() @sxtimeit def api_execute_jm_job(encryption_id: str, db: Session = Depends(get_db)): jm_job_id = 0 try: initial = base64.b64decode(encryption_id) jm_job_id = int(initial.decode('utf-8')) except Exception as e: raise Exception('任务路径解析失败') jm_job = crud.get_jm_job_info(db,jm_job_id) if jm_job.status == 0: raise Exception('任务已被停用') res = execute_job_services(db,jm_job_id) return res['data'] @router.post("/cron_expression") @web_try() @sxtimeit def get_cron_expression(cron_expression: schemas.CronExpression, token_data: schemas.TokenData = Depends(verify_users), ): print(cron_expression) cron = joint_cron_expression(cron_expression) return cron @router.get("/cron_next_execute") @web_try() @sxtimeit def get_cron_next_execute(cron_expression: str, token_data: schemas.TokenData = Depends(verify_users), ): execute_list = run_get_next_time(cron_expression) return execute_list def run_get_next_time(cron_expression): SHA_TZ = timezone(timedelta(hours=8),name='Asia/Shanghai',) utc_now = datetime.utcnow().replace(tzinfo=timezone.utc) now = utc_now.astimezone(SHA_TZ) cron_str = cron_expression.replace('?','*') cron = croniter.croniter(cron_str, now) execute_list = [] for i in range(0, 5): next_time = cron.get_next(datetime).strftime("%Y-%m-%d %H:%M") execute_list.append(next_time) return execute_list