import datetime import json import time from collections import defaultdict import requests from fastapi import APIRouter, Depends from fastapi_pagination import paginate, Params from pydantic import BaseModel from sqlalchemy.orm import Session from app import schemas, get_db, crud from app.core.airflow.af_util import get_airflow_api_info, call_airflow_api, datetime2timestamp from utils import web_try, sxtimeit class Item(BaseModel): data: dict router_af_run = APIRouter( prefix="/af/af_run", tags=["airflow_run-运行管理"], ) @router_af_run.get("/") @web_try() @sxtimeit def get_runs(params: Params = Depends(), db: Session = Depends(get_db)): return paginate(crud.get_airflow_tasks(db), params) @router_af_run.get("/{run_id}/status") @web_try() @sxtimeit def get_airflow_run_status(run_id: int, db: Session = Depends(get_db)): item = crud.get_airflow_run_once(item_id=run_id, db=db) job_item = crud.get_airflow_job_once(db=db, item_id=item.job_id) print(f'job_item.job_mode is {job_item.job_mode},with af run id= {item.af_run_id}') if job_item.job_mode == 1: # 常规模式 if item.status in [2, 3]: return {"status": item.status} else: uri_prefix, headers = get_airflow_api_info() url = f'{uri_prefix}/dags/dag_{item.job_id}/dagRuns/{item.af_run_id}' state = requests.get(url, headers=headers).json().get('state', None) status = {"queued": 0, 'running': 1, 'success': 2, 'failed': 3}.get(state, -1) print(f'status is {status}, with state {state}') if item.status != status: # queue or running item.status = status item.update(db) print(f'after update {item.status}') return {"status": status} else: uri_prefix, headers = get_airflow_api_info() url = f'{uri_prefix}/dags/dag_{item.job_id}/dagRuns/{item.af_run_id}' state = requests.get(url, headers=headers).json().get('state', None) status = {"queued": 0, 'running': 1, 'success': 2, 'failed': 3}.get(state, -1) print(f'status is {status}, with state {state}') return {"status": status} @router_af_run.post("/") @web_try() @sxtimeit def add_airflow_run(item: Item, db: Session = Depends(get_db)): print(item.data) job_item = crud.get_airflow_job_once(db=db, item_id=item.data["job_id"]) sparks_dependence = {} debug_run = crud.get_airflow_run_once_debug_mode(job_id=item.data["job_id"], db=db) if job_item.job_mode == 1 or (job_item.job_mode == 2 and debug_run is None): # 常规模式 for task in schemas.AirflowJob(**job_item.to_dict()).tasks: if task.task_type == 'sparks': sparks = json.loads(task.script) dependence = [] for (source, sink) in sparks['edges']: dependence.append([f'{task.id}_{source}', f'{task.id}_{sink}']) sparks_dependence[task.id] = dependence item = schemas.AirflowRunCreate(**{"start_time": item.data["start_time"], "job_id": int(item.data["job_id"]), "run_ts": item.data['run_ts'], "af_run_id": item.data['af_run_id'], "details": {"tasks": {}, "dependence": {"tasks": job_item.dependence, "sparks": sparks_dependence}}, "status": 0}, ) crud.create_airflow_run(db, item) elif job_item.job_mode == 2: # 调试模式 run = crud.get_airflow_run_once_debug_mode(job_id=item.data["job_id"], db=db) sparks_task = schemas.AirflowTask(**job_item.tasks[0]) assert sparks_task.task_type == 'sparks' sparks = json.loads(sparks_task.script) sparks_dependence[sparks_task.id] = [[f'{sparks_task.id}_{s}', f'{sparks_task.id}_{k}'] for (s, k) in sparks['edges']] spark_nodes = [sub_node['id'] for sub_node in sparks['sub_nodes']] run.details['dependence']['sparks'] = sparks_dependence run.details['tasks'] = {k: v for k, v in run.details['tasks'].items() if k in spark_nodes} update_run = schemas.AirflowRunUpdate( **{"details": run.details, "status": 1, "af_run_id": item.data['af_run_id']}) crud.update_airflow_run(db=db, item_id=run.id, update_item=update_run) @router_af_run.post("/notification") @web_try() @sxtimeit def add_notification(item: Item, db: Session = Depends(get_db)): state_uri = f"dags/{item.data['dag_id']}/dagRuns/{item.data['af_run_id']}/taskInstances/{item.data['task_id']}" log_uri = f"{state_uri}/logs/1" try_count = 10 while try_count > 0: state = call_airflow_api("get", state_uri, {}).json().get('state', None) if state in ['success', 'failed']: break time.sleep(1) try_count -= 1 logs = call_airflow_api("get", log_uri, {}).text.encode('raw_unicode_escape').decode('utf-8') job_item = crud.get_airflow_job_once(db=db, item_id=item.data["job_id"]) if job_item.job_mode == 1: # normal model, one job-> many runs run = crud.get_airflow_run_once_normal_mode(af_run_id=item.data['af_run_id'],job_id=job_item.id, db=db) elif job_item.job_mode == 2: # debug model, one job-> one run run = crud.get_airflow_run_once_debug_mode(job_id=item.data["job_id"], db=db) else: run = None if run is not None: update_run = schemas.AirflowRunUpdate( **{"details": run.details, "status": run.status, "af_run_id": item.data['af_run_id']}) update_run.details['tasks'][item.data['task_id']].update({"log": logs, "start_time": item.data["start_time"], "end_time": item.data["end_time"], "status": item.data['status']}) crud.update_airflow_run(db=db, item_id=run.id, update_item=update_run) @router_af_run.post("/sigal") @web_try() @sxtimeit def add_notification(item: Item): print(f'receive sigal: {item.data} ') @router_af_run.get("/tasks_status/{job_id}/{af_run_id}") @web_try() @sxtimeit def get_airflow_dagrun(job_id: int, af_run_id: str, db: Session = Depends(get_db)): ret = call_airflow_api(method='get', uri=f'dags/dag_{job_id}/dagRuns/{af_run_id}/taskInstances', args_dict={}) details = defaultdict(dict) for task in ret.json()['task_instances']: details['tasks'][task['task_id']] = { # "log": logs, "start_time": datetime.datetime.strptime(task['start_date'], '%Y-%m-%dT%H:%M:%S.%f%z').timestamp(), "end_time": datetime.datetime.strptime(task['end_date'], '%Y-%m-%dT%H:%M:%S.%f%z').timestamp(), "status": task['state'] } # print(f"{task['task_id']}:{task['duration']}") return details @router_af_run.get("/running_status/{job_id}/{af_run_id}") @web_try() @sxtimeit def get_airflow_dagrun_running_status(job_id: int, af_run_id: str): job_uri = f'dags/dag_{job_id}/dagRuns/{af_run_id}' job_ret = call_airflow_api(method='get', uri=job_uri, args_dict={}) if job_ret.status_code != 200: raise Exception(f'cant found the information of this job run,please check your input: job uri is {job_uri} ') return { "start_time": datetime2timestamp(job_ret.json()['start_date']), "end_time": datetime2timestamp(job_ret.json()['end_date']), "status": job_ret.json()['state'] } @router_af_run.get("/task_log/{job_id}/{af_run_id}/{task_id}") @web_try() @sxtimeit def get_airflow_dagrun_task_log(job_id: int, af_run_id: str, task_id: str, db: Session = Depends(get_db)): state_uri = f"dags/dag_{job_id}/dagRuns/{af_run_id}/taskInstances/{task_id}" log_uri = f"{state_uri}/logs/1" job_item = crud.get_airflow_job_once(db=db, item_id=job_id) print(f'job_mode type is {job_item.job_mode}, af_run_id in') if job_item.job_mode == 1: # normal model, one job-> many runs run = crud.get_airflow_run_once_normal_mode(af_run_id=af_run_id, job_id=job_id,db=db) elif job_item.job_mode == 2: # debug model, one job-> one run run = crud.get_airflow_run_once_debug_mode(job_id=job_id, db=db) else: run = None if run is not None: if run.details['tasks'].get(task_id, {}).get("status", "running") not in ["success", "failed"]: state_ret = call_airflow_api(method='get', uri=state_uri, args_dict={}) log_ret = call_airflow_api(method='get', uri=log_uri, args_dict={}) if state_ret.status_code != 200 or log_ret.status_code != 200: return None update_run = schemas.AirflowRunUpdate( **{"details": run.details, "status": run.status, "af_run_id": af_run_id}) task_info = { "log": log_ret.text.encode('raw_unicode_escape').decode('utf-8'), "status": state_ret.json()['state'], "execution_time": datetime2timestamp(state_ret.json()['execution_date']), "start_time": datetime2timestamp(state_ret.json()['start_date']), "end_time": datetime2timestamp(state_ret.json()['end_date']), } update_run.details['tasks'][task_id] = task_info crud.update_airflow_run(db=db, item_id=run.id, update_item=update_run) return task_info else: return run.details['tasks'][task_id] @router_af_run.get("/data_transfer_log/{af_run_id}") @web_try() @sxtimeit def get_airflow_dagrun_task_log(af_run_id: str): state_uri = f"dags/dag_0/dagRuns/{af_run_id}/taskInstances/0" log_uri = f"{state_uri}/logs/1" state_ret = call_airflow_api(method='get', uri=state_uri, args_dict={}) log_ret = call_airflow_api(method='get', uri=log_uri, args_dict={}) return { "log": log_ret.text, "status": state_ret.json()['state'], "execution_time": datetime2timestamp(state_ret.json()['execution_date']), "start_time": datetime2timestamp(state_ret.json()['start_date']), "end_time": datetime2timestamp(state_ret.json()['end_date']), }