jm_job_info.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. import datetime
  2. import croniter
  3. import re
  4. from typing import Optional, List
  5. from fastapi import APIRouter
  6. from fastapi import Depends
  7. from sqlalchemy.orm import Session
  8. from app import models, schemas
  9. import app.crud as crud
  10. from app.schemas import cron_expression
  11. from app.services.jm_job import on_off_control
  12. from app.services.jm_job_info import create_jm_job_info_services, execute_job_services, update_jm_job_info_services, update_jm_job_status_services
  13. from app.utils.cron_utils import *
  14. from app.utils.send_util import send_delete, send_execute
  15. from utils.sx_time import sxtimeit
  16. from utils.sx_web import web_try
  17. from fastapi_pagination import Page, add_pagination, paginate, Params
  18. from app import get_db
  19. router = APIRouter(
  20. prefix="/jpt/jm_job_info",
  21. tags=["jm_job_info-定时任务管理"],
  22. )
  23. @router.post("/")
  24. @web_try()
  25. @sxtimeit
  26. def create_jm_job_info(item: schemas.JmJobInfoCreate, db: Session = Depends(get_db)):
  27. return create_jm_job_info_services(db, item)
  28. @router.get("/")
  29. @web_try()
  30. @sxtimeit
  31. def get_jm_job_infos(db: Session = Depends(get_db)):
  32. res_list = []
  33. jm_job_list = crud.get_jm_job_infos(db)
  34. jm_job_ids = [job.id for job in jm_job_list]
  35. relations = crud.get_af_ids(db,jm_job_ids, 'job')
  36. af_to_datax = {relation.af_id:relation.se_id for relation in relations}
  37. af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys())
  38. res = {}
  39. for af_job_run in af_job_runs:
  40. tasks = list(af_job_run.details['tasks'].values()) if len(list(af_job_run.details['tasks'].values()))>0 else []
  41. if len(tasks) > 0:
  42. task = tasks[-1]
  43. task.pop('log',None)
  44. job_id = af_to_datax[int(af_job_run.job_id)]
  45. log = {
  46. "id": af_job_run.id,
  47. "job_id": job_id,
  48. "af_job_id": int(af_job_run.job_id),
  49. "run_id": af_job_run.af_run_id,
  50. "trigger_time": af_job_run.start_time,
  51. "trigger_result": 1 if task else 0,
  52. "execute_time": task['start_time'] if task else 0,
  53. "execute_result": af_job_run.status,
  54. "end_time": task['end_time'] if task else 0,
  55. }
  56. if job_id in res.keys():
  57. res[job_id].append(log)
  58. else:
  59. res.update({job_id: [log]})
  60. for jm_job in jm_job_list:
  61. history = res[jm_job.id] if jm_job.id in res.keys() else []
  62. history.sort(key=lambda x: x['trigger_time'], reverse=True)
  63. jm_job_dict = jm_job.to_dict()
  64. jm_job_dict.update({'history':history[0:10]})
  65. res_list.append(jm_job_dict)
  66. return res_list
  67. @router.get("/info")
  68. @web_try()
  69. @sxtimeit
  70. def get_jm_job_info(jm_job_id: int, db: Session = Depends(get_db)):
  71. jm_job = crud.get_jm_job_info(db,jm_job_id)
  72. jm_job_dict = jm_job.to_dict()
  73. nodes = crud.get_one_job_nodes(db,jm_job_id)
  74. cron_type, cron_select_type, cron_expression = jm_job_dict['cron_type'], jm_job_dict['cron_select_type'], jm_job_dict['cron_expression']
  75. cron_expression_dict = {}
  76. if cron_type == 2:
  77. cron_expression_dict = parsing_cron_expression(cron_expression)
  78. cron_expression_dict.update({
  79. 'cron_select_type': cron_select_type,
  80. 'cron_expression': cron_expression
  81. })
  82. jm_job_dict.update({
  83. 'nodes': nodes,
  84. 'cron_expression_dict': cron_expression_dict
  85. })
  86. return jm_job_dict
  87. @router.put("/")
  88. @web_try()
  89. @sxtimeit
  90. def update_jm_job_info(item: schemas.JmJobInfoUpdate, db: Session = Depends(get_db)):
  91. return update_jm_job_info_services(db, item)
  92. @router.delete("/")
  93. @web_try()
  94. @sxtimeit
  95. def delete_jm_job_info(jm_job_id: int, db: Session = Depends(get_db)):
  96. relation = crud.get_af_id(db, jm_job_id, 'job')
  97. send_delete('/af/af_job', relation.af_id)
  98. return crud.delete_jm_job_info(db,jm_job_id)
  99. @router.put("/status")
  100. @web_try()
  101. @sxtimeit
  102. def update_jm_job_status(item: schemas.JmJobInfoStatusUpdate, db: Session = Depends(get_db)):
  103. return update_jm_job_status_services(db, item.id, item.status)
  104. @router.post("/execute/{jm_job_id}")
  105. @web_try()
  106. @sxtimeit
  107. def execute_jm_job(jm_job_id: int, db: Session = Depends(get_db)):
  108. jm_job = crud.get_jm_job_info(db,jm_job_id)
  109. if jm_job.status == 0:
  110. raise Exception('任务已被停用')
  111. res = execute_job_services(db,jm_job_id)
  112. return res['data']
  113. @router.post("/cron_expression")
  114. @web_try()
  115. @sxtimeit
  116. def get_cron_expression(cron_expression: schemas.CronExpression):
  117. print(cron_expression)
  118. cron = joint_cron_expression(cron_expression)
  119. return cron
  120. @router.get("/cron_next_execute")
  121. @web_try()
  122. @sxtimeit
  123. def get_cron_next_execute(cron_expression: str):
  124. execute_list = run_get_next_time(cron_expression)
  125. return execute_list
  126. def run_get_next_time(cron_expression):
  127. now = datetime.datetime.now()
  128. cron_str = cron_expression.replace('?','*')
  129. cron = croniter.croniter(cron_str, now)
  130. execute_list = []
  131. for i in range(0, 5):
  132. next_time = cron.get_next(datetime.datetime).strftime("%Y-%m-%d %H:%M")
  133. execute_list.append(next_time)
  134. return execute_list