123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- from unittest import result
- import requests
- from configs.settings import config
- AF_HOST = config.get('AF_BACKEND', 'host')
- AF_PORT = config.get('AF_BACKEND', 'port')
- PROGRAMME_URL = config.get('PROGRAMME', 'url')
- def send_post(uri,data):
- res = requests.post(url=f'http://{AF_HOST}:{AF_PORT}{uri}', json=data)
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- msg = result['msg'] if 'msg' in result.keys() else result
- raise Exception(f'{uri}-->请求airflow失败-->{msg}')
- def send_submit(af_job_id):
- res = requests.post(url=f'http://{AF_HOST}:{AF_PORT}/af/af_job/submit?id='+str(af_job_id))
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- msg = result['msg'] if 'msg' in result.keys() else result
- raise Exception(f'提交任务,请求airflow失败-->{msg}')
- def send_put(uri,path_data,data):
- res = requests.put(url=f'http://{AF_HOST}:{AF_PORT}{uri}/{path_data}', json=data)
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- msg = result['msg'] if 'msg' in result.keys() else result
- raise Exception(f'{uri}-->请求airflow失败-->{msg}')
- def send_get(uri,path_data):
- res = requests.get(url=f'http://{AF_HOST}:{AF_PORT}{uri}/{path_data}')
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- msg = result['msg'] if 'msg' in result.keys() else result
- raise Exception(f'{uri}-->请求airflow失败-->{msg}')
- # 执行任务
- def send_execute(path_data):
- res = requests.post(url=f'http://{AF_HOST}:{AF_PORT}/af/af_job/{str(path_data)}/run')
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- msg = result['msg'] if 'msg' in result.keys() else result
- raise Exception(f'执行任务,请求airflow失败-->{msg}')
- # 起停任务
- def send_pause(af_job_id, status):
- flag = True if status == 0 else False
- res = requests.patch(url=f'http://{AF_HOST}:{AF_PORT}/af/af_job/{str(af_job_id)}/pause/{str(flag)}')
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- msg = result['msg'] if 'msg' in result.keys() else result
- raise Exception(f'修改任务状态,请求airflow失败-->{msg}')
- # 删除任务
- def send_delete(uri, path_data):
- res = requests.delete(url=f'http://{AF_HOST}:{AF_PORT}{uri}/{path_data}')
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- msg = result['msg'] if 'msg' in result.keys() else result
- raise Exception(f'{uri}-->请求airflow失败-->{msg}')
- # 获取airflow端dag文件生成时间
- def get_job_last_parsed_time(path_data):
- res = requests.get(url=f'http://{AF_HOST}:{AF_PORT}/af/af_job/{path_data}/last_parsed_time')
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- msg = result['msg'] if 'msg' in result.keys() else result
- raise Exception(f'获取上次转化时间-->请求airflow失败-->{msg}')
- # 获取job某次运行的状态
- def get_job_run_status(path_data):
- res = requests.get(url=f'http://{AF_HOST}:{AF_PORT}/af/af_run/{path_data}/status')
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- msg = result['msg'] if 'msg' in result.keys() else result
- raise Exception(f'获取job某次运行的状态-->请求airflow失败-->{msg}')
- # 中间结果转存
- def data_transfer_run(source_tb: str, target_tb: str):
- res = requests.post(url=f'http://{AF_HOST}:{AF_PORT}/af/af_job/000/data_transfer_run?source_tb={source_tb}&target_tb={target_tb}')
- result = res.json()
- print(result)
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- msg = result['msg'] if 'msg' in result.keys() else result
- raise Exception(f'中间结果转存,请求airflow失败-->{msg}')
- # 获取task日志
- def get_task_log(job_id: str, af_run_id: str, task_id: str):
- res = requests.get(url=f'http://{AF_HOST}:{AF_PORT}/af/af_run/task_log/{job_id}/{af_run_id}/{task_id}')
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- msg = result['msg'] if 'msg' in result.keys() else result
- raise Exception(f'获取task日志,请求airflow失败-->{msg}')
- # 获取中间结果转存状态
- def get_data_transfer_run_status(af_run_id: str):
- res = requests.get(url=f'http://{AF_HOST}:{AF_PORT}/af/af_run/data_transfer_log/{af_run_id}')
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- msg = result['msg'] if 'msg' in result.keys() else result
- raise Exception(f'获取中间结果转存状态,请求airflow失败-->{msg}')
- # 创建jupyter
- def create_jupyter(data):
- res = requests.post(url=f'http://{PROGRAMME_URL}/helm/ops/start', json=data)
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- stop_jupyter({'namespace': data['namespace'],'release_name': data['release_name']})
- msg = result['msg'] if 'msg' in result.keys() else result
- raise Exception(f'创建jupyter,请求jupyter端失败-->{msg}')
- # 获取jupyter密码
- def get_jupyter_password(data):
- res = requests.get(url=f'http://{PROGRAMME_URL}/helm/password', json=data)
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- msg = result['msg'] if 'msg' in result.keys() else result
- raise Exception(f'获取jupyter密码,请求jupyter端失败-->{msg}')
- # 停止jupyter
- def stop_jupyter(data):
- res = requests.post(url=f'http://{PROGRAMME_URL}/helm/ops/stop', json=data)
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- msg = result['msg'] if 'msg' in result.keys() else result
- raise Exception(f'停止jupyter,请求jupyter端失败-->{msg}')
- # 更新jupyter
- def update_jupyter(data):
- res = requests.post(url=f'http://{PROGRAMME_URL}/helm/ops/upgrade', json=data)
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- msg = result['msg'] if 'msg' in result.keys() else result
- raise Exception(f'更新jupyter,请求jupyter端失败-->{msg}')
- # 获取jupyter服务状态
- def get_jupyter_status(data):
- res = requests.post(url=f'http://{PROGRAMME_URL}/helm/ops/status', json=data)
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- msg = result['msg'] if 'msg' in result.keys() else result
- raise Exception(f'获取jupyter服务状态,请求jupyter端失败-->{msg}')
- # 获取jupyter服务状态
- def get_jupyter_html(url):
- res = requests.post(url=url)
- return res.headers
- # 请求airflow下载依赖
- def post_install_requirements(requirements_str: str,target_path: str):
- res = requests.post(url=f'http://{AF_HOST}:{AF_PORT}/af/af_job/special_job/install_requirements?requirements_str={requirements_str}&target_path={target_path}')
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- msg = result['msg'] if 'msg' in result.keys() else result
- raise Exception(f'安装依赖-->请求airflow失败-->{msg}')
- # 根据job_id和af_run_id获取运行状态
- def get_running_status(job_id: str, af_run_id: str):
- res = requests.get(url=f'http://{AF_HOST}:{AF_PORT}/af/af_run/running_status/{job_id}/{af_run_id}')
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- msg = result['msg'] if 'msg' in result.keys() else result
- raise Exception(f'获取任务运行状态,请求airflow失败-->{msg}')
|