123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 |
- import time
- from typing import Optional
- from fastapi import APIRouter
- from fastapi import Depends
- from sqlalchemy.orm import Session
- from app import models, page_help, schemas
- from app.common.decorators import verify_all
- import app.crud as crud
- from app.crud import job_info
- from app.services.datax import datax_create_job, datax_update_job, on_off_control
- from app.services.job_info import create_job_info_services, execute_job_services, update_job_info_services
- from app.utils.cron_utils import parsing_cron_expression
- from app.utils.send_util import *
- from app.utils.utils import *
- from utils.sx_time import sxtimeit
- from utils.sx_web import web_try
- from fastapi_pagination import Page, add_pagination, paginate, Params
- from app import get_db, get_page
- router = APIRouter(
- prefix="/jpt/jobinfo",
- tags=["jobinfo-任务管理"],
- )
- @router.post("/", dependencies=[Depends(verify_all)])
- @web_try()
- @sxtimeit
- def create_job_info(item: schemas.JobInfoCreate, db: Session = Depends(get_db)):
- return create_job_info_services(db,item)
- @router.get("/", dependencies=[Depends(verify_all)])
- @web_try()
- @sxtimeit
- def get_job_infos(params: Params=Depends(get_page), db: Session = Depends(get_db)):
- return page_help(crud.get_job_infos(db),params['page'],params['size'])
- @router.get("/info", dependencies=[Depends(verify_all)])
- @web_try()
- @sxtimeit
- def get_job_info(job_id: int, db: Session = Depends(get_db)):
- job_info = crud.get_job_info(db, job_id)
- job_info_dict = job_info.to_dict()
- cron_select_type, cron_expression = job_info_dict['cron_select_type'], job_info_dict['job_cron']
- cron_expression_dict = parsing_cron_expression(cron_expression)
- cron_expression_dict.update({
- 'cron_select_type': cron_select_type,
- 'cron_expression': cron_expression
- })
- job_info_dict.update({
- 'cron_expression_dict': cron_expression_dict
- })
- partition_info_str = job_info_dict.pop('partition_info', None)
- if partition_info_str:
- partition_list = partition_info_str.split(',')
- job_info_dict.update({
- 'partition_info': partition_list[0],
- 'partition_time': partition_list[2],
- 'partition_num': int(partition_list[1])
- })
- return job_info_dict
- @router.put("/{id}", dependencies=[Depends(verify_all)])
- @web_try()
- @sxtimeit
- def update_datasource(id: int, update_item: schemas.JobInfoUpdate, db: Session = Depends(get_db)):
- return update_job_info_services(db, id, update_item)
- @router.put("/update_trigger_status/", dependencies=[Depends(verify_all)])
- @web_try()
- @sxtimeit
- def update_trigger_status(item: schemas.JobInfoTriggerStatus, db: Session = Depends(get_db)):
- job_info: models.JobInfo = crud.get_job_info(db, item.id)
- relation = crud.get_af_id(db, job_info.id, 'datax')
- job_info.trigger_status = item.trigger_status
- on_off_control(relation.af_id, item.trigger_status)
- job_info = crud.update_job_trigger_status(db, item.id, item.trigger_status)
- return job_info
- @router.delete("/{job_id}", dependencies=[Depends(verify_all)])
- @web_try()
- @sxtimeit
- def delete_job_info(job_id: int, db: Session = Depends(get_db)):
- jm_job: models.JobInfo = crud.get_job_info(db, job_id)
- if jm_job.trigger_status == 1:
- raise Exception('任务未停用,不可删除')
- relation = crud.get_af_id(db, job_id, 'datax')
- send_delete('/af/af_job', relation.af_id)
- return crud.delete_job_info(db, job_id)
- @router.post("/execute", dependencies=[Depends(verify_all)])
- @web_try()
- @sxtimeit
- def execute_job_info(job_id: int, db: Session = Depends(get_db)):
- jm_job: models.JobInfo = crud.get_job_info(db, job_id)
- if jm_job.trigger_status == 0:
- raise Exception('任务已被停用')
- res = execute_job_services(db, job_id)
- return res['data']
- add_pagination(router)
|