from app import crud, models from app.utils.send_util import * from app.utils.utils import get_cmd_parameter from sqlalchemy.orm import Session def datax_create_job(job_info: models.JobInfo, db: Session): af_task = datax_create_task(job_info) cron: str = job_info.job_cron cron.replace('?','*') af_job = { "tasks": [af_task], "name": cron, "dependence": [], "cron": job_info.job_cron, "desc": job_info.job_desc, "route_strategy": job_info.executor_route_strategy, "block_strategy": job_info.executor_block_strategy, "executor_timeout": job_info.executor_timeout, "executor_fail_retry_count": job_info.executor_fail_retry_count, "trigger_status": job_info.trigger_status, "job_mode":1, "job_type": 0, "user_id": 0, } res = send_post('/jpt/af_job', af_job) af_job = res['data'] crud.create_relation(db, job_info.id,'datax', af_job['id']) send_submit(af_job['id']) send_pause(af_job['id'], job_info.trigger_status) def datax_create_task(job_info: models.JobInfo): cmd_parameter = get_cmd_parameter(job_info.jvm_param, job_info.inc_start_time, job_info.replace_param, job_info.partition_info) af_task = { "name": job_info.job_desc, "file_urls": [], "script": job_info.job_json, "cmd": "", "cmd_parameters": cmd_parameter, "envs": {}, "run_image": "", "task_type": "datax", "user_id": 0, } res = send_post('/jpt/af_task', af_task) af_task = res['data'] return af_task def datax_update_job(job_info: models.JobInfo, db: Session): relation = crud.get_af_id(db, job_info.id, 'datax') af_job_id = relation.af_id res = send_get("/jpt/af_job/getOnce",af_job_id) old_af_job = res['data'] old_af_task = old_af_job['tasks'][0] af_task = datax_put_task(job_info,old_af_task) cron: str = job_info.job_cron cron.replace('?','*') af_job = { "tasks": [af_task], "name": job_info.job_desc, "dependence": [], "cron": cron, "desc": job_info.job_desc, "route_strategy": job_info.executor_route_strategy, "block_strategy": job_info.executor_block_strategy, "executor_timeout": job_info.executor_timeout, "executor_fail_retry_count": job_info.executor_fail_retry_count, "trigger_status": job_info.trigger_status, } res = send_put('/jpt/af_job', old_af_job['id'], af_job) af_job = res['data'] send_submit(af_job['id']) send_pause(af_job['id'], job_info.trigger_status) def datax_put_task(job_info: models.JobInfo,old_af_task): cmd_parameter = get_cmd_parameter(job_info.jvm_param, job_info.inc_start_time, job_info.replace_param, job_info.partition_info) af_task = { "name": job_info.job_desc, "file_urls": [], "script": job_info.job_json, "cmd": "", "cmd_parameters": cmd_parameter, "envs": {}, "run_image": "", } res = send_put('/jpt/af_task', old_af_task['id'],af_task) af_task = res['data'] return af_task