job.py 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. from fastapi import APIRouter, Depends
  2. from fastapi_pagination import paginate, Params
  3. from sqlalchemy.orm import Session
  4. from app import schemas, get_db, crud
  5. from app.crud import create_airflow_job_submit
  6. from utils import web_try, sxtimeit
  7. router_af_job = APIRouter(
  8. prefix="/jpt/af_job",
  9. tags=["airflow_job-任务管理"],
  10. )
  11. @router_af_job.get("/")
  12. @web_try()
  13. @sxtimeit
  14. def get_af_jobs(params: Params = Depends(), db: Session = Depends(get_db)):
  15. return paginate(crud.get_airflow_jobs(db), params)
  16. @router_af_job.post("/")
  17. @web_try()
  18. @sxtimeit
  19. def add_af_job(item: schemas.AirflowJobCreate, db: Session = Depends(get_db)):
  20. return crud.create_airflow_job(db, item)
  21. @router_af_job.put("/{item_id}")
  22. @web_try()
  23. @sxtimeit
  24. def update_af_job(item_id: int, update_item: schemas.AirflowJobUpdate, db: Session = Depends(get_db)):
  25. return crud.update_airflow_job(db=db, item_id=item_id, update_item=update_item)
  26. @router_af_job.post("/submit")
  27. @web_try()
  28. @sxtimeit
  29. def add_dag_submit(id: int, db: Session = Depends(get_db)):
  30. item = crud.get_airflow_job_once(db, id)
  31. create_airflow_job_submit(schemas.AirflowJob(**item.to_dict()))
  32. # return crud.create_airflow_job(item)