123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- import json
- import requests
- from fastapi import APIRouter, Depends
- from fastapi_pagination import paginate, Params
- from pydantic import BaseModel
- from sqlalchemy.orm import Session
- from app import schemas, get_db, crud
- from app.core.airflow.uri import get_airflow_api_info
- from app.core.k8s.k8s_client import KubernetesTools
- from utils import web_try, sxtimeit
- class Item(BaseModel):
- data: dict
- router_af_run = APIRouter(
- prefix="/jpt/af_run",
- tags=["airflow_run-运行管理"],
- )
- @router_af_run.get("/")
- @web_try()
- @sxtimeit
- def get_tasks(params: Params = Depends(), db: Session = Depends(get_db)):
- return paginate(crud.get_airflow_tasks(db), params)
- @router_af_run.get("/{run_id}/status")
- @web_try()
- @sxtimeit
- def get_airflow_run_status(run_id: int, db: Session = Depends(get_db)):
- item = crud.get_airflow_run_once(item_id=run_id, db=db)
- job_item = crud.get_airflow_job_once(db=db, item_id=item.job_id)
- if job_item.job_mode == 1: # 常规模式
- if item.status in [2, 3]:
- return {"status": item.status}
- else:
- uri_prefix, headers = get_airflow_api_info()
- url = f'{uri_prefix}/dags/dag_{item.job_id}/dagRuns/{item.af_run_id}'
- state = requests.get(url, headers=headers).json().get('state', None)
- status = {"queued": 0, 'running': 1, 'success': 2, 'failed': 3}.get(state, -1)
- print(f'status is {status}, with state {state}')
- if item.status != status: # queue or running
- item.status = status
- item.update(db)
- print(f'after update {item.status}')
- return {"status": status}
- else:
- uri_prefix, headers = get_airflow_api_info()
- url = f'{uri_prefix}/dags/dag_{item.job_id}/dagRuns/{item.af_run_id}'
- state = requests.get(url, headers=headers).json().get('state', None)
- status = {"queued": 0, 'running': 1, 'success': 2, 'failed': 3}.get(state, -1)
- print(f'status is {status}, with state {state}')
- return {"status": status}
- @router_af_run.post("/")
- @web_try()
- @sxtimeit
- def add_airflow_run(item: Item, db: Session = Depends(get_db)):
- print(item.data)
- job_item = crud.get_airflow_job_once(db=db, item_id=item.data["job_id"])
- sparks_dependence = {}
- debug_run = crud.get_airflow_run_once_debug_mode(job_id=item.data["job_id"], db=db)
- if job_item.job_mode == 1 or (job_item.job_mode == 2 and debug_run is None): # 常规模式
- for task in schemas.AirflowJob(**job_item.to_dict()).tasks:
- if task.task_type == 'sparks':
- sparks = json.loads(task.script)
- dependence = []
- for (source, sink) in sparks['edges']:
- dependence.append([f'{task.id}_{source}', f'{task.id}_{sink}'])
- sparks_dependence[task.id] = dependence
- item = schemas.AirflowRunCreate(**{"start_time": item.data["start_time"],
- "job_id": int(item.data["job_id"]),
- "run_ts": item.data['run_ts'],
- "af_run_id": item.data['af_run_id'],
- "details": {"tasks": {}, "dependence": {"tasks": job_item.dependence,
- "sparks": sparks_dependence}},
- "status": 0},
- )
- crud.create_airflow_run(db, item)
- elif job_item.job_mode == 2: # 调试模式
- run = crud.get_airflow_run_once_debug_mode(job_id=item.data["job_id"], db=db)
- sparks_task = schemas.AirflowTask(**job_item.tasks[0])
- assert sparks_task.task_type == 'sparks'
- sparks = json.loads(sparks_task.script)
- sparks_dependence[sparks_task.id] = [[f'{sparks_task.id}_{s}', f'{sparks_task.id}_{k}'] for (s, k) in
- sparks['edges']]
- spark_nodes = [sub_node['id'] for sub_node in sparks['sub_nodes']]
- run.details['dependence']['sparks'] = sparks_dependence
- run.details['tasks'] = {k: v for k, v in run.details['tasks'].items() if k in spark_nodes}
- update_run = schemas.AirflowRunUpdate(**{"details": run.details, "status": 1})
- crud.update_airflow_run(db=db, item_id=run.id, update_item=update_run)
- @router_af_run.post("/notification")
- @web_try()
- @sxtimeit
- def add_notification(item: Item, db: Session = Depends(get_db)):
- k8s_tool = KubernetesTools()
- labels = {"dag_id": item.data['dag_id'], "task_id": item.data['task_id'], "run_ts": item.data['run_ts']}
- logs = k8s_tool.get_pod_logs(namespaces="airflow", labels=labels)
- job_item = crud.get_airflow_job_once(db=db, item_id=item.data["job_id"])
- if job_item.job_mode == 1: # normal model, one job-> many runs
- run = crud.get_airflow_run_once_normal_mode(af_run_id=item.data['af_run_id'], db=db)
- elif job_item.job_mode == 2: # debug model, one job-> one run
- run = crud.get_airflow_run_once_debug_mode(job_id=item.data["job_id"], db=db)
- else:
- run = None
- if run is not None:
- update_run = schemas.AirflowRunUpdate(**{"details": run.details, "status": run.status})
- update_run.details['tasks'][item.data['task_id']] = {"log": logs,
- "start_time": item.data["start_time"],
- "end_time": item.data["end_time"],
- "status": item.data['status']}
- crud.update_airflow_run(db=db, item_id=run.id, update_item=update_run)
- @router_af_run.post("/sigal")
- @web_try()
- @sxtimeit
- def add_notification(item: Item):
- print(f'receive sigal: {item.data} ')
|