|
@@ -11,17 +11,17 @@ def send_post(uri,data):
|
|
if 'code' in result.keys() and result['code'] == 200:
|
|
if 'code' in result.keys() and result['code'] == 200:
|
|
return res.json()
|
|
return res.json()
|
|
else:
|
|
else:
|
|
- print(result)
|
|
|
|
- raise Exception(f'{uri}-->请求airflow失败-->'+result['msg'])
|
|
|
|
|
|
+ msg = result['msg'] if 'msf' in result.keys() else result
|
|
|
|
+ raise Exception(f'{uri}-->请求airflow失败-->{msg}')
|
|
|
|
|
|
def send_submit(af_job_id):
|
|
def send_submit(af_job_id):
|
|
res = requests.post(url=f'http://{HOST}:{PORT}/af/af_job/submit?id='+str(af_job_id))
|
|
res = requests.post(url=f'http://{HOST}:{PORT}/af/af_job/submit?id='+str(af_job_id))
|
|
result = res.json()
|
|
result = res.json()
|
|
- print(result)
|
|
|
|
if 'code' in result.keys() and result['code'] == 200:
|
|
if 'code' in result.keys() and result['code'] == 200:
|
|
return res.json()
|
|
return res.json()
|
|
else:
|
|
else:
|
|
- raise Exception('提交任务,请求airflow失败-->'+result['msg'])
|
|
|
|
|
|
+ msg = result['msg'] if 'msf' in result.keys() else result
|
|
|
|
+ raise Exception(f'提交任务,请求airflow失败-->{msg}')
|
|
|
|
|
|
|
|
|
|
def send_put(uri,path_data,data):
|
|
def send_put(uri,path_data,data):
|
|
@@ -30,7 +30,8 @@ def send_put(uri,path_data,data):
|
|
if 'code' in result.keys() and result['code'] == 200:
|
|
if 'code' in result.keys() and result['code'] == 200:
|
|
return res.json()
|
|
return res.json()
|
|
else:
|
|
else:
|
|
- raise Exception(f'{uri}-->请求airflow失败-->'+result['msg'])
|
|
|
|
|
|
+ msg = result['msg'] if 'msf' in result.keys() else result
|
|
|
|
+ raise Exception(f'{uri}-->请求airflow失败-->{msg}')
|
|
|
|
|
|
def send_get(uri,path_data):
|
|
def send_get(uri,path_data):
|
|
res = requests.get(url=f'http://{HOST}:{PORT}{uri}/{path_data}')
|
|
res = requests.get(url=f'http://{HOST}:{PORT}{uri}/{path_data}')
|
|
@@ -38,18 +39,19 @@ def send_get(uri,path_data):
|
|
if 'code' in result.keys() and result['code'] == 200:
|
|
if 'code' in result.keys() and result['code'] == 200:
|
|
return res.json()
|
|
return res.json()
|
|
else:
|
|
else:
|
|
- raise Exception(f'{uri}-->请求airflow失败-->'+result['msg'])
|
|
|
|
|
|
+ msg = result['msg'] if 'msf' in result.keys() else result
|
|
|
|
+ raise Exception(f'{uri}-->请求airflow失败-->{msg}')
|
|
|
|
|
|
|
|
|
|
# 执行任务
|
|
# 执行任务
|
|
def send_execute(path_data):
|
|
def send_execute(path_data):
|
|
res = requests.post(url=f'http://{HOST}:{PORT}/af/af_job/{str(path_data)}/run')
|
|
res = requests.post(url=f'http://{HOST}:{PORT}/af/af_job/{str(path_data)}/run')
|
|
result = res.json()
|
|
result = res.json()
|
|
- print(result)
|
|
|
|
if 'code' in result.keys() and result['code'] == 200:
|
|
if 'code' in result.keys() and result['code'] == 200:
|
|
return res.json()
|
|
return res.json()
|
|
else:
|
|
else:
|
|
- raise Exception('执行一次任务,请求airflow失败-->'+result['msg'])
|
|
|
|
|
|
+ msg = result['msg'] if 'msf' in result.keys() else result
|
|
|
|
+ raise Exception(f'执行任务,请求airflow失败-->{msg}')
|
|
|
|
|
|
# 起停任务
|
|
# 起停任务
|
|
def send_pause(af_job_id, status):
|
|
def send_pause(af_job_id, status):
|
|
@@ -59,7 +61,8 @@ def send_pause(af_job_id, status):
|
|
if 'code' in result.keys() and result['code'] == 200:
|
|
if 'code' in result.keys() and result['code'] == 200:
|
|
return res.json()
|
|
return res.json()
|
|
else:
|
|
else:
|
|
- raise Exception('修改任务状态,请求airflow失败-->'+result['msg'])
|
|
|
|
|
|
+ msg = result['msg'] if 'msf' in result.keys() else result
|
|
|
|
+ raise Exception(f'修改任务状态,请求airflow失败-->{msg}')
|
|
|
|
|
|
# 删除任务
|
|
# 删除任务
|
|
def send_delete(uri, path_data):
|
|
def send_delete(uri, path_data):
|
|
@@ -68,28 +71,28 @@ def send_delete(uri, path_data):
|
|
if 'code' in result.keys() and result['code'] == 200:
|
|
if 'code' in result.keys() and result['code'] == 200:
|
|
return res.json()
|
|
return res.json()
|
|
else:
|
|
else:
|
|
- print(result)
|
|
|
|
- raise Exception(f'{uri}-->请求airflow失败-->'+result['msg'])
|
|
|
|
|
|
+ msg = result['msg'] if 'msf' in result.keys() else result
|
|
|
|
+ raise Exception(f'{uri}-->请求airflow失败-->{msg}')
|
|
|
|
|
|
# 获取airflow端dag文件生成时间
|
|
# 获取airflow端dag文件生成时间
|
|
def get_job_last_parsed_time(path_data):
|
|
def get_job_last_parsed_time(path_data):
|
|
res = requests.get(url=f'http://{HOST}:{PORT}/af/af_job/{path_data}/last_parsed_time')
|
|
res = requests.get(url=f'http://{HOST}:{PORT}/af/af_job/{path_data}/last_parsed_time')
|
|
result = res.json()
|
|
result = res.json()
|
|
- print(result)
|
|
|
|
if 'code' in result.keys() and result['code'] == 200:
|
|
if 'code' in result.keys() and result['code'] == 200:
|
|
return res.json()
|
|
return res.json()
|
|
else:
|
|
else:
|
|
- raise Exception('获取上次转化时间-->请求airflow失败-->'+result['msg'])
|
|
|
|
|
|
+ msg = result['msg'] if 'msf' in result.keys() else result
|
|
|
|
+ raise Exception(f'获取上次转化时间-->请求airflow失败-->{msg}')
|
|
|
|
|
|
# 获取job某次运行的状态
|
|
# 获取job某次运行的状态
|
|
def get_job_run_status(path_data):
|
|
def get_job_run_status(path_data):
|
|
res = requests.get(url=f'http://{HOST}:{PORT}/af/af_run/{path_data}/status')
|
|
res = requests.get(url=f'http://{HOST}:{PORT}/af/af_run/{path_data}/status')
|
|
result = res.json()
|
|
result = res.json()
|
|
- print(result)
|
|
|
|
if 'code' in result.keys() and result['code'] == 200:
|
|
if 'code' in result.keys() and result['code'] == 200:
|
|
return res.json()
|
|
return res.json()
|
|
else:
|
|
else:
|
|
- raise Exception('获取job某次运行的状态-->请求airflow失败-->'+result['msg'])
|
|
|
|
|
|
+ msg = result['msg'] if 'msf' in result.keys() else result
|
|
|
|
+ raise Exception(f'获取job某次运行的状态-->请求airflow失败-->{msg}')
|
|
|
|
|
|
# 中间结果转存
|
|
# 中间结果转存
|
|
def data_transfer_run(source_tb: str, target_tb: str):
|
|
def data_transfer_run(source_tb: str, target_tb: str):
|
|
@@ -99,4 +102,5 @@ def data_transfer_run(source_tb: str, target_tb: str):
|
|
if 'code' in result.keys() and result['code'] == 200:
|
|
if 'code' in result.keys() and result['code'] == 200:
|
|
return res.json()
|
|
return res.json()
|
|
else:
|
|
else:
|
|
- raise Exception('中间结果转存,请求airflow失败-->'+result['msg'])
|
|
|
|
|
|
+ msg = result['msg'] if 'msf' in result.keys() else result
|
|
|
|
+ raise Exception('中间结果转存,请求airflow失败-->{msg}')
|