job_info.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. from pyexpat import model
  2. import time
  3. from typing import Optional
  4. from fastapi import APIRouter
  5. from fastapi import Depends
  6. from sqlalchemy.orm import Session
  7. from app import models, page_help, schemas
  8. import app.crud as crud
  9. from app.crud import job_info
  10. from app.services.datax import datax_create_job, datax_update_job, on_off_control
  11. from app.services.job_info import create_job_info_services, execute_job_services, update_job_info_services
  12. from app.utils.cron_utils import parsing_cron_expression
  13. from app.utils.send_util import *
  14. from app.utils.utils import *
  15. from utils.sx_time import sxtimeit
  16. from utils.sx_web import web_try
  17. from fastapi_pagination import Page, add_pagination, paginate, Params
  18. from app import get_db, get_page
  19. router = APIRouter(
  20. prefix="/jpt/jobinfo",
  21. tags=["jobinfo-任务管理"],
  22. )
  23. @router.post("/")
  24. @web_try()
  25. @sxtimeit
  26. def create_job_info(item: schemas.JobInfoCreate, db: Session = Depends(get_db)):
  27. return create_job_info_services(db,item)
  28. @router.get("/")
  29. @web_try()
  30. @sxtimeit
  31. def get_job_infos(params: Params=Depends(get_page), db: Session = Depends(get_db)):
  32. return page_help(crud.get_job_infos(db),params['page'],params['size'])
  33. @router.get("/info")
  34. @web_try()
  35. @sxtimeit
  36. def get_job_info(job_id: int, db: Session = Depends(get_db)):
  37. job_info = crud.get_job_info(db, job_id)
  38. job_info_dict = job_info.to_dict()
  39. cron_select_type, cron_expression = job_info_dict['cron_select_type'], job_info_dict['job_cron']
  40. cron_expression_dict = parsing_cron_expression(cron_expression)
  41. cron_expression_dict.update({
  42. 'cron_select_type': cron_select_type,
  43. 'cron_expression': cron_expression
  44. })
  45. job_info_dict.update({
  46. 'cron_expression_dict': cron_expression_dict
  47. })
  48. partition_info_str = job_info_dict.pop('partition_info', None)
  49. if partition_info_str:
  50. partition_list = partition_info_str.split(',')
  51. job_info_dict.update({
  52. 'partition_info': partition_list[0],
  53. 'partition_time': partition_list[2],
  54. 'partition_num': int(partition_list[1])
  55. })
  56. return job_info_dict
  57. @router.put("/{id}")
  58. @web_try()
  59. @sxtimeit
  60. def update_datasource(id: int, update_item: schemas.JobInfoUpdate, db: Session = Depends(get_db)):
  61. return update_job_info_services(db, id, update_item)
  62. @router.put("/update_trigger_status/")
  63. @web_try()
  64. @sxtimeit
  65. def update_trigger_status(item: schemas.JobInfoTriggerStatus, db: Session = Depends(get_db)):
  66. job_info = crud.get_job_info(db, item.id)
  67. relation = crud.get_af_id(db, job_info.id, 'datax')
  68. job_info.trigger_status = item.trigger_status
  69. on_off_control(relation.af_id, item.trigger_status)
  70. job_info = crud.update_job_trigger_status(db, item.id, item.trigger_status)
  71. return job_info
  72. @router.delete("/{job_id}")
  73. @web_try()
  74. @sxtimeit
  75. def delete_job_info(job_id: int, db: Session = Depends(get_db)):
  76. jm_job = crud.get_job_info(db, job_id)
  77. if jm_job.trigger_status == 1:
  78. raise Exception('任务未停用,不可删除')
  79. relation = crud.get_af_id(db, job_id, 'datax')
  80. send_delete('/af/af_job', relation.af_id)
  81. return crud.delete_job_info(db, job_id)
  82. @router.post("/execute")
  83. @web_try()
  84. @sxtimeit
  85. def execute_job_info(job_id: int, db: Session = Depends(get_db)):
  86. jm_job = crud.get_job_info(db, job_id)
  87. if jm_job.trigger_status == 0:
  88. raise Exception('任务已被停用')
  89. res = execute_job_services(db, job_id)
  90. return res['data']
  91. add_pagination(router)