send_util.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. from unittest import result
  2. import requests
  3. from configs.settings import config
  4. HOST = config.get('AF_BACKEND', 'host')
  5. PORT = config.get('AF_BACKEND', 'port')
  6. def send_post(uri,data):
  7. res = requests.post(url=f'http://{HOST}:{PORT}{uri}', json=data)
  8. result = res.json()
  9. if 'code' in result.keys() and result['code'] == 200:
  10. return res.json()
  11. else:
  12. print(result)
  13. raise Exception(f'{uri}-->请求airflow失败-->'+result['msg'])
  14. def send_submit(af_job_id):
  15. res = requests.post(url=f'http://{HOST}:{PORT}/af/af_job/submit?id='+str(af_job_id))
  16. result = res.json()
  17. print(result)
  18. if 'code' in result.keys() and result['code'] == 200:
  19. return res.json()
  20. else:
  21. raise Exception('提交任务,请求airflow失败-->'+result['msg'])
  22. def send_put(uri,path_data,data):
  23. res = requests.put(url=f'http://{HOST}:{PORT}{uri}/{path_data}', json=data)
  24. result = res.json()
  25. if 'code' in result.keys() and result['code'] == 200:
  26. return res.json()
  27. else:
  28. raise Exception(f'{uri}-->请求airflow失败-->'+result['msg'])
  29. def send_get(uri,path_data):
  30. res = requests.get(url=f'http://{HOST}:{PORT}{uri}/{path_data}')
  31. result = res.json()
  32. if 'code' in result.keys() and result['code'] == 200:
  33. return res.json()
  34. else:
  35. raise Exception(f'{uri}-->请求airflow失败-->'+result['msg'])
  36. # 执行任务
  37. def send_execute(path_data):
  38. res = requests.post(url=f'http://{HOST}:{PORT}/af/af_job/{str(path_data)}/run')
  39. result = res.json()
  40. print(result)
  41. if 'code' in result.keys() and result['code'] == 200:
  42. return res.json()
  43. else:
  44. raise Exception('执行一次任务,请求airflow失败-->'+result['msg'])
  45. # 起停任务
  46. def send_pause(af_job_id, status):
  47. flag = True if status == 0 else False
  48. res = requests.patch(url=f'http://{HOST}:{PORT}/af/af_job/{str(af_job_id)}/pause/{str(flag)}')
  49. result = res.json()
  50. if 'code' in result.keys() and result['code'] == 200:
  51. return res.json()
  52. else:
  53. raise Exception('修改任务状态,请求airflow失败-->'+result['msg'])
  54. # 删除任务
  55. def send_delete(uri, path_data):
  56. res = requests.delete(url=f'http://{HOST}:{PORT}{uri}/{path_data}')
  57. result = res.json()
  58. if 'code' in result.keys() and result['code'] == 200:
  59. return res.json()
  60. else:
  61. print(result)
  62. raise Exception(f'{uri}-->请求airflow失败-->'+result['msg'])
  63. # 获取airflow端dag文件生成时间
  64. def get_job_last_parsed_time(path_data):
  65. res = requests.get(url=f'http://{HOST}:{PORT}/af/af_job/{path_data}/last_parsed_time')
  66. result = res.json()
  67. print(result)
  68. if 'code' in result.keys() and result['code'] == 200:
  69. return res.json()
  70. else:
  71. raise Exception('获取上次转化时间-->请求airflow失败-->'+result['msg'])
  72. # 获取job某次运行的状态
  73. def get_job_run_status(path_data):
  74. res = requests.get(url=f'http://{HOST}:{PORT}/af/af_run/{path_data}/status')
  75. result = res.json()
  76. print(result)
  77. if 'code' in result.keys() and result['code'] == 200:
  78. return res.json()
  79. else:
  80. raise Exception('获取job某次运行的状态-->请求airflow失败-->'+result['msg'])
  81. # 中间结果转存
  82. def data_transfer_run(source_tb: str, target_tb: str):
  83. res = requests.post(url=f'http://{HOST}:{PORT}/af/af_job/000/data_transfer_run?source_tb={source_tb}&target_tb={target_tb}')
  84. result = res.json()
  85. print(result)
  86. if 'code' in result.keys() and result['code'] == 200:
  87. return res.json()
  88. else:
  89. raise Exception('中间结果转存,请求airflow失败-->'+result['msg'])