import time from app import crud, models from app.utils.send_util import * from app.utils.utils import get_cmd_parameter from sqlalchemy.orm import Session def datax_create_job(job_info: models.JobInfo, db: Session): af_task = datax_create_task(job_info) cron: str = job_info.job_cron cron.replace('?','*') af_job = { "tasks": [af_task], "name": job_info.job_desc, "dependence": [], "cron": cron, "desc": job_info.job_desc, "route_strategy": job_info.executor_route_strategy, "block_strategy": job_info.executor_block_strategy, "executor_timeout": job_info.executor_timeout, "executor_fail_retry_count": job_info.executor_fail_retry_count, "trigger_status": job_info.trigger_status, "job_mode":1, "job_type": 0, "user_id": 0, } res = send_post('/af/af_job', af_job) af_job = res['data'] crud.create_relation(db, job_info.id,'datax', af_job['id']) send_submit(af_job['id']) get_job_last_parsed_time() # on_off_control(af_job['id'], job_info.trigger_status) def datax_create_task(job_info: models.JobInfo): cmd_parameter = get_cmd_parameter(job_info.jvm_param) partition_list = [] if job_info.partition_info is not None and job_info.partition_info != '': partition_list = job_info.partition_info.split(',') envs = {} if job_info.inc_start_time and job_info.last_time and len(partition_list) > 0 and job_info.current_time: envs = { "first_begin_time": job_info.inc_start_time, "last_key": job_info.last_time, "current_key": job_info.current_time, "partition_key": "partition", "partition_word": partition_list[0] if len(partition_list) > 0 else '', "partition_format": partition_list[2] if len(partition_list) > 0 else '', "partition_diff": partition_list[1] if len(partition_list) > 0 else '' } af_task = { "name": job_info.job_desc, "file_urls": [], "script": job_info.job_json, "cmd": "", "cmd_parameters": cmd_parameter, "envs": envs, "run_image": "", "task_type": "datax", "user_id": 0, } res = send_post('/af/af_task', af_task) af_task = res['data'] return af_task def datax_update_job(job_info: models.JobInfo, db: Session): relation = crud.get_af_id(db, job_info.id, 'datax') af_job_id = relation.af_id res = send_get("/af/af_job/getOnce",af_job_id) old_af_job = res['data'] old_af_task = old_af_job['tasks'][0] af_task = datax_put_task(job_info,old_af_task) cron: str = job_info.job_cron cron.replace('?','*') af_job = { "tasks": [af_task], "name": job_info.job_desc, "dependence": [], "cron": cron, "desc": job_info.job_desc, "route_strategy": job_info.executor_route_strategy, "block_strategy": job_info.executor_block_strategy, "executor_timeout": job_info.executor_timeout, "executor_fail_retry_count": job_info.executor_fail_retry_count, "trigger_status": job_info.trigger_status, } res = send_put('/af/af_job', old_af_job['id'], af_job) af_job = res['data'] send_submit(af_job['id']) on_off_control(af_job['id'], job_info.trigger_status) def datax_put_task(job_info: models.JobInfo,old_af_task): cmd_parameter = get_cmd_parameter(job_info.jvm_param) partition_list = [] if job_info.partition_info is not None and job_info.partition_info != '': partition_list = job_info.partition_info.split(',') envs = {} if job_info.inc_start_time and job_info.last_time and len(partition_list) > 0 and job_info.current_time: envs = { "first_begin_time": job_info.inc_start_time, "last_key": job_info.last_time, "current_key": job_info.current_time, "partition_key": "partition", "partition_word": partition_list[0] if len(partition_list) > 0 else '', "partition_format": partition_list[2] if len(partition_list) > 0 else '', "partition_diff": partition_list[1] if len(partition_list) > 0 else '' } af_task = { "name": job_info.job_desc, "file_urls": [], "script": job_info.job_json, "cmd": "", "cmd_parameters": cmd_parameter, "envs": envs, "run_image": "", } res = send_put('/af/af_task', old_af_task['id'],af_task) af_task = res['data'] return af_task def datax_job_submit(job_info: models.JobInfo, db: Session): relation = crud.get_af_id(db, job_info.id, 'datax') if not relation: datax_create_job(job_info,db) else: datax_update_job(job_info,db) def on_off_control(af_job_id: int,status: int): for i in range(0,11): parsed_res = get_job_last_parsed_time(af_job_id) last_parsed_time = parsed_res['data']['last_parsed_time'] if last_parsed_time: send_pause(af_job_id,status) print(f"{af_job_id}<==状态修改成功==>{last_parsed_time}") break if i >= 10: raise Exception(f"{af_job_id}==>状态修改失败") time.sleep(2)