1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950 |
- from typing import Optional
- from fastapi import APIRouter
- from fastapi import Depends
- from sqlalchemy.orm import Session
- from app import schemas
- import app.crud as crud
- from utils.sx_time import sxtimeit
- from utils.sx_web import web_try
- from fastapi_pagination import Page, add_pagination, paginate, Params
- from app import get_db
- router = APIRouter(
- prefix="/jpt/jobinfo",
- tags=["jobinfo-任务管理"],
- )
- @router.post("/")
- @web_try()
- @sxtimeit
- def create_job_info(item: schemas.JobInfoCreate, db: Session = Depends(get_db)):
- return crud.create_job_info(db, item)
- @router.get("/")
- @web_try()
- @sxtimeit
- def get_job_infos(params: Params=Depends(), db: Session = Depends(get_db)):
- return paginate(crud.get_job_infos(db), params)
- @router.put("/{id}")
- @web_try()
- @sxtimeit
- def update_datasource(id: int, update_item: schemas.JobInfoUpdate, db: Session = Depends(get_db)):
- return crud.update_job_info(db, id, update_item)
- @router.delete("/{job_id}")
- @web_try()
- @sxtimeit
- def delete_job_info(job_id: int, db: Session = Depends(get_db)):
- return crud.delete_job_info(db, job_id)
- add_pagination(router)
|