datax.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. from app import crud, models
  2. from app.utils.send_util import *
  3. from app.utils.utils import get_cmd_parameter
  4. from sqlalchemy.orm import Session
  5. def datax_create_job(job_info: models.JobInfo, db: Session):
  6. af_task = datax_create_task(job_info)
  7. af_job = {
  8. "tasks": [af_task],
  9. "name": job_info.job_desc,
  10. "dependence": [],
  11. "cron": job_info.job_cron,
  12. "desc": job_info.job_desc,
  13. "route_strategy": job_info.executor_route_strategy,
  14. "block_strategy": job_info.executor_block_strategy,
  15. "executor_timeout": job_info.executor_timeout,
  16. "executor_fail_retry_count": job_info.executor_fail_retry_count,
  17. "trigger_status": job_info.trigger_status,
  18. "job_type": 0,
  19. "user_id": 0,
  20. }
  21. res = send_post('/jpt/af_job', af_job)
  22. af_job = res['data']
  23. crud.create_relation(db, job_info.id,'datax', af_job['id'])
  24. send_submit(af_job['id'])
  25. def datax_create_task(job_info: models.JobInfo):
  26. cmd_parameter = get_cmd_parameter(job_info.jvm_param, job_info.inc_start_time, job_info.replace_param, job_info.partition_info)
  27. af_task = {
  28. "name": job_info.job_desc,
  29. "file_urls": [],
  30. "script": job_info.job_json,
  31. "cmd": "",
  32. "cmd_parameters": cmd_parameter,
  33. "envs": {},
  34. "run_image": "",
  35. "task_type": "datax",
  36. "user_id": 0,
  37. }
  38. res = send_post('/jpt/af_task', af_task)
  39. af_task = res['data']
  40. return af_task
  41. def datax_update_job(job_info: models.JobInfo, db: Session):
  42. relation = crud.get_af_id(db, job_info.id, 'datax')
  43. af_job_id = relation.af_id
  44. res = send_get("/jpt/af_job/getOnce",af_job_id)
  45. old_af_job = res['data']
  46. old_af_task = old_af_job['tasks'][0]
  47. af_task = datax_put_task(job_info,old_af_task)
  48. af_job = {
  49. "tasks": [af_task],
  50. "name": job_info.job_desc,
  51. "dependence": [],
  52. "cron": job_info.job_cron,
  53. "desc": job_info.job_desc,
  54. "route_strategy": job_info.executor_route_strategy,
  55. "block_strategy": job_info.executor_block_strategy,
  56. "executor_timeout": job_info.executor_timeout,
  57. "executor_fail_retry_count": job_info.executor_fail_retry_count,
  58. "trigger_status": job_info.trigger_status,
  59. }
  60. res = send_put('/jpt/af_job', old_af_job['id'], af_job)
  61. af_job = res['data']
  62. send_submit(af_job['id'])
  63. def datax_put_task(job_info: models.JobInfo,old_af_task):
  64. cmd_parameter = get_cmd_parameter(job_info.jvm_param, job_info.inc_start_time, job_info.replace_param, job_info.partition_info)
  65. af_task = {
  66. "name": job_info.job_desc,
  67. "file_urls": [],
  68. "script": job_info.job_json,
  69. "cmd": "",
  70. "cmd_parameters": cmd_parameter,
  71. "envs": {},
  72. "run_image": "",
  73. }
  74. res = send_put('/jpt/af_task', old_af_task['id'],af_task)
  75. af_task = res['data']
  76. return af_task