Pārlūkot izejas kodu

日志获取变更

liweiquan 2 gadi atpakaļ
vecāks
revīzija
223256bbe5
2 mainītis faili ar 44 papildinājumiem un 12 dzēšanām
  1. 23 1
      app/routers/jm_job_log.py
  2. 21 11
      app/utils/send_util.py

+ 23 - 1
app/routers/jm_job_log.py

@@ -6,7 +6,7 @@ from sqlalchemy.orm import Session
 from app import page_help, schemas
 
 import app.crud as crud
-from app.utils.send_util import get_job_run_status
+from app.utils.send_util import get_job_run_status, get_task_log
 from constants.constants import RUN_STATUS
 from utils.sx_time import sxtimeit
 from utils.sx_web import web_try
@@ -22,6 +22,7 @@ router = APIRouter(
 )
 
 
+
 @router.get("/")
 @web_try()
 @sxtimeit
@@ -89,6 +90,27 @@ def get_job_log_once(run_id: str, db: Session = Depends(get_db)):
     res.sort(key=lambda x: x['trigger_time'], reverse=True)
     return res
 
+@router.get("/all_task")
+@web_try()
+@sxtimeit
+def get_job_all_task(run_id: str, db: Session = Depends(get_db)):
+    af_job_run = crud.get_airflow_run_once(db, run_id)
+    af_job_id = af_job_run.job_id
+    af_job = crud.get_airflow_job_once(db, af_job_id)
+    res = []
+    for task in af_job.tasks:
+        task.update({'job_id':af_job_id,'af_run_id':af_job_run.af_run_id,'task_id':task['id']})
+        res.append(task)
+    return res
+
+
+@router.get("/task_log/{job_id}/{af_run_id}/{task_id}")
+@web_try()
+@sxtimeit
+def get_job_task_log(job_id: str, af_run_id: str, task_id: str, db: Session = Depends(get_db)):
+    res = get_task_log(job_id, af_run_id, task_id)
+    return res['data']
+
 @router.get("/logs_status/{ids}")
 @web_try()
 @sxtimeit

+ 21 - 11
app/utils/send_util.py

@@ -11,7 +11,7 @@ def send_post(uri,data):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        msg = result['msg'] if 'msf' in result.keys() else result
+        msg = result['msg'] if 'msg' in result.keys() else result
         raise Exception(f'{uri}-->请求airflow失败-->{msg}')
 
 def send_submit(af_job_id):
@@ -20,7 +20,7 @@ def send_submit(af_job_id):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        msg = result['msg'] if 'msf' in result.keys() else result
+        msg = result['msg'] if 'msg' in result.keys() else result
         raise Exception(f'提交任务,请求airflow失败-->{msg}')
 
 
@@ -30,7 +30,7 @@ def send_put(uri,path_data,data):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        msg = result['msg'] if 'msf' in result.keys() else result
+        msg = result['msg'] if 'msg' in result.keys() else result
         raise Exception(f'{uri}-->请求airflow失败-->{msg}')
 
 def send_get(uri,path_data):
@@ -39,7 +39,7 @@ def send_get(uri,path_data):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        msg = result['msg'] if 'msf' in result.keys() else result
+        msg = result['msg'] if 'msg' in result.keys() else result
         raise Exception(f'{uri}-->请求airflow失败-->{msg}')
 
 
@@ -50,7 +50,7 @@ def send_execute(path_data):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        msg = result['msg'] if 'msf' in result.keys() else result
+        msg = result['msg'] if 'msg' in result.keys() else result
         raise Exception(f'执行任务,请求airflow失败-->{msg}')
 
 # 起停任务
@@ -61,7 +61,7 @@ def send_pause(af_job_id, status):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        msg = result['msg'] if 'msf' in result.keys() else result
+        msg = result['msg'] if 'msg' in result.keys() else result
         raise Exception(f'修改任务状态,请求airflow失败-->{msg}')
 
 # 删除任务
@@ -71,7 +71,7 @@ def send_delete(uri, path_data):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        msg = result['msg'] if 'msf' in result.keys() else result
+        msg = result['msg'] if 'msg' in result.keys() else result
         raise Exception(f'{uri}-->请求airflow失败-->{msg}')
 
 # 获取airflow端dag文件生成时间
@@ -81,7 +81,7 @@ def get_job_last_parsed_time(path_data):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        msg = result['msg'] if 'msf' in result.keys() else result
+        msg = result['msg'] if 'msg' in result.keys() else result
         raise Exception(f'获取上次转化时间-->请求airflow失败-->{msg}')
 
 # 获取job某次运行的状态
@@ -91,7 +91,7 @@ def get_job_run_status(path_data):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        msg = result['msg'] if 'msf' in result.keys() else result
+        msg = result['msg'] if 'msg' in result.keys() else result
         raise Exception(f'获取job某次运行的状态-->请求airflow失败-->{msg}')
 
 # 中间结果转存
@@ -102,5 +102,15 @@ def data_transfer_run(source_tb: str, target_tb: str):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        msg = result['msg'] if 'msf' in result.keys() else result
-        raise Exception('中间结果转存,请求airflow失败-->{msg}')
+        msg = result['msg'] if 'msg' in result.keys() else result
+        raise Exception(f'中间结果转存,请求airflow失败-->{msg}')
+
+# 获取task日志
+def get_task_log(job_id: str, af_run_id: str, task_id: str):
+    res = requests.get(url=f'http://{HOST}:{PORT}/af/af_run/task_log/{job_id}/{af_run_id}/{task_id}')
+    result = res.json()
+    if 'code' in result.keys() and result['code'] == 200:
+        return res.json()
+    else:
+        msg = result['msg'] if 'msg' in result.keys() else result
+        raise Exception(f'获取task日志,请求airflow失败-->{msg}')