datax.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. import time
  2. from app import crud, models
  3. from app.utils.send_util import *
  4. from app.utils.utils import get_cmd_parameter
  5. from sqlalchemy.orm import Session
  6. def datax_create_job(job_info: models.JobInfo, db: Session):
  7. af_task = datax_create_task(job_info)
  8. cron: str = job_info.job_cron
  9. cron = cron.replace('?','*')
  10. af_job = {
  11. "tasks": [af_task],
  12. "name": job_info.job_desc,
  13. "dependence": [],
  14. "cron": cron,
  15. "desc": job_info.job_desc,
  16. "route_strategy": job_info.executor_route_strategy,
  17. "block_strategy": job_info.executor_block_strategy,
  18. "executor_timeout": job_info.executor_timeout,
  19. "executor_fail_retry_count": job_info.executor_fail_retry_count,
  20. "trigger_status": job_info.trigger_status,
  21. "job_mode":1,
  22. "job_type": 0,
  23. "user_id": 0,
  24. }
  25. res = send_post('/af/af_job', af_job)
  26. af_job = res['data']
  27. send_submit(af_job['id'])
  28. return af_job
  29. def datax_create_task(job_info: models.JobInfo):
  30. cmd_parameter = get_cmd_parameter(job_info.jvm_param)
  31. partition_list = []
  32. if job_info.partition_info is not None and job_info.partition_info != '':
  33. partition_list = job_info.partition_info.split(',')
  34. first_begin_time = int(time.time())
  35. if job_info.inc_start_time is not None and job_info.inc_start_time != '':
  36. first_begin_time = job_info.inc_start_time
  37. last_key = 'lastTime'
  38. if job_info.last_time is not None and job_info.last_time != '':
  39. last_key = job_info.last_time
  40. current_key = 'currentTime'
  41. if job_info.current_time is not None and job_info.current_time != '':
  42. current_key = job_info.current_time
  43. envs = {
  44. "first_begin_time": first_begin_time,
  45. "last_key": last_key,
  46. "current_key": current_key,
  47. "partition_key": "partition",
  48. "partition_word": partition_list[0] if len(partition_list) > 0 else 'xujiayue',
  49. "partition_format": partition_list[2] if len(partition_list) > 0 else '%Y-%m-%d',
  50. "partition_diff": partition_list[1] if len(partition_list) > 0 else 0,
  51. }
  52. if job_info.executor_timeout is not None and job_info.executor_timeout >=0:
  53. envs.update({"execution_timeout": job_info.executor_timeout * 60})
  54. if job_info.executor_fail_retry_count is not None and job_info.executor_fail_retry_count >= 0:
  55. envs.update({"retries":job_info.executor_fail_retry_count})
  56. af_task = {
  57. "name": job_info.job_desc,
  58. "file_urls": [],
  59. "script": job_info.job_json,
  60. "cmd": "",
  61. "cmd_parameters": cmd_parameter,
  62. "envs": envs,
  63. "run_image": "",
  64. "task_type": "datax",
  65. "user_id": 0,
  66. }
  67. res = send_post('/af/af_task', af_task)
  68. af_task = res['data']
  69. return af_task
  70. def datax_update_job(job_info: models.JobInfo, db: Session):
  71. relation = crud.get_af_id(db, job_info.id, 'datax')
  72. af_job_id = relation.af_id
  73. res = send_get("/af/af_job/getOnce",af_job_id)
  74. old_af_job = res['data']
  75. old_af_task = old_af_job['tasks'][0]
  76. af_task = datax_put_task(job_info,old_af_task)
  77. cron: str = job_info.job_cron
  78. cron = cron.replace('?','*')
  79. af_job = {
  80. "tasks": [af_task],
  81. "name": job_info.job_desc,
  82. "dependence": [],
  83. "cron": cron,
  84. "desc": job_info.job_desc,
  85. "route_strategy": job_info.executor_route_strategy,
  86. "block_strategy": job_info.executor_block_strategy,
  87. "executor_timeout": job_info.executor_timeout,
  88. "executor_fail_retry_count": job_info.executor_fail_retry_count,
  89. "trigger_status": job_info.trigger_status,
  90. }
  91. res = send_put('/af/af_job', old_af_job['id'], af_job)
  92. af_job = res['data']
  93. send_submit(af_job['id'])
  94. return af_job
  95. def datax_put_task(job_info: models.JobInfo,old_af_task):
  96. cmd_parameter = get_cmd_parameter(job_info.jvm_param)
  97. partition_list = []
  98. if job_info.partition_info is not None and job_info.partition_info != '':
  99. partition_list = job_info.partition_info.split(',')
  100. first_begin_time = int(time.time())
  101. if job_info.inc_start_time is not None and job_info.inc_start_time != '':
  102. first_begin_time = job_info.inc_start_time
  103. last_key = 'lastTime'
  104. if job_info.last_time is not None and job_info.last_time != '':
  105. last_key = job_info.last_time
  106. current_key = 'currentTime'
  107. if job_info.current_time is not None and job_info.current_time != '':
  108. current_key = job_info.current_time
  109. envs = {
  110. "first_begin_time": first_begin_time,
  111. "last_key": last_key,
  112. "current_key": current_key,
  113. "partition_key": "partition",
  114. "partition_word": partition_list[0] if len(partition_list) > 0 else 'xujiayue',
  115. "partition_format": partition_list[2] if len(partition_list) > 0 else '%Y-%m-%d',
  116. "partition_diff": partition_list[1] if len(partition_list) > 0 else 0
  117. }
  118. if job_info.executor_timeout is not None and job_info.executor_timeout >=0:
  119. envs.update({"execution_timeout": job_info.executor_timeout * 60})
  120. if job_info.executor_fail_retry_count is not None and job_info.executor_fail_retry_count >= 0:
  121. envs.update({"retries":job_info.executor_fail_retry_count})
  122. af_task = {
  123. "name": job_info.job_desc,
  124. "file_urls": [],
  125. "script": job_info.job_json,
  126. "cmd": "",
  127. "cmd_parameters": cmd_parameter,
  128. "envs": envs,
  129. "run_image": "",
  130. }
  131. res = send_put('/af/af_task', old_af_task['id'],af_task)
  132. af_task = res['data']
  133. return af_task
  134. def on_off_control(af_job_id: int,status: int):
  135. for i in range(0,11):
  136. parsed_res = get_job_last_parsed_time(af_job_id)
  137. last_parsed_time = parsed_res['data']['last_parsed_time']
  138. if last_parsed_time:
  139. send_pause(af_job_id,status)
  140. print(f"{af_job_id}<==状态修改成功==>{last_parsed_time}")
  141. break
  142. if i >= 10:
  143. raise Exception(f"{af_job_id}==>状态修改失败")
  144. time.sleep(2)
  145. def execute_job(af_job_id: int):
  146. res = get_job_last_parsed_time(af_job_id)
  147. current_time = res['data']['last_parsed_time'] if 'last_parsed_time' in res['data'].keys() else None
  148. send_submit(af_job_id)
  149. for i in range(0,21):
  150. parsed_res = get_job_last_parsed_time(af_job_id)
  151. last_parsed_time = parsed_res['data']['last_parsed_time']
  152. if last_parsed_time and last_parsed_time != current_time:
  153. res = send_execute(af_job_id)
  154. print(f"{af_job_id}<==任务执行成功==>{last_parsed_time}")
  155. return res
  156. if i >= 20:
  157. raise Exception(f"{af_job_id}==>文件正在转化中")
  158. time.sleep(2)