data_table.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. import re
  2. import time
  3. import uuid
  4. from typing import List
  5. from app import models, schemas
  6. from sqlalchemy.orm import Session
  7. from app.common.hive import hiveDs
  8. from configs.globals import g
  9. from configs.settings import DefaultOption, config
  10. database_name = config.get('HIVE', 'DATABASE_NAME')
  11. def import_datalake(db: Session, item: schemas.ImportDataLake):
  12. try:
  13. ailab_table_names = hiveDs.list_tables(item.database_name)
  14. except Exception as e:
  15. raise Exception('请检查连接信息是否有误')
  16. if not item.table_name in ailab_table_names: raise Exception(f'数据湖中不存在表{item.table_name}')
  17. data_table = models.DataTable(**{
  18. 'database_name': item.database_name,
  19. 'table_name': item.table_name,
  20. 'table_path': item.table_path,
  21. 'source': 0,
  22. 'type': 2,
  23. 'project_id': 1,
  24. 'create_time': int(time.time()),
  25. 'user_id': g.user_id
  26. })
  27. db.add(data_table)
  28. db.commit()
  29. db.refresh(data_table)
  30. return data_table
  31. def update_datalake(db: Session, datalake_id: int, item: schemas.ImportDataLake):
  32. ailab_table_names = hiveDs.list_tables()
  33. if not item.table_name in ailab_table_names: raise Exception(f'数据湖中不存在表{item.table_name}')
  34. db_item: models.DataTable = db.query(models.DataTable).filter(models.DataTable.id == datalake_id).first()
  35. if not db_item: raise Exception('未找到此数据')
  36. db_item.database_name = item.database_name
  37. db_item.table_name = item.table_name
  38. db_item.table_path = item.table_path
  39. db.commit()
  40. db.flush()
  41. db.refresh(db_item)
  42. return db_item
  43. def delete_datalake(db: Session, datalake_id: int):
  44. db_item: models.DataTable = db.query(models.DataTable).filter(models.DataTable.id == datalake_id).first()
  45. if not db_item: raise Exception('未找到此数据')
  46. db.delete(db_item)
  47. db.commit()
  48. return db_item
  49. def share_ailab(db: Session, item: schemas.ShareAilab):
  50. # 校验ailab中是否存在此表
  51. ailab_table_names = hiveDs.list_tables()
  52. table_names = item.table_names
  53. project_ids = item.project_ids
  54. db_items = []
  55. for table_name in table_names:
  56. if not table_name in ailab_table_names: raise Exception(f'数据湖中不存在表{table_name}')
  57. data_tables: List[models.DataTable] = db.query(models.DataTable)\
  58. .filter(models.DataTable.table_name==table_name)\
  59. .filter(models.DataTable.type == 0).all()
  60. p_ids = [d.project_id for d in data_tables]
  61. for project_id in project_ids:
  62. if project_id in p_ids:
  63. continue
  64. data_table = models.DataTable(**{
  65. 'table_name': table_name,
  66. 'source': 1,
  67. 'type': 0,
  68. 'project_id': project_id,
  69. 'create_time': int(time.time()),
  70. 'user_id': g.user_id
  71. })
  72. db_items.append(data_table)
  73. res = []
  74. for db_item in db_items:
  75. db.add(db_item)
  76. db.commit()
  77. db.refresh(db_item)
  78. res.append(db_item.to_dict())
  79. return res
  80. def create_table(db: Session, item: schemas.CreateAilab):
  81. table_name = item.table_name
  82. columns = item.columns
  83. partition_column = item.partition_column
  84. # 校验表名是否重复
  85. table_names = hiveDs.list_tables()
  86. if table_name in table_names: raise Exception(f'表 {table_name} 命名重复')
  87. create_table_str = f"create table if not exists `{hiveDs.database_name}`.`{table_name}` ("
  88. if len(columns) == 0: raise Exception('表中至少有一个字段')
  89. for column in columns:
  90. create_table_str += f"`{column.column_name}` {column.Column_type},"
  91. create_table_str = create_table_str[:-1] + ")"
  92. if partition_column:
  93. create_table_str += f" PARTITIONED BY (`{partition_column}` string)"
  94. create_table_str += " ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE"
  95. hiveDs._execute_create_sql([create_table_str])
  96. res = hiveDs._execute_sql([f"show tables like '{table_name}'"])
  97. db_item = models.DataTable(**{
  98. 'database_name': hiveDs.database_name,
  99. 'table_name': table_name,
  100. 'source': 1,
  101. 'type': 1,
  102. 'project_id': item.project_id,
  103. 'create_time': int(time.time()),
  104. 'user_id': g.user_id
  105. })
  106. if len(res[0][0])>0:
  107. db.add(db_item)
  108. db.commit()
  109. db.refresh(db_item)
  110. return {
  111. 'status': True,
  112. 'sql': create_table_str
  113. }
  114. else:
  115. raise Exception("建表失败")
  116. def filter_by_name(table_name):
  117. return not bool(re.search(r'(?:job.*_tmp|task.*_tmp)',table_name))
  118. def get_ailab_table(db: Session, project_id: int):
  119. project: models.Project = db.query(models.Project)\
  120. .filter(models.Project.id==project_id).first()
  121. if not project:
  122. raise Exception('项目不存在')
  123. res = []
  124. if project.type == 0:
  125. table_list = hiveDs.list_tables()
  126. for table_name in table_list:
  127. if filter_by_name(table_name): res.append(table_name)
  128. else:
  129. data_tables: List[models.DataTable] = db.query(models.DataTable)\
  130. .filter(models.DataTable.project_id==project_id)\
  131. .filter(models.DataTable.source==1)\
  132. .order_by(models.DataTable.create_time.desc()).all()
  133. res = [t.table_name for t in data_tables]
  134. return {"table_list": res, "database_name": database_name}
  135. def get_ailab_table_schema(db: Session, table_name: str):
  136. table_schema = hiveDs.get_table_schema(table_name)
  137. return table_schema
  138. def get_preview_ailab_table(db: Session, table_name: str):
  139. preview_data = hiveDs.get_preview_data(table_name)
  140. return preview_data
  141. def get_lake_table_schema(db: Session, db_name: str, table_name: str):
  142. table_schema = hiveDs.get_table_schema(table_name, db_name=db_name)
  143. return table_schema
  144. def get_preview_lake_table(db: Session, db_name: str, table_name: str):
  145. preview_data = hiveDs.get_preview_data(table_name, db_name=db_name)
  146. return preview_data
  147. def get_lake_table(db: Session, project_id: int):
  148. data_tables: List[models.DataTable] = db.query(models.DataTable)\
  149. .filter(models.DataTable.project_id==project_id)\
  150. .filter(models.DataTable.source==0)\
  151. .order_by(models.DataTable.create_time.desc()).all()
  152. return data_tables
  153. def get_lake_table_info(db: Session, dl_id: int):
  154. data_table: models.DataTable = db.query(models.DataTable)\
  155. .filter(models.DataTable.id==dl_id).first()
  156. if not data_table: raise Exception('未找到该数据湖表')
  157. return data_table
  158. def get_table_info(db: Session, db_name: str, table_name: str):
  159. table_info = hiveDs.get_table_info(table_name, db_name)
  160. return table_info
  161. def check_share(db: Session, table_name: str):
  162. data_tables: List[models.DataTable] = db.query(models.DataTable)\
  163. .filter(models.DataTable.table_name==table_name)\
  164. .filter(models.DataTable.type == 0).all()
  165. if len(data_tables) > 0:
  166. return 1
  167. return 0