jm_job_info.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. from typing import Optional, List
  2. from fastapi import APIRouter
  3. from fastapi import Depends
  4. from sqlalchemy.orm import Session
  5. from app import models, schemas
  6. import app.crud as crud
  7. from utils.sx_time import sxtimeit
  8. from utils.sx_web import web_try
  9. from fastapi_pagination import Page, add_pagination, paginate, Params
  10. from app import get_db
  11. router = APIRouter(
  12. prefix="/jpt/jm_job_info",
  13. tags=["jm_job_info-定时任务管理"],
  14. )
  15. @router.post("/")
  16. @web_try()
  17. @sxtimeit
  18. def create_jm_job_info(item: schemas.JmJobInfoCreate, db: Session = Depends(get_db)):
  19. jm_job_info,nodes,edges = crud.create_jm_job_info(db, item)
  20. job_id = jm_job_info.id
  21. if edges is not None and len(edges) > 0:
  22. nodes_dict = {node['id']:node for node in nodes}
  23. for node_id in nodes_dict:
  24. node = nodes_dict[node_id]
  25. if node['start_point'] == 0:
  26. next_nodes = format_nodes(nodes_dict,edges,node_id)
  27. node['child_nodes'] = next_nodes
  28. node_tree = node
  29. break
  30. node = models.JmJobNode(**{
  31. 'job_id': job_id,
  32. 'homework_id': node_tree['homework_id'],
  33. 'homework_name': node_tree['homework_name'],
  34. 'start_point': node['start_point'],
  35. })
  36. node = crud.create_jm_job_node(db,node)
  37. if edges is not None and len(edges) > 0:
  38. create_jm_job_node(db,node_tree['child_nodes'], job_id, node.id)
  39. else:
  40. node = models.JmJobNode(**{
  41. 'job_id': job_id,
  42. 'homework_id': nodes[0]['homework_id'],
  43. 'homework_name': nodes[0]['homework_name'],
  44. 'start_point': nodes[0]['start_point'],
  45. })
  46. node = crud.create_jm_job_node(db,node)
  47. return jm_job_info.to_dict()
  48. def create_jm_job_node(db: Session, nodes, job_id, last_node_id):
  49. if nodes is None or len(nodes) == 0:
  50. return
  51. for node in nodes:
  52. next_node = models.JmJobNode(**{
  53. 'job_id': job_id,
  54. 'homework_id': node['homework_id'],
  55. 'homework_name': node['homework_name'],
  56. 'start_point': node['start_point'],
  57. })
  58. next_node = crud.create_jm_job_node(db,next_node)
  59. out_node_id = next_node.id
  60. edge = models.JmJobEdge(**{
  61. 'in_node_id': last_node_id,
  62. 'out_node_id': out_node_id,
  63. 'job_id': job_id,
  64. })
  65. edge = crud.create_jm_job_edge(db,edge)
  66. create_jm_job_node(db, node['child_nodes'], job_id, out_node_id)
  67. return
  68. def format_nodes(nodes_dict,edges: List[schemas.JmJobEdge],last_node: int):
  69. nodes = []
  70. for edge in edges:
  71. if edge['source'] == last_node:
  72. node = nodes_dict[edge['target']]
  73. next_nodes = format_nodes(nodes_dict,edges,edge['target'])
  74. node['child_nodes'] = next_nodes
  75. nodes.append(node)
  76. return nodes
  77. @router.get("/")
  78. @web_try()
  79. @sxtimeit
  80. def get_jm_job_infos(db: Session = Depends(get_db)):
  81. res_list = []
  82. jm_job_list = crud.get_jm_job_infos(db)
  83. for jm_job in jm_job_list:
  84. history = crud.get_one_job_historys(db, jm_job.id)
  85. jm_job_dict = jm_job.to_dict()
  86. jm_job_dict.update({'history':history[0:10]})
  87. res_list.append(jm_job_dict)
  88. return res_list
  89. @router.get("/info")
  90. @web_try()
  91. @sxtimeit
  92. def get_jm_job_info(jm_job_id: int, db: Session = Depends(get_db)):
  93. jm_job = crud.get_jm_job_info(db,jm_job_id)
  94. jm_job_dict = jm_job.to_dict()
  95. nodes = crud.get_one_job_nodes(db,jm_job_id)
  96. edges = crud.get_one_job_edges(db,jm_job_id)
  97. edges_list = [
  98. {
  99. 'id': edge.id,
  100. 'job_id': edge.job_id,
  101. 'source': edge.in_node_id,
  102. 'target': edge.out_node_id,
  103. }
  104. for edge in edges
  105. ]
  106. jm_job_dict.update({
  107. 'nodes': nodes,
  108. 'edges': edges_list,
  109. })
  110. return jm_job_dict
  111. @router.put("/")
  112. @web_try()
  113. @sxtimeit
  114. def update_jm_job_info(item: schemas.JmJobInfoUpdate, db: Session = Depends(get_db)):
  115. jm_job_info,nodes,edges = crud.update_jm_job_info(db, item)
  116. job_id = jm_job_info.id
  117. crud.delete_job_node(db, job_id)
  118. if edges is not None and len(edges) > 0:
  119. nodes_dict = {node['id']:node for node in nodes}
  120. for node_id in nodes_dict:
  121. node = nodes_dict[node_id]
  122. if node['start_point'] == 0:
  123. next_nodes = format_nodes(nodes_dict,edges,node_id)
  124. node['child_nodes'] = next_nodes
  125. node_tree = node
  126. break
  127. node = models.JmJobNode(**{
  128. 'job_id': job_id,
  129. 'homework_id': node_tree['homework_id'],
  130. 'homework_name': node_tree['homework_name'],
  131. 'start_point': node['start_point'],
  132. })
  133. node = crud.create_jm_job_node(db,node)
  134. if edges is not None and len(edges) > 0:
  135. create_jm_job_node(db,node_tree['child_nodes'], job_id, node.id)
  136. else:
  137. node = models.JmJobNode(**{
  138. 'job_id': job_id,
  139. 'homework_id': nodes[0]['homework_id'],
  140. 'homework_name': nodes[0]['homework_name'],
  141. 'start_point': nodes[0]['start_point'],
  142. })
  143. node = crud.create_jm_job_node(db,node)
  144. return jm_job_info.to_dict()
  145. @router.delete("/")
  146. @web_try()
  147. @sxtimeit
  148. def delete_jm_job_info(jm_job_id: int, db: Session = Depends(get_db)):
  149. return crud.delete_jm_job_info(db,jm_job_id)
  150. @router.put("/status")
  151. @web_try()
  152. @sxtimeit
  153. def update_jm_job_status(item: schemas.JmJobInfoStatusUpdate, db: Session = Depends(get_db)):
  154. return crud.update_jm_job_status(db,item)
  155. @router.post("/execute/{jm_job_id}")
  156. @web_try()
  157. @sxtimeit
  158. def execute_jm_job(jm_job_id: int, db: Session = Depends(get_db)):
  159. jm_job = crud.get_jm_job_info(db,jm_job_id)
  160. if jm_job.status == 0:
  161. raise Exception('任务已被停用')
  162. # 进行api调用
  163. return jm_job