import time from app import crud, models from app.utils.send_util import * from app.utils.utils import get_cmd_parameter from sqlalchemy.orm import Session def datax_create_job(job_info: models.JobInfo, db: Session): af_task = datax_create_task(job_info) cron: str = job_info.job_cron cron = cron.replace('?','*') af_job = { "tasks": [af_task], "name": job_info.job_desc, "dependence": [], "cron": cron, "desc": job_info.job_desc, "route_strategy": job_info.executor_route_strategy, "block_strategy": job_info.executor_block_strategy, "executor_timeout": job_info.executor_timeout, "executor_fail_retry_count": job_info.executor_fail_retry_count, "trigger_status": job_info.trigger_status, "job_mode":1, "job_type": 0, "user_id": 0, } res = send_post('/af/af_job', af_job) af_job = res['data'] send_submit(af_job['id']) return af_job def datax_create_task(job_info: models.JobInfo): cmd_parameter = get_cmd_parameter(job_info.jvm_param) partition_list = [] if job_info.partition_info is not None and job_info.partition_info != '': partition_list = job_info.partition_info.split(',') first_begin_time = int(time.time()) if job_info.inc_start_time is not None and job_info.inc_start_time != '': first_begin_time = job_info.inc_start_time last_key = 'lastTime' if job_info.last_time is not None and job_info.last_time != '': last_key = job_info.last_time current_key = 'currentTime' if job_info.current_time is not None and job_info.current_time != '': current_key = job_info.current_time envs = { "first_begin_time": first_begin_time, "last_key": last_key, "current_key": current_key, "partition_key": "partition", "partition_word": partition_list[0] if len(partition_list) > 0 else 'xujiayue', "partition_format": partition_list[2] if len(partition_list) > 0 else '%Y-%m-%d', "partition_diff": partition_list[1] if len(partition_list) > 0 else 0, } if job_info.executor_timeout is not None and job_info.executor_timeout >=0: envs.update({"execution_timeout": job_info.executor_timeout * 60}) if job_info.executor_fail_retry_count is not None and job_info.executor_fail_retry_count >= 0: envs.update({"retries":job_info.executor_fail_retry_count}) af_task = { "name": job_info.job_desc, "file_urls": [], "script": job_info.job_json, "cmd": "", "cmd_parameters": cmd_parameter, "envs": envs, "run_image": "", "task_type": "datax", "user_id": 0, } res = send_post('/af/af_task', af_task) af_task = res['data'] return af_task def datax_update_job(job_info: models.JobInfo, db: Session): relation = crud.get_af_id(db, job_info.id, 'datax') af_job_id = relation.af_id res = send_get("/af/af_job/getOnce",af_job_id) old_af_job = res['data'] old_af_task = old_af_job['tasks'][0] af_task = datax_put_task(job_info,old_af_task) cron: str = job_info.job_cron cron = cron.replace('?','*') af_job = { "tasks": [af_task], "name": job_info.job_desc, "dependence": [], "cron": cron, "desc": job_info.job_desc, "route_strategy": job_info.executor_route_strategy, "block_strategy": job_info.executor_block_strategy, "executor_timeout": job_info.executor_timeout, "executor_fail_retry_count": job_info.executor_fail_retry_count, "trigger_status": job_info.trigger_status, } res = send_put('/af/af_job', old_af_job['id'], af_job) af_job = res['data'] send_submit(af_job['id']) return af_job def datax_put_task(job_info: models.JobInfo,old_af_task): cmd_parameter = get_cmd_parameter(job_info.jvm_param) partition_list = [] if job_info.partition_info is not None and job_info.partition_info != '': partition_list = job_info.partition_info.split(',') first_begin_time = int(time.time()) if job_info.inc_start_time is not None and job_info.inc_start_time != '': first_begin_time = job_info.inc_start_time last_key = 'lastTime' if job_info.last_time is not None and job_info.last_time != '': last_key = job_info.last_time current_key = 'currentTime' if job_info.current_time is not None and job_info.current_time != '': current_key = job_info.current_time envs = { "first_begin_time": first_begin_time, "last_key": last_key, "current_key": current_key, "partition_key": "partition", "partition_word": partition_list[0] if len(partition_list) > 0 else 'xujiayue', "partition_format": partition_list[2] if len(partition_list) > 0 else '%Y-%m-%d', "partition_diff": partition_list[1] if len(partition_list) > 0 else 0 } if job_info.executor_timeout is not None and job_info.executor_timeout >=0: envs.update({"execution_timeout": job_info.executor_timeout * 60}) if job_info.executor_fail_retry_count is not None and job_info.executor_fail_retry_count >= 0: envs.update({"retries":job_info.executor_fail_retry_count}) af_task = { "name": job_info.job_desc, "file_urls": [], "script": job_info.job_json, "cmd": "", "cmd_parameters": cmd_parameter, "envs": envs, "run_image": "", } res = send_put('/af/af_task', old_af_task['id'],af_task) af_task = res['data'] return af_task def on_off_control(af_job_id: int,status: int): for i in range(0,11): parsed_res = get_job_last_parsed_time(af_job_id) last_parsed_time = parsed_res['data']['last_parsed_time'] if last_parsed_time: send_pause(af_job_id,status) print(f"{af_job_id}<==状态修改成功==>{last_parsed_time}") break if i >= 10: raise Exception(f"{af_job_id}==>状态修改失败") time.sleep(2) def execute_job(af_job_id: int): res = get_job_last_parsed_time(af_job_id) current_time = res['data']['last_parsed_time'] if 'last_parsed_time' in res['data'].keys() else None send_submit(af_job_id) for i in range(0,21): parsed_res = get_job_last_parsed_time(af_job_id) last_parsed_time = parsed_res['data']['last_parsed_time'] if last_parsed_time and last_parsed_time != current_time: res = send_execute(af_job_id) print(f"{af_job_id}<==任务执行成功==>{last_parsed_time}") return res if i >= 20: raise Exception(f"{af_job_id}==>文件正在转化中") time.sleep(2)