job_info.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. from pyexpat import model
  2. import time
  3. from typing import Optional
  4. from fastapi import APIRouter
  5. from fastapi import Depends
  6. from sqlalchemy.orm import Session
  7. from app import models, schemas
  8. import app.crud as crud
  9. from app.crud import job_info
  10. from app.services.datax import datax_create_job, datax_update_job
  11. from app.utils.cron_utils import parsing_cron_expression
  12. from app.utils.send_util import *
  13. from app.utils.utils import *
  14. from utils.sx_time import sxtimeit
  15. from utils.sx_web import web_try
  16. from fastapi_pagination import Page, add_pagination, paginate, Params
  17. from app import get_db
  18. router = APIRouter(
  19. prefix="/jpt/jobinfo",
  20. tags=["jobinfo-任务管理"],
  21. )
  22. @router.post("/")
  23. @web_try()
  24. @sxtimeit
  25. def create_job_info(item: schemas.JobInfoCreate, db: Session = Depends(get_db)):
  26. return crud.create_job_info(db, item)
  27. @router.get("/")
  28. @web_try()
  29. @sxtimeit
  30. def get_job_infos(params: Params=Depends(), db: Session = Depends(get_db)):
  31. return paginate(crud.get_job_infos(db), params)
  32. @router.get("/info")
  33. @web_try()
  34. @sxtimeit
  35. def get_job_info(job_id: int, db: Session = Depends(get_db)):
  36. job_info = crud.get_job_info(db, job_id)
  37. job_info_dict = job_info.to_dict()
  38. cron_select_type, cron_expression = job_info_dict['cron_select_type'], job_info_dict['job_cron']
  39. cron_expression_dict = parsing_cron_expression(cron_expression)
  40. cron_expression_dict.update({
  41. 'cron_select_type': cron_select_type,
  42. 'cron_expression': cron_expression
  43. })
  44. job_info_dict.update({
  45. 'cron_expression_dict': cron_expression_dict
  46. })
  47. partition_info_str = job_info_dict.pop('partition_info', None)
  48. if partition_info_str:
  49. partition_list = partition_info_str.split(',')
  50. job_info_dict.update({
  51. 'partition_info': partition_list[0],
  52. 'partition_time': partition_list[2],
  53. 'partition_num': int(partition_list[1])
  54. })
  55. return job_info_dict
  56. @router.put("/{id}")
  57. @web_try()
  58. @sxtimeit
  59. def update_datasource(id: int, update_item: schemas.JobInfoUpdate, db: Session = Depends(get_db)):
  60. job_info = crud.update_job_info(db, id, update_item)
  61. relation = crud.get_af_id(db, job_info.id, 'datax')
  62. if not relation:
  63. datax_create_job(job_info,db)
  64. else:
  65. datax_update_job(job_info,db)
  66. return job_info
  67. @router.put("/update_trigger_status/")
  68. @web_try()
  69. @sxtimeit
  70. def update_trigger_status(item: schemas.JobInfoTriggerStatus, db: Session = Depends(get_db)):
  71. job_info = crud.get_job_info(db, item.id)
  72. relation = crud.get_af_id(db, job_info.id, 'datax')
  73. job_info.trigger_status = item.trigger_status
  74. if not relation:
  75. datax_create_job(job_info,db)
  76. else:
  77. datax_update_job(job_info,db)
  78. job_info = crud.update_job_trigger_status(db, item.id, item.trigger_status)
  79. return job_info
  80. @router.delete("/{job_id}")
  81. @web_try()
  82. @sxtimeit
  83. def delete_job_info(job_id: int, db: Session = Depends(get_db)):
  84. relation = crud.get_af_id(db, job_id, 'datax')
  85. send_delete('/jpt/af_job', relation.af_id)
  86. return crud.delete_job_info(db, job_id)
  87. @router.post("/execute")
  88. @web_try()
  89. @sxtimeit
  90. def execute_job_info(job_id: int, db: Session = Depends(get_db)):
  91. jm_job = crud.get_job_info(db, job_id)
  92. if jm_job.trigger_status == 0:
  93. raise Exception('任务已被停用')
  94. relation = crud.get_af_id(db, job_id, 'datax')
  95. res = send_execute(relation.af_id)
  96. return res['data']
  97. add_pagination(router)