Просмотр исходного кода

数据预览展示额外信息、回显存储路径

liweiquan 1 год назад
Родитель
Сommit
21b3c30aa9

+ 5 - 0
app/core/datasource/hive.py

@@ -230,6 +230,11 @@ class HiveDS(DataSourceBase):
                     break
         return table_schema
 
+    def get_table_info(self,table_name: str, db_name: str = None):
+        sql1 = f"DESCRIBE FORMATTED `{table_name}`"
+        res = self._execute_sql([sql1], db_name)
+        return res
+
         # sql1 = f'show columns in {self.database_name}.{table_name}'
         # res = self._execute_sql([sql1])
         # print("===",res)

+ 12 - 0
app/crud/data_table.py

@@ -170,3 +170,15 @@ def get_lake_table_info(db: Session, dl_id: int):
     if not data_table: raise Exception('未找到该数据湖表')
     return data_table
 
+def get_table_info(db: Session, db_name: str, table_name: str):
+    table_info = hiveDs.get_table_info(table_name, db_name)
+    return table_info
+
+def check_share(db: Session, table_name: str):
+    data_tables: List[models.DataTable] = db.query(models.DataTable)\
+            .filter(models.DataTable.table_name==table_name)\
+            .filter(models.DataTable.type == 0).all()
+    if len(data_tables) > 0:
+        return 1
+    return 0
+

+ 4 - 1
app/crud/job_jdbc_datasource.py

@@ -125,7 +125,6 @@ def get_job_jdbc_datasources_info(db: Session, ds_id: int):
 def update_job_jdbc_datasources(db: Session, ds_id: int, update_item: schemas.JobJdbcDatasourceUpdate):
     if update_item.jdbc_password and update_item.jdbc_password != '':
         update_item.jdbc_password = decode_base64(update_item.jdbc_password)
-        print(update_item.jdbc_password)
     ds, update_item = _format_datasource(db, update_item)
     con_result = ds.is_connect()
     if not con_result:
@@ -166,3 +165,7 @@ def get_job_jdbc_datasource(db: Session, ds_id: int):
     if not db_item:
         raise Exception('未找到该数据源')
     return db_item
+
+def get_job_jdbc_datasource_table_location(db: Session, db_name: str, table_name: str, ds_id: int):
+    ds, item = _format_datasource(db, None, ds_id)
+    return ds.get_table_info(table_name,db_name)

+ 13 - 1
app/routers/dag.py

@@ -91,9 +91,21 @@ def get_dag_debug_status(dag_uuid: str, node_id: str,db: Session = Depends(get_d
 def get_dag_debug_result(dag_uuid: str,node_id: str,out_pin: int ,db: Session = Depends(get_db)):
     table_name = get_tmp_table_name(dag_uuid, node_id, str(out_pin), db)
     result = hiveDs.get_preview_data(table_name,500)
+    location = ''
+    owner = ''
+    res = hiveDs.get_table_info(table_name)
+    for line_list in res[0]:
+        if  line_list[0].find('Location')>=0:
+            location = line_list[1]
+        if line_list[0].find('Owner')>=0:
+            owner = line_list[1]
+    share_status = crud.check_share(db, table_name)
     result.update({
         'table_name':f'{database_name}.{table_name}',
-        'table_path':f'{hdfs_path}{base_path}{table_name}'
+        'table_path':f'{hdfs_path}{base_path}{table_name}',
+        'owner': owner,
+        'location': location,
+        'share_status': share_status
         })
     return result
 

+ 18 - 2
app/routers/data_management.py

@@ -68,8 +68,24 @@ def get_data_management_info(id: int, db: Session = Depends(get_db)):
         transfer_run_status = transfer_run['status'] if transfer_run and 'status' in transfer_run.keys() else None
         if transfer_run_status:
             item = crud.update_data_management_status(db, item.id, RUN_STATUS[transfer_run_status])
-    item.table_name = f'{database_name}.{item.table_name}'
-    return item
+    location = ''
+    owner = ''
+    if item.status == 2:
+        res = hiveDs.get_table_info(item.table_name)
+        for line_list in res[0]:
+            if  line_list[0].find('Location')>=0:
+                location = line_list[1]
+            if line_list[0].find('Owner')>=0:
+                owner = line_list[1]
+    share_status = crud.check_share(db, item.table_name)
+    item_dict = item.to_dict()
+    item_dict.update({
+        'table_name': f'{database_name}.{item.table_name}',
+        'owner': owner,
+        'location': location,
+        'share_status': share_status
+    })
+    return item_dict
 
 @router.delete("/", dependencies=[Depends(verify_all)])
 @web_try()

+ 16 - 0
app/routers/job_jdbc_datasource.py

@@ -163,6 +163,22 @@ def get_lake_table_schema(db_name: str, table_name: str, db: Session = Depends(g
 def get_preview_lake_table(db_name: str, table_name: str, db: Session = Depends(get_db)):
     return crud.get_preview_lake_table(db, db_name, table_name)
 
+@router.get("/table_location")
+@web_try()
+@sxtimeit
+def get_table_location(db_name: str, table_name: str, ds_id: Optional[int] = None, token_data: schemas.TokenData = Depends(verify_special), db: Session = Depends(get_db)):
+    res = None
+    if ds_id is None:
+        res = crud.get_table_info(db, db_name, table_name)
+    else:
+        res = crud.get_job_jdbc_datasource_table_location(db, db_name, table_name, ds_id)
+    location = ''
+    for line_list in res[0]:
+        if  line_list[0].find('Location')>=0:
+            location = line_list[1]
+            break
+    return {'location': location}
+