1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980 |
- import json
- from fastapi import APIRouter, Depends
- from fastapi_pagination import paginate, Params
- from pydantic import BaseModel
- from sqlalchemy.orm import Session
- from app import schemas, get_db, crud
- from app.core.k8s.k8s_client import KubernetesTools
- from utils import web_try, sxtimeit
- class Item(BaseModel):
- data: dict
- router_af_run = APIRouter(
- prefix="/jpt/jpt_run",
- tags=["airflow_run-运行管理"],
- )
- @router_af_run.get("/")
- @web_try()
- @sxtimeit
- def get_tasks(params: Params = Depends(), db: Session = Depends(get_db)):
- return paginate(crud.get_airflow_tasks(db), params)
- @router_af_run.post("/")
- @web_try()
- @sxtimeit
- def add_airflow_run(item: Item, db: Session = Depends(get_db)):
- print(item.data)
- job_item = crud.get_airflow_job_once(db=db, item_id=item.data["job_id"])
- sparks_dependence = {}
- if job_item is not None:
- for task in schemas.AirflowJob(**job_item.to_dict()).tasks:
- if task.task_type == 'sparks':
- sparks_info = json.loads(task.script)
- dependence = []
- for (source, sink) in sparks_info['edges']:
- dependence.append([f'{task.id}_{source}', f'{task.id}_{sink}'])
- sparks_dependence[task.id] = dependence
- item = schemas.AirflowRunCreate(**{"start_time": int(item.data["start_time"]),
- "job_id": item.data["job_id"],
- "run_id": item.data['run_ts'],
- "details": {"tasks": {}, "dependence": {"tasks": job_item.dependence,
- "sparks": sparks_dependence}},
- "status": "1"},
- )
- crud.create_airflow_run(db, item)
- @router_af_run.post("/notification")
- @web_try()
- @sxtimeit
- def add_notification(item: Item, db: Session = Depends(get_db)):
- print(f'input : {item.data} ')
- k8s_tool = KubernetesTools()
- labels = {"dag_id": item.data['dag_id'],
- "task_id": item.data['task_id'],
- "run_ts": item.data['run_ts']}
- logs = k8s_tool.get_pod_logs(namespaces="airflow", labels=labels)
- # run = crud.get_airflow_run_once_normal_mode(run_id=item.data['run_ts'], job_id=item.data["job_id"], db=db)
- run = crud.get_airflow_run_once_normal_mode(af_run_id=item.data['af_run_id'], db=db)
- if run is not None:
- update_run = schemas.AirflowRunUpdate(**{"details": run.details, "status": run.status})
- update_run.details['tasks'][item.data['task_id']] = {"log": logs,
- "start_time": item.data["start_time"],
- "end_time": item.data["end_time"],
- "status": item.data['status']}
- crud.update_airflow_run(db=db, item_id=run.id, update_item=update_run)
- return
- @router_af_run.post("/sigal")
- @web_try()
- @sxtimeit
- def add_notification(item: Item, db: Session = Depends(get_db)):
- print(f'receive sigal: {item.data} ')
|