123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- import time
- from app import crud, models
- from app.utils.send_util import *
- from app.utils.utils import get_cmd_parameter
- from sqlalchemy.orm import Session
- def datax_create_job(job_info: models.JobInfo, db: Session):
- af_task = datax_create_task(job_info)
- cron: str = job_info.job_cron
- cron = cron.replace('?','*')
- af_job = {
- "tasks": [af_task],
- "name": job_info.job_desc,
- "dependence": [],
- "cron": cron,
- "desc": job_info.job_desc,
- "route_strategy": job_info.executor_route_strategy,
- "block_strategy": job_info.executor_block_strategy,
- "executor_timeout": job_info.executor_timeout,
- "executor_fail_retry_count": job_info.executor_fail_retry_count,
- "trigger_status": job_info.trigger_status,
- "job_mode":1,
- "job_type": 0,
- "user_id": 0,
- }
- res = send_post('/af/af_job', af_job)
- af_job = res['data']
- send_submit(af_job['id'])
- return af_job
- def datax_create_task(job_info: models.JobInfo):
- cmd_parameter = get_cmd_parameter(job_info.jvm_param)
- partition_list = []
- if job_info.partition_info is not None and job_info.partition_info != '':
- partition_list = job_info.partition_info.split(',')
- first_begin_time = int(time.time())
- if job_info.inc_start_time is not None and job_info.inc_start_time != '':
- first_begin_time = job_info.inc_start_time
- last_key = 'lastTime'
- if job_info.last_time is not None and job_info.last_time != '':
- last_key = job_info.last_time
- current_key = 'currentTime'
- if job_info.current_time is not None and job_info.current_time != '':
- current_key = job_info.current_time
- envs = {
- "first_begin_time": first_begin_time,
- "last_key": last_key,
- "current_key": current_key,
- "partition_key": "partition",
- "partition_word": partition_list[0] if len(partition_list) > 0 else 'xujiayue',
- "partition_format": partition_list[2] if len(partition_list) > 0 else '%Y-%m-%d',
- "partition_diff": partition_list[1] if len(partition_list) > 0 else 0,
- }
- if job_info.executor_timeout is not None and job_info.executor_timeout >=0:
- envs.update({"execution_timeout": job_info.executor_timeout * 60})
- if job_info.executor_fail_retry_count is not None and job_info.executor_fail_retry_count >= 0:
- envs.update({"retries":job_info.executor_fail_retry_count})
- af_task = {
- "name": job_info.job_desc,
- "file_urls": [],
- "script": job_info.job_json,
- "cmd": "",
- "cmd_parameters": cmd_parameter,
- "envs": envs,
- "run_image": "",
- "task_type": "datax",
- "user_id": 0,
- }
- res = send_post('/af/af_task', af_task)
- af_task = res['data']
- return af_task
- def datax_update_job(job_info: models.JobInfo, db: Session):
- relation = crud.get_af_id(db, job_info.id, 'datax')
- af_job_id = relation.af_id
- res = send_get("/af/af_job/getOnce",af_job_id)
- old_af_job = res['data']
- old_af_task = old_af_job['tasks'][0]
- af_task = datax_put_task(job_info,old_af_task)
- cron: str = job_info.job_cron
- cron = cron.replace('?','*')
- af_job = {
- "tasks": [af_task],
- "name": job_info.job_desc,
- "dependence": [],
- "cron": cron,
- "desc": job_info.job_desc,
- "route_strategy": job_info.executor_route_strategy,
- "block_strategy": job_info.executor_block_strategy,
- "executor_timeout": job_info.executor_timeout,
- "executor_fail_retry_count": job_info.executor_fail_retry_count,
- "trigger_status": job_info.trigger_status,
- }
- res = send_put('/af/af_job', old_af_job['id'], af_job)
- af_job = res['data']
- send_submit(af_job['id'])
- return af_job
- def datax_put_task(job_info: models.JobInfo,old_af_task):
- cmd_parameter = get_cmd_parameter(job_info.jvm_param)
- partition_list = []
- if job_info.partition_info is not None and job_info.partition_info != '':
- partition_list = job_info.partition_info.split(',')
- first_begin_time = int(time.time())
- if job_info.inc_start_time is not None and job_info.inc_start_time != '':
- first_begin_time = job_info.inc_start_time
- last_key = 'lastTime'
- if job_info.last_time is not None and job_info.last_time != '':
- last_key = job_info.last_time
- current_key = 'currentTime'
- if job_info.current_time is not None and job_info.current_time != '':
- current_key = job_info.current_time
- envs = {
- "first_begin_time": first_begin_time,
- "last_key": last_key,
- "current_key": current_key,
- "partition_key": "partition",
- "partition_word": partition_list[0] if len(partition_list) > 0 else 'xujiayue',
- "partition_format": partition_list[2] if len(partition_list) > 0 else '%Y-%m-%d',
- "partition_diff": partition_list[1] if len(partition_list) > 0 else 0
- }
- if job_info.executor_timeout is not None and job_info.executor_timeout >=0:
- envs.update({"execution_timeout": job_info.executor_timeout * 60})
- if job_info.executor_fail_retry_count is not None and job_info.executor_fail_retry_count >= 0:
- envs.update({"retries":job_info.executor_fail_retry_count})
- af_task = {
- "name": job_info.job_desc,
- "file_urls": [],
- "script": job_info.job_json,
- "cmd": "",
- "cmd_parameters": cmd_parameter,
- "envs": envs,
- "run_image": "",
- }
- res = send_put('/af/af_task', old_af_task['id'],af_task)
- af_task = res['data']
- return af_task
- def on_off_control(af_job_id: int,status: int):
- for i in range(0,11):
- parsed_res = get_job_last_parsed_time(af_job_id)
- last_parsed_time = parsed_res['data']['last_parsed_time']
- if last_parsed_time:
- send_pause(af_job_id,status)
- print(f"{af_job_id}<==状态修改成功==>{last_parsed_time}")
- break
- if i >= 10:
- raise Exception(f"{af_job_id}==>状态修改失败")
- time.sleep(2)
- def execute_job(af_job_id: int):
- res = get_job_last_parsed_time(af_job_id)
- current_time = res['data']['last_parsed_time'] if 'last_parsed_time' in res['data'].keys() else None
- send_submit(af_job_id)
- for i in range(0,21):
- parsed_res = get_job_last_parsed_time(af_job_id)
- last_parsed_time = parsed_res['data']['last_parsed_time']
- if last_parsed_time and last_parsed_time != current_time:
- res = send_execute(af_job_id)
- print(f"{af_job_id}<==任务执行成功==>{last_parsed_time}")
- return res
- if i >= 20:
- raise Exception(f"{af_job_id}==>文件正在转化中")
- time.sleep(2)
|