Ver código fonte

Merge remote-tracking branch 'origin/master'

luoyulong 2 anos atrás
pai
commit
ff10075502
3 arquivos alterados com 60 adições e 14 exclusões
  1. 35 1
      app/routers/jm_job_log.py
  2. 4 2
      app/services/jm_job.py
  3. 21 11
      app/utils/send_util.py

+ 35 - 1
app/routers/jm_job_log.py

@@ -6,7 +6,7 @@ from sqlalchemy.orm import Session
 from app import page_help, schemas
 
 import app.crud as crud
-from app.utils.send_util import get_job_run_status
+from app.utils.send_util import get_job_run_status, get_task_log
 from constants.constants import RUN_STATUS
 from utils.sx_time import sxtimeit
 from utils.sx_web import web_try
@@ -22,6 +22,7 @@ router = APIRouter(
 )
 
 
+
 @router.get("/")
 @web_try()
 @sxtimeit
@@ -89,6 +90,39 @@ def get_job_log_once(run_id: str, db: Session = Depends(get_db)):
     res.sort(key=lambda x: x['trigger_time'], reverse=True)
     return res
 
+@router.get("/all_task")
+@web_try()
+@sxtimeit
+def get_job_all_task(run_id: str, db: Session = Depends(get_db)):
+    af_job_run = crud.get_airflow_run_once(db, run_id)
+    af_job_id = af_job_run.job_id
+    af_job = crud.get_airflow_job_once(db, af_job_id)
+    res = []
+    for task in af_job.tasks:
+        task.update({
+            'job_id':af_job_id,
+            'af_run_id':af_job_run.af_run_id,
+            'task_id':task['id'],
+            })
+        task_log_res = get_task_log(af_job_id, af_job_run.af_run_id, task['id'])
+        task_log = task_log_res['data'] if 'data' in task_log_res else None
+        if task_log:
+            task.update({
+                'execute_result':task_log['status'] if 'status' in task_log else None,
+                'execute_time':task_log['start_time'] if 'start_time' in task_log else None,
+                'log': task_log['log'] if 'log' in task_log else None
+                })
+        res.append(task)
+    return res
+
+
+@router.get("/task_log/{job_id}/{af_run_id}/{task_id}")
+@web_try()
+@sxtimeit
+def get_job_task_log(job_id: str, af_run_id: str, task_id: str, db: Session = Depends(get_db)):
+    res = get_task_log(job_id, af_run_id, task_id)
+    return res['data']
+
 @router.get("/logs_status/{ids}")
 @web_try()
 @sxtimeit

+ 4 - 2
app/services/jm_job.py

