123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- import datetime
- import json
- import os
- import requests
- from fastapi import APIRouter, Depends
- from fastapi_pagination import paginate, Params
- from sqlalchemy.orm import Session
- from app import schemas, get_db, crud
- from app.core.airflow.uri import get_job_path, get_airflow_api_info
- from app.crud import create_airflow_job_submit
- from utils import web_try, sxtimeit
- router_af_job = APIRouter(
- prefix="/jpt/af_job",
- tags=["airflow_job-任务管理"],
- )
- @router_af_job.get("/")
- @web_try()
- @sxtimeit
- def get_af_jobs(params: Params = Depends(), db: Session = Depends(get_db)):
- return paginate(crud.get_airflow_jobs(db), params)
- @router_af_job.get("/{item_id}/last_parsed_time")
- @web_try()
- @sxtimeit
- def get_af_jobs(item_id: int):
- uri_prefix, headers = get_airflow_api_info()
- url = f'{uri_prefix}/dags/dag_{item_id}'
- rets = requests.get(url, headers=headers).json()
- if 'last_parsed_time' in rets:
- last_parsed_time_str = rets['last_parsed_time']
- last_parsed_time = datetime.datetime.strptime(last_parsed_time_str, '%Y-%m-%dT%H:%M:%S.%f%z').timestamp()
- else:
- last_parsed_time= None
- return {"last_parsed_time": last_parsed_time}
- @router_af_job.get("/getOnce/{item_id}")
- @web_try()
- @sxtimeit
- def get_af_jobs_once(item_id: int, db: Session = Depends(get_db)):
- return crud.get_airflow_job_once(db, item_id)
- @router_af_job.post("/")
- @web_try()
- @sxtimeit
- def add_af_job(item: schemas.AirflowJobCreate, db: Session = Depends(get_db)):
- return crud.create_airflow_job(db, item)
- @router_af_job.put("/{item_id}")
- @web_try()
- @sxtimeit
- def update_af_job(item_id: int, update_item: schemas.AirflowJobUpdate, db: Session = Depends(get_db)):
- return crud.update_airflow_job(db=db, item_id=item_id, update_item=update_item)
- @router_af_job.post("/submit")
- @web_try()
- @sxtimeit
- def add_dag_submit(id: int, db: Session = Depends(get_db)):
- item = crud.get_airflow_job_once(db, id)
- create_airflow_job_submit(schemas.AirflowJob(**item.to_dict()))
- @router_af_job.post("/{item_id}/run")
- @web_try()
- @sxtimeit
- def trigger_af_job_run(item_id: int, db: Session = Depends(get_db)):
- job_item = crud.get_airflow_job_once(db=db, item_id=item_id)
- uri = f'http://192.168.199.109/api/v1/dags/dag_{item_id}/dagRuns'
- headers = {
- 'content-type': 'application/json',
- 'Authorization': 'basic YWRtaW46YWRtaW4=',
- 'Host': 'airflow-web.sxkj.com'
- }
- response = requests.post(uri, headers=headers, data=json.dumps({}))
- return response.json()
- #
- # @router_af_job.post("/{item_id}/update_and_run")
- # @web_try()
- # @sxtimeit
- # def update_and_trigger_job_run(item_id: int, db: Session = Depends(get_db)):
- # job_item = crud.get_airflow_job_once(db=db, item_id=item_id)
- # uri = f'http://192.168.199.109/api/v1/dags/dag_{item_id}/dagRuns'
- # headers = {
- # 'content-type': 'application/json',
- # 'Authorization': 'basic YWRtaW46YWRtaW4=',
- # 'Host': 'airflow-web.sxkj.com'
- # }
- #
- # response = requests.post(uri, headers=headers, data=json.dumps({}))
- # return response.json()
- @router_af_job.delete("/{item_id}")
- @web_try()
- @sxtimeit
- def delete_af_job(item_id: int, db: Session = Depends(get_db)):
- uri = f'http://192.168.199.109/api/v1/dags/dag_{item_id}'
- headers = {
- 'content-type': 'application/json',
- 'Authorization': 'basic YWRtaW46YWRtaW4=',
- 'Host': 'airflow-web.sxkj.com'
- }
- requests.delete(uri, headers=headers)
- os.remove(get_job_path(job_id=item_id))
- crud.delete_airflow_job(db, item_id)
- @router_af_job.patch("/{item_id}/pause/{pause}")
- @web_try()
- @sxtimeit
- def pause_af_job(pause: bool, item_id: int):
- uri_prefix, headers = get_airflow_api_info()
- url = f'{uri_prefix}/dags/dag_{item_id}'
- return requests.patch(url, headers=headers, data=json.dumps({"is_paused": pause})).json()
|