123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- import datetime
- import croniter
- import re
- from typing import Optional, List
- from fastapi import APIRouter
- from fastapi import Depends
- from sqlalchemy.orm import Session
- from app import models, schemas
- import app.crud as crud
- from app.schemas import cron_expression
- from app.utils.cron_utils import *
- from utils.sx_time import sxtimeit
- from utils.sx_web import web_try
- from fastapi_pagination import Page, add_pagination, paginate, Params
- from app import get_db
- router = APIRouter(
- prefix="/jpt/jm_job_info",
- tags=["jm_job_info-定时任务管理"],
- )
- @router.post("/")
- @web_try()
- @sxtimeit
- def create_jm_job_info(item: schemas.JmJobInfoCreate, db: Session = Depends(get_db)):
- jm_job_info,nodes,edges = crud.create_jm_job_info(db, item)
- job_id = jm_job_info.id
- create_jm_job_node(db, nodes, edges, job_id)
- return jm_job_info.to_dict()
- def create_jm_job_node(db: Session, nodes, edges, job_id):
- uuid_node_id = {}
- if nodes is None or len(nodes) == 0:
- return
- for node in nodes:
- uuid = node['id']
- node_item = models.JmJobNode(**{
- 'job_id': job_id,
- 'homework_id': node['homework_id'],
- 'homework_name': node['homework_name'],
- 'start_point': 1,
- })
- node_item = crud.create_jm_job_node(db,node_item)
- node_id = node_item.id
- uuid_node_id.update({uuid:node_id})
- if nodes is None or len(nodes) == 0:
- return
- for edge in edges:
- edge_item = models.JmJobEdge(**{
- 'job_id': job_id,
- 'in_node_id': uuid_node_id[edge['source']],
- 'out_node_id': uuid_node_id[edge['target']]
- })
- edge = crud.create_jm_job_edge(db,edge_item)
- return
- @router.get("/")
- @web_try()
- @sxtimeit
- def get_jm_job_infos(db: Session = Depends(get_db)):
- res_list = []
- jm_job_list = crud.get_jm_job_infos(db)
- for jm_job in jm_job_list:
- history = crud.get_one_job_historys(db, jm_job.id)
- jm_job_dict = jm_job.to_dict()
- jm_job_dict.update({'history':history[0:10]})
- res_list.append(jm_job_dict)
- return res_list
- @router.get("/info")
- @web_try()
- @sxtimeit
- def get_jm_job_info(jm_job_id: int, db: Session = Depends(get_db)):
- jm_job = crud.get_jm_job_info(db,jm_job_id)
- jm_job_dict = jm_job.to_dict()
- nodes = crud.get_one_job_nodes(db,jm_job_id)
- cron_type, cron_select_type, cron_expression = jm_job_dict['cron_type'], jm_job_dict['cron_select_type'], jm_job_dict['cron_expression']
- cron_expression_dict = {}
- if cron_type == 2:
- cron_expression_dict = parsing_cron_expression(cron_expression)
- cron_expression_dict.update({
- 'cron_select_type': cron_select_type,
- 'cron_expression': cron_expression
- })
- jm_job_dict.update({
- 'nodes': nodes,
- 'cron_expression_dict': cron_expression_dict
- })
- return jm_job_dict
- @router.put("/")
- @web_try()
- @sxtimeit
- def update_jm_job_info(item: schemas.JmJobInfoUpdate, db: Session = Depends(get_db)):
- jm_job_info,nodes,edges = crud.update_jm_job_info(db, item)
- job_id = jm_job_info.id
- crud.delete_job_node(db, job_id)
- job_id = jm_job_info.id
- create_jm_job_node(db, nodes, edges, job_id)
- return jm_job_info.to_dict()
- @router.delete("/")
- @web_try()
- @sxtimeit
- def delete_jm_job_info(jm_job_id: int, db: Session = Depends(get_db)):
- return crud.delete_jm_job_info(db,jm_job_id)
- @router.put("/status")
- @web_try()
- @sxtimeit
- def update_jm_job_status(item: schemas.JmJobInfoStatusUpdate, db: Session = Depends(get_db)):
- return crud.update_jm_job_status(db,item)
- @router.post("/execute/{jm_job_id}")
- @web_try()
- @sxtimeit
- def execute_jm_job(jm_job_id: int, db: Session = Depends(get_db)):
- jm_job = crud.get_jm_job_info(db,jm_job_id)
- if jm_job.status == 0:
- raise Exception('任务已被停用')
- # 进行api调用
- return jm_job
- @router.post("/cron_expression")
- @web_try()
- @sxtimeit
- def get_cron_expression(cron_expression: schemas.CronExpression):
- print(cron_expression)
- cron = joint_cron_expression(cron_expression)
- return cron
- @router.get("/cron_next_execute")
- @web_try()
- @sxtimeit
- def get_cron_next_execute(cron_expression: str):
- execute_list = run_get_next_time(cron_expression)
- return execute_list
- def run_get_next_time(cron_expression):
- now = datetime.datetime.now()
- cron_str = cron_expression.replace('?','*')
- cron = croniter.croniter(cron_str, now)
- execute_list = []
- for i in range(0, 5):
- next_time = cron.get_next(datetime.datetime).strftime("%Y-%m-%d %H:%M")
- execute_list.append(next_time)
- return execute_list
|