@@ -63,10 +63,11 @@ def jm_job_update_task(jm_homework: models.JmHomework, relation_list, db: Sessio
 
 def jm_job_create_job(jm_job_info: models.JmJobInfo, nodes, edges, db: Session):
     homework_ids = [node['homework_id'] for node in nodes]
+    node_uuid_to_h_id = {node['id']:node['homework_id'] for node in nodes}
     relations = crud.get_af_ids(db,homework_ids, 'task')
     se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
     tasks = [ send_get("/af/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
-    dependence = [[se_id_to_af_id_dict[edge['source']],se_id_to_af_id_dict[str(edge['target'])]] for edge in edges]
+    dependence = [[se_id_to_af_id_dict[node_uuid_to_h_id[str(edge['source'])]],se_id_to_af_id_dict[node_uuid_to_h_id[str(edge['target'])]]] for edge in edges]
     cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once'
     cron.replace('?','*')
     af_job = {
@@ -92,10 +93,11 @@ def jm_job_create_job(jm_job_info: models.JmJobInfo, nodes, edges, db: Session):
 
 def jm_job_update_job(jm_job_info: models.JmJobInfo, nodes, edges, db: Session):
     homework_ids = [node['homework_id'] for node in nodes]
+    node_uuid_to_h_id = {node['id']:node['homework_id'] for node in nodes}
     relations = crud.get_af_ids(db,homework_ids, 'task')
     se_id_to_af_id_dict = { relation.se_id:relation.af_id for relation in relations}
     tasks = [ send_get("/af/af_task/getOnce",id)['data'] for id in se_id_to_af_id_dict.values()]
-    dependence = [[se_id_to_af_id_dict[edge['source']],se_id_to_af_id_dict[str(edge['target'])]] for edge in edges]
+    dependence = [[se_id_to_af_id_dict[node_uuid_to_h_id[str(edge['source'])]],se_id_to_af_id_dict[node_uuid_to_h_id[str(edge['target'])]]] for edge in edges]
     cron = jm_job_info.cron_expression if jm_job_info.cron_type == 2 else '@once'
     cron.replace('?','*')
     af_job = {

+ 21 - 11
app/utils/send_util.py

@@ -11,7 +11,7 @@ def send_post(uri,data):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        msg = result['msg'] if 'msf' in result.keys() else result
+        msg = result['msg'] if 'msg' in result.keys() else result
         raise Exception(f'{uri}-->请求airflow失败-->{msg}')
 
 def send_submit(af_job_id):
@@ -20,7 +20,7 @@ def send_submit(af_job_id):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        msg = result['msg'] if 'msf' in result.keys() else result
+        msg = result['msg'] if 'msg' in result.keys() else result
         raise Exception(f'提交任务,请求airflow失败-->{msg}')
 
 
@@ -30,7 +30,7 @@ def send_put(uri,path_data,data):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        msg = result['msg'] if 'msf' in result.keys() else result
+        msg = result['msg'] if 'msg' in result.keys() else result
         raise Exception(f'{uri}-->请求airflow失败-->{msg}')
 
 def send_get(uri,path_data):
@@ -39,7 +39,7 @@ def send_get(uri,path_data):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        msg = result['msg'] if 'msf' in result.keys() else result
+        msg = result['msg'] if 'msg' in result.keys() else result
         raise Exception(f'{uri}-->请求airflow失败-->{msg}')
 
 
@@ -50,7 +50,7 @@ def send_execute(path_data):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        msg = result['msg'] if 'msf' in result.keys() else result
+        msg = result['msg'] if 'msg' in result.keys() else result
         raise Exception(f'执行任务,请求airflow失败-->{msg}')
 
 # 起停任务
@@ -61,7 +61,7 @@ def send_pause(af_job_id, status):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        msg = result['msg'] if 'msf' in result.keys() else result
+        msg = result['msg'] if 'msg' in result.keys() else result
         raise Exception(f'修改任务状态,请求airflow失败-->{msg}')
 
 # 删除任务
@@ -71,7 +71,7 @@ def send_delete(uri, path_data):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        msg = result['msg'] if 'msf' in result.keys() else result
+        msg = result['msg'] if 'msg' in result.keys() else result
         raise Exception(f'{uri}-->请求airflow失败-->{msg}')
 
 # 获取airflow端dag文件生成时间
@@ -81,7 +81,7 @@ def get_job_last_parsed_time(path_data):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        msg = result['msg'] if 'msf' in result.keys() else result
+        msg = result['msg'] if 'msg' in result.keys() else result
         raise Exception(f'获取上次转化时间-->请求airflow失败-->{msg}')
 
 # 获取job某次运行的状态
@@ -91,7 +91,7 @@ def get_job_run_status(path_data):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        msg = result['msg'] if 'msf' in result.keys() else result
+        msg = result['msg'] if 'msg' in result.keys() else result
         raise Exception(f'获取job某次运行的状态-->请求airflow失败-->{msg}')
 
 # 中间结果转存
@@ -102,5 +102,15 @@ def data_transfer_run(source_tb: str, target_tb: str):
     if 'code' in result.keys() and result['code'] == 200:
         return res.json()
     else:
-        msg = result['msg'] if 'msf' in result.keys() else result
-        raise Exception('中间结果转存,请求airflow失败-->{msg}')
+        msg = result['msg'] if 'msg' in result.keys() else result
+        raise Exception(f'中间结果转存,请求airflow失败-->{msg}')
+
+# 获取task日志
+def get_task_log(job_id: str, af_run_id: str, task_id: str):
+    res = requests.get(url=f'http://{HOST}:{PORT}/af/af_run/task_log/{job_id}/{af_run_id}/{task_id}')
+    result = res.json()
+    if 'code' in result.keys() and result['code'] == 200:
+        return res.json()
+    else:
+        msg = result['msg'] if 'msg' in result.keys() else result
+        raise Exception(f'获取task日志,请求airflow失败-->{msg}')