from unittest import result import requests from configs.settings import config HOST = config.get('AF_BACKEND', 'host') PORT = config.get('AF_BACKEND', 'port') def send_post(uri,data): res = requests.post(url=f'http://{HOST}:{PORT}{uri}', json=data) result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: msg = result['msg'] if 'msf' in result.keys() else result raise Exception(f'{uri}-->请求airflow失败-->{msg}') def send_submit(af_job_id): res = requests.post(url=f'http://{HOST}:{PORT}/af/af_job/submit?id='+str(af_job_id)) result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: msg = result['msg'] if 'msf' in result.keys() else result raise Exception(f'提交任务,请求airflow失败-->{msg}') def send_put(uri,path_data,data): res = requests.put(url=f'http://{HOST}:{PORT}{uri}/{path_data}', json=data) result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: msg = result['msg'] if 'msf' in result.keys() else result raise Exception(f'{uri}-->请求airflow失败-->{msg}') def send_get(uri,path_data): res = requests.get(url=f'http://{HOST}:{PORT}{uri}/{path_data}') result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: msg = result['msg'] if 'msf' in result.keys() else result raise Exception(f'{uri}-->请求airflow失败-->{msg}') # 执行任务 def send_execute(path_data): res = requests.post(url=f'http://{HOST}:{PORT}/af/af_job/{str(path_data)}/run') result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: msg = result['msg'] if 'msf' in result.keys() else result raise Exception(f'执行任务,请求airflow失败-->{msg}') # 起停任务 def send_pause(af_job_id, status): flag = True if status == 0 else False res = requests.patch(url=f'http://{HOST}:{PORT}/af/af_job/{str(af_job_id)}/pause/{str(flag)}') result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: msg = result['msg'] if 'msf' in result.keys() else result raise Exception(f'修改任务状态,请求airflow失败-->{msg}') # 删除任务 def send_delete(uri, path_data): res = requests.delete(url=f'http://{HOST}:{PORT}{uri}/{path_data}') result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: msg = result['msg'] if 'msf' in result.keys() else result raise Exception(f'{uri}-->请求airflow失败-->{msg}') # 获取airflow端dag文件生成时间 def get_job_last_parsed_time(path_data): res = requests.get(url=f'http://{HOST}:{PORT}/af/af_job/{path_data}/last_parsed_time') result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: msg = result['msg'] if 'msf' in result.keys() else result raise Exception(f'获取上次转化时间-->请求airflow失败-->{msg}') # 获取job某次运行的状态 def get_job_run_status(path_data): res = requests.get(url=f'http://{HOST}:{PORT}/af/af_run/{path_data}/status') result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: msg = result['msg'] if 'msf' in result.keys() else result raise Exception(f'获取job某次运行的状态-->请求airflow失败-->{msg}') # 中间结果转存 def data_transfer_run(source_tb: str, target_tb: str): res = requests.post(url=f'http://{HOST}:{PORT}/af/af_job/000/data_transfer_run?source_tb={source_tb}&target_tb={target_tb}') result = res.json() print(result) if 'code' in result.keys() and result['code'] == 200: return res.json() else: msg = result['msg'] if 'msf' in result.keys() else result raise Exception('中间结果转存,请求airflow失败-->{msg}')