import json from sqlalchemy.orm import Session from fastapi import Depends from fastapi import APIRouter from app.common.security.auth import verify_users from app.services.dag import dag_job_submit, get_tmp_table_name from app import crud, models, schemas from app.utils.send_util import get_job_run_status, get_task_log from constants.constants import RUN_STATUS from utils.sx_time import sxtimeit from utils.sx_web import web_try from app.common.hive import hiveDs from app import get_db router = APIRouter( prefix="/jpt/dag", tags=["dag-dag管理"], ) @router.post("/execute") @web_try() @sxtimeit def execute_dag(dag: schemas.Dag, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): af_job = dag_job_submit(dag.dag_uuid, dag.dag_script,db) return af_job @router.get("/debug_execute") @web_try() @sxtimeit def debug_execute(dag_uuid: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): relation = crud.get_dag_af_id(db,dag_uuid, 'debug') if relation is None: return False return True @router.get("/debug_status") @web_try() @sxtimeit def get_dag_debug_status(dag_uuid: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): relation = crud.get_dag_af_id(db,dag_uuid, 'debug') af_job_run = crud.get_airflow_run_once_debug_mode(db,relation.af_id) job_run_res = get_job_run_status(af_job_run.id) job_run_status = job_run_res['data']['status'] af_job = crud.get_airflow_job_once(db, af_job_run.job_id) task = list(af_job.tasks)[0] if len(list(af_job.tasks))>0 else None nodes = {} if task: task_script = json.loads(task['script']) for node in task_script['sub_nodes']: task_id = str(task['id'])+'_'+node['id'] task_log_res = get_task_log(af_job.id, af_job_run.af_run_id, task_id) task_log = task_log_res['data'] if 'data' in task_log_res.keys() else None if task_log: nodes.update({node['id']:task_log['status'] if task_log['status'] else 'running'}) res = { "job":job_run_status if job_run_status else af_job_run.status, "nodes": nodes } return res @router.get("/node_log") @web_try() @sxtimeit def get_dag_debug_status(dag_uuid: str, node_id: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): relation = crud.get_dag_af_id(db,dag_uuid, 'debug') af_job_run = crud.get_airflow_run_once_debug_mode(db,relation.af_id) af_job = crud.get_airflow_job_once(db, af_job_run.job_id) task = list(af_job.tasks)[0] if len(list(af_job.tasks))>0 else None if task: task_id = str(task['id'])+'_'+node_id task_log_res = get_task_log(af_job.id, af_job_run.af_run_id, task_id) task_log = task_log_res['data'] if 'data' in task_log_res.keys() else None if task_log: return task_log['log'] return None @router.get("/node_result") @web_try() @sxtimeit def get_dag_debug_result(dag_uuid: str, node_id: str, out_pin: int , token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): table_name = get_tmp_table_name(dag_uuid, node_id, str(out_pin), db) result = hiveDs.get_preview_data(table_name,500) return result