datax.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. import time
  2. from app import crud, models
  3. from app.utils.send_util import *
  4. from app.utils.utils import get_cmd_parameter
  5. from sqlalchemy.orm import Session
  6. def datax_create_job(job_info: models.JobInfo, db: Session):
  7. af_task = datax_create_task(job_info)
  8. cron: str = job_info.job_cron
  9. cron.replace('?','*')
  10. af_job = {
  11. "tasks": [af_task],
  12. "name": job_info.job_desc,
  13. "dependence": [],
  14. "cron": cron,
  15. "desc": job_info.job_desc,
  16. "route_strategy": job_info.executor_route_strategy,
  17. "block_strategy": job_info.executor_block_strategy,
  18. "executor_timeout": job_info.executor_timeout,
  19. "executor_fail_retry_count": job_info.executor_fail_retry_count,
  20. "trigger_status": job_info.trigger_status,
  21. "job_mode":1,
  22. "job_type": 0,
  23. "user_id": 0,
  24. }
  25. res = send_post('/jpt/af_job', af_job)
  26. af_job = res['data']
  27. crud.create_relation(db, job_info.id,'datax', af_job['id'])
  28. send_submit(af_job['id'])
  29. get_job_last_parsed_time()
  30. # on_off_control(af_job['id'], job_info.trigger_status)
  31. def datax_create_task(job_info: models.JobInfo):
  32. cmd_parameter = get_cmd_parameter(job_info.jvm_param)
  33. partition_list = []
  34. if job_info.partition_info is not None and job_info.partition_info != '':
  35. partition_list = job_info.partition_info.split(',')
  36. envs = {}
  37. if job_info.inc_start_time and job_info.last_time and len(partition_list) > 0 and job_info.current_time:
  38. envs = {
  39. "first_begin_time": job_info.inc_start_time,
  40. "last_key": job_info.last_time,
  41. "current_key": job_info.current_time,
  42. "partition_key": "partition",
  43. "partition_word": partition_list[0] if len(partition_list) > 0 else '',
  44. "partition_format": partition_list[2] if len(partition_list) > 0 else '',
  45. "partition_diff": partition_list[1] if len(partition_list) > 0 else ''
  46. }
  47. af_task = {
  48. "name": job_info.job_desc,
  49. "file_urls": [],
  50. "script": job_info.job_json,
  51. "cmd": "",
  52. "cmd_parameters": cmd_parameter,
  53. "envs": envs,
  54. "run_image": "",
  55. "task_type": "datax",
  56. "user_id": 0,
  57. }
  58. res = send_post('/jpt/af_task', af_task)
  59. af_task = res['data']
  60. return af_task
  61. def datax_update_job(job_info: models.JobInfo, db: Session):
  62. relation = crud.get_af_id(db, job_info.id, 'datax')
  63. af_job_id = relation.af_id
  64. res = send_get("/jpt/af_job/getOnce",af_job_id)
  65. old_af_job = res['data']
  66. old_af_task = old_af_job['tasks'][0]
  67. af_task = datax_put_task(job_info,old_af_task)
  68. cron: str = job_info.job_cron
  69. cron.replace('?','*')
  70. af_job = {
  71. "tasks": [af_task],
  72. "name": job_info.job_desc,
  73. "dependence": [],
  74. "cron": cron,
  75. "desc": job_info.job_desc,
  76. "route_strategy": job_info.executor_route_strategy,
  77. "block_strategy": job_info.executor_block_strategy,
  78. "executor_timeout": job_info.executor_timeout,
  79. "executor_fail_retry_count": job_info.executor_fail_retry_count,
  80. "trigger_status": job_info.trigger_status,
  81. }
  82. res = send_put('/jpt/af_job', old_af_job['id'], af_job)
  83. af_job = res['data']
  84. send_submit(af_job['id'])
  85. on_off_control(af_job['id'], job_info.trigger_status)
  86. def datax_put_task(job_info: models.JobInfo,old_af_task):
  87. cmd_parameter = get_cmd_parameter(job_info.jvm_param)
  88. partition_list = []
  89. if job_info.partition_info is not None and job_info.partition_info != '':
  90. partition_list = job_info.partition_info.split(',')
  91. envs = {}
  92. if job_info.inc_start_time and job_info.last_time and len(partition_list) > 0 and job_info.current_time:
  93. envs = {
  94. "first_begin_time": job_info.inc_start_time,
  95. "last_key": job_info.last_time,
  96. "current_key": job_info.current_time,
  97. "partition_key": "partition",
  98. "partition_word": partition_list[0] if len(partition_list) > 0 else '',
  99. "partition_format": partition_list[2] if len(partition_list) > 0 else '',
  100. "partition_diff": partition_list[1] if len(partition_list) > 0 else ''
  101. }
  102. af_task = {
  103. "name": job_info.job_desc,
  104. "file_urls": [],
  105. "script": job_info.job_json,
  106. "cmd": "",
  107. "cmd_parameters": cmd_parameter,
  108. "envs": envs,
  109. "run_image": "",
  110. }
  111. res = send_put('/jpt/af_task', old_af_task['id'],af_task)
  112. af_task = res['data']
  113. return af_task
  114. def datax_job_submit(job_info: models.JobInfo, db: Session):
  115. relation = crud.get_af_id(db, job_info.id, 'datax')
  116. if not relation:
  117. datax_create_job(job_info,db)
  118. else:
  119. datax_update_job(job_info,db)
  120. def on_off_control(af_job_id: int,status: int):
  121. for i in range(0,11):
  122. parsed_res = get_job_last_parsed_time(af_job_id)
  123. last_parsed_time = parsed_res['data']['last_parsed_time']
  124. if last_parsed_time:
  125. send_pause(af_job_id,status)
  126. print(f"{af_job_id}<==状态修改成功==>{last_parsed_time}")
  127. break
  128. if i >= 10:
  129. raise Exception(f"{af_job_id}==>状态修改失败")
  130. time.sleep(2)