from pyexpat import model import time from typing import Optional from fastapi import APIRouter from fastapi import Depends from sqlalchemy.orm import Session from app import models, schemas import app.crud as crud from app.crud import job_info from app.services.datax import datax_create_job, datax_update_job from app.utils.cron_utils import parsing_cron_expression from app.utils.send_util import * from app.utils.utils import * from utils.sx_time import sxtimeit from utils.sx_web import web_try from fastapi_pagination import Page, add_pagination, paginate, Params from app import get_db router = APIRouter( prefix="/jpt/jobinfo", tags=["jobinfo-任务管理"], ) @router.post("/") @web_try() @sxtimeit def create_job_info(item: schemas.JobInfoCreate, db: Session = Depends(get_db)): return crud.create_job_info(db, item) @router.get("/") @web_try() @sxtimeit def get_job_infos(params: Params=Depends(), db: Session = Depends(get_db)): return paginate(crud.get_job_infos(db), params) @router.get("/info") @web_try() @sxtimeit def get_job_info(job_id: int, db: Session = Depends(get_db)): job_info = crud.get_job_info(db, job_id) job_info_dict = job_info.to_dict() cron_select_type, cron_expression = job_info_dict['cron_select_type'], job_info_dict['job_cron'] cron_expression_dict = parsing_cron_expression(cron_expression) cron_expression_dict.update({ 'cron_select_type': cron_select_type, 'cron_expression': cron_expression }) job_info_dict.update({ 'cron_expression_dict': cron_expression_dict }) partition_info_str = job_info_dict.pop('partition_info', None) if partition_info_str: partition_list = partition_info_str.split(',') job_info_dict.update({ 'partition_info': partition_list[0], 'partition_time': partition_list[2], 'partition_num': int(partition_list[1]) }) return job_info_dict @router.put("/{id}") @web_try() @sxtimeit def update_datasource(id: int, update_item: schemas.JobInfoUpdate, db: Session = Depends(get_db)): job_info = crud.update_job_info(db, id, update_item) relation = crud.get_af_id(db, job_info.id, 'datax') if not relation: datax_create_job(job_info,db) else: datax_update_job(job_info,db) return job_info @router.put("/update_trigger_status/") @web_try() @sxtimeit def update_trigger_status(item: schemas.JobInfoTriggerStatus, db: Session = Depends(get_db)): job_info = crud.get_job_info(db, item.id) relation = crud.get_af_id(db, job_info.id, 'datax') job_info.trigger_status = item.trigger_status if not relation: datax_create_job(job_info,db) else: datax_update_job(job_info,db) job_info = crud.update_job_trigger_status(db, item.id, item.trigger_status) return job_info @router.delete("/{job_id}") @web_try() @sxtimeit def delete_job_info(job_id: int, db: Session = Depends(get_db)): return crud.delete_job_info(db, job_id) add_pagination(router)