send_util.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. from unittest import result
  2. import requests
  3. from configs.settings import config
  4. AF_HOST = config.get('AF_BACKEND', 'host')
  5. AF_PORT = config.get('AF_BACKEND', 'port')
  6. PROGRAMME_URL = config.get('PROGRAMME', 'url')
  7. AI_YIQI_URL = config.get('AI_YIQI', 'url')
  8. def send_post(uri,data):
  9. res = requests.post(url=f'http://{AF_HOST}:{AF_PORT}{uri}', json=data)
  10. result = res.json()
  11. if 'code' in result.keys() and result['code'] == 200:
  12. return res.json()
  13. else:
  14. msg = result['msg'] if 'msg' in result.keys() else result
  15. raise Exception(f'{uri}-->请求airflow失败-->{msg}')
  16. def send_submit(af_job_id):
  17. res = requests.post(url=f'http://{AF_HOST}:{AF_PORT}/af/af_job/submit?id='+str(af_job_id))
  18. result = res.json()
  19. if 'code' in result.keys() and result['code'] == 200:
  20. return res.json()
  21. else:
  22. msg = result['msg'] if 'msg' in result.keys() else result
  23. raise Exception(f'提交任务,请求airflow失败-->{msg}')
  24. def send_put(uri,path_data,data):
  25. res = requests.put(url=f'http://{AF_HOST}:{AF_PORT}{uri}/{path_data}', json=data)
  26. result = res.json()
  27. if 'code' in result.keys() and result['code'] == 200:
  28. return res.json()
  29. else:
  30. msg = result['msg'] if 'msg' in result.keys() else result
  31. raise Exception(f'{uri}-->请求airflow失败-->{msg}')
  32. def send_get(uri,path_data):
  33. res = requests.get(url=f'http://{AF_HOST}:{AF_PORT}{uri}/{path_data}')
  34. result = res.json()
  35. if 'code' in result.keys() and result['code'] == 200:
  36. return res.json()
  37. else:
  38. msg = result['msg'] if 'msg' in result.keys() else result
  39. raise Exception(f'{uri}-->请求airflow失败-->{msg}')
  40. # 执行任务
  41. def send_execute(path_data):
  42. res = requests.post(url=f'http://{AF_HOST}:{AF_PORT}/af/af_job/{str(path_data)}/run')
  43. result = res.json()
  44. if 'code' in result.keys() and result['code'] == 200:
  45. return res.json()
  46. else:
  47. msg = result['msg'] if 'msg' in result.keys() else result
  48. raise Exception(f'执行任务,请求airflow失败-->{msg}')
  49. # 起停任务
  50. def send_pause(af_job_id, status):
  51. flag = True if status == 0 else False
  52. res = requests.patch(url=f'http://{AF_HOST}:{AF_PORT}/af/af_job/{str(af_job_id)}/pause/{str(flag)}')
  53. result = res.json()
  54. if 'code' in result.keys() and result['code'] == 200:
  55. return res.json()
  56. else:
  57. msg = result['msg'] if 'msg' in result.keys() else result
  58. raise Exception(f'修改任务状态,请求airflow失败-->{msg}')
  59. # 删除任务
  60. def send_delete(uri, path_data):
  61. res = requests.delete(url=f'http://{AF_HOST}:{AF_PORT}{uri}/{path_data}')
  62. result = res.json()
  63. if 'code' in result.keys() and result['code'] == 200:
  64. return res.json()
  65. else:
  66. msg = result['msg'] if 'msg' in result.keys() else result
  67. raise Exception(f'{uri}-->请求airflow失败-->{msg}')
  68. # 获取airflow端dag文件生成时间
  69. def get_job_last_parsed_time(path_data):
  70. res = requests.get(url=f'http://{AF_HOST}:{AF_PORT}/af/af_job/{path_data}/last_parsed_time')
  71. result = res.json()
  72. if 'code' in result.keys() and result['code'] == 200:
  73. return res.json()
  74. else:
  75. msg = result['msg'] if 'msg' in result.keys() else result
  76. raise Exception(f'获取上次转化时间-->请求airflow失败-->{msg}')
  77. # 获取job某次运行的状态
  78. def get_job_run_status(path_data):
  79. res = requests.get(url=f'http://{AF_HOST}:{AF_PORT}/af/af_run/{path_data}/status')
  80. result = res.json()
  81. if 'code' in result.keys() and result['code'] == 200:
  82. return res.json()
  83. else:
  84. msg = result['msg'] if 'msg' in result.keys() else result
  85. raise Exception(f'获取job某次运行的状态-->请求airflow失败-->{msg}')
  86. # 中间结果转存
  87. def data_transfer_run(source_tb: str, target_tb: str):
  88. res = requests.post(url=f'http://{AF_HOST}:{AF_PORT}/af/af_job/000/data_transfer_run?source_tb={source_tb}&target_tb={target_tb}')
  89. result = res.json()
  90. print(result)
  91. if 'code' in result.keys() and result['code'] == 200:
  92. return res.json()
  93. else:
  94. msg = result['msg'] if 'msg' in result.keys() else result
  95. raise Exception(f'中间结果转存,请求airflow失败-->{msg}')
  96. # 获取task日志
  97. def get_task_log(job_id: str, af_run_id: str, task_id: str):
  98. res = requests.get(url=f'http://{AF_HOST}:{AF_PORT}/af/af_run/task_log/{job_id}/{af_run_id}/{task_id}')
  99. result = res.json()
  100. if 'code' in result.keys() and result['code'] == 200:
  101. return res.json()
  102. else:
  103. msg = result['msg'] if 'msg' in result.keys() else result
  104. raise Exception(f'获取task日志,请求airflow失败-->{msg}')
  105. # 获取中间结果转存状态
  106. def get_data_transfer_run_status(af_run_id: str):
  107. res = requests.get(url=f'http://{AF_HOST}:{AF_PORT}/af/af_run/data_transfer_log/{af_run_id}')
  108. result = res.json()
  109. if 'code' in result.keys() and result['code'] == 200:
  110. return res.json()
  111. else:
  112. msg = result['msg'] if 'msg' in result.keys() else result
  113. raise Exception(f'获取中间结果转存状态,请求airflow失败-->{msg}')
  114. # 创建jupyter
  115. def create_jupyter(data):
  116. res = requests.post(url=f'http://{PROGRAMME_URL}/helm/ops/start', json=data)
  117. result = res.json()
  118. if 'code' in result.keys() and result['code'] == 200:
  119. return res.json()
  120. else:
  121. stop_jupyter({'namespace': data['namespace'],'release_name': data['release_name']})
  122. msg = result['msg'] if 'msg' in result.keys() else result
  123. raise Exception(f'创建jupyter,请求jupyter端失败-->{msg}')
  124. # 获取jupyter密码
  125. def get_jupyter_password(data):
  126. res = requests.get(url=f'http://{PROGRAMME_URL}/helm/password', json=data)
  127. result = res.json()
  128. if 'code' in result.keys() and result['code'] == 200:
  129. return res.json()
  130. else:
  131. msg = result['msg'] if 'msg' in result.keys() else result
  132. raise Exception(f'获取jupyter密码,请求jupyter端失败-->{msg}')
  133. # 停止jupyter
  134. def stop_jupyter(data):
  135. res = requests.post(url=f'http://{PROGRAMME_URL}/helm/ops/stop', json=data)
  136. result = res.json()
  137. if 'code' in result.keys() and result['code'] == 200:
  138. return res.json()
  139. else:
  140. msg = result['msg'] if 'msg' in result.keys() else result
  141. raise Exception(f'停止jupyter,请求jupyter端失败-->{msg}')
  142. # 更新jupyter
  143. def update_jupyter(data):
  144. res = requests.post(url=f'http://{PROGRAMME_URL}/helm/ops/upgrade', json=data)
  145. result = res.json()
  146. if 'code' in result.keys() and result['code'] == 200:
  147. return res.json()
  148. else:
  149. msg = result['msg'] if 'msg' in result.keys() else result
  150. raise Exception(f'更新jupyter,请求jupyter端失败-->{msg}')
  151. # 获取jupyter服务状态
  152. def get_jupyter_status(data):
  153. res = requests.post(url=f'http://{PROGRAMME_URL}/helm/ops/status', json=data)
  154. result = res.json()
  155. if 'code' in result.keys() and result['code'] == 200:
  156. return res.json()
  157. else:
  158. msg = result['msg'] if 'msg' in result.keys() else result
  159. raise Exception(f'获取jupyter服务状态,请求jupyter端失败-->{msg}')
  160. # 获取jupyter服务状态
  161. def get_jupyter_html(url):
  162. res = requests.post(url=url)
  163. return res.headers
  164. # 请求airflow下载依赖
  165. def post_install_requirements(requirements_str: str,target_path: str):
  166. res = requests.post(url=f'http://{AF_HOST}:{AF_PORT}/af/af_job/special_job/install_requirements?requirements_str={requirements_str}&target_path={target_path}')
  167. result = res.json()
  168. if 'code' in result.keys() and result['code'] == 200:
  169. return res.json()
  170. else:
  171. msg = result['msg'] if 'msg' in result.keys() else result
  172. raise Exception(f'安装依赖-->请求airflow失败-->{msg}')
  173. # 根据job_id和af_run_id获取运行状态
  174. def get_running_status(job_id: str, af_run_id: str):
  175. res = requests.get(url=f'http://{AF_HOST}:{AF_PORT}/af/af_run/running_status/{job_id}/{af_run_id}')
  176. result = res.json()
  177. if 'code' in result.keys() and result['code'] == 200:
  178. return res.json()
  179. else:
  180. msg = result['msg'] if 'msg' in result.keys() else result
  181. raise Exception(f'获取任务运行状态,请求airflow失败-->{msg}')
  182. # 获取ai中台的镜像语言分类类别
  183. def get_ai_mode_language(auth_token: str):
  184. res = requests.get(url=f'http://{AI_YIQI_URL}/aiSquare/openApi/sic/environment/operate/baseinfo?type=language', headers={'Authorization': auth_token})
  185. result = res.json()
  186. if 'code' in result.keys() and result['code'] == '00000':
  187. return result
  188. else:
  189. msg = result['message'] if 'message' in result.keys() else result
  190. raise Exception(f'获取语言分类,请求一期接口失败-->{msg}')
  191. # 根据语言获取ai中台的镜像列表
  192. def get_ai_images(language: str, auth_token: str):
  193. res = requests.get(url=f'http://{AI_YIQI_URL}/aiSquare/openApi/sic/mirror?paramjson=%7B%22isPage%22:%22N%22,%22baseType%22:%22BASE,TRAIN%22,%22sourceTypes%22:%22IMPORT,ONLINEBUILD,INIT%22,%22language%22:%22{language}%22%7D', headers={'Authorization': auth_token})
  194. result = res.json()
  195. if 'code' in result.keys() and result['code'] == '00000':
  196. return result
  197. else:
  198. msg = result['message'] if 'message' in result.keys() else result
  199. raise Exception(f'获取镜像列表,请求一期接口失败-->{msg}')
  200. # 根据语言获取ai中台的镜像列表
  201. def get_ai_image_version(image_id: str, auth_token: str):
  202. res = requests.get(url=f'http://{AI_YIQI_URL}/aiSquare/openApi/sic/mirror/version?paramsJson=%7B%22mirrorId%22:%22{image_id}%22,%22versionState%22:%22SUCCESS%22%7D', headers={'Authorization': auth_token})
  203. result = res.json()
  204. if 'code' in result.keys() and result['code'] == '00000':
  205. return result
  206. else:
  207. msg = result['message'] if 'message' in result.keys() else result
  208. raise Exception(f'获取镜像版本列表,请求一期接口失败-->{msg}')
  209. # 新增结构化数据
  210. def post_structuring_data(data, auth_token: str):
  211. res = requests.post(url=f'http://{AI_YIQI_URL}/aiSquare/openApi/bc/api/datasources', json=data, headers={'Authorization': auth_token})
  212. result = res.json()
  213. if 'code' in result.keys() and result['code'] == 1:
  214. return result
  215. else:
  216. msg = result['message'] if 'message' in result.keys() else result
  217. raise Exception(f'新增结构化数据,请求一期接口失败-->{msg}')