123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- import time
- import uuid
- from typing import List
- from app import models, schemas
- from sqlalchemy.orm import Session
- from app.common.hive import hiveDs
- def import_datalake(db: Session, item: schemas.ImportDataLake):
- # 校验数据湖中是否存在此库表
- ailab_table_names = hiveDs.list_tables()
- if not item.table_name in ailab_table_names: raise Exception(f'数据湖中不存在表{item.table_name}')
- data_table = models.DataTable(**{
- 'database_name': item.database_name,
- 'table_name': item.table_name,
- 'source': 0,
- 'type': 2,
- 'project_id': 1
- })
- db.add(data_table)
- db.commit()
- db.refresh(data_table)
- return data_table
- def share_ailab(db: Session, item: schemas.ShareAilab):
- # 校验ailab中是否存在此表
- ailab_table_names = hiveDs.list_tables()
- table_names = item.table_names
- project_ids = item.project_ids
- db_items = []
- for table_name in table_names:
- if not table_name in ailab_table_names: raise Exception(f'数据湖中不存在表{table_name}')
- for project_id in project_ids:
- data_table = models.DataTable(**{
- 'table_name': table_name,
- 'source': 1,
- 'type': 0,
- 'project_id': project_id
- })
- db_items.append(data_table)
- res = []
- for db_item in db_items:
- db.add(db_item)
- db.commit()
- db.refresh(db_item)
- res.append(db_item.to_dict())
- return res
- def create_table(db: Session, item: schemas.CreateAilab):
- table_name = item.table_name
- columns = item.columns
- partition_column = item.partition_column
- # 校验表名是否重复
- table_names = hiveDs.list_tables()
- if table_name in table_names: raise Exception(f'表 {table_name} 命名重复')
- create_table_str = f"create table if not exists {hiveDs.database_name}.`{table_name}` ("
- if len(columns) == 0: raise Exception('表中至少有一个字段')
- for column in columns:
- create_table_str += f"`{column.column_name}` {column.Column_type},"
- create_table_str = create_table_str[:-1] + ")"
- if partition_column:
- create_table_str += f" PARTITIONED BY (`{partition_column}` string)"
- create_table_str += " ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE"
- hiveDs._execute_create_sql([create_table_str])
- res = hiveDs._execute_sql([f"show tables like '{table_name}'"])
- db_item = models.DataTable(**{
- 'database_name': hiveDs.database_name,
- 'table_name': table_name,
- 'source': 1,
- 'type': 1,
- })
- if len(res[0][0])>0:
- db.add(db_item)
- db.commit()
- db.refresh(db_item)
- return {
- 'status': True,
- 'sql': create_table_str
- }
- else:
- raise Exception("建表失败")
- def get_ailab_table(db: Session, project_id: int):
- project: models.Project = db.query(models.Project)\
- .filter(models.Project.id==project_id).first()
- if not project:
- raise Exception('项目不存在')
- res = []
- if project.type == 0:
- res = hiveDs.list_tables()
- else:
- data_tables: List[models.DataTable] = db.query(models.DataTable)\
- .filter(models.DataTable.project_id==project_id)\
- .filter(models.DataTable.source==1).all()
- res = [t.table_name for t in data_tables]
- return res
- def get_ailab_table_schema(db: Session, table_name: str):
- table_schema = hiveDs.get_table_schema(table_name)
- return table_schema
- def get_preview_ailab_table(db: Session, table_name: str):
- preview_data = hiveDs.get_preview_data(table_name)
- return preview_data
- def get_lake_table(db: Session, project_id: int):
- data_tables: List[models.DataTable] = db.query(models.DataTable)\
- .filter(models.DataTable.project_id==project_id)\
- .filter(models.DataTable.source==0).all()
- return data_tables
|