from unittest import result import requests from configs.settings import config AF_HOST = config.get('AF_BACKEND', 'host') AF_PORT = config.get('AF_BACKEND', 'port') PROGRAMME_URL = config.get('PROGRAMME', 'url') def send_post(uri,data): res = requests.post(url=f'http://{AF_HOST}:{AF_PORT}{uri}', json=data) result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: msg = result['msg'] if 'msg' in result.keys() else result raise Exception(f'{uri}-->请求airflow失败-->{msg}') def send_submit(af_job_id): res = requests.post(url=f'http://{AF_HOST}:{AF_PORT}/af/af_job/submit?id='+str(af_job_id)) result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: msg = result['msg'] if 'msg' in result.keys() else result raise Exception(f'提交任务,请求airflow失败-->{msg}') def send_put(uri,path_data,data): res = requests.put(url=f'http://{AF_HOST}:{AF_PORT}{uri}/{path_data}', json=data) result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: msg = result['msg'] if 'msg' in result.keys() else result raise Exception(f'{uri}-->请求airflow失败-->{msg}') def send_get(uri,path_data): res = requests.get(url=f'http://{AF_HOST}:{AF_PORT}{uri}/{path_data}') result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: msg = result['msg'] if 'msg' in result.keys() else result raise Exception(f'{uri}-->请求airflow失败-->{msg}') # 执行任务 def send_execute(path_data): res = requests.post(url=f'http://{AF_HOST}:{AF_PORT}/af/af_job/{str(path_data)}/run') result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: msg = result['msg'] if 'msg' in result.keys() else result raise Exception(f'执行任务,请求airflow失败-->{msg}') # 起停任务 def send_pause(af_job_id, status): flag = True if status == 0 else False res = requests.patch(url=f'http://{AF_HOST}:{AF_PORT}/af/af_job/{str(af_job_id)}/pause/{str(flag)}') result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: msg = result['msg'] if 'msg' in result.keys() else result raise Exception(f'修改任务状态,请求airflow失败-->{msg}') # 删除任务 def send_delete(uri, path_data): res = requests.delete(url=f'http://{AF_HOST}:{AF_PORT}{uri}/{path_data}') result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: msg = result['msg'] if 'msg' in result.keys() else result raise Exception(f'{uri}-->请求airflow失败-->{msg}') # 获取airflow端dag文件生成时间 def get_job_last_parsed_time(path_data): res = requests.get(url=f'http://{AF_HOST}:{AF_PORT}/af/af_job/{path_data}/last_parsed_time') result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: msg = result['msg'] if 'msg' in result.keys() else result raise Exception(f'获取上次转化时间-->请求airflow失败-->{msg}') # 获取job某次运行的状态 def get_job_run_status(path_data): res = requests.get(url=f'http://{AF_HOST}:{AF_PORT}/af/af_run/{path_data}/status') result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: msg = result['msg'] if 'msg' in result.keys() else result raise Exception(f'获取job某次运行的状态-->请求airflow失败-->{msg}') # 中间结果转存 def data_transfer_run(source_tb: str, target_tb: str): res = requests.post(url=f'http://{AF_HOST}:{AF_PORT}/af/af_job/000/data_transfer_run?source_tb={source_tb}&target_tb={target_tb}') result = res.json() print(result) if 'code' in result.keys() and result['code'] == 200: return res.json() else: msg = result['msg'] if 'msg' in result.keys() else result raise Exception(f'中间结果转存,请求airflow失败-->{msg}') # 获取task日志 def get_task_log(job_id: str, af_run_id: str, task_id: str): res = requests.get(url=f'http://{AF_HOST}:{AF_PORT}/af/af_run/task_log/{job_id}/{af_run_id}/{task_id}') result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: msg = result['msg'] if 'msg' in result.keys() else result raise Exception(f'获取task日志,请求airflow失败-->{msg}') # 获取中间结果转存状态 def get_data_transfer_run_status(af_run_id: str): res = requests.get(url=f'http://{AF_HOST}:{AF_PORT}/af/af_run/data_transfer_log/{af_run_id}') result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: msg = result['msg'] if 'msg' in result.keys() else result raise Exception(f'获取中间结果转存状态,请求airflow失败-->{msg}') # 创建jupyter def create_jupyter(data): res = requests.post(url=f'http://{PROGRAMME_URL}/helm/ops/start', json=data) result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: stop_jupyter({'namespace': data['namespace'],'release_name': data['release_name']}) msg = result['msg'] if 'msg' in result.keys() else result raise Exception(f'创建jupyter,请求jupyter端失败-->{msg}') # 获取jupyter密码 def get_jupyter_password(data): res = requests.get(url=f'http://{PROGRAMME_URL}/helm/password', json=data) result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: msg = result['msg'] if 'msg' in result.keys() else result raise Exception(f'获取jupyter密码,请求jupyter端失败-->{msg}') # 停止jupyter def stop_jupyter(data): res = requests.post(url=f'http://{PROGRAMME_URL}/helm/ops/stop', json=data) result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: msg = result['msg'] if 'msg' in result.keys() else result raise Exception(f'停止jupyter,请求jupyter端失败-->{msg}') # 更新jupyter def update_jupyter(data): res = requests.post(url=f'http://{PROGRAMME_URL}/helm/ops/upgrade', json=data) result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: msg = result['msg'] if 'msg' in result.keys() else result raise Exception(f'更新jupyter,请求jupyter端失败-->{msg}') # 获取jupyter服务状态 def get_jupyter_status(data): res = requests.post(url=f'http://{PROGRAMME_URL}/helm/ops/status', json=data) result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: msg = result['msg'] if 'msg' in result.keys() else result raise Exception(f'获取jupyter服务状态,请求jupyter端失败-->{msg}') # 获取jupyter服务状态 def get_jupyter_html(url): res = requests.post(url=url) return res.headers # 请求airflow下载依赖 def post_install_requirements(requirements_str: str,target_path: str): res = requests.post(url=f'http://{AF_HOST}:{AF_PORT}/af/af_job/special_job/install_requirements?requirements_str={requirements_str}&target_path={target_path}') result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: msg = result['msg'] if 'msg' in result.keys() else result raise Exception(f'安装依赖-->请求airflow失败-->{msg}') # 根据job_id和af_run_id获取运行状态 def get_running_status(job_id: str, af_run_id: str): res = requests.get(url=f'http://{AF_HOST}:{AF_PORT}/af/af_run/running_status/{job_id}/{af_run_id}') result = res.json() if 'code' in result.keys() and result['code'] == 200: return res.json() else: msg = result['msg'] if 'msg' in result.keys() else result raise Exception(f'获取任务运行状态,请求airflow失败-->{msg}')