send_util.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. from unittest import result
  2. import requests
  3. from configs.settings import config
  4. AF_HOST = config.get('AF_BACKEND', 'host')
  5. AF_PORT = config.get('AF_BACKEND', 'port')
  6. PROGRAMME_URL = config.get('PROGRAMME', 'url')
  7. def send_post(uri,data):
  8. res = requests.post(url=f'http://{AF_HOST}:{AF_PORT}{uri}', json=data)
  9. result = res.json()
  10. if 'code' in result.keys() and result['code'] == 200:
  11. return res.json()
  12. else:
  13. msg = result['msg'] if 'msg' in result.keys() else result
  14. raise Exception(f'{uri}-->请求airflow失败-->{msg}')
  15. def send_submit(af_job_id):
  16. res = requests.post(url=f'http://{AF_HOST}:{AF_PORT}/af/af_job/submit?id='+str(af_job_id))
  17. result = res.json()
  18. if 'code' in result.keys() and result['code'] == 200:
  19. return res.json()
  20. else:
  21. msg = result['msg'] if 'msg' in result.keys() else result
  22. raise Exception(f'提交任务,请求airflow失败-->{msg}')
  23. def send_put(uri,path_data,data):
  24. res = requests.put(url=f'http://{AF_HOST}:{AF_PORT}{uri}/{path_data}', json=data)
  25. result = res.json()
  26. if 'code' in result.keys() and result['code'] == 200:
  27. return res.json()
  28. else:
  29. msg = result['msg'] if 'msg' in result.keys() else result
  30. raise Exception(f'{uri}-->请求airflow失败-->{msg}')
  31. def send_get(uri,path_data):
  32. res = requests.get(url=f'http://{AF_HOST}:{AF_PORT}{uri}/{path_data}')
  33. result = res.json()
  34. if 'code' in result.keys() and result['code'] == 200:
  35. return res.json()
  36. else:
  37. msg = result['msg'] if 'msg' in result.keys() else result
  38. raise Exception(f'{uri}-->请求airflow失败-->{msg}')
  39. # 执行任务
  40. def send_execute(path_data):
  41. res = requests.post(url=f'http://{AF_HOST}:{AF_PORT}/af/af_job/{str(path_data)}/run')
  42. result = res.json()
  43. if 'code' in result.keys() and result['code'] == 200:
  44. return res.json()
  45. else:
  46. msg = result['msg'] if 'msg' in result.keys() else result
  47. raise Exception(f'执行任务,请求airflow失败-->{msg}')
  48. # 起停任务
  49. def send_pause(af_job_id, status):
  50. flag = True if status == 0 else False
  51. res = requests.patch(url=f'http://{AF_HOST}:{AF_PORT}/af/af_job/{str(af_job_id)}/pause/{str(flag)}')
  52. result = res.json()
  53. if 'code' in result.keys() and result['code'] == 200:
  54. return res.json()
  55. else:
  56. msg = result['msg'] if 'msg' in result.keys() else result
  57. raise Exception(f'修改任务状态,请求airflow失败-->{msg}')
  58. # 删除任务
  59. def send_delete(uri, path_data):
  60. res = requests.delete(url=f'http://{AF_HOST}:{AF_PORT}{uri}/{path_data}')
  61. result = res.json()
  62. if 'code' in result.keys() and result['code'] == 200:
  63. return res.json()
  64. else:
  65. msg = result['msg'] if 'msg' in result.keys() else result
  66. raise Exception(f'{uri}-->请求airflow失败-->{msg}')
  67. # 获取airflow端dag文件生成时间
  68. def get_job_last_parsed_time(path_data):
  69. res = requests.get(url=f'http://{AF_HOST}:{AF_PORT}/af/af_job/{path_data}/last_parsed_time')
  70. result = res.json()
  71. if 'code' in result.keys() and result['code'] == 200:
  72. return res.json()
  73. else:
  74. msg = result['msg'] if 'msg' in result.keys() else result
  75. raise Exception(f'获取上次转化时间-->请求airflow失败-->{msg}')
  76. # 获取job某次运行的状态
  77. def get_job_run_status(path_data):
  78. res = requests.get(url=f'http://{AF_HOST}:{AF_PORT}/af/af_run/{path_data}/status')
  79. result = res.json()
  80. if 'code' in result.keys() and result['code'] == 200:
  81. return res.json()
  82. else:
  83. msg = result['msg'] if 'msg' in result.keys() else result
  84. raise Exception(f'获取job某次运行的状态-->请求airflow失败-->{msg}')
  85. # 中间结果转存
  86. def data_transfer_run(source_tb: str, target_tb: str):
  87. res = requests.post(url=f'http://{AF_HOST}:{AF_PORT}/af/af_job/000/data_transfer_run?source_tb={source_tb}&target_tb={target_tb}')
  88. result = res.json()
  89. print(result)
  90. if 'code' in result.keys() and result['code'] == 200:
  91. return res.json()
  92. else:
  93. msg = result['msg'] if 'msg' in result.keys() else result
  94. raise Exception(f'中间结果转存,请求airflow失败-->{msg}')
  95. # 获取task日志
  96. def get_task_log(job_id: str, af_run_id: str, task_id: str):
  97. res = requests.get(url=f'http://{AF_HOST}:{AF_PORT}/af/af_run/task_log/{job_id}/{af_run_id}/{task_id}')
  98. result = res.json()
  99. if 'code' in result.keys() and result['code'] == 200:
  100. return res.json()
  101. else:
  102. msg = result['msg'] if 'msg' in result.keys() else result
  103. raise Exception(f'获取task日志,请求airflow失败-->{msg}')
  104. # 获取中间结果转存状态
  105. def get_data_transfer_run_status(af_run_id: str):
  106. res = requests.get(url=f'http://{AF_HOST}:{AF_PORT}/af/af_run/data_transfer_log/{af_run_id}')
  107. result = res.json()
  108. if 'code' in result.keys() and result['code'] == 200:
  109. return res.json()
  110. else:
  111. msg = result['msg'] if 'msg' in result.keys() else result
  112. raise Exception(f'获取中间结果转存状态,请求airflow失败-->{msg}')
  113. # 创建jupyter
  114. def create_jupyter(data):
  115. res = requests.post(url=f'http://{PROGRAMME_URL}/helm/ops/start', json=data)
  116. result = res.json()
  117. if 'code' in result.keys() and result['code'] == 200:
  118. return res.json()
  119. else:
  120. stop_jupyter({'namespace': data['namespace'],'release_name': data['release_name']})
  121. msg = result['msg'] if 'msg' in result.keys() else result
  122. raise Exception(f'创建jupyter,请求jupyter端失败-->{msg}')
  123. # 获取jupyter密码
  124. def get_jupyter_password(data):
  125. res = requests.get(url=f'http://{PROGRAMME_URL}/helm/password', json=data)
  126. result = res.json()
  127. if 'code' in result.keys() and result['code'] == 200:
  128. return res.json()
  129. else:
  130. msg = result['msg'] if 'msg' in result.keys() else result
  131. raise Exception(f'获取jupyter密码,请求jupyter端失败-->{msg}')
  132. # 停止jupyter
  133. def stop_jupyter(data):
  134. res = requests.post(url=f'http://{PROGRAMME_URL}/helm/ops/stop', json=data)
  135. result = res.json()
  136. if 'code' in result.keys() and result['code'] == 200:
  137. return res.json()
  138. else:
  139. msg = result['msg'] if 'msg' in result.keys() else result
  140. raise Exception(f'停止jupyter,请求jupyter端失败-->{msg}')
  141. # 更新jupyter
  142. def update_jupyter(data):
  143. res = requests.post(url=f'http://{PROGRAMME_URL}/helm/ops/upgrade', json=data)
  144. result = res.json()
  145. if 'code' in result.keys() and result['code'] == 200:
  146. return res.json()
  147. else:
  148. msg = result['msg'] if 'msg' in result.keys() else result
  149. raise Exception(f'更新jupyter,请求jupyter端失败-->{msg}')
  150. # 获取jupyter服务状态
  151. def get_jupyter_status(data):
  152. res = requests.post(url=f'http://{PROGRAMME_URL}/helm/ops/status', json=data)
  153. result = res.json()
  154. if 'code' in result.keys() and result['code'] == 200:
  155. return res.json()
  156. else:
  157. msg = result['msg'] if 'msg' in result.keys() else result
  158. raise Exception(f'获取jupyter服务状态,请求jupyter端失败-->{msg}')
  159. # 获取jupyter服务状态
  160. def get_jupyter_html(url):
  161. res = requests.post(url=url)
  162. return res.headers
  163. # 请求airflow下载依赖
  164. def post_install_requirements(requirements_str: str,target_path: str):
  165. res = requests.post(url=f'http://{AF_HOST}:{AF_PORT}/af/af_job/special_job/install_requirements?requirements_str={requirements_str}&target_path={target_path}')
  166. result = res.json()
  167. if 'code' in result.keys() and result['code'] == 200:
  168. return res.json()
  169. else:
  170. msg = result['msg'] if 'msg' in result.keys() else result
  171. raise Exception(f'安装依赖-->请求airflow失败-->{msg}')
  172. # 根据job_id和af_run_id获取运行状态
  173. def get_running_status(job_id: str, af_run_id: str):
  174. res = requests.get(url=f'http://{AF_HOST}:{AF_PORT}/af/af_run/running_status/{job_id}/{af_run_id}')
  175. result = res.json()
  176. if 'code' in result.keys() and result['code'] == 200:
  177. return res.json()
  178. else:
  179. msg = result['msg'] if 'msg' in result.keys() else result
  180. raise Exception(f'获取任务运行状态,请求airflow失败-->{msg}')