job.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. import json
  2. import requests
  3. from fastapi import APIRouter, Depends
  4. from fastapi_pagination import paginate, Params
  5. from sqlalchemy.orm import Session
  6. from app import schemas, get_db, crud
  7. from app.crud import create_airflow_job_submit
  8. from utils import web_try, sxtimeit
  9. router_af_job = APIRouter(
  10. prefix="/jpt/af_job",
  11. tags=["airflow_job-任务管理"],
  12. )
  13. @router_af_job.get("/")
  14. @web_try()
  15. @sxtimeit
  16. def get_af_jobs(params: Params = Depends(), db: Session = Depends(get_db)):
  17. return paginate(crud.get_airflow_jobs(db), params)
  18. @router_af_job.get("/getOnce/{item_id}")
  19. @web_try()
  20. @sxtimeit
  21. def get_af_jobs_once(item_id: int, db: Session = Depends(get_db)):
  22. return crud.get_airflow_job_once(db, item_id)
  23. @router_af_job.post("/")
  24. @web_try()
  25. @sxtimeit
  26. def add_af_job(item: schemas.AirflowJobCreate, db: Session = Depends(get_db)):
  27. return crud.create_airflow_job(db, item)
  28. @router_af_job.put("/{item_id}")
  29. @web_try()
  30. @sxtimeit
  31. def update_af_job(item_id: int, update_item: schemas.AirflowJobUpdate, db: Session = Depends(get_db)):
  32. return crud.update_airflow_job(db=db, item_id=item_id, update_item=update_item)
  33. @router_af_job.post("/submit")
  34. @web_try()
  35. @sxtimeit
  36. def add_dag_submit(id: int, db: Session = Depends(get_db)):
  37. item = crud.get_airflow_job_once(db, id)
  38. create_airflow_job_submit(schemas.AirflowJob(**item.to_dict()))
  39. # return crud.create_airflow_job(item)
  40. @router_af_job.post("/{item_id}/run")
  41. @web_try()
  42. @sxtimeit
  43. def trigger_af_job_run(item_id: int, db: Session = Depends(get_db)):
  44. job_item = crud.get_airflow_job_once(db=db, item_id=item_id)
  45. uri = f'http://192.168.199.109/api/v1/dags/dag_{item_id}/dagRuns'
  46. headers = {
  47. 'content-type': 'application/json',
  48. 'Authorization': 'basic YWRtaW46YWRtaW4=',
  49. 'Host':'airflow-web.sxkj.com'
  50. }
  51. response = requests.post(uri, headers=headers, data=json.dumps({}))
  52. return response.json()
  53. #
  54. # @router_af_job.post("/run")
  55. # @web_try()
  56. # @sxtimeit
  57. # def trigger_af_job_run(item: schemas.AirflowJobCreate, db: Session = Depends(get_db)):
  58. # return crud.create_airflow_job(db, item)