123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- from unittest import result
- import requests
- from configs.settings import config
- HOST = config.get('AF_BACKEND', 'host')
- PORT = config.get('AF_BACKEND', 'port')
- def send_post(uri,data):
- res = requests.post(url=f'http://{HOST}:{PORT}{uri}', json=data)
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- msg = result['msg'] if 'msf' in result.keys() else result
- raise Exception(f'{uri}-->请求airflow失败-->{msg}')
- def send_submit(af_job_id):
- res = requests.post(url=f'http://{HOST}:{PORT}/af/af_job/submit?id='+str(af_job_id))
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- msg = result['msg'] if 'msf' in result.keys() else result
- raise Exception(f'提交任务,请求airflow失败-->{msg}')
- def send_put(uri,path_data,data):
- res = requests.put(url=f'http://{HOST}:{PORT}{uri}/{path_data}', json=data)
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- msg = result['msg'] if 'msf' in result.keys() else result
- raise Exception(f'{uri}-->请求airflow失败-->{msg}')
- def send_get(uri,path_data):
- res = requests.get(url=f'http://{HOST}:{PORT}{uri}/{path_data}')
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- msg = result['msg'] if 'msf' in result.keys() else result
- raise Exception(f'{uri}-->请求airflow失败-->{msg}')
- # 执行任务
- def send_execute(path_data):
- res = requests.post(url=f'http://{HOST}:{PORT}/af/af_job/{str(path_data)}/run')
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- msg = result['msg'] if 'msf' in result.keys() else result
- raise Exception(f'执行任务,请求airflow失败-->{msg}')
- # 起停任务
- def send_pause(af_job_id, status):
- flag = True if status == 0 else False
- res = requests.patch(url=f'http://{HOST}:{PORT}/af/af_job/{str(af_job_id)}/pause/{str(flag)}')
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- msg = result['msg'] if 'msf' in result.keys() else result
- raise Exception(f'修改任务状态,请求airflow失败-->{msg}')
- # 删除任务
- def send_delete(uri, path_data):
- res = requests.delete(url=f'http://{HOST}:{PORT}{uri}/{path_data}')
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- msg = result['msg'] if 'msf' in result.keys() else result
- raise Exception(f'{uri}-->请求airflow失败-->{msg}')
- # 获取airflow端dag文件生成时间
- def get_job_last_parsed_time(path_data):
- res = requests.get(url=f'http://{HOST}:{PORT}/af/af_job/{path_data}/last_parsed_time')
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- msg = result['msg'] if 'msf' in result.keys() else result
- raise Exception(f'获取上次转化时间-->请求airflow失败-->{msg}')
- # 获取job某次运行的状态
- def get_job_run_status(path_data):
- res = requests.get(url=f'http://{HOST}:{PORT}/af/af_run/{path_data}/status')
- result = res.json()
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- msg = result['msg'] if 'msf' in result.keys() else result
- raise Exception(f'获取job某次运行的状态-->请求airflow失败-->{msg}')
- # 中间结果转存
- def data_transfer_run(source_tb: str, target_tb: str):
- res = requests.post(url=f'http://{HOST}:{PORT}/af/af_job/000/data_transfer_run?source_tb={source_tb}&target_tb={target_tb}')
- result = res.json()
- print(result)
- if 'code' in result.keys() and result['code'] == 200:
- return res.json()
- else:
- msg = result['msg'] if 'msf' in result.keys() else result
- raise Exception('中间结果转存,请求airflow失败-->{msg}')
|