send_util.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. from unittest import result
  2. import requests
  3. from configs.settings import config
  4. HOST = config.get('AF_BACKEND', 'host')
  5. PORT = config.get('AF_BACKEND', 'port')
  6. def send_post(uri,data):
  7. res = requests.post(url=f'http://{HOST}:{PORT}{uri}', json=data)
  8. result = res.json()
  9. if 'code' in result.keys() and result['code'] == 200:
  10. return res.json()
  11. else:
  12. msg = result['msg'] if 'msg' in result.keys() else result
  13. raise Exception(f'{uri}-->请求airflow失败-->{msg}')
  14. def send_submit(af_job_id):
  15. res = requests.post(url=f'http://{HOST}:{PORT}/af/af_job/submit?id='+str(af_job_id))
  16. result = res.json()
  17. if 'code' in result.keys() and result['code'] == 200:
  18. return res.json()
  19. else:
  20. msg = result['msg'] if 'msg' in result.keys() else result
  21. raise Exception(f'提交任务,请求airflow失败-->{msg}')
  22. def send_put(uri,path_data,data):
  23. res = requests.put(url=f'http://{HOST}:{PORT}{uri}/{path_data}', json=data)
  24. result = res.json()
  25. if 'code' in result.keys() and result['code'] == 200:
  26. return res.json()
  27. else:
  28. msg = result['msg'] if 'msg' in result.keys() else result
  29. raise Exception(f'{uri}-->请求airflow失败-->{msg}')
  30. def send_get(uri,path_data):
  31. res = requests.get(url=f'http://{HOST}:{PORT}{uri}/{path_data}')
  32. result = res.json()
  33. if 'code' in result.keys() and result['code'] == 200:
  34. return res.json()
  35. else:
  36. msg = result['msg'] if 'msg' in result.keys() else result
  37. raise Exception(f'{uri}-->请求airflow失败-->{msg}')
  38. # 执行任务
  39. def send_execute(path_data):
  40. res = requests.post(url=f'http://{HOST}:{PORT}/af/af_job/{str(path_data)}/run')
  41. result = res.json()
  42. if 'code' in result.keys() and result['code'] == 200:
  43. return res.json()
  44. else:
  45. msg = result['msg'] if 'msg' in result.keys() else result
  46. raise Exception(f'执行任务,请求airflow失败-->{msg}')
  47. # 起停任务
  48. def send_pause(af_job_id, status):
  49. flag = True if status == 0 else False
  50. res = requests.patch(url=f'http://{HOST}:{PORT}/af/af_job/{str(af_job_id)}/pause/{str(flag)}')
  51. result = res.json()
  52. if 'code' in result.keys() and result['code'] == 200:
  53. return res.json()
  54. else:
  55. msg = result['msg'] if 'msg' in result.keys() else result
  56. raise Exception(f'修改任务状态,请求airflow失败-->{msg}')
  57. # 删除任务
  58. def send_delete(uri, path_data):
  59. res = requests.delete(url=f'http://{HOST}:{PORT}{uri}/{path_data}')
  60. result = res.json()
  61. if 'code' in result.keys() and result['code'] == 200:
  62. return res.json()
  63. else:
  64. msg = result['msg'] if 'msg' in result.keys() else result
  65. raise Exception(f'{uri}-->请求airflow失败-->{msg}')
  66. # 获取airflow端dag文件生成时间
  67. def get_job_last_parsed_time(path_data):
  68. res = requests.get(url=f'http://{HOST}:{PORT}/af/af_job/{path_data}/last_parsed_time')
  69. result = res.json()
  70. if 'code' in result.keys() and result['code'] == 200:
  71. return res.json()
  72. else:
  73. msg = result['msg'] if 'msg' in result.keys() else result
  74. raise Exception(f'获取上次转化时间-->请求airflow失败-->{msg}')
  75. # 获取job某次运行的状态
  76. def get_job_run_status(path_data):
  77. res = requests.get(url=f'http://{HOST}:{PORT}/af/af_run/{path_data}/status')
  78. result = res.json()
  79. if 'code' in result.keys() and result['code'] == 200:
  80. return res.json()
  81. else:
  82. msg = result['msg'] if 'msg' in result.keys() else result
  83. raise Exception(f'获取job某次运行的状态-->请求airflow失败-->{msg}')
  84. # 中间结果转存
  85. def data_transfer_run(source_tb: str, target_tb: str):
  86. res = requests.post(url=f'http://{HOST}:{PORT}/af/af_job/000/data_transfer_run?source_tb={source_tb}&target_tb={target_tb}')
  87. result = res.json()
  88. print(result)
  89. if 'code' in result.keys() and result['code'] == 200:
  90. return res.json()
  91. else:
  92. msg = result['msg'] if 'msg' in result.keys() else result
  93. raise Exception(f'中间结果转存,请求airflow失败-->{msg}')
  94. # 获取task日志
  95. def get_task_log(job_id: str, af_run_id: str, task_id: str):
  96. res = requests.get(url=f'http://{HOST}:{PORT}/af/af_run/task_log/{job_id}/{af_run_id}/{task_id}')
  97. result = res.json()
  98. if 'code' in result.keys() and result['code'] == 200:
  99. return res.json()
  100. else:
  101. msg = result['msg'] if 'msg' in result.keys() else result
  102. raise Exception(f'获取task日志,请求airflow失败-->{msg}')
  103. # 获取中间结果转存状态
  104. def get_data_transfer_run_status(af_run_id: str):
  105. res = requests.get(url=f'http://{HOST}:{PORT}/af/af_run/data_transfer_log/{af_run_id}')
  106. result = res.json()
  107. if 'code' in result.keys() and result['code'] == 200:
  108. return res.json()
  109. else:
  110. msg = result['msg'] if 'msg' in result.keys() else result
  111. raise Exception(f'获取中间结果转存状态,请求airflow失败-->{msg}')