from asyncio import current_task import time from app import crud, models from app.utils.send_util import * from app.utils.utils import get_cmd_parameter from sqlalchemy.orm import Session from app.common.hive import hiveDs from configs.settings import DefaultOption, config database_name = config.get('HIVE', 'DATABASE_NAME') requirement_path = config.get('REQUIREMENTS_CONFIG', 'path') requirement_prefix = config.get('REQUIREMENTS_CONFIG', 'prefix') def dag_create_job(dag_uuid:str,dag_script: str,db: Session): af_task = dag_create_task(dag_uuid, dag_script, db) af_job = { "tasks": [af_task], "name": "调试任务", "dependence": [], "cron": "None", "desc": "调试任务", "route_strategy": "", "block_strategy": "", "executor_timeout": 0, "executor_fail_retry_count": 0, "trigger_status": 1, "job_mode":2, "job_type": 0, "user_id": 0, } res = send_post('/af/af_job', af_job) af_job = res['data'] crud.create_debug_relation(db,dag_uuid,'debug',af_job['id']) return af_job def dag_create_task(dag_uuid:str,dag_script: str,db: Session): envs = {} requirements_relation = crud.get_requirements_relation(db, dag_uuid) if requirements_relation: requirements = crud.get_requirements_status(db, dag_uuid) if requirements.status != 2: raise Exception('依赖未安装成功,不可执行') envs.update({'requirement_package_path': f'{requirement_prefix}{requirement_path}/dag_{dag_uuid.lower()}.zip'}) af_task = { "name": "调试作业", "file_urls": [], "script": dag_script, "cmd": "", "cmd_parameters": "", "envs": envs, "run_image": "", "task_type": "sparks", "user_id": 0, } res = send_post('/af/af_task', af_task) af_task = res['data'] return af_task def dag_update_job(dag_uuid:str, dag_script: str, db: Session): relation = crud.get_dag_af_id(db, dag_uuid, 'debug') af_job_id = relation.af_id res = send_get("/af/af_job/getOnce",af_job_id) old_af_job = res['data'] old_af_task = old_af_job['tasks'][0] af_task = dag_put_task(dag_uuid, dag_script, db, old_af_task) af_job = { "tasks": [af_task], "name": "调试任务", "dependence": [], "cron": "None", "desc": "调试任务", "route_strategy": "", "block_strategy": "", "executor_timeout": 0, "executor_fail_retry_count": 0, "trigger_status": 1, } res = send_put('/af/af_job', old_af_job['id'], af_job) af_job = res['data'] return af_job def dag_put_task(dag_uuid:str,dag_script: str,db: Session,old_af_task): envs = {} requirements_relation = crud.get_requirements_relation(db, dag_uuid) if requirements_relation: requirements = crud.get_requirements_status(db, dag_uuid) if requirements.status != 2: raise Exception('依赖未安装成功,不可执行') envs.update({'requirement_package_path': f'{requirement_prefix}{requirement_path}/dag_{dag_uuid.lower()}.zip'}) af_task = { "name": "调试作业", "file_urls": [], "script": dag_script, "cmd": "", "cmd_parameters": "", "envs": envs, "run_image": "", "task_type": "sparks", } res = send_put('/af/af_task', old_af_task['id'],af_task) af_task = res['data'] return af_task def dag_job_submit(dag_uuid:str,dag_script: str,db: Session): job_relation = crud.get_dag_af_id(db,dag_uuid,'debug') af_job = None if job_relation is None: af_job = dag_create_job(dag_uuid, dag_script, db) else: af_job = dag_update_job(dag_uuid, dag_script, db) res = get_job_last_parsed_time(af_job['id']) current_time = res['data']['last_parsed_time'] if 'last_parsed_time' in res['data'].keys() else None send_submit(af_job['id']) for i in range(0,21): time.sleep(2) res = get_job_last_parsed_time(af_job['id']) last_parsed_time = res['data']['last_parsed_time'] if last_parsed_time and current_time != last_parsed_time: send_pause(af_job['id'],1) send_execute(af_job['id']) print(f"{af_job['id']}<==执行成功==>{last_parsed_time}") break if i >= 20: raise Exception(f"{af_job['id']}==>执行失败") return af_job def get_tmp_table_name(dag_uuid: str, node_id: str, out_pin: str, db: Session): relation = crud.get_dag_af_id(db,dag_uuid, 'debug') job_id = relation.af_id af_job_run = crud.get_airflow_run_once_debug_mode(db,job_id) tasks = af_job_run.details['tasks'] if len(af_job_run.details['tasks'])>0 else {} task_id = None for task in tasks: t_id = task.split('_')[0] n_id = task.split('_')[1] if n_id == node_id: task_id = t_id break if task_id: table_name = f'job{job_id}_task{task_id}_subnode{node_id}_output{out_pin}_tmp' t_list = hiveDs.list_tables() table_name = table_name.lower() if table_name not in t_list: raise Exception('该节点不存在中间结果') return table_name else: raise Exception('该节点不存在中间结果') def get_transfer_table_name(project_id: str, user_id: str, name: str, ): current_time = int(time.time()) return f'{database_name}.project{project_id.lower()}_user{user_id.lower()}_{name.lower()}_{current_time}'