task.py 1.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940
  1. from fastapi import APIRouter, Depends
  2. from fastapi_pagination import paginate, Params
  3. from sqlalchemy.orm import Session
  4. from app import schemas, get_db, crud
  5. from utils import web_try, sxtimeit
  6. router_af_task = APIRouter(
  7. prefix="/af/af_task",
  8. tags=["airflow_task-作业管理"],
  9. )
  10. @router_af_task.get("/")
  11. @web_try()
  12. @sxtimeit
  13. def get_tasks(params: Params = Depends(), db: Session = Depends(get_db)):
  14. return paginate(crud.get_airflow_tasks(db), params)
  15. @router_af_task.get("/getOnce/{id}")
  16. @web_try()
  17. @sxtimeit
  18. def get_task_once(id: int, db: Session = Depends(get_db)):
  19. return crud.get_airflow_task_once(db, id)
  20. @router_af_task.put("/{item_id}")
  21. @web_try()
  22. @sxtimeit
  23. def update_task(item_id: int, update_item: schemas.AirflowTaskUpdate, db: Session = Depends(get_db)):
  24. return crud.update_airflow_task(db=db, item_id=item_id, update_item=update_item)
  25. @router_af_task.post("/")
  26. @web_try()
  27. @sxtimeit
  28. def add_task(item: schemas.AirflowTaskCreate, db: Session = Depends(get_db)):
  29. return crud.create_airflow_task(db, item)