send_util.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. from unittest import result
  2. import requests
  3. from configs.settings import config
  4. HOST = config.get('AIRFLOW', 'HOST')
  5. PORT = config.get('AIRFLOW', 'PORT')
  6. def send_post(uri,data):
  7. res = requests.post(url=f'http://{HOST}:{PORT}{uri}', json=data)
  8. result = res.json()
  9. if 'code' in result.keys() and result['code'] == 200:
  10. return res.json()
  11. else:
  12. print(result)
  13. raise Exception(f'{uri}-->请求airflow失败-->'+result['msg'])
  14. def send_submit(af_job_id):
  15. res = requests.post(url=f'http://{HOST}:{PORT}/jpt/af_job/submit?id='+str(af_job_id))
  16. result = res.json()
  17. print(result)
  18. if 'code' in result.keys() and result['code'] == 200:
  19. return res.json()
  20. else:
  21. raise Exception('提交任务,请求airflow失败-->'+result['msg'])
  22. def send_put(uri,path_data,data):
  23. res = requests.put(url=f'http://{HOST}:{PORT}{uri}/{path_data}', json=data)
  24. result = res.json()
  25. if 'code' in result.keys() and result['code'] == 200:
  26. return res.json()
  27. else:
  28. raise Exception(f'{uri}-->请求airflow失败-->'+result['msg'])
  29. def send_get(uri,path_data):
  30. res = requests.get(url=f'http://{HOST}:{PORT}{uri}/{path_data}')
  31. result = res.json()
  32. if 'code' in result.keys() and result['code'] == 200:
  33. return res.json()
  34. else:
  35. raise Exception(f'{uri}-->请求airflow失败-->'+result['msg'])
  36. def send_execute(path_data):
  37. res = requests.post(url=f'http://{HOST}:{PORT}/jpt/af_job/{str(path_data)}/run')
  38. result = res.json()
  39. if 'code' in result.keys() and result['code'] == 200:
  40. return res.json()
  41. else:
  42. raise Exception('执行一次任务,请求airflow失败-->'+result['msg'])
  43. # 起停任务
  44. def send_pause(af_job_id, status):
  45. res = requests.patch(url=f'http://{HOST}:{PORT}/jpt/af_job/{str(af_job_id)}/pause/{str(status)}')
  46. result = res.json()
  47. if 'code' in result.keys() and result['code'] == 200:
  48. return res.json()
  49. else:
  50. raise Exception('修改任务状态,请求airflow失败-->'+result['msg'])
  51. # 删除任务
  52. def send_delete(uri, path_data):
  53. res = requests.delete(url=f'http://{HOST}:{PORT}{uri}/{path_data}')
  54. result = res.json()
  55. if 'code' in result.keys() and result['code'] == 200:
  56. return res.json()
  57. else:
  58. print(result)
  59. raise Exception(f'{uri}-->请求airflow失败-->'+result['msg'])