123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184 |
- import re
- import time
- import uuid
- from typing import List
- from app import models, schemas
- from sqlalchemy.orm import Session
- from app.common.hive import hiveDs
- from configs.globals import g
- from configs.settings import DefaultOption, config
- database_name = config.get('HIVE', 'DATABASE_NAME')
- def import_datalake(db: Session, item: schemas.ImportDataLake):
- try:
- ailab_table_names = hiveDs.list_tables(item.database_name)
- except Exception as e:
- raise Exception('请检查连接信息是否有误')
- if not item.table_name in ailab_table_names: raise Exception(f'数据湖中不存在表{item.table_name}')
- data_table = models.DataTable(**{
- 'database_name': item.database_name,
- 'table_name': item.table_name,
- 'table_path': item.table_path,
- 'source': 0,
- 'type': 2,
- 'project_id': 1,
- 'create_time': int(time.time()),
- 'user_id': g.user_id
- })
- db.add(data_table)
- db.commit()
- db.refresh(data_table)
- return data_table
- def update_datalake(db: Session, datalake_id: int, item: schemas.ImportDataLake):
- ailab_table_names = hiveDs.list_tables()
- if not item.table_name in ailab_table_names: raise Exception(f'数据湖中不存在表{item.table_name}')
- db_item: models.DataTable = db.query(models.DataTable).filter(models.DataTable.id == datalake_id).first()
- if not db_item: raise Exception('未找到此数据')
- db_item.database_name = item.database_name
- db_item.table_name = item.table_name
- db_item.table_path = item.table_path
- db.commit()
- db.flush()
- db.refresh(db_item)
- return db_item
- def delete_datalake(db: Session, datalake_id: int):
- db_item: models.DataTable = db.query(models.DataTable).filter(models.DataTable.id == datalake_id).first()
- if not db_item: raise Exception('未找到此数据')
- db.delete(db_item)
- db.commit()
- return db_item
- def share_ailab(db: Session, item: schemas.ShareAilab):
- # 校验ailab中是否存在此表
- ailab_table_names = hiveDs.list_tables()
- table_names = item.table_names
- project_ids = item.project_ids
- db_items = []
- for table_name in table_names:
- if not table_name in ailab_table_names: raise Exception(f'数据湖中不存在表{table_name}')
- data_tables: List[models.DataTable] = db.query(models.DataTable)\
- .filter(models.DataTable.table_name==table_name)\
- .filter(models.DataTable.type == 0).all()
- p_ids = [d.project_id for d in data_tables]
- for project_id in project_ids:
- if project_id in p_ids:
- continue
- data_table = models.DataTable(**{
- 'table_name': table_name,
- 'source': 1,
- 'type': 0,
- 'project_id': project_id,
- 'create_time': int(time.time()),
- 'user_id': g.user_id
- })
- db_items.append(data_table)
- res = []
- for db_item in db_items:
- db.add(db_item)
- db.commit()
- db.refresh(db_item)
- res.append(db_item.to_dict())
- return res
- def create_table(db: Session, item: schemas.CreateAilab):
- table_name = item.table_name
- columns = item.columns
- partition_column = item.partition_column
- # 校验表名是否重复
- table_names = hiveDs.list_tables()
- if table_name in table_names: raise Exception(f'表 {table_name} 命名重复')
- create_table_str = f"create table if not exists `{hiveDs.database_name}`.`{table_name}` ("
- if len(columns) == 0: raise Exception('表中至少有一个字段')
- for column in columns:
- create_table_str += f"`{column.column_name}` {column.Column_type},"
- create_table_str = create_table_str[:-1] + ")"
- if partition_column:
- create_table_str += f" PARTITIONED BY (`{partition_column}` string)"
- create_table_str += " ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE"
- hiveDs._execute_create_sql([create_table_str])
- res = hiveDs._execute_sql([f"show tables like '{table_name}'"])
- db_item = models.DataTable(**{
- 'database_name': hiveDs.database_name,
- 'table_name': table_name,
- 'source': 1,
- 'type': 1,
- 'project_id': item.project_id,
- 'create_time': int(time.time()),
- 'user_id': g.user_id
- })
- if len(res[0][0])>0:
- db.add(db_item)
- db.commit()
- db.refresh(db_item)
- return {
- 'status': True,
- 'sql': create_table_str
- }
- else:
- raise Exception("建表失败")
- def filter_by_name(table_name):
- return not bool(re.search(r'(?:job.*_tmp|task.*_tmp)',table_name))
- def get_ailab_table(db: Session, project_id: int):
- project: models.Project = db.query(models.Project)\
- .filter(models.Project.id==project_id).first()
- if not project:
- raise Exception('项目不存在')
- res = []
- if project.type == 0:
- table_list = hiveDs.list_tables()
- for table_name in table_list:
- if filter_by_name(table_name): res.append(table_name)
- else:
- data_tables: List[models.DataTable] = db.query(models.DataTable)\
- .filter(models.DataTable.project_id==project_id)\
- .filter(models.DataTable.source==1)\
- .order_by(models.DataTable.create_time.desc()).all()
- res = [t.table_name for t in data_tables]
- return {"table_list": res, "database_name": database_name}
- def get_ailab_table_schema(db: Session, table_name: str):
- table_schema = hiveDs.get_table_schema(table_name)
- return table_schema
- def get_preview_ailab_table(db: Session, table_name: str):
- preview_data = hiveDs.get_preview_data(table_name)
- return preview_data
- def get_lake_table_schema(db: Session, db_name: str, table_name: str):
- table_schema = hiveDs.get_table_schema(table_name, db_name=db_name)
- return table_schema
- def get_preview_lake_table(db: Session, db_name: str, table_name: str):
- preview_data = hiveDs.get_preview_data(table_name, db_name=db_name)
- return preview_data
- def get_lake_table(db: Session, project_id: int):
- data_tables: List[models.DataTable] = db.query(models.DataTable)\
- .filter(models.DataTable.project_id==project_id)\
- .filter(models.DataTable.source==0)\
- .order_by(models.DataTable.create_time.desc()).all()
- return data_tables
- def get_lake_table_info(db: Session, dl_id: int):
- data_table: models.DataTable = db.query(models.DataTable)\
- .filter(models.DataTable.id==dl_id).first()
- if not data_table: raise Exception('未找到该数据湖表')
- return data_table
- def get_table_info(db: Session, db_name: str, table_name: str):
- table_info = hiveDs.get_table_info(table_name, db_name)
- return table_info
- def check_share(db: Session, table_name: str):
- data_tables: List[models.DataTable] = db.query(models.DataTable)\
- .filter(models.DataTable.table_name==table_name)\
- .filter(models.DataTable.type == 0).all()
- if len(data_tables) > 0:
- return 1
- return 0
|