from typing import Optional from fastapi import APIRouter from fastapi import Depends from sqlalchemy.orm import Session from app import schemas import app.crud as crud from utils.sx_time import sxtimeit from utils.sx_web import web_try from fastapi_pagination import Page, add_pagination, paginate, Params from app import get_db router = APIRouter( prefix="/jpt/jobinfo", tags=["jobinfo-任务管理"], ) @router.post("/") @web_try() @sxtimeit def create_job_info(item: schemas.JobInfoCreate, db: Session = Depends(get_db)): return crud.create_job_info(db, item) @router.get("/") @web_try() @sxtimeit def get_job_infos(params: Params=Depends(), db: Session = Depends(get_db)): return paginate(crud.get_job_infos(db), params) @router.put("/{id}") @web_try() @sxtimeit def update_datasource(id: int, update_item: schemas.JobInfoUpdate, db: Session = Depends(get_db)): return crud.update_job_info(db, id, update_item) @router.delete("/{job_id}") @web_try() @sxtimeit def delete_job_info(job_id: int, db: Session = Depends(get_db)): return crud.delete_job_info(db, job_id) add_pagination(router)