123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- import datetime
- import croniter
- import re
- from typing import Optional, List
- from fastapi import APIRouter
- from fastapi import Depends
- from sqlalchemy.orm import Session
- from app import models, schemas
- import app.crud as crud
- from app.schemas import cron_expression
- from app.services.jm_job import on_off_control
- from app.services.jm_job_info import create_jm_job_info_services, execute_job_services, update_jm_job_info_services, update_jm_job_status_services
- from app.utils.cron_utils import *
- from app.utils.send_util import send_delete, send_execute
- from utils.sx_time import sxtimeit
- from utils.sx_web import web_try
- from fastapi_pagination import Page, add_pagination, paginate, Params
- from app import get_db
- router = APIRouter(
- prefix="/jpt/jm_job_info",
- tags=["jm_job_info-定时任务管理"],
- )
- @router.post("/")
- @web_try()
- @sxtimeit
- def create_jm_job_info(item: schemas.JmJobInfoCreate, db: Session = Depends(get_db)):
- return create_jm_job_info_services(db, item)
- @router.get("/")
- @web_try()
- @sxtimeit
- def get_jm_job_infos(db: Session = Depends(get_db)):
- res_list = []
- jm_job_list = crud.get_jm_job_infos(db)
- jm_job_ids = [job.id for job in jm_job_list]
- relations = crud.get_af_ids(db,jm_job_ids, 'job')
- af_to_datax = {relation.af_id:relation.se_id for relation in relations}
- af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys())
- res = {}
- for af_job_run in af_job_runs:
- tasks = list(af_job_run.details['tasks'].values()) if len(list(af_job_run.details['tasks'].values()))>0 else []
- if len(tasks) > 0:
- task = tasks[-1]
- task.pop('log',None)
- job_id = af_to_datax[int(af_job_run.job_id)]
- log = {
- "id": af_job_run.id,
- "job_id": job_id,
- "af_job_id": int(af_job_run.job_id),
- "run_id": af_job_run.af_run_id,
- "trigger_time": af_job_run.start_time,
- "trigger_result": 1 if task else 0,
- "execute_time": task['start_time'] if task else 0,
- "execute_result": af_job_run.status,
- "end_time": task['end_time'] if task else 0,
- }
- if job_id in res.keys():
- res[job_id].append(log)
- else:
- res.update({job_id: [log]})
- for jm_job in jm_job_list:
- history = res[jm_job.id] if jm_job.id in res.keys() else []
- history.sort(key=lambda x: x['trigger_time'], reverse=True)
- jm_job_dict = jm_job.to_dict()
- jm_job_dict.update({'history':history[0:10]})
- res_list.append(jm_job_dict)
- return res_list
- @router.get("/info")
- @web_try()
- @sxtimeit
- def get_jm_job_info(jm_job_id: int, db: Session = Depends(get_db)):
- jm_job = crud.get_jm_job_info(db,jm_job_id)
- jm_job_dict = jm_job.to_dict()
- nodes = crud.get_one_job_nodes(db,jm_job_id)
- cron_type, cron_select_type, cron_expression = jm_job_dict['cron_type'], jm_job_dict['cron_select_type'], jm_job_dict['cron_expression']
- cron_expression_dict = {}
- if cron_type == 2:
- cron_expression_dict = parsing_cron_expression(cron_expression)
- cron_expression_dict.update({
- 'cron_select_type': cron_select_type,
- 'cron_expression': cron_expression
- })
- jm_job_dict.update({
- 'nodes': nodes,
- 'cron_expression_dict': cron_expression_dict
- })
- return jm_job_dict
- @router.put("/")
- @web_try()
- @sxtimeit
- def update_jm_job_info(item: schemas.JmJobInfoUpdate, db: Session = Depends(get_db)):
- return update_jm_job_info_services(db, item)
- @router.delete("/")
- @web_try()
- @sxtimeit
- def delete_jm_job_info(jm_job_id: int, db: Session = Depends(get_db)):
- relation = crud.get_af_id(db, jm_job_id, 'job')
- send_delete('/af/af_job', relation.af_id)
- return crud.delete_jm_job_info(db,jm_job_id)
- @router.put("/status")
- @web_try()
- @sxtimeit
- def update_jm_job_status(item: schemas.JmJobInfoStatusUpdate, db: Session = Depends(get_db)):
- return update_jm_job_status_services(db, item.id, item.status)
- @router.post("/execute/{jm_job_id}")
- @web_try()
- @sxtimeit
- def execute_jm_job(jm_job_id: int, db: Session = Depends(get_db)):
- jm_job = crud.get_jm_job_info(db,jm_job_id)
- if jm_job.status == 0:
- raise Exception('任务已被停用')
- res = execute_job_services(db,jm_job_id)
- return res['data']
- @router.post("/cron_expression")
- @web_try()
- @sxtimeit
- def get_cron_expression(cron_expression: schemas.CronExpression):
- print(cron_expression)
- cron = joint_cron_expression(cron_expression)
- return cron
- @router.get("/cron_next_execute")
- @web_try()
- @sxtimeit
- def get_cron_next_execute(cron_expression: str):
- execute_list = run_get_next_time(cron_expression)
- return execute_list
- def run_get_next_time(cron_expression):
- now = datetime.datetime.now()
- cron_str = cron_expression.replace('?','*')
- cron = croniter.croniter(cron_str, now)
- execute_list = []
- for i in range(0, 5):
- next_time = cron.get_next(datetime.datetime).strftime("%Y-%m-%d %H:%M")
- execute_list.append(next_time)
- return execute_list
|