瀏覽代碼

Merge branch 'master' of http://gogsb.soaringnova.com/sxwl_DL/datax-admin

Zhang Li 2 年之前
父節點
當前提交
c78f066ceb
共有 5 個文件被更改,包括 72 次插入10 次删除
  1. 19 2
      app/crud/data_management.py
  2. 5 1
      app/models/data_management.py
  3. 29 6
      app/routers/data_management.py
  4. 12 1
      app/utils/send_util.py
  5. 7 0
      data/data.sql

+ 19 - 2
app/crud/data_management.py

@@ -1,9 +1,10 @@
+from re import S
 import time
 from typing import List
 from app import models, schemas
 from sqlalchemy.orm import Session
 
-def create_data_management(db: Session, item: schemas.DataManagementCreate, table_name: str):
+def create_data_management(db: Session, item: schemas.DataManagementCreate, table_name: str, af_run_id: str):
     create_time: int = int(time.time())
     db_item = models.DataManagement(**{
         'name': item.name,
@@ -11,17 +12,33 @@ def create_data_management(db: Session, item: schemas.DataManagementCreate, tabl
         'create_time': create_time,
         'user_name': item.user_name,
         'user_id': item.user_id,
-        'project_id': item.project_id
+        'project_id': item.project_id,
+        'af_run_id': af_run_id,
+        'status': 1
     })
     db.add(db_item)
     db.commit()
     db.refresh(db_item)
     return db_item
 
+def update_data_management_status(db: Session, d_id: int, status: int):
+    db_item: models.DataManagement = db.query(models.DataManagement).filter(models.DataManagement.id == d_id).first()
+    db_item.status = status
+    db.commit()
+    db.flush()
+    db.refresh(db_item)
+    return db_item
+
 def get_data_managements(db: Session, user_id: str, project_id: str):
     res: List[models.DataManagement] = db.query(models.DataManagement).filter(models.DataManagement.project_id == project_id).all()
     return res
 
+def get_data_management_info(db: Session, d_id: int):
+    db_item: models.DataManagement = db.query(models.DataManagement).filter(models.DataManagement.id == d_id).first()
+    if not db_item:
+        raise Exception('该数据不存在')
+    return db_item
+
 def delete_data_management(db: Session, d_id: int):
     dm_item = db.query(models.DataManagement).filter(models.DataManagement.id == d_id).first()
     if not dm_item:

+ 5 - 1
app/models/data_management.py

@@ -18,4 +18,8 @@ class DataManagement(BaseModel):
     # 创建人编号
     user_id = Column(String, nullable=False)
     # 项目编号
-    project_id = Column(String, nullable=False)
+    project_id = Column(String, nullable=False)
+    # af_run_id
+    af_run_id = Column(String, nullable=False)
+    # 状态(1:转存中,2:成功,3:失败)
+    status = Column(Integer, nullable=False)

+ 29 - 6
app/routers/data_management.py

@@ -1,5 +1,5 @@
 from asyncio import current_task
-from re import A
+from re import A, I
 import time
 from typing import Optional
 from fastapi import APIRouter
@@ -10,7 +10,8 @@ from app import schemas
 
 import app.crud as crud
 from app.services.dag import get_tmp_table_name
-from app.utils.send_util import data_transfer_run
+from app.utils.send_util import data_transfer_run, get_data_transfer_run_status
+from constants.constants import RUN_STATUS
 from utils.sx_time import sxtimeit
 from utils.sx_web import web_try
 from app.common.hive import hiveDs
@@ -32,9 +33,14 @@ def create_data_management(item: schemas.DataManagementCreate, db: Session = Dep
     current_time = int(time.time())
     table_name = f'project{item.project_id.lower()}_user{item.user_id.lower()}_{item.name.lower()}_{current_time}'
     tmp_table_name = get_tmp_table_name(item.dag_uuid, item.node_id, str(item.out_pin), db)
-    af_run_id = data_transfer_run(tmp_table_name, table_name)
-    res = crud.create_data_management(db, item, table_name)
-    return res
+    af_run_res = data_transfer_run(tmp_table_name, table_name)
+    af_run = af_run_res['data'] if 'data' in af_run_res.keys() else None
+    af_run_id = af_run['af_run_id'] if af_run and 'af_run_id' in af_run.keys() else None
+    if af_run_id:
+        res = crud.create_data_management(db, item, table_name, af_run_id)
+        return res
+    else:
+        raise Exception('中间结果转存失败')
 
 
 @router.get("/")
@@ -42,9 +48,26 @@ def create_data_management(item: schemas.DataManagementCreate, db: Session = Dep
 @sxtimeit
 def get_data_managements(user_id: str, project_id: str, db: Session = Depends(get_db)):
     res = crud.get_data_managements(db, user_id, project_id)
+    data_management_list = []
     for item in res:
         item.table_name = f'{database_name}.{item.table_name}'
-    return res
+        data_management_list.append(item)
+    return data_management_list
+
+@router.get("/info")
+@web_try()
+@sxtimeit
+def get_data_management_info(id: int, db: Session = Depends(get_db)):
+    item = crud.get_data_management_info(db, id)
+    if item.status == 1:
+        transfer_run_res = get_data_transfer_run_status(item.af_run_id)
+        transfer_run = transfer_run_res['data'] if 'data' in transfer_run_res.keys() else None
+        transfer_run_status = transfer_run['status'] if transfer_run and 'status' in transfer_run.keys() else None
+        if transfer_run_status:
+            item = crud.update_data_management_status(db, item.id, RUN_STATUS[transfer_run_status])
+    item.table_name = f'{database_name}.{item.table_name}'
+    return item
+
 
 @router.get("/local")
 @web_try()

+ 12 - 1
app/utils/send_util.py

@@ -113,4 +113,15 @@ def get_task_log(job_id: str, af_run_id: str, task_id: str):
         return res.json()
     else:
         msg = result['msg'] if 'msg' in result.keys() else result
-        raise Exception(f'获取task日志,请求airflow失败-->{msg}')
+        raise Exception(f'获取task日志,请求airflow失败-->{msg}')
+
+
+# 获取中间结果转存状态
+def get_data_transfer_run_status(af_run_id: str):
+    res = requests.get(url=f'http://{HOST}:{PORT}/af/af_run/data_transfer_log/{af_run_id}')
+    result = res.json()
+    if 'code' in result.keys() and result['code'] == 200:
+        return res.json()
+    else:
+        msg = result['msg'] if 'msg' in result.keys() else result
+        raise Exception(f'获取中间结果转存状态,请求airflow失败-->{msg}')

+ 7 - 0
data/data.sql

@@ -336,4 +336,11 @@ ALTER TABLE `jm_job_info`
 ADD COLUMN `create_time` int(20) NULL COMMENT '创建时间' AFTER `project_id`,
 ADD COLUMN `update_time` int(20) NULL COMMENT '修改时间' AFTER `create_time`;
 
+-- ----------------------------
+-- Alter for data_management
+-- ----------------------------
+ALTER TABLE `data_management`
+ADD COLUMN `af_run_id` varchar(100) NOT NULL COMMENT 'airflow运行id' AFTER `project_id`,
+ADD COLUMN `status` tinyint(4) NOT NULL COMMENT '状态(1:转存中,2:成功,3:失败)' AFTER `af_run_id`;
+
 SET FOREIGN_KEY_CHECKS = 1;