job_info.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. from pyexpat import model
  2. import time
  3. from typing import Optional
  4. from fastapi import APIRouter
  5. from fastapi import Depends
  6. from sqlalchemy.orm import Session
  7. from app import models, page_help, schemas
  8. import app.crud as crud
  9. from app.crud import job_info
  10. from app.services.datax import datax_create_job, datax_update_job
  11. from app.utils.cron_utils import parsing_cron_expression
  12. from app.utils.send_util import *
  13. from app.utils.utils import *
  14. from utils.sx_time import sxtimeit
  15. from utils.sx_web import web_try
  16. from fastapi_pagination import Page, add_pagination, paginate, Params
  17. from app import get_db, get_page
  18. router = APIRouter(
  19. prefix="/jpt/jobinfo",
  20. tags=["jobinfo-任务管理"],
  21. )
  22. @router.post("/")
  23. @web_try()
  24. @sxtimeit
  25. def create_job_info(item: schemas.JobInfoCreate, db: Session = Depends(get_db)):
  26. return crud.create_job_info(db, item)
  27. @router.get("/")
  28. @web_try()
  29. @sxtimeit
  30. def get_job_infos(params: Params=Depends(get_page), db: Session = Depends(get_db)):
  31. return page_help(crud.get_job_infos(db),params['page'],params['size'])
  32. @router.get("/info")
  33. @web_try()
  34. @sxtimeit
  35. def get_job_info(job_id: int, db: Session = Depends(get_db)):
  36. job_info = crud.get_job_info(db, job_id)
  37. job_info_dict = job_info.to_dict()
  38. cron_select_type, cron_expression = job_info_dict['cron_select_type'], job_info_dict['job_cron']
  39. cron_expression_dict = parsing_cron_expression(cron_expression)
  40. cron_expression_dict.update({
  41. 'cron_select_type': cron_select_type,
  42. 'cron_expression': cron_expression
  43. })
  44. job_info_dict.update({
  45. 'cron_expression_dict': cron_expression_dict
  46. })
  47. partition_info_str = job_info_dict.pop('partition_info', None)
  48. if partition_info_str:
  49. partition_list = partition_info_str.split(',')
  50. job_info_dict.update({
  51. 'partition_info': partition_list[0],
  52. 'partition_time': partition_list[2],
  53. 'partition_num': int(partition_list[1])
  54. })
  55. return job_info_dict
  56. @router.put("/{id}")
  57. @web_try()
  58. @sxtimeit
  59. def update_datasource(id: int, update_item: schemas.JobInfoUpdate, db: Session = Depends(get_db)):
  60. job_info = crud.update_job_info(db, id, update_item)
  61. return job_info
  62. @router.put("/update_trigger_status/")
  63. @web_try()
  64. @sxtimeit
  65. def update_trigger_status(item: schemas.JobInfoTriggerStatus, db: Session = Depends(get_db)):
  66. job_info = crud.get_job_info(db, item.id)
  67. relation = crud.get_af_id(db, job_info.id, 'datax')
  68. job_info.trigger_status = item.trigger_status
  69. if not relation:
  70. datax_create_job(job_info,db)
  71. else:
  72. datax_update_job(job_info,db)
  73. job_info = crud.update_job_trigger_status(db, item.id, item.trigger_status)
  74. return job_info
  75. @router.delete("/{job_id}")
  76. @web_try()
  77. @sxtimeit
  78. def delete_job_info(job_id: int, db: Session = Depends(get_db)):
  79. relation = crud.get_af_id(db, job_id, 'datax')
  80. send_delete('/af/af_job', relation.af_id)
  81. return crud.delete_job_info(db, job_id)
  82. @router.post("/execute")
  83. @web_try()
  84. @sxtimeit
  85. def execute_job_info(job_id: int, db: Session = Depends(get_db)):
  86. jm_job = crud.get_job_info(db, job_id)
  87. if jm_job.trigger_status == 0:
  88. raise Exception('任务已被停用')
  89. relation = crud.get_af_id(db, job_id, 'datax')
  90. res = send_execute(relation.af_id)
  91. return res['data']
  92. add_pagination(router)