123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- from asyncio import current_task
- import time
- from app import crud, models
- from app.utils.send_util import *
- from app.utils.utils import get_cmd_parameter
- from sqlalchemy.orm import Session
- from app.common.hive import hiveDs
- from configs.settings import DefaultOption, config
- database_name = config.get('HIVE', 'DATABASE_NAME')
- requirement_path = config.get('REQUIREMENTS_CONFIG', 'path')
- requirement_prefix = config.get('REQUIREMENTS_CONFIG', 'prefix')
- def dag_create_job(dag_uuid:str,dag_script: str,db: Session):
- af_task = dag_create_task(dag_uuid, dag_script, db)
- af_job = {
- "tasks": [af_task],
- "name": "调试任务",
- "dependence": [],
- "cron": "None",
- "desc": "调试任务",
- "route_strategy": "",
- "block_strategy": "",
- "executor_timeout": 0,
- "executor_fail_retry_count": 0,
- "trigger_status": 1,
- "job_mode":2,
- "job_type": 0,
- "user_id": 0,
- }
- res = send_post('/af/af_job', af_job)
- af_job = res['data']
- crud.create_debug_relation(db,dag_uuid,'debug',af_job['id'])
- return af_job
- def dag_create_task(dag_uuid:str,dag_script: str,db: Session):
- envs = {}
- requirements_relation = crud.get_requirements_relation(db, dag_uuid)
- if requirements_relation:
- requirements = crud.get_requirements_status(db, dag_uuid)
- if requirements.status != 2:
- raise Exception('依赖未安装成功,不可执行')
- envs.update({'requirement_package_path': f'{requirement_prefix}{requirement_path}/dag_{dag_uuid.lower()}.zip'})
- af_task = {
- "name": "调试作业",
- "file_urls": [],
- "script": dag_script,
- "cmd": "",
- "cmd_parameters": "",
- "envs": envs,
- "run_image": "",
- "task_type": "sparks",
- "user_id": 0,
- }
- res = send_post('/af/af_task', af_task)
- af_task = res['data']
- return af_task
- def dag_update_job(dag_uuid:str, dag_script: str, db: Session):
- relation = crud.get_dag_af_id(db, dag_uuid, 'debug')
- af_job_id = relation.af_id
- res = send_get("/af/af_job/getOnce",af_job_id)
- old_af_job = res['data']
- old_af_task = old_af_job['tasks'][0]
- af_task = dag_put_task(dag_uuid, dag_script, db, old_af_task)
- af_job = {
- "tasks": [af_task],
- "name": "调试任务",
- "dependence": [],
- "cron": "None",
- "desc": "调试任务",
- "route_strategy": "",
- "block_strategy": "",
- "executor_timeout": 0,
- "executor_fail_retry_count": 0,
- "trigger_status": 1,
- }
- res = send_put('/af/af_job', old_af_job['id'], af_job)
- af_job = res['data']
- return af_job
- def dag_put_task(dag_uuid:str,dag_script: str,db: Session,old_af_task):
- envs = {}
- requirements_relation = crud.get_requirements_relation(db, dag_uuid)
- if requirements_relation:
- requirements = crud.get_requirements_status(db, dag_uuid)
- if requirements.status != 2:
- raise Exception('依赖未安装成功,不可执行')
- envs.update({'requirement_package_path': f'{requirement_prefix}{requirement_path}/dag_{dag_uuid.lower()}.zip'})
- af_task = {
- "name": "调试作业",
- "file_urls": [],
- "script": dag_script,
- "cmd": "",
- "cmd_parameters": "",
- "envs": envs,
- "run_image": "",
- "task_type": "sparks",
- }
- res = send_put('/af/af_task', old_af_task['id'],af_task)
- af_task = res['data']
- return af_task
- def dag_job_submit(dag_uuid:str,dag_script: str,db: Session):
- job_relation = crud.get_dag_af_id(db,dag_uuid,'debug')
- af_job = None
- if job_relation is None:
- af_job = dag_create_job(dag_uuid, dag_script, db)
- else:
- af_job = dag_update_job(dag_uuid, dag_script, db)
- res = get_job_last_parsed_time(af_job['id'])
- current_time = res['data']['last_parsed_time'] if 'last_parsed_time' in res['data'].keys() else None
- send_submit(af_job['id'])
- for i in range(0,21):
- time.sleep(2)
- res = get_job_last_parsed_time(af_job['id'])
- last_parsed_time = res['data']['last_parsed_time']
- if last_parsed_time and current_time != last_parsed_time:
- send_pause(af_job['id'],1)
- send_execute(af_job['id'])
- print(f"{af_job['id']}<==执行成功==>{last_parsed_time}")
- break
- if i >= 20:
- raise Exception(f"{af_job['id']}==>执行失败")
- return af_job
- def get_tmp_table_name(dag_uuid: str, node_id: str, out_pin: str, db: Session):
- relation = crud.get_dag_af_id(db,dag_uuid, 'debug')
- job_id = relation.af_id
- af_job_run = crud.get_airflow_run_once_debug_mode(db,job_id)
- tasks = af_job_run.details['tasks'] if len(af_job_run.details['tasks'])>0 else {}
- task_id = None
- for task in tasks:
- t_id = task.split('_')[0]
- n_id = task.split('_')[1]
- if n_id == node_id:
- task_id = t_id
- break
- if task_id:
- table_name = f'job{job_id}_task{task_id}_subnode{node_id}_output{out_pin}_tmp'
- t_list = hiveDs.list_tables()
- table_name = table_name.lower()
- if table_name not in t_list:
- raise Exception('该节点不存在中间结果')
- return table_name
- else:
- raise Exception('该节点不存在中间结果')
- def get_transfer_table_name(project_id: str, user_id: str, name: str, ):
- current_time = int(time.time())
- return f'{database_name}.project{project_id.lower()}_user{user_id.lower()}_{name.lower()}_{current_time}'
|