jm_job_info.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. import datetime
  2. import croniter
  3. import re
  4. from typing import Optional, List
  5. from fastapi import APIRouter
  6. from fastapi import Depends
  7. from sqlalchemy.orm import Session
  8. from app import models, schemas
  9. import app.crud as crud
  10. from app.schemas import cron_expression
  11. from app.utils.cron_utils import *
  12. from utils.sx_time import sxtimeit
  13. from utils.sx_web import web_try
  14. from fastapi_pagination import Page, add_pagination, paginate, Params
  15. from app import get_db
  16. router = APIRouter(
  17. prefix="/jpt/jm_job_info",
  18. tags=["jm_job_info-定时任务管理"],
  19. )
  20. @router.post("/")
  21. @web_try()
  22. @sxtimeit
  23. def create_jm_job_info(item: schemas.JmJobInfoCreate, db: Session = Depends(get_db)):
  24. jm_job_info,nodes,edges = crud.create_jm_job_info(db, item)
  25. job_id = jm_job_info.id
  26. create_jm_job_node(db, nodes, edges, job_id)
  27. return jm_job_info.to_dict()
  28. def create_jm_job_node(db: Session, nodes, edges, job_id):
  29. uuid_node_id = {}
  30. if nodes is None or len(nodes) == 0:
  31. return
  32. for node in nodes:
  33. uuid = node['id']
  34. node_item = models.JmJobNode(**{
  35. 'job_id': job_id,
  36. 'homework_id': node['homework_id'],
  37. 'homework_name': node['homework_name'],
  38. 'start_point': 1,
  39. })
  40. node_item = crud.create_jm_job_node(db,node_item)
  41. node_id = node_item.id
  42. uuid_node_id.update({uuid:node_id})
  43. if nodes is None or len(nodes) == 0:
  44. return
  45. for edge in edges:
  46. edge_item = models.JmJobEdge(**{
  47. 'job_id': job_id,
  48. 'in_node_id': uuid_node_id[edge['source']],
  49. 'out_node_id': uuid_node_id[edge['target']]
  50. })
  51. edge = crud.create_jm_job_edge(db,edge_item)
  52. return
  53. @router.get("/")
  54. @web_try()
  55. @sxtimeit
  56. def get_jm_job_infos(db: Session = Depends(get_db)):
  57. res_list = []
  58. jm_job_list = crud.get_jm_job_infos(db)
  59. jm_job_ids = [job.id for job in jm_job_list]
  60. relations = crud.get_af_ids(db,jm_job_ids, 'job')
  61. af_to_datax = {relation.af_id:relation.se_id for relation in relations}
  62. af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys())
  63. res = {}
  64. for af_job_run in af_job_runs:
  65. tasks = list(af_job_run.details['tasks'].values()) if len(list(af_job_run.details['tasks'].values()))>0 else []
  66. if len(tasks) > 0:
  67. task = tasks[-1]
  68. task.pop('log',None)
  69. job_id = af_to_datax[int(af_job_run.job_id)]
  70. log = {
  71. "id": af_job_run.id,
  72. "job_id": job_id,
  73. "af_job_id": int(af_job_run.job_id),
  74. "run_id": af_job_run.run_id,
  75. "trigger_time": af_job_run.start_time,
  76. "trigger_result": 1 if task else 0,
  77. "execute_time": task['start_time'] if task else 0,
  78. "execute_result": 1 if task and task['status'] == 'success' else 0,
  79. "end_time": task['end_time'] if task else 0,
  80. }
  81. if job_id in res.keys():
  82. res[job_id].append(log)
  83. else:
  84. res.update({job_id: [log]})
  85. for jm_job in jm_job_list:
  86. history = res[jm_job.id] if jm_job.id in res.keys() else []
  87. jm_job_dict = jm_job.to_dict()
  88. jm_job_dict.update({'history':history[0:10]})
  89. res_list.append(jm_job_dict)
  90. return res_list
  91. @router.get("/info")
  92. @web_try()
  93. @sxtimeit
  94. def get_jm_job_info(jm_job_id: int, db: Session = Depends(get_db)):
  95. jm_job = crud.get_jm_job_info(db,jm_job_id)
  96. jm_job_dict = jm_job.to_dict()
  97. nodes = crud.get_one_job_nodes(db,jm_job_id)
  98. cron_type, cron_select_type, cron_expression = jm_job_dict['cron_type'], jm_job_dict['cron_select_type'], jm_job_dict['cron_expression']
  99. cron_expression_dict = {}
  100. if cron_type == 2:
  101. cron_expression_dict = parsing_cron_expression(cron_expression)
  102. cron_expression_dict.update({
  103. 'cron_select_type': cron_select_type,
  104. 'cron_expression': cron_expression
  105. })
  106. jm_job_dict.update({
  107. 'nodes': nodes,
  108. 'cron_expression_dict': cron_expression_dict
  109. })
  110. return jm_job_dict
  111. @router.put("/")
  112. @web_try()
  113. @sxtimeit
  114. def update_jm_job_info(item: schemas.JmJobInfoUpdate, db: Session = Depends(get_db)):
  115. jm_job_info,nodes,edges = crud.update_jm_job_info(db, item)
  116. job_id = jm_job_info.id
  117. crud.delete_job_node(db, job_id)
  118. job_id = jm_job_info.id
  119. create_jm_job_node(db, nodes, edges, job_id)
  120. return jm_job_info.to_dict()
  121. @router.delete("/")
  122. @web_try()
  123. @sxtimeit
  124. def delete_jm_job_info(jm_job_id: int, db: Session = Depends(get_db)):
  125. return crud.delete_jm_job_info(db,jm_job_id)
  126. @router.put("/status")
  127. @web_try()
  128. @sxtimeit
  129. def update_jm_job_status(item: schemas.JmJobInfoStatusUpdate, db: Session = Depends(get_db)):
  130. return crud.update_jm_job_status(db,item)
  131. @router.post("/execute/{jm_job_id}")
  132. @web_try()
  133. @sxtimeit
  134. def execute_jm_job(jm_job_id: int, db: Session = Depends(get_db)):
  135. jm_job = crud.get_jm_job_info(db,jm_job_id)
  136. if jm_job.status == 0:
  137. raise Exception('任务已被停用')
  138. # 进行api调用
  139. return jm_job
  140. @router.post("/cron_expression")
  141. @web_try()
  142. @sxtimeit
  143. def get_cron_expression(cron_expression: schemas.CronExpression):
  144. print(cron_expression)
  145. cron = joint_cron_expression(cron_expression)
  146. return cron
  147. @router.get("/cron_next_execute")
  148. @web_try()
  149. @sxtimeit
  150. def get_cron_next_execute(cron_expression: str):
  151. execute_list = run_get_next_time(cron_expression)
  152. return execute_list
  153. def run_get_next_time(cron_expression):
  154. now = datetime.datetime.now()
  155. cron_str = cron_expression.replace('?','*')
  156. cron = croniter.croniter(cron_str, now)
  157. execute_list = []
  158. for i in range(0, 5):
  159. next_time = cron.get_next(datetime.datetime).strftime("%Y-%m-%d %H:%M")
  160. execute_list.append(next_time)
  161. return execute_list