Преглед изворни кода

中间结果另存与查看

liweiquan пре 2 година
родитељ
комит
ff5cbb486d

+ 7 - 2
app/core/datasource/hive.py

@@ -56,9 +56,9 @@ class HiveDS(DataSourceBase):
             return False
 
 
-    def get_preview_data(self, table_name, limit=100):
+    def get_preview_data(self, table_name, limit=100, page = 0):
         sql1 = f'describe {self.database_name}.{table_name}'
-        sql2 = f"SELECT * FROM {table_name} LIMIT {limit}"
+        sql2 = f"SELECT * FROM {table_name} LIMIT {page},{limit}"
         res = self._execute_sql([sql1, sql2])
         logger.info(res)
         return {
@@ -66,6 +66,11 @@ class HiveDS(DataSourceBase):
             'content': res[1]
         }
 
+    def get_data_num(self, table_name):
+        sql2 = f"SELECT 1 FROM {table_name}"
+        res = self._execute_sql([sql2])
+        return len(res[0])
+
     def list_tables(self):
         sql = f'show tables'
         res = self._execute_sql([sql])

+ 8 - 3
app/crud/data_management.py

@@ -3,10 +3,15 @@ from typing import List
 from app import models, schemas
 from sqlalchemy.orm import Session
 
-def create_data_management(db: Session, item: schemas.DataManagementCreate):
+def create_data_management(db: Session, item: schemas.DataManagementCreate, table_name: str):
     create_time: int = int(time.time())
