123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- import base64
- import re
- from typing import Optional, List
- from fastapi import APIRouter, Depends, Header
- from sqlalchemy.orm import Session
- from app import models, schemas
- from app.common.decorators import verify_all
- import app.crud as crud
- from app.schemas import cron_expression
- from app.services.jm_job import on_off_control
- from app.services.jm_job_info import create_jm_job_info_services, execute_job_services, get_all_timeout_jobs_services, get_requirements_status_by_job_id, update_jm_job_info_services, update_jm_job_status_services
- from app.utils.cron_utils import *
- from app.utils.send_util import send_delete, send_execute
- from utils.sx_time import sxtimeit
- from utils.sx_web import web_try
- from fastapi_pagination import Page, add_pagination, paginate, Params
- from configs.settings import DefaultOption, config
- job_timeout = int(config.get('JOB_CONFIG', 'timeout'))
- job_auth_token = config.get('JOB_CONFIG', 'auth_token')
- from app import get_db
- router = APIRouter(
- prefix="/jpt/jm_job_info",
- tags=["jm_job_info-定时任务管理"],
- )
- @router.post("/", dependencies=[Depends(verify_all)])
- @web_try()
- @sxtimeit
- def create_jm_job_info(item: schemas.JmJobInfoCreate, db: Session = Depends(get_db)):
- return create_jm_job_info_services(db, item)
- @router.get("/", dependencies=[Depends(verify_all)])
- @web_try()
- @sxtimeit
- def get_jm_job_infos(db: Session = Depends(get_db)):
- res_list = []
- jm_job_list = crud.get_jm_job_infos(db)
- jm_job_ids = [job.id for job in jm_job_list]
- relations = crud.get_af_ids(db,jm_job_ids, 'job')
- af_to_datax = {relation.af_id:relation.se_id for relation in relations}
- af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys())
- res = {}
- for af_job_run in af_job_runs:
- if af_job_run.status in [2,3]:
- job_id = af_to_datax[int(af_job_run.job_id)]
- log = {
- "start_time": af_job_run.start_time,
- "execute_result": af_job_run.status,
- }
- if job_id in res.keys():
- res[job_id].append(log)
- else:
- res.update({job_id: [log]})
- for jm_job in jm_job_list:
- history = res[jm_job.id] if jm_job.id in res.keys() else []
- history.sort(key=lambda x: x['start_time'], reverse=True)
- jm_job_dict = jm_job.to_dict()
- history = history[0:10]
- history.sort(key=lambda x: x['start_time'], reverse=False)
- jm_job_dict.update({'history':history})
- requirements_status = get_requirements_status_by_job_id(db,jm_job.id)
- jm_job_dict.update({'requirements_status': 0 if requirements_status else 1})
- res_list.append(jm_job_dict)
- return res_list
- @router.get("/info", dependencies=[Depends(verify_all)])
- @web_try()
- @sxtimeit
- def get_jm_job_info(jm_job_id: int, db: Session = Depends(get_db)):
- jm_job = crud.get_jm_job_info(db,jm_job_id)
- jm_job_dict = jm_job.to_dict()
- nodes = crud.get_one_job_nodes(db,jm_job_id)
- cron_type, cron_select_type, cron_expression = jm_job_dict['cron_type'], jm_job_dict['cron_select_type'], jm_job_dict['cron_expression']
- cron_expression_dict = {}
- if cron_type == 2:
- cron_expression_dict = parsing_cron_expression(cron_expression)
- cron_expression_dict.update({
- 'cron_select_type': cron_select_type,
- 'cron_expression': cron_expression
- })
- jm_job_dict.update({
- 'nodes': nodes,
- 'cron_expression_dict': cron_expression_dict
- })
- return jm_job_dict
- @router.put("/", dependencies=[Depends(verify_all)])
- @web_try()
- @sxtimeit
- def update_jm_job_info(item: schemas.JmJobInfoUpdate, db: Session = Depends(get_db)):
- return update_jm_job_info_services(db, item)
- @router.delete("/", dependencies=[Depends(verify_all)])
- @web_try()
- @sxtimeit
- def delete_jm_job_info(jm_job_id: int, db: Session = Depends(get_db)):
- jm_job = crud.get_jm_job_info(db,jm_job_id)
- if jm_job.status == 1:
- raise Exception('任务未停用,不可删除')
- relation = crud.get_af_id(db, jm_job_id, 'job')
- send_delete('/af/af_job', relation.af_id)
- return crud.delete_jm_job_info(db,jm_job_id)
- @router.put("/status", dependencies=[Depends(verify_all)])
- @web_try()
- @sxtimeit
- def update_jm_job_status(item: schemas.JmJobInfoStatusUpdate, db: Session = Depends(get_db)):
- return update_jm_job_status_services(db, item.id, item.status)
- @router.post("/execute/{jm_job_id}", dependencies=[Depends(verify_all)])
- @web_try()
- @sxtimeit
- def execute_jm_job(jm_job_id: int, db: Session = Depends(get_db)):
- jm_job = crud.get_jm_job_info(db,jm_job_id)
- if jm_job.status == 0:
- raise Exception('任务已被停用')
- res = execute_job_services(db,jm_job_id)
- return res['data']
- @router.get("/api/{jm_job_id}", dependencies=[Depends(verify_all)])
- @web_try()
- @sxtimeit
- def copy_api_path(jm_job_id: int, db: Session = Depends(get_db)):
- initial = str(jm_job_id).encode('utf-8')
- encryption_id = base64.b64encode(initial).decode('utf-8')
- return f'/jpt/jm_job_info/api_execute/{encryption_id}'
- @router.get("/api_execute/{encryption_id}")
- @web_try()
- @sxtimeit
- def api_execute_jm_job(encryption_id: str, db: Session = Depends(get_db)):
- jm_job_id = 0
- try:
- initial = base64.b64decode(encryption_id)
- jm_job_id = int(initial.decode('utf-8'))
- except Exception as e:
- raise Exception('任务路径解析失败')
- jm_job = crud.get_jm_job_info(db,jm_job_id)
- if jm_job.status == 0:
- raise Exception('任务已被停用')
- res = execute_job_services(db,jm_job_id)
- return res['data']
- @router.post("/cron_expression", dependencies=[Depends(verify_all)])
- @web_try()
- @sxtimeit
- def get_cron_expression(cron_expression: schemas.CronExpression):
- cron = joint_cron_expression(cron_expression)
- return cron
- @router.get("/cron_next_execute", dependencies=[Depends(verify_all)])
- @web_try()
- @sxtimeit
- def get_cron_next_execute(cron_expression: str):
- execute_list = run_get_next_time(cron_expression)
- return execute_list
- @router.get("/all_timeout_jobs")
- @web_try()
- @sxtimeit
- def get_all_timeout_jobs(timeout: Optional[int] = job_timeout, auth_token: str = Header(), db: Session = Depends(get_db)):
- if auth_token != job_auth_token:
- raise Exception('Token校验失败')
- return get_all_timeout_jobs_services(db, timeout)
|