send_util.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. from unittest import result
  2. import requests
  3. from configs.settings import config
  4. HOST = config.get('AIRFLOW', 'HOST')
  5. PORT = config.get('AIRFLOW', 'PORT')
  6. def send_post(uri,data):
  7. res = requests.post(url=f'http://{HOST}:{PORT}{uri}', json=data)
  8. result = res.json()
  9. if 'code' in result.keys() and result['code'] == 200:
  10. return res.json()
  11. else:
  12. print(result)
  13. raise Exception(f'{uri}-->请求airflow失败-->'+result['msg'])
  14. def send_submit(af_job_id):
  15. res = requests.post(url=f'http://{HOST}:{PORT}/jpt/af_job/submit?id='+str(af_job_id))
  16. result = res.json()
  17. print(result)
  18. if 'code' in result.keys() and result['code'] == 200:
  19. return res.json()
  20. else:
  21. raise Exception('提交任务,请求airflow失败-->'+result['msg'])
  22. def send_put(uri,path_data,data):
  23. res = requests.put(url=f'http://{HOST}:{PORT}{uri}/{path_data}', json=data)
  24. result = res.json()
  25. if 'code' in result.keys() and result['code'] == 200:
  26. return res.json()
  27. else:
  28. raise Exception(f'{uri}-->请求airflow失败-->'+result['msg'])
  29. def send_get(uri,path_data):
  30. res = requests.get(url=f'http://{HOST}:{PORT}{uri}/{path_data}')
  31. result = res.json()
  32. if 'code' in result.keys() and result['code'] == 200:
  33. return res.json()
  34. else:
  35. raise Exception(f'{uri}-->请求airflow失败-->'+result['msg'])
  36. def send_execute(path_data):
  37. res = requests.post(url=f'http://{HOST}:{PORT}/jpt/af_job/{str(path_data)}/run')
  38. result = res.json()
  39. if 'code' in result.keys() and result['code'] == 200:
  40. return res.json()
  41. else:
  42. raise Exception('执行一次任务,请求airflow失败-->'+result['msg'])