datax.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. from app import crud, models
  2. from app.utils.send_util import *
  3. from app.utils.utils import get_cmd_parameter
  4. from sqlalchemy.orm import Session
  5. def datax_create_job(job_info: models.JobInfo, db: Session):
  6. af_task = datax_create_task(job_info)
  7. cron: str = job_info.job_cron
  8. cron.replace('?','*')
  9. af_job = {
  10. "tasks": [af_task],
  11. "name": cron,
  12. "dependence": [],
  13. "cron": job_info.job_cron,
  14. "desc": job_info.job_desc,
  15. "route_strategy": job_info.executor_route_strategy,
  16. "block_strategy": job_info.executor_block_strategy,
  17. "executor_timeout": job_info.executor_timeout,
  18. "executor_fail_retry_count": job_info.executor_fail_retry_count,
  19. "trigger_status": job_info.trigger_status,
  20. "job_mode":1,
  21. "job_type": 0,
  22. "user_id": 0,
  23. }
  24. res = send_post('/jpt/af_job', af_job)
  25. af_job = res['data']
  26. crud.create_relation(db, job_info.id,'datax', af_job['id'])
  27. send_submit(af_job['id'])
  28. send_pause(af_job['id'], True if job_info.trigger_status == 1 else False)
  29. def datax_create_task(job_info: models.JobInfo):
  30. cmd_parameter = get_cmd_parameter(job_info.jvm_param, job_info.inc_start_time, job_info.replace_param, job_info.partition_info)
  31. af_task = {
  32. "name": job_info.job_desc,
  33. "file_urls": [],
  34. "script": job_info.job_json,
  35. "cmd": "",
  36. "cmd_parameters": cmd_parameter,
  37. "envs": {},
  38. "run_image": "",
  39. "task_type": "datax",
  40. "user_id": 0,
  41. }
  42. res = send_post('/jpt/af_task', af_task)
  43. af_task = res['data']
  44. return af_task
  45. def datax_update_job(job_info: models.JobInfo, db: Session):
  46. relation = crud.get_af_id(db, job_info.id, 'datax')
  47. af_job_id = relation.af_id
  48. res = send_get("/jpt/af_job/getOnce",af_job_id)
  49. old_af_job = res['data']
  50. old_af_task = old_af_job['tasks'][0]
  51. af_task = datax_put_task(job_info,old_af_task)
  52. cron: str = job_info.job_cron
  53. cron.replace('?','*')
  54. af_job = {
  55. "tasks": [af_task],
  56. "name": job_info.job_desc,
  57. "dependence": [],
  58. "cron": cron,
  59. "desc": job_info.job_desc,
  60. "route_strategy": job_info.executor_route_strategy,
  61. "block_strategy": job_info.executor_block_strategy,
  62. "executor_timeout": job_info.executor_timeout,
  63. "executor_fail_retry_count": job_info.executor_fail_retry_count,
  64. "trigger_status": job_info.trigger_status,
  65. }
  66. res = send_put('/jpt/af_job', old_af_job['id'], af_job)
  67. af_job = res['data']
  68. send_submit(af_job['id'])
  69. send_pause(af_job['id'], True if job_info.trigger_status == 1 else False)
  70. def datax_put_task(job_info: models.JobInfo,old_af_task):
  71. cmd_parameter = get_cmd_parameter(job_info.jvm_param, job_info.inc_start_time, job_info.replace_param, job_info.partition_info)
  72. af_task = {
  73. "name": job_info.job_desc,
  74. "file_urls": [],
  75. "script": job_info.job_json,
  76. "cmd": "",
  77. "cmd_parameters": cmd_parameter,
  78. "envs": {},
  79. "run_image": "",
  80. }
  81. res = send_put('/jpt/af_task', old_af_task['id'],af_task)
  82. af_task = res['data']
  83. return af_task