job_info.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. import time
  2. from typing import Optional
  3. from fastapi import APIRouter
  4. from fastapi import Depends
  5. from sqlalchemy.orm import Session
  6. from app import models, page_help, schemas
  7. from app.common.security.auth import verify_users
  8. import app.crud as crud
  9. from app.crud import job_info
  10. from app.services.datax import datax_create_job, datax_update_job, on_off_control
  11. from app.services.job_info import create_job_info_services, execute_job_services, update_job_info_services
  12. from app.utils.cron_utils import parsing_cron_expression
  13. from app.utils.send_util import *
  14. from app.utils.utils import *
  15. from utils.sx_time import sxtimeit
  16. from utils.sx_web import web_try
  17. from fastapi_pagination import Page, add_pagination, paginate, Params
  18. from app import get_db, get_page
  19. router = APIRouter(
  20. prefix="/jpt/jobinfo",
  21. tags=["jobinfo-任务管理"],
  22. )
  23. @router.post("/")
  24. @web_try()
  25. @sxtimeit
  26. def create_job_info(item: schemas.JobInfoCreate, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  27. return create_job_info_services(db,item)
  28. @router.get("/")
  29. @web_try()
  30. @sxtimeit
  31. def get_job_infos(params: Params=Depends(get_page), token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  32. return page_help(crud.get_job_infos(db),params['page'],params['size'])
  33. @router.get("/info")
  34. @web_try()
  35. @sxtimeit
  36. def get_job_info(job_id: int, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  37. job_info = crud.get_job_info(db, job_id)
  38. job_info_dict = job_info.to_dict()
  39. cron_select_type, cron_expression = job_info_dict['cron_select_type'], job_info_dict['job_cron']
  40. cron_expression_dict = parsing_cron_expression(cron_expression)
  41. cron_expression_dict.update({
  42. 'cron_select_type': cron_select_type,
  43. 'cron_expression': cron_expression
  44. })
  45. job_info_dict.update({
  46. 'cron_expression_dict': cron_expression_dict
  47. })
  48. partition_info_str = job_info_dict.pop('partition_info', None)
  49. if partition_info_str:
  50. partition_list = partition_info_str.split(',')
  51. job_info_dict.update({
  52. 'partition_info': partition_list[0],
  53. 'partition_time': partition_list[2],
  54. 'partition_num': int(partition_list[1])
  55. })
  56. return job_info_dict
  57. @router.put("/{id}")
  58. @web_try()
  59. @sxtimeit
  60. def update_datasource(id: int, update_item: schemas.JobInfoUpdate, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  61. return update_job_info_services(db, id, update_item)
  62. @router.put("/update_trigger_status/")
  63. @web_try()
  64. @sxtimeit
  65. def update_trigger_status(item: schemas.JobInfoTriggerStatus, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  66. job_info: models.JobInfo = crud.get_job_info(db, item.id)
  67. relation = crud.get_af_id(db, job_info.id, 'datax')
  68. job_info.trigger_status = item.trigger_status
  69. on_off_control(relation.af_id, item.trigger_status)
  70. job_info = crud.update_job_trigger_status(db, item.id, item.trigger_status)
  71. return job_info
  72. @router.delete("/{job_id}")
  73. @web_try()
  74. @sxtimeit
  75. def delete_job_info(job_id: int, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  76. jm_job: models.JobInfo = crud.get_job_info(db, job_id)
  77. if jm_job.trigger_status == 1:
  78. raise Exception('任务未停用,不可删除')
  79. relation = crud.get_af_id(db, job_id, 'datax')
  80. send_delete('/af/af_job', relation.af_id)
  81. return crud.delete_job_info(db, job_id)
  82. @router.post("/execute")
  83. @web_try()
  84. @sxtimeit
  85. def execute_job_info(job_id: int, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
  86. jm_job: models.JobInfo = crud.get_job_info(db, job_id)
  87. if jm_job.trigger_status == 0:
  88. raise Exception('任务已被停用')
  89. res = execute_job_services(db, job_id)
  90. return res['data']
  91. add_pagination(router)