1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 |
- from app import crud, models
- from app.utils.send_util import *
- from app.utils.utils import get_cmd_parameter
- from sqlalchemy.orm import Session
- def datax_create_job(job_info: models.JobInfo, db: Session):
- af_task = datax_create_task(job_info)
- af_job = {
- "tasks": [af_task],
- "name": job_info.job_desc,
- "dependence": [],
- "cron": job_info.job_cron,
- "desc": job_info.job_desc,
- "route_strategy": job_info.executor_route_strategy,
- "block_strategy": job_info.executor_block_strategy,
- "executor_timeout": job_info.executor_timeout,
- "executor_fail_retry_count": job_info.executor_fail_retry_count,
- "trigger_status": job_info.trigger_status,
- "job_mode":1,
- "job_type": 0,
- "user_id": 0,
- }
- res = send_post('/jpt/af_job', af_job)
- af_job = res['data']
- crud.create_relation(db, job_info.id,'datax', af_job['id'])
- send_submit(af_job['id'])
- def datax_create_task(job_info: models.JobInfo):
- cmd_parameter = get_cmd_parameter(job_info.jvm_param, job_info.inc_start_time, job_info.replace_param, job_info.partition_info)
- af_task = {
- "name": job_info.job_desc,
- "file_urls": [],
- "script": job_info.job_json,
- "cmd": "",
- "cmd_parameters": cmd_parameter,
- "envs": {},
- "run_image": "",
- "task_type": "datax",
- "user_id": 0,
- }
- res = send_post('/jpt/af_task', af_task)
- af_task = res['data']
- return af_task
- def datax_update_job(job_info: models.JobInfo, db: Session):
- relation = crud.get_af_id(db, job_info.id, 'datax')
- af_job_id = relation.af_id
- res = send_get("/jpt/af_job/getOnce",af_job_id)
- old_af_job = res['data']
- old_af_task = old_af_job['tasks'][0]
- af_task = datax_put_task(job_info,old_af_task)
- af_job = {
- "tasks": [af_task],
- "name": job_info.job_desc,
- "dependence": [],
- "cron": job_info.job_cron,
- "desc": job_info.job_desc,
- "route_strategy": job_info.executor_route_strategy,
- "block_strategy": job_info.executor_block_strategy,
- "executor_timeout": job_info.executor_timeout,
- "executor_fail_retry_count": job_info.executor_fail_retry_count,
- "trigger_status": job_info.trigger_status,
- }
- res = send_put('/jpt/af_job', old_af_job['id'], af_job)
- af_job = res['data']
- send_submit(af_job['id'])
- def datax_put_task(job_info: models.JobInfo,old_af_task):
- cmd_parameter = get_cmd_parameter(job_info.jvm_param, job_info.inc_start_time, job_info.replace_param, job_info.partition_info)
- af_task = {
- "name": job_info.job_desc,
- "file_urls": [],
- "script": job_info.job_json,
- "cmd": "",
- "cmd_parameters": cmd_parameter,
- "envs": {},
- "run_image": "",
- }
- res = send_put('/jpt/af_task', old_af_task['id'],af_task)
- af_task = res['data']
- return af_task
|