-    db_item = models.DataManagement(**item.dict(), **{
+    db_item = models.DataManagement(**{
+        'name': item.name,
+        'table_name': table_name,
         'create_time': create_time,
+        'user_name': item.user_name,
+        'user_id': item.user_id,
+        'project_id': item.project_id
     })
     db.add(db_item)
     db.commit()
@@ -14,7 +19,7 @@ def create_data_management(db: Session, item: schemas.DataManagementCreate):
     return db_item
 
 def get_data_managements(db: Session, user_id: str, project_id: str):
-    res: List[models.DataManagement] = db.query(models.DataManagement).filter(models.DataManagement.project_id == project_id,models.DataManagement.user_id == user_id).all()
+    res: List[models.DataManagement] = db.query(models.DataManagement).filter(models.DataManagement.project_id == project_id).all()
     return res
 
 def delete_data_management(db: Session, d_id: int):

+ 2 - 12
app/models/data_management.py

@@ -9,18 +9,8 @@ class DataManagement(BaseModel):
     id = Column(Integer, primary_key=True, index=True)
     # 数据名称
     name = Column(String)
-    # 数据类型
-    data_type = Column(String)
-    # 数据条数
-    data_num = Column(Integer)
-    # 存储位置
-    storage_location = Column(String)
-    # 占用存储
-    storage_usage = Column(String)
-    # 存储路径
-    storage_path = Column(String)
-    # 完整性
-    integrity = Column(String)
+    # 表格名称
+    table_name = Column(String)
     # 创建时间
     create_time = Column(Integer, nullable=False)
     # 创建人名称

+ 3 - 23
app/routers/dag.py

@@ -1,17 +1,11 @@
-from cgitb import reset
-from importlib.resources import contents
-import json
-import os
 from sqlalchemy.orm import Session
 from fastapi import Depends
 from fastapi import APIRouter
-from app.services.dag import dag_job_submit
+from app.services.dag import dag_job_submit, get_tmp_table_name
 from app import crud, models, schemas
 from app.utils.send_util import get_job_run_status
-from app.common.minio import minio_client
 from utils.sx_time import sxtimeit
 from utils.sx_web import web_try
-from fastapi.responses import StreamingResponse
 from app.common.hive import hiveDs
 from app import get_db
 
@@ -69,21 +63,7 @@ def get_dag_debug_status(dag_uuid: str, node_id: str,db: Session = Depends(get_d
 @web_try()
 @sxtimeit
 def get_dag_debug_result(dag_uuid: str,node_id: str,out_pin: int ,db: Session = Depends(get_db)):
-    relation = crud.get_dag_af_id(db,dag_uuid, 'debug')
-    job_id = relation.af_id
-    af_job_run = crud.get_airflow_run_once_debug_mode(db,job_id)
-    tasks = af_job_run.details['tasks'] if len(af_job_run.details['tasks'])>0 else {}
-    task_id = None
-    for task in tasks:
-        t_id = task.split('_')[0]
-        n_id = task.split('_')[1]
-        if n_id == node_id:
-            task_id = t_id
-            break
-    result = None
-    if task_id:
-        table_name = f'job{job_id}_task{task_id}_subnode{node_id}_output{out_pin}_tmp'
-        t_list = hiveDs.list_tables()
-        result = hiveDs.get_preview_data(table_name,500) if table_name.lower() in t_list else None
+    table_name = get_tmp_table_name(dag_uuid, node_id, str(out_pin), db)
+    result = hiveDs.get_preview_data(table_name,500)
     return result
 

+ 20 - 5
app/routers/data_management.py

@@ -1,15 +1,18 @@
+from asyncio import current_task
 from re import A
+from time import time
 from typing import Optional
 from fastapi import APIRouter
 
 from fastapi import Depends
 from sqlalchemy.orm import Session
 from app import schemas
-from app.common.minio import FileHandler
 
 import app.crud as crud
+from app.services.dag import get_tmp_table_name
 from utils.sx_time import sxtimeit
 from utils.sx_web import web_try
+from app.common.hive import hiveDs
 
 from app import get_db
 
@@ -23,7 +26,12 @@ router = APIRouter(
 @web_try()
 @sxtimeit
 def create_data_management(item: schemas.DataManagementCreate, db: Session = Depends(get_db)):
-    return crud.create_data_management(db, item)
+    current_time = int(time.time())
+    table_name = f'project{item.project_id.lower()}_user{item.user_id.lower()}_{item.name.lower()}_{current_time}'
+    tmp_table_name = get_tmp_table_name(item.dag_uuid, item.node_id, str(item.out_pin), db)
+    # 执行临时表的转存,目前还不能,先将临时表名存入
+    res = crud.create_data_management(db, item, tmp_table_name)
+    return res
 
 
 @router.get("/")
@@ -37,6 +45,13 @@ def get_data_managements(user_id: str, project_id: str, db: Session = Depends(ge
 @sxtimeit
 def delete_data_management(data_management_id: int, db: Session = Depends(get_db)):
     data_management = crud.delete_data_management(db, data_management_id)
-    file_handler = FileHandler("datax")
-    file_handler.del_image(data_management.storage_path)
-    return data_management
+    return data_management
+
+@router.get("/table_content")
+@web_try()
+@sxtimeit
+def get_data_management_content(table_name: str, page: Optional[int] = 0, size: Optional[int] = 100, db: Session = Depends(get_db)):
+    result = hiveDs.get_preview_data(table_name,limit=size,page=page)
+    data_num = hiveDs.get_data_num(table_name)
+    result.update({'total':data_num})
+    return result

+ 14 - 22
app/schemas/data_management.py

@@ -5,18 +5,6 @@ from pydantic import BaseModel
 class DataManagementBase(BaseModel):
     # 数据名称
     name: str
-    # 数据类型
-    data_type: str
-    # 数据条数
-    data_num: int
-    # 存储位置
-    storage_location: str
-    # 占用存储
-    storage_usage: str
-    # 存储路径
-    storage_path: str
-    # 完整性
-    integrity: str
     # 创建人名称
     user_name: str
     # 创建人编号
@@ -25,24 +13,26 @@ class DataManagementBase(BaseModel):
     project_id: str
 
 class DataManagementCreate(DataManagementBase):
+    dag_uuid: str
+    node_id: str
+    out_pin: str
     class Config:
         schema_extra = {
             "example": {
                 "name": "test",
-                "data_type": "数据表",
-                "data_num": 25,
-                "storage_location": "minio",
-                "storage_usage": "23M",
-                "storage_path": "/datax/usgdcnkasojcxasuscbv",
-                "integrity": "-",
                 "user_name": "test",
                 "user_id": "test",
                 "project_id": "test",
+                "dag_uuid": "test",
+                "node_id": "test",
+                "out_pin": "0",
             }
         }
 
 class DataManagement(DataManagementBase):
     id: int
+    # 表格名称
+    table_name: str
     # 创建时间
     create_time: int
     class Config:
@@ -50,12 +40,14 @@ class DataManagement(DataManagementBase):
 
 
 class DataManagementSelect(BaseModel):
-    user_id: str
-    project_id: str
+    table_name: str
+    page: Optional[int] = 0
+    size: Optional[int] = 100
     class Config:
         schema_extra = {
             "example": {
-                "user_id": "test",
-                "project_id": "test",
+                "table_name": "train",
+                "page": 0,
+                "size": 100
             }
         }

+ 23 - 0
app/services/dag.py

@@ -4,6 +4,7 @@ from app import crud, models
 from app.utils.send_util import *
 from app.utils.utils import get_cmd_parameter
 from sqlalchemy.orm import Session
+from app.common.hive import hiveDs
 
 def dag_create_job(dag_uuid:str,dag_script: str,db: Session):
     af_task = dag_create_task(dag_script)
@@ -102,3 +103,25 @@ def dag_job_submit(dag_uuid:str,dag_script: str,db: Session):
         if i >= 10:
             raise Exception(f"{af_job['id']}==>执行失败")
     return af_job
+
+
+def get_tmp_table_name(dag_uuid: str, node_id: str, out_pin: str, db: Session):
+    relation = crud.get_dag_af_id(db,dag_uuid, 'debug')
+    job_id = relation.af_id
+    af_job_run = crud.get_airflow_run_once_debug_mode(db,job_id)
+    tasks = af_job_run.details['tasks'] if len(af_job_run.details['tasks'])>0 else {}
+    task_id = None
+    for task in tasks:
+        t_id = task.split('_')[0]
+        n_id = task.split('_')[1]
+        if n_id == node_id:
+            task_id = t_id
+            break
+    if task_id:
+        table_name = f'job{job_id}_task{task_id}_subnode{node_id}_output{out_pin}_tmp'
+        t_list = hiveDs.list_tables()
+        if table_name.lower() not in t_list:
+            raise Exception('该节点不存在中间结果')
+        return table_name
+    else:
+        raise Exception('该节点不存在中间结果')

+ 12 - 0
data/data.sql

@@ -299,4 +299,16 @@ CREATE TABLE `airflow_task` (
   KEY `ix_airflow_task_id` (`id`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
 
+-- ----------------------------
+-- Alter for data_management
+-- ----------------------------
+ALTER TABLE `data_management`
+DROP COLUMN `data_type`,
+DROP COLUMN `data_num`,
+DROP COLUMN `storage_location`,
+DROP COLUMN `storage_usage`,
+DROP COLUMN `storage_path`,
+DROP COLUMN `integrity`,
+ADD COLUMN `table_name` varchar(255) NULL COMMENT '表名' AFTER `name`;
+
 SET FOREIGN_KEY_CHECKS = 1;