from unittest import result import requests from configs.settings import config HOST = config.get('AIRFLOW', 'HOST') PORT = config.get('AIRFLOW', 'PORT') def send_post(uri,data): res = requests.post(url=f'http://{HOST}:{PORT}{uri}', json=data) result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: print(result) raise Exception(f'{uri}-->请求airflow失败-->'+result['msg']) def send_submit(af_job_id): res = requests.post(url=f'http://{HOST}:{PORT}/jpt/af_job/submit?id='+str(af_job_id)) result = res.json() print(result) if 'code' in result.keys() and result['code'] == 200: return res.json() else: raise Exception('提交任务,请求airflow失败-->'+result['msg']) def send_put(uri,path_data,data): res = requests.put(url=f'http://{HOST}:{PORT}{uri}/{path_data}', json=data) result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: raise Exception(f'{uri}-->请求airflow失败-->'+result['msg']) def send_get(uri,path_data): res = requests.get(url=f'http://{HOST}:{PORT}{uri}/{path_data}') result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: raise Exception(f'{uri}-->请求airflow失败-->'+result['msg']) def send_execute(path_data): res = requests.post(url=f'http://{HOST}:{PORT}/jpt/af_job/{str(path_data)}/run') result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: raise Exception('执行一次任务,请求airflow失败-->'+result['msg']) # 起停任务 def send_pause(af_job_id, status): res = requests.patch(url=f'http://{HOST}:{PORT}/jpt/af_job/{str(af_job_id)}/pause/{str(status)}') result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: raise Exception('修改任务状态,请求airflow失败-->'+result['msg']) # 删除任务 def send_delete(uri, path_data): res = requests.delete(url=f'http://{HOST}:{PORT}{uri}/{path_data}') result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: print(result) raise Exception(f'{uri}-->请求airflow失败-->'+result['msg'])