jm_job_info.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. import base64
  2. import re
  3. from typing import Optional, List
  4. from fastapi import APIRouter, Depends, Header
  5. from sqlalchemy.orm import Session
  6. from app import models, schemas
  7. from app.common.security.auth import verify_users
  8. import app.crud as crud
  9. from app.schemas import cron_expression
  10. from app.services.jm_job import on_off_control
  11. from app.services.jm_job_info import create_jm_job_info_services, execute_job_services, get_all_timeout_jobs_services, get_requirements_status_by_job_id, update_jm_job_info_services, update_jm_job_status_services
  12. from app.utils.cron_utils import *
  13. from app.utils.send_util import send_delete, send_execute
  14. from utils.sx_time import sxtimeit
  15. from utils.sx_web import web_try
  16. from fastapi_pagination import Page, add_pagination, paginate, Params
  17. from configs.settings import DefaultOption, config
  18. job_timeout = int(config.get('JOB_CONFIG', 'timeout'))
  19. job_auth_token = config.get('JOB_CONFIG', 'auth_token')
  20. from app import get_db
  21. router = APIRouter(
  22. prefix="/jpt/jm_job_info",
  23. tags=["jm_job_info-定时任务管理"],
  24. )
  25. @router.post("/")
  26. @web_try()
  27. @sxtimeit
  28. def create_jm_job_info(item: schemas.JmJobInfoCreate, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  29. return create_jm_job_info_services(db, item)
  30. @router.get("/")
  31. @web_try()
  32. @sxtimeit
  33. def get_jm_job_infos(token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  34. res_list = []
  35. jm_job_list = crud.get_jm_job_infos(db)
  36. jm_job_ids = [job.id for job in jm_job_list]
  37. relations = crud.get_af_ids(db,jm_job_ids, 'job')
  38. af_to_datax = {relation.af_id:relation.se_id for relation in relations}
  39. af_job_runs = crud.get_airflow_runs_by_af_job_ids(db, af_to_datax.keys())
  40. res = {}
  41. for af_job_run in af_job_runs:
  42. if af_job_run.status in [2,3]:
  43. job_id = af_to_datax[int(af_job_run.job_id)]
  44. log = {
  45. "start_time": af_job_run.start_time,
  46. "execute_result": af_job_run.status,
  47. }
  48. if job_id in res.keys():
  49. res[job_id].append(log)
  50. else:
  51. res.update({job_id: [log]})
  52. for jm_job in jm_job_list:
  53. # 历史运行
  54. history = res[jm_job.id] if jm_job.id in res.keys() else []
  55. history.sort(key=lambda x: x['start_time'], reverse=True)
  56. jm_job_dict = jm_job.to_dict()
  57. history = history[0:10]
  58. history.sort(key=lambda x: x['start_time'], reverse=False)
  59. jm_job_dict.update({'history':history})
  60. print(jm_job)
  61. requirements_status = get_requirements_status_by_job_id(db,jm_job.id)
  62. jm_job_dict.update({'requirements_status': 0 if requirements_status else 1})
  63. res_list.append(jm_job_dict)
  64. return res_list
  65. @router.get("/info")
  66. @web_try()
  67. @sxtimeit
  68. def get_jm_job_info(jm_job_id: int, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  69. jm_job = crud.get_jm_job_info(db,jm_job_id)
  70. jm_job_dict = jm_job.to_dict()
  71. nodes = crud.get_one_job_nodes(db,jm_job_id)
  72. cron_type, cron_select_type, cron_expression = jm_job_dict['cron_type'], jm_job_dict['cron_select_type'], jm_job_dict['cron_expression']
  73. cron_expression_dict = {}
  74. if cron_type == 2:
  75. cron_expression_dict = parsing_cron_expression(cron_expression)
  76. cron_expression_dict.update({
  77. 'cron_select_type': cron_select_type,
  78. 'cron_expression': cron_expression
  79. })
  80. jm_job_dict.update({
  81. 'nodes': nodes,
  82. 'cron_expression_dict': cron_expression_dict
  83. })
  84. return jm_job_dict
  85. @router.put("/")
  86. @web_try()
  87. @sxtimeit
  88. def update_jm_job_info(item: schemas.JmJobInfoUpdate, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  89. return update_jm_job_info_services(db, item)
  90. @router.delete("/")
  91. @web_try()
  92. @sxtimeit
  93. def delete_jm_job_info(jm_job_id: int, db: Session = Depends(get_db)):
  94. jm_job = crud.get_jm_job_info(db,jm_job_id)
  95. if jm_job.status == 1:
  96. raise Exception('任务未停用,不可删除')
  97. relation = crud.get_af_id(db, jm_job_id, 'job')
  98. send_delete('/af/af_job', relation.af_id)
  99. return crud.delete_jm_job_info(db,jm_job_id)
  100. @router.put("/status")
  101. @web_try()
  102. @sxtimeit
  103. def update_jm_job_status(item: schemas.JmJobInfoStatusUpdate, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  104. return update_jm_job_status_services(db, item.id, item.status)
  105. @router.post("/execute/{jm_job_id}")
  106. @web_try()
  107. @sxtimeit
  108. def execute_jm_job(jm_job_id: int, db: Session = Depends(get_db)):
  109. jm_job = crud.get_jm_job_info(db,jm_job_id)
  110. if jm_job.status == 0:
  111. raise Exception('任务已被停用')
  112. res = execute_job_services(db,jm_job_id)
  113. return res['data']
  114. @router.get("/api/{jm_job_id}")
  115. @web_try()
  116. @sxtimeit
  117. def copy_api_path(jm_job_id: int, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  118. initial = str(jm_job_id).encode('utf-8')
  119. encryption_id = base64.b64encode(initial).decode('utf-8')
  120. return f'/jpt/jm_job_info/api_execute/{encryption_id}'
  121. @router.get("/api_execute/{encryption_id}")
  122. @web_try()
  123. @sxtimeit
  124. def api_execute_jm_job(encryption_id: str, db: Session = Depends(get_db)):
  125. jm_job_id = 0
  126. try:
  127. initial = base64.b64decode(encryption_id)
  128. jm_job_id = int(initial.decode('utf-8'))
  129. except Exception as e:
  130. raise Exception('任务路径解析失败')
  131. jm_job = crud.get_jm_job_info(db,jm_job_id)
  132. if jm_job.status == 0:
  133. raise Exception('任务已被停用')
  134. res = execute_job_services(db,jm_job_id)
  135. return res['data']
  136. @router.post("/cron_expression")
  137. @web_try()
  138. @sxtimeit
  139. def get_cron_expression(cron_expression: schemas.CronExpression, token_data: schemas.TokenData = Depends(verify_users)):
  140. cron = joint_cron_expression(cron_expression)
  141. return cron
  142. @router.get("/cron_next_execute")
  143. @web_try()
  144. @sxtimeit
  145. def get_cron_next_execute(cron_expression: str, token_data: schemas.TokenData = Depends(verify_users)):
  146. execute_list = run_get_next_time(cron_expression)
  147. return execute_list
  148. @router.get("/cron_next_execute")
  149. @web_try()
  150. @sxtimeit
  151. def get_cron_next_execute(cron_expression: str, token_data: schemas.TokenData = Depends(verify_users)):
  152. execute_list = run_get_next_time(cron_expression)
  153. return execute_list
  154. @router.get("/all_timeout_jobs")
  155. @web_try()
  156. @sxtimeit
  157. def get_all_timeout_jobs(timeout: Optional[int] = job_timeout, auth_token: str = Header(), db: Session = Depends(get_db)):
  158. if auth_token != job_auth_token:
  159. raise Exception('Token校验失败')
  160. return get_all_timeout_jobs_services(db, timeout)