import datetime import json import os from typing import Optional import requests from fastapi import APIRouter, Depends from fastapi_pagination import paginate, Params from sqlalchemy.orm import Session from app import schemas, get_db, crud from app.core.airflow.af_util import get_job_path, get_airflow_api_info, call_airflow_api from app.crud import create_airflow_job_submit from app.schemas import AirflowTrigger from utils import web_try, sxtimeit router_af_job = APIRouter( prefix="/af/af_job", tags=["airflow_job-任务管理"], ) @router_af_job.get("/") @web_try() @sxtimeit def get_af_jobs(params: Params = Depends(), db: Session = Depends(get_db)): return paginate(crud.get_airflow_jobs(db), params) @router_af_job.get("/{item_id}/last_parsed_time") @web_try() @sxtimeit def get_af_jobs(item_id: int): uri_prefix, headers = get_airflow_api_info() url = f'{uri_prefix}/dags/dag_{item_id}' rets = requests.get(url, headers=headers).json() if 'last_parsed_time' in rets: last_parsed_time_str = rets['last_parsed_time'] last_parsed_time = datetime.datetime.strptime(last_parsed_time_str, '%Y-%m-%dT%H:%M:%S.%f%z').timestamp() else: last_parsed_time = None return {"last_parsed_time": last_parsed_time} @router_af_job.get("/getOnce/{item_id}") @web_try() @sxtimeit def get_af_jobs_once(item_id: int, db: Session = Depends(get_db)): return crud.get_airflow_job_once(db, item_id) @router_af_job.post("/") @web_try() @sxtimeit def add_af_job(item: schemas.AirflowJobCreate, db: Session = Depends(get_db)): return crud.create_airflow_job(db, item) @router_af_job.put("/{item_id}") @web_try() @sxtimeit def update_af_job(item_id: int, update_item: schemas.AirflowJobUpdate, db: Session = Depends(get_db)): return crud.update_airflow_job(db=db, item_id=item_id, update_item=update_item) @router_af_job.post("/submit") @web_try() @sxtimeit def add_dag_submit(id: int, db: Session = Depends(get_db)): item = crud.get_airflow_job_once(db, id) create_airflow_job_submit(schemas.AirflowJob(**item.to_dict())) @router_af_job.post("/{job_id}/run") @web_try() @sxtimeit def trigger_af_job_run(job_id: int): return call_airflow_api(method='post', uri=f'dags/dag_{job_id}/dagRuns', args_dict={"data": json.dumps({})}).json() @router_af_job.post("/{job_id}/adv_run") @web_try() @sxtimeit def trigger_af_job_adv_run(job_id: int, parameters: Optional[AirflowTrigger]): return call_airflow_api(method='post', uri=f'dags/dag_{job_id}/dagRuns', args_dict={"json": {"conf": parameters.parameters if parameters else {}}}).json() @router_af_job.post("/000/data_transfer_run") @web_try() @sxtimeit def trigger_af_data_transfer_job_run(source_tb: str, target_tb: str): rets = call_airflow_api(method='post', uri=f'dags/dag_0/dagRuns', args_dict={"json": {"conf": {"input": source_tb, "output": target_tb}}}).json() return {"af_run_id": rets.get('dag_run_id', None), "dag_id": "dag_0"} @router_af_job.post("/special_job/install_requirements") @web_try() @sxtimeit def trigger_af_requirements_install_job_run(requirements_str: str, target_path: str): rets = call_airflow_api(method='post', uri=f'dags/dag_-1/dagRuns', args_dict={"json": {"conf": {"requirements_str": requirements_str, "target_path": target_path}}}).json() return {"af_run_id": rets.get('dag_run_id', None), "dag_id": "dag_-1"} @router_af_job.delete("/{item_id}") @web_try() @sxtimeit def delete_af_job(item_id: int, db: Session = Depends(get_db)): uri_prefix, headers = get_airflow_api_info() uri = f'{uri_prefix}/dags/dag_{item_id}' requests.delete(uri, headers=headers) os.remove(get_job_path(job_id=item_id)) crud.delete_airflow_job(db, item_id) @router_af_job.patch("/{item_id}/pause/{pause}") @web_try() @sxtimeit def pause_af_job(pause: bool, item_id: int): uri_prefix, headers = get_airflow_api_info() url = f'{uri_prefix}/dags/dag_{item_id}' return requests.patch(url, headers=headers, data=json.dumps({"is_paused": pause})).json()