import json import requests from fastapi import APIRouter, Depends from fastapi_pagination import paginate, Params from sqlalchemy.orm import Session from app import schemas, get_db, crud from app.crud import create_airflow_job_submit from utils import web_try, sxtimeit router_af_job = APIRouter( prefix="/jpt/af_job", tags=["airflow_job-任务管理"], ) @router_af_job.get("/") @web_try() @sxtimeit def get_af_jobs(params: Params = Depends(), db: Session = Depends(get_db)): return paginate(crud.get_airflow_jobs(db), params) @router_af_job.get("/getOnce/{item_id}") @web_try() @sxtimeit def get_af_jobs_once(item_id: int, db: Session = Depends(get_db)): return crud.get_airflow_job_once(db, item_id) @router_af_job.post("/") @web_try() @sxtimeit def add_af_job(item: schemas.AirflowJobCreate, db: Session = Depends(get_db)): return crud.create_airflow_job(db, item) @router_af_job.put("/{item_id}") @web_try() @sxtimeit def update_af_job(item_id: int, update_item: schemas.AirflowJobUpdate, db: Session = Depends(get_db)): return crud.update_airflow_job(db=db, item_id=item_id, update_item=update_item) @router_af_job.post("/submit") @web_try() @sxtimeit def add_dag_submit(id: int, db: Session = Depends(get_db)): item = crud.get_airflow_job_once(db, id) create_airflow_job_submit(schemas.AirflowJob(**item.to_dict())) # return crud.create_airflow_job(item) @router_af_job.post("/{item_id}/run") @web_try() @sxtimeit def trigger_af_job_run(item_id: int, db: Session = Depends(get_db)): job_item = crud.get_airflow_job_once(db=db, item_id=item_id) uri = f'http://192.168.199.109/api/v1/dags/dag_{item_id}/dagRuns' headers = { 'content-type': 'application/json', 'Authorization': 'basic YWRtaW46YWRtaW4=', 'Host':'airflow-web.sxkj.com' } response = requests.post(uri, headers=headers, data=json.dumps({})) return response.json() # # @router_af_job.post("/run") # @web_try() # @sxtimeit # def trigger_af_job_run(item: schemas.AirflowJobCreate, db: Session = Depends(get_db)): # return crud.create_airflow_job(db, item)