import re import time import uuid from typing import List from app import models, schemas from sqlalchemy.orm import Session from app.common.hive import hiveDs from configs.globals import g from configs.settings import DefaultOption, config database_name = config.get('HIVE', 'DATABASE_NAME') special_project_id = config.get('PERMISSIONS', 'special_project_id') def import_datalake(db: Session, item: schemas.ImportDataLake): try: ailab_table_names = hiveDs.list_tables(item.database_name) except Exception as e: raise Exception('请检查连接信息是否有误') if not item.table_name in ailab_table_names: raise Exception(f'数据湖中不存在表{item.table_name}') data_table = models.DataTable(**{ 'database_name': item.database_name, 'table_name': item.table_name, 'table_path': item.table_path, 'source': 0, 'type': 2, 'project_id': g.project_id, 'create_time': int(time.time()), 'user_id': g.user_id }) db.add(data_table) db.commit() db.refresh(data_table) return data_table def update_datalake(db: Session, datalake_id: int, item: schemas.ImportDataLake): ailab_table_names = hiveDs.list_tables() if not item.table_name in ailab_table_names: raise Exception(f'数据湖中不存在表{item.table_name}') db_item: models.DataTable = db.query(models.DataTable).filter(models.DataTable.id == datalake_id).first() if not db_item: raise Exception('未找到此数据') db_item.database_name = item.database_name db_item.table_name = item.table_name db_item.table_path = item.table_path db.commit() db.flush() db.refresh(db_item) return db_item def delete_datalake(db: Session, datalake_id: int): db_item: models.DataTable = db.query(models.DataTable).filter(models.DataTable.id == datalake_id).first() if not db_item: raise Exception('未找到此数据') db.delete(db_item) db.commit() return db_item def share_ailab(db: Session, item: schemas.ShareAilab): # 校验ailab中是否存在此表 ailab_table_names = hiveDs.list_tables() table_names = item.table_names project_ids = item.project_ids db_items = [] for table_name in table_names: if not table_name in ailab_table_names: raise Exception(f'数据湖中不存在表{table_name}') data_tables: List[models.DataTable] = db.query(models.DataTable)\ .filter(models.DataTable.table_name==table_name)\ .filter(models.DataTable.type == 0).all() p_ids = [d.project_id for d in data_tables] for project_id in project_ids: if project_id in p_ids: continue data_table = models.DataTable(**{ 'table_name': table_name, 'source': 1, 'type': 0, 'project_id': project_id, 'create_time': int(time.time()), 'user_id': g.user_id }) db_items.append(data_table) res = [] for db_item in db_items: db.add(db_item) db.commit() db.refresh(db_item) res.append(db_item.to_dict()) return res def create_table(db: Session, item: schemas.CreateAilab): table_name = item.table_name columns = item.columns partition_column = item.partition_column # 校验表名是否重复 table_names = hiveDs.list_tables() if table_name in table_names: raise Exception(f'表 {table_name} 命名重复') create_table_str = f"create table if not exists `{hiveDs.database_name}`.`{table_name}` (" if len(columns) == 0: raise Exception('表中至少有一个字段') for column in columns: create_table_str += f"`{column.column_name}` {column.Column_type}," create_table_str = create_table_str[:-1] + ")" if partition_column: create_table_str += f" PARTITIONED BY (`{partition_column}` string)" create_table_str += " ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE" hiveDs._execute_create_sql([create_table_str]) res = hiveDs._execute_sql([f"show tables like '{table_name}'"]) db_item = models.DataTable(**{ 'database_name': hiveDs.database_name, 'table_name': table_name, 'source': 1, 'type': 1, 'project_id': g.project_id, 'create_time': int(time.time()), 'user_id': g.user_id }) if len(res[0][0])>0: db.add(db_item) db.commit() db.refresh(db_item) return { 'status': True, 'sql': create_table_str } else: raise Exception("建表失败") def filter_by_name(table_name): return not bool(re.search(r'(?:job.*_tmp|task.*_tmp)',table_name)) def get_ailab_table(db: Session, project_id: str): res = [] if project_id == special_project_id: table_list = hiveDs.list_tables() for table_name in table_list: if filter_by_name(table_name): res.append(table_name) else: data_tables: List[models.DataTable] = db.query(models.DataTable)\ .filter(models.DataTable.project_id==project_id)\ .filter(models.DataTable.source==1)\ .order_by(models.DataTable.create_time.desc()).all() res = [t.table_name for t in data_tables] return {"table_list": res, "database_name": database_name} def get_ailab_table_schema(db: Session, table_name: str): table_schema = hiveDs.get_table_schema(table_name) return table_schema def get_preview_ailab_table(db: Session, table_name: str): preview_data = hiveDs.get_preview_data(table_name) return preview_data def get_lake_table_schema(db: Session, db_name: str, table_name: str): table_schema = hiveDs.get_table_schema(table_name, db_name=db_name) return table_schema def get_preview_lake_table(db: Session, db_name: str, table_name: str): preview_data = hiveDs.get_preview_data(table_name, db_name=db_name) return preview_data def get_lake_table(db: Session, project_id: int): data_tables: List[models.DataTable] = db.query(models.DataTable)\ .filter(models.DataTable.project_id==project_id)\ .filter(models.DataTable.source==0)\ .order_by(models.DataTable.create_time.desc()).all() return data_tables def get_lake_table_info(db: Session, dl_id: int): data_table: models.DataTable = db.query(models.DataTable)\ .filter(models.DataTable.id==dl_id).first() if not data_table: raise Exception('未找到该数据湖表') return data_table def get_table_info(db: Session, db_name: str, table_name: str): table_info = hiveDs.get_table_info(table_name, db_name) return table_info def check_share(db: Session, table_name: str): data_tables: List[models.DataTable] = db.query(models.DataTable)\ .filter(models.DataTable.table_name==table_name)\ .filter(models.DataTable.type == 0).all() if len(data_tables) > 0: return 1 return 0