123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- from typing import Optional, List
- from fastapi import APIRouter
- from fastapi import Depends
- from sqlalchemy.orm import Session
- from app import models, schemas
- import app.crud as crud
- from utils.sx_time import sxtimeit
- from utils.sx_web import web_try
- from fastapi_pagination import Page, add_pagination, paginate, Params
- from app import get_db
- router = APIRouter(
- prefix="/jpt/jm_job_info",
- tags=["jm_job_info-定时任务管理"],
- )
- @router.post("/")
- @web_try()
- @sxtimeit
- def create_jm_job_info(item: schemas.JmJobInfoCreate, db: Session = Depends(get_db)):
- jm_job_info,nodes,edges = crud.create_jm_job_info(db, item)
- job_id = jm_job_info.id
- if edges is not None and len(edges) > 0:
- nodes_dict = {node['id']:node for node in nodes}
- for node_id in nodes_dict:
- node = nodes_dict[node_id]
- if node['start_point'] == 0:
- next_nodes = format_nodes(nodes_dict,edges,node_id)
- node['child_nodes'] = next_nodes
- node_tree = node
- break
- node = models.JmJobNode(**{
- 'job_id': job_id,
- 'homework_id': node_tree['homework_id'],
- 'homework_name': node_tree['homework_name'],
- 'start_point': node['start_point'],
- })
- node = crud.create_jm_job_node(db,node)
- if edges is not None and len(edges) > 0:
- create_jm_job_node(db,node_tree['child_nodes'], job_id, node.id)
- else:
- node = models.JmJobNode(**{
- 'job_id': job_id,
- 'homework_id': nodes[0]['homework_id'],
- 'homework_name': nodes[0]['homework_name'],
- 'start_point': nodes[0]['start_point'],
- })
- node = crud.create_jm_job_node(db,node)
- return jm_job_info.to_dict()
- def create_jm_job_node(db: Session, nodes, job_id, last_node_id):
- if nodes is None or len(nodes) == 0:
- return
- for node in nodes:
- next_node = models.JmJobNode(**{
- 'job_id': job_id,
- 'homework_id': node['homework_id'],
- 'homework_name': node['homework_name'],
- 'start_point': node['start_point'],
- })
- next_node = crud.create_jm_job_node(db,next_node)
- out_node_id = next_node.id
- edge = models.JmJobEdge(**{
- 'in_node_id': last_node_id,
- 'out_node_id': out_node_id,
- 'job_id': job_id,
- })
- edge = crud.create_jm_job_edge(db,edge)
- create_jm_job_node(db, node['child_nodes'], job_id, out_node_id)
- return
- def format_nodes(nodes_dict,edges: List[schemas.JmJobEdge],last_node: int):
- nodes = []
- for edge in edges:
- if edge['source'] == last_node:
- node = nodes_dict[edge['target']]
- next_nodes = format_nodes(nodes_dict,edges,edge['target'])
- node['child_nodes'] = next_nodes
- nodes.append(node)
- return nodes
- @router.get("/")
- @web_try()
- @sxtimeit
- def get_jm_job_infos(db: Session = Depends(get_db)):
- res_list = []
- jm_job_list = crud.get_jm_job_infos(db)
- for jm_job in jm_job_list:
- history = crud.get_one_job_historys(db, jm_job.id)
- jm_job_dict = jm_job.to_dict()
- jm_job_dict.update({'history':history[0:10]})
- res_list.append(jm_job_dict)
- return res_list
- @router.get("/info")
- @web_try()
- @sxtimeit
- def get_jm_job_info(jm_job_id: int, db: Session = Depends(get_db)):
- jm_job = crud.get_jm_job_info(db,jm_job_id)
- jm_job_dict = jm_job.to_dict()
- nodes = crud.get_one_job_nodes(db,jm_job_id)
- edges = crud.get_one_job_edges(db,jm_job_id)
- edges_list = [
- {
- 'id': edge.id,
- 'job_id': edge.job_id,
- 'source': edge.in_node_id,
- 'target': edge.out_node_id,
- }
- for edge in edges
- ]
- jm_job_dict.update({
- 'nodes': nodes,
- 'edges': edges_list,
- })
- return jm_job_dict
- @router.put("/")
- @web_try()
- @sxtimeit
- def update_jm_job_info(item: schemas.JmJobInfoUpdate, db: Session = Depends(get_db)):
- jm_job_info,nodes,edges = crud.update_jm_job_info(db, item)
- job_id = jm_job_info.id
- crud.delete_job_node(db, job_id)
- if edges is not None and len(edges) > 0:
- nodes_dict = {node['id']:node for node in nodes}
- for node_id in nodes_dict:
- node = nodes_dict[node_id]
- if node['start_point'] == 0:
- next_nodes = format_nodes(nodes_dict,edges,node_id)
- node['child_nodes'] = next_nodes
- node_tree = node
- break
- node = models.JmJobNode(**{
- 'job_id': job_id,
- 'homework_id': node_tree['homework_id'],
- 'homework_name': node_tree['homework_name'],
- 'start_point': node['start_point'],
- })
- node = crud.create_jm_job_node(db,node)
- if edges is not None and len(edges) > 0:
- create_jm_job_node(db,node_tree['child_nodes'], job_id, node.id)
- else:
- node = models.JmJobNode(**{
- 'job_id': job_id,
- 'homework_id': nodes[0]['homework_id'],
- 'homework_name': nodes[0]['homework_name'],
- 'start_point': nodes[0]['start_point'],
- })
- node = crud.create_jm_job_node(db,node)
- return jm_job_info.to_dict()
- @router.delete("/")
- @web_try()
- @sxtimeit
- def delete_jm_job_info(jm_job_id: int, db: Session = Depends(get_db)):
- return crud.delete_jm_job_info(db,jm_job_id)
- @router.put("/status")
- @web_try()
- @sxtimeit
- def update_jm_job_status(item: schemas.JmJobInfoStatusUpdate, db: Session = Depends(get_db)):
- return crud.update_jm_job_status(db,item)
- @router.post("/execute/{jm_job_id}")
- @web_try()
- @sxtimeit
- def execute_jm_job(jm_job_id: int, db: Session = Depends(get_db)):
- jm_job = crud.get_jm_job_info(db,jm_job_id)
- if jm_job.status == 0:
- raise Exception('任务已被停用')
- # 进行api调用
- return jm_job
|