123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- from pyexpat import model
- import time
- from typing import Optional
- from fastapi import APIRouter
- from fastapi import Depends
- from sqlalchemy.orm import Session
- from app import models, schemas
- import app.crud as crud
- from app.crud import job_info
- from app.services.datax import datax_create_job, datax_update_job
- from app.utils.cron_utils import parsing_cron_expression
- from app.utils.send_util import *
- from app.utils.utils import *
- from utils.sx_time import sxtimeit
- from utils.sx_web import web_try
- from fastapi_pagination import Page, add_pagination, paginate, Params
- from app import get_db
- router = APIRouter(
- prefix="/jpt/jobinfo",
- tags=["jobinfo-任务管理"],
- )
- @router.post("/")
- @web_try()
- @sxtimeit
- def create_job_info(item: schemas.JobInfoCreate, db: Session = Depends(get_db)):
- return crud.create_job_info(db, item)
- @router.get("/")
- @web_try()
- @sxtimeit
- def get_job_infos(params: Params=Depends(), db: Session = Depends(get_db)):
- return paginate(crud.get_job_infos(db), params)
- @router.get("/info")
- @web_try()
- @sxtimeit
- def get_job_info(job_id: int, db: Session = Depends(get_db)):
- job_info = crud.get_job_info(db, job_id)
- job_info_dict = job_info.to_dict()
- cron_select_type, cron_expression = job_info_dict['cron_select_type'], job_info_dict['job_cron']
- cron_expression_dict = parsing_cron_expression(cron_expression)
- cron_expression_dict.update({
- 'cron_select_type': cron_select_type,
- 'cron_expression': cron_expression
- })
- job_info_dict.update({
- 'cron_expression_dict': cron_expression_dict
- })
- partition_info_str = job_info_dict.pop('partition_info', None)
- if partition_info_str:
- partition_list = partition_info_str.split(',')
- job_info_dict.update({
- 'partition_info': partition_list[0],
- 'partition_time': partition_list[2],
- 'partition_num': int(partition_list[1])
- })
- return job_info_dict
- @router.put("/{id}")
- @web_try()
- @sxtimeit
- def update_datasource(id: int, update_item: schemas.JobInfoUpdate, db: Session = Depends(get_db)):
- job_info = crud.update_job_info(db, id, update_item)
- relation = crud.get_af_id(db, job_info.id, 'datax')
- if not relation:
- datax_create_job(job_info,db)
- else:
- datax_update_job(job_info,db)
- return job_info
- @router.put("/update_trigger_status/")
- @web_try()
- @sxtimeit
- def update_trigger_status(item: schemas.JobInfoTriggerStatus, db: Session = Depends(get_db)):
- job_info = crud.get_job_info(db, item.id)
- relation = crud.get_af_id(db, job_info.id, 'datax')
- job_info.trigger_status = item.trigger_status
- if not relation:
- datax_create_job(job_info,db)
- else:
- datax_update_job(job_info,db)
- job_info = crud.update_job_trigger_status(db, item.id, item.trigger_status)
- return job_info
- @router.delete("/{job_id}")
- @web_try()
- @sxtimeit
- def delete_job_info(job_id: int, db: Session = Depends(get_db)):
- return crud.delete_job_info(db, job_id)
- add_pagination(router)
|