from sqlalchemy.orm import Session from fastapi import Depends from fastapi import APIRouter from app.services.dag import dag_job_submit, get_tmp_table_name from app import crud, models, schemas from app.utils.send_util import get_job_run_status from utils.sx_time import sxtimeit from utils.sx_web import web_try from app.common.hive import hiveDs from app import get_db router = APIRouter( prefix="/jpt/dag", tags=["dag-dag管理"], ) @router.post("/execute") @web_try() @sxtimeit def execute_dag(dag: schemas.Dag,db: Session = Depends(get_db)): af_job = dag_job_submit(dag.dag_uuid, dag.dag_script,db) return af_job @router.get("/debug_status") @web_try() @sxtimeit def get_dag_debug_status(dag_uuid: str, db: Session = Depends(get_db)): relation = crud.get_dag_af_id(db,dag_uuid, 'debug') af_job_run = crud.get_airflow_run_once_debug_mode(db,relation.af_id) tasks = af_job_run.details['tasks'] if len(af_job_run.details['tasks'])>0 else {} nodes = {} for task_id in tasks: task = tasks[task_id] node_id = task_id.split('_')[1] nodes.update({node_id:task['status']}) job_run_res = get_job_run_status(af_job_run.id) job_run_status = job_run_res['data']['status'] res = { "job":job_run_status if job_run_status else af_job_run.status, "nodes": nodes } return res @router.get("/node_log") @web_try() @sxtimeit def get_dag_debug_status(dag_uuid: str, node_id: str,db: Session = Depends(get_db)): relation = crud.get_dag_af_id(db,dag_uuid, 'debug') af_job_run = crud.get_airflow_run_once_debug_mode(db,relation.af_id) tasks = af_job_run.details['tasks'] if len(af_job_run.details['tasks'])>0 else {} node_log = {} for task_id in tasks: task = tasks[task_id] task_id = task_id.split('_')[1] node_log.update({task_id:task['log']}) if node_id in node_log.keys(): return node_log[node_id] else: return None @router.get("/node_result") @web_try() @sxtimeit def get_dag_debug_result(dag_uuid: str,node_id: str,out_pin: int ,db: Session = Depends(get_db)): table_name = get_tmp_table_name(dag_uuid, node_id, str(out_pin), db) result = hiveDs.get_preview_data(table_name,500) return result