import time import uuid from typing import List from app import models, schemas from sqlalchemy.orm import Session from app.common.hive import hiveDs def import_datalake(db: Session, item: schemas.ImportDataLake): # 校验数据湖中是否存在此库表 ailab_table_names = hiveDs.list_tables() if not item.table_name in ailab_table_names: raise Exception(f'数据湖中不存在表{item.table_name}') data_table = models.DataTable(**{ 'database_name': item.database_name, 'table_name': item.table_name, 'source': 0, 'type': 2, 'project_id': 1 }) db.add(data_table) db.commit() db.refresh(data_table) return data_table def share_ailab(db: Session, item: schemas.ShareAilab): # 校验ailab中是否存在此表 ailab_table_names = hiveDs.list_tables() table_names = item.table_names project_ids = item.project_ids db_items = [] for table_name in table_names: if not table_name in ailab_table_names: raise Exception(f'数据湖中不存在表{table_name}') for project_id in project_ids: data_table = models.DataTable(**{ 'table_name': table_name, 'source': 1, 'type': 0, 'project_id': project_id }) db_items.append(data_table) res = [] for db_item in db_items: db.add(db_item) db.commit() db.refresh(db_item) res.append(db_item.to_dict()) return res def create_table(db: Session, item: schemas.CreateAilab): table_name = item.table_name columns = item.columns partition_column = item.partition_column # 校验表名是否重复 table_names = hiveDs.list_tables() if table_name in table_names: raise Exception(f'表 {table_name} 命名重复') create_table_str = f"create table if not exists {hiveDs.database_name}.`{table_name}` (" if len(columns) == 0: raise Exception('表中至少有一个字段') for column in columns: create_table_str += f"`{column.column_name}` {column.Column_type}," create_table_str = create_table_str[:-1] + ")" if partition_column: create_table_str += f" PARTITIONED BY (`{partition_column}` string)" create_table_str += " ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE" hiveDs._execute_create_sql([create_table_str]) res = hiveDs._execute_sql([f"show tables like '{table_name}'"]) db_item = models.DataTable(**{ 'database_name': hiveDs.database_name, 'table_name': table_name, 'source': 1, 'type': 1, }) if len(res[0][0])>0: db.add(db_item) db.commit() db.refresh(db_item) return { 'status': True, 'sql': create_table_str } else: raise Exception("建表失败") def get_ailab_table(db: Session, project_id: int): project: models.Project = db.query(models.Project)\ .filter(models.Project.id==project_id).first() if not project: raise Exception('项目不存在') res = [] if project.type == 0: res = hiveDs.list_tables() else: data_tables: List[models.DataTable] = db.query(models.DataTable)\ .filter(models.DataTable.project_id==project_id)\ .filter(models.DataTable.source==1).all() res = [t.table_name for t in data_tables] return res def get_ailab_table_schema(db: Session, table_name: str): table_schema = hiveDs.get_table_schema(table_name) return table_schema def get_preview_ailab_table(db: Session, table_name: str): preview_data = hiveDs.get_preview_data(table_name) return preview_data def get_lake_table(db: Session, project_id: int): data_tables: List[models.DataTable] = db.query(models.DataTable)\ .filter(models.DataTable.project_id==project_id)\ .filter(models.DataTable.source==0).all() return data_tables