123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869 |
- from unittest import result
- import requests
- from configs.settings import config
- HOST = config.get('AIRFLOW', 'HOST')
- PORT = config.get('AIRFLOW', 'PORT')
- def send_post(uri,data):
- res = requests.post(url=f'http://{HOST}:{PORT}{uri}', json=data)
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- print(result)
- raise Exception(f'{uri}-->请求airflow失败-->'+result['msg'])
- def send_submit(af_job_id):
- res = requests.post(url=f'http://{HOST}:{PORT}/jpt/af_job/submit?id='+str(af_job_id))
- result = res.json()
- print(result)
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- raise Exception('提交任务,请求airflow失败-->'+result['msg'])
- def send_put(uri,path_data,data):
- res = requests.put(url=f'http://{HOST}:{PORT}{uri}/{path_data}', json=data)
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- raise Exception(f'{uri}-->请求airflow失败-->'+result['msg'])
- def send_get(uri,path_data):
- res = requests.get(url=f'http://{HOST}:{PORT}{uri}/{path_data}')
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- raise Exception(f'{uri}-->请求airflow失败-->'+result['msg'])
- def send_execute(path_data):
- res = requests.post(url=f'http://{HOST}:{PORT}/jpt/af_job/{str(path_data)}/run')
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- raise Exception('执行一次任务,请求airflow失败-->'+result['msg'])
- # 起停任务
- def send_pause(af_job_id, status):
- res = requests.patch(url=f'http://{HOST}:{PORT}/jpt/af_job/{str(af_job_id)}/pause/{str(status)}')
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- raise Exception('修改任务状态,请求airflow失败-->'+result['msg'])
- # 删除任务
- def send_delete(uri, path_data):
- res = requests.delete(url=f'http://{HOST}:{PORT}{uri}/{path_data}')
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- print(result)
- raise Exception(f'{uri}-->请求airflow失败-->'+result['msg'])
|