import json from fastapi import APIRouter, Depends from fastapi_pagination import paginate, Params from pydantic import BaseModel from sqlalchemy.orm import Session from app import schemas, get_db, crud from app.core.k8s.k8s_client import KubernetesTools from utils import web_try, sxtimeit class Item(BaseModel): data: dict router_af_run = APIRouter( prefix="/jpt/jpt_run", tags=["airflow_run-运行管理"], ) @router_af_run.get("/") @web_try() @sxtimeit def get_tasks(params: Params = Depends(), db: Session = Depends(get_db)): return paginate(crud.get_airflow_tasks(db), params) @router_af_run.post("/") @web_try() @sxtimeit def add_airflow_run(item: Item, db: Session = Depends(get_db)): print(item.data) job_item = crud.get_airflow_job_once(db=db, item_id=item.data["job_id"]) sparks_dependence = {} if job_item is not None: for task in schemas.AirflowJob(**job_item.to_dict()).tasks: if task.task_type == 'sparks': sparks_info = json.loads(task.script) dependence = [] for (source, sink) in sparks_info['edges']: dependence.append([f'{task.id}_{source}', f'{task.id}_{sink}']) sparks_dependence[task.id] = dependence item = schemas.AirflowRunCreate(**{"start_time": int(item.data["start_time"]), "job_id": item.data["job_id"], "run_id": item.data['run_ts'], "details": {"tasks": {}, "dependence": {"tasks": job_item.dependence, "sparks": sparks_dependence}}, "status": "1"}, ) crud.create_airflow_run(db, item) @router_af_run.post("/notification") @web_try() @sxtimeit def add_notification(item: Item, db: Session = Depends(get_db)): print(f'input : {item.data} ') k8s_tool = KubernetesTools() labels = {"dag_id": item.data['dag_id'], "task_id": item.data['task_id'], "run_ts": item.data['run_ts']} logs = k8s_tool.get_pod_logs(namespaces="airflow", labels=labels) # run = crud.get_airflow_run_once_normal_mode(run_id=item.data['run_ts'], job_id=item.data["job_id"], db=db) run = crud.get_airflow_run_once_normal_mode(af_run_id=item.data['af_run_id'], db=db) if run is not None: update_run = schemas.AirflowRunUpdate(**{"details": run.details, "status": run.status}) update_run.details['tasks'][item.data['task_id']] = {"log": logs, "start_time": item.data["start_time"], "end_time": item.data["end_time"], "status": item.data['status']} crud.update_airflow_run(db=db, item_id=run.id, update_item=update_run) return @router_af_run.post("/sigal") @web_try() @sxtimeit def add_notification(item: Item, db: Session = Depends(get_db)): print(f'receive sigal: {item.data} ')