import json from sqlalchemy.orm import Session from fastapi import Depends from fastapi import APIRouter from app.common.security.auth import verify_users from app.services.dag import dag_job_submit, get_tmp_table_name from app import crud, models, schemas from app.utils.send_util import get_job_run_status, get_running_status, get_task_log, post_install_requirements from constants.constants import RUN_STATUS from utils.sx_time import sxtimeit from utils.sx_web import web_try from app.common.hive import hiveDs from app import get_db from configs.settings import DefaultOption, config database_name = config.get('HIVE', 'database_name') base_path = config.get('HIVE', 'base_path') hdfs_path = config.get('HADOOP_INNER', 'default_fs') requirement_path = config.get('REQUIREMENTS_CONFIG', 'path') router = APIRouter( prefix="/jpt/dag", tags=["dag-dag管理"], ) @router.post("/execute") @web_try() @sxtimeit def execute_dag(dag: schemas.Dag, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): relation = crud.get_requirements_status(db, dag.dag_uuid) if relation and relation.status in [0,1]: raise Exception('依赖正在安装中') elif relation and relation.status in [3]: raise Exception('依赖安装失败,请重新安装') af_job = dag_job_submit(dag.dag_uuid, dag.dag_script,db) return af_job @router.get("/debug_execute") @web_try() @sxtimeit def debug_execute(dag_uuid: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): relation = crud.get_dag_af_id(db,dag_uuid, 'debug') if relation is None: return False return True @router.get("/debug_status") @web_try() @sxtimeit def get_dag_debug_status(dag_uuid: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): relation = crud.get_dag_af_id(db,dag_uuid, 'debug') af_job_run = crud.get_airflow_run_once_debug_mode(db,relation.af_id) job_run_res = get_job_run_status(af_job_run.id) job_run_status = job_run_res['data']['status'] af_job = crud.get_airflow_job_once(db, af_job_run.job_id) task = list(af_job.tasks)[0] if len(list(af_job.tasks))>0 else None nodes = {} if task: task_script = json.loads(task['script']) for node in task_script['sub_nodes']: task_id = str(task['id'])+'_'+node['id'] task_log_res = get_task_log(af_job.id, af_job_run.af_run_id, task_id) task_log = task_log_res['data'] if 'data' in task_log_res.keys() else None if task_log: nodes.update({node['id']:task_log['status'] if task_log['status'] else 'running'}) res = { "job":job_run_status if job_run_status else af_job_run.status, "nodes": nodes } return res @router.get("/node_log") @web_try() @sxtimeit def get_dag_debug_status(dag_uuid: str, node_id: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): relation = crud.get_dag_af_id(db,dag_uuid, 'debug') af_job_run = crud.get_airflow_run_once_debug_mode(db,relation.af_id) af_job = crud.get_airflow_job_once(db, af_job_run.job_id) task = list(af_job.tasks)[0] if len(list(af_job.tasks))>0 else None if task: task_id = str(task['id'])+'_'+node_id task_log_res = get_task_log(af_job.id, af_job_run.af_run_id, task_id) task_log = task_log_res['data'] if 'data' in task_log_res.keys() else None if task_log: return task_log['log'] return None @router.get("/node_result") @web_try() @sxtimeit def get_dag_debug_result(dag_uuid: str, node_id: str, out_pin: int , token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): table_name = get_tmp_table_name(dag_uuid, node_id, str(out_pin), db) result = hiveDs.get_preview_data(table_name,500) result.update({ 'table_name':f'{database_name}.{table_name}', 'table_path':f'{hdfs_path}{base_path}{table_name}' }) return result @router.post("/install_requirements") @web_try() @sxtimeit def install_requirements(item: schemas.Requirements, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): res = post_install_requirements(item.requirements, f'{requirement_path}/dag_{item.dag_uuid.lower()}.zip') af_run_id = res['data']['af_run_id'] running_res = get_running_status(str(-1), af_run_id) running_status = running_res['data']['status'] relation = crud.create_or_update_requirements_relation(db,item.dag_uuid, af_run_id, RUN_STATUS[running_status]) return relation @router.get("/requirements_status") @web_try() @sxtimeit def get_requirements_status(dag_uuid: str, token_data: schemas.TokenData = Depends(verify_users), db: Session = Depends(get_db)): relation = crud.get_requirements_status(db, dag_uuid) if not relation: return "暂无依赖" return relation