123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- import json
- from sqlalchemy.orm import Session
- from fastapi import Depends
- from fastapi import APIRouter
- from app.common.security.auth import verify_users
- from app.services.dag import dag_job_submit, get_tmp_table_name
- from app import crud, models, schemas
- from app.utils.send_util import get_job_run_status, get_task_log
- from constants.constants import RUN_STATUS
- from utils.sx_time import sxtimeit
- from utils.sx_web import web_try
- from app.common.hive import hiveDs
- from app import get_db
- router = APIRouter(
- prefix="/jpt/dag",
- tags=["dag-dag管理"],
- )
- @router.post("/execute")
- @web_try()
- @sxtimeit
- def execute_dag(dag: schemas.Dag, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
- af_job = dag_job_submit(dag.dag_uuid, dag.dag_script,db)
- return af_job
- @router.get("/debug_execute")
- @web_try()
- @sxtimeit
- def debug_execute(dag_uuid: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
- relation = crud.get_dag_af_id(db,dag_uuid, 'debug')
- if relation is None:
- return False
- return True
- @router.get("/debug_status")
- @web_try()
- @sxtimeit
- def get_dag_debug_status(dag_uuid: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
- relation = crud.get_dag_af_id(db,dag_uuid, 'debug')
- af_job_run = crud.get_airflow_run_once_debug_mode(db,relation.af_id)
- job_run_res = get_job_run_status(af_job_run.id)
- job_run_status = job_run_res['data']['status']
- af_job = crud.get_airflow_job_once(db, af_job_run.job_id)
- task = list(af_job.tasks)[0] if len(list(af_job.tasks))>0 else None
- nodes = {}
- if task:
- task_script = json.loads(task['script'])
- for node in task_script['sub_nodes']:
- task_id = str(task['id'])+'_'+node['id']
- task_log_res = get_task_log(af_job.id, af_job_run.af_run_id, task_id)
- task_log = task_log_res['data'] if 'data' in task_log_res.keys() else None
- if task_log:
- nodes.update({node['id']:task_log['status'] if task_log['status'] else 'running'})
- res = {
- "job":job_run_status if job_run_status else af_job_run.status,
- "nodes": nodes
- }
- return res
- @router.get("/node_log")
- @web_try()
- @sxtimeit
- def get_dag_debug_status(dag_uuid: str, node_id: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
- relation = crud.get_dag_af_id(db,dag_uuid, 'debug')
- af_job_run = crud.get_airflow_run_once_debug_mode(db,relation.af_id)
- af_job = crud.get_airflow_job_once(db, af_job_run.job_id)
- task = list(af_job.tasks)[0] if len(list(af_job.tasks))>0 else None
- if task:
- task_id = str(task['id'])+'_'+node_id
- task_log_res = get_task_log(af_job.id, af_job_run.af_run_id, task_id)
- task_log = task_log_res['data'] if 'data' in task_log_res.keys() else None
- if task_log:
- return task_log['log']
- return None
- @router.get("/node_result")
- @web_try()
- @sxtimeit
- def get_dag_debug_result(dag_uuid: str, node_id: str, out_pin: int , token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)):
- table_name = get_tmp_table_name(dag_uuid, node_id, str(out_pin), db)
- result = hiveDs.get_preview_data(table_name,500)
- return result
|