data_table.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. import re
  2. import time
  3. import uuid
  4. from typing import List
  5. from app import models, schemas
  6. from sqlalchemy.orm import Session
  7. from app.common.hive import hiveDs
  8. from configs.globals import g
  9. from configs.settings import DefaultOption, config
  10. database_name = config.get('HIVE', 'DATABASE_NAME')
  11. special_project_id = config.get('PERMISSIONS', 'special_project_id')
  12. def import_datalake(db: Session, item: schemas.ImportDataLake):
  13. try:
  14. ailab_table_names = hiveDs.list_tables(item.database_name)
  15. except Exception as e:
  16. raise Exception('请检查连接信息是否有误')
  17. if not item.table_name in ailab_table_names: raise Exception(f'数据湖中不存在表{item.table_name}')
  18. data_table = models.DataTable(**{
  19. 'database_name': item.database_name,
  20. 'table_name': item.table_name,
  21. 'table_path': item.table_path,
  22. 'source': 0,
  23. 'type': 2,
  24. 'project_id': g.project_id,
  25. 'create_time': int(time.time()),
  26. 'user_id': g.user_id
  27. })
  28. db.add(data_table)
  29. db.commit()
  30. db.refresh(data_table)
  31. return data_table
  32. def update_datalake(db: Session, datalake_id: int, item: schemas.ImportDataLake):
  33. ailab_table_names = hiveDs.list_tables()
  34. if not item.table_name in ailab_table_names: raise Exception(f'数据湖中不存在表{item.table_name}')
  35. db_item: models.DataTable = db.query(models.DataTable).filter(models.DataTable.id == datalake_id).first()
  36. if not db_item: raise Exception('未找到此数据')
  37. db_item.database_name = item.database_name
  38. db_item.table_name = item.table_name
  39. db_item.table_path = item.table_path
  40. db.commit()
  41. db.flush()
  42. db.refresh(db_item)
  43. return db_item
  44. def delete_datalake(db: Session, datalake_id: int):
  45. db_item: models.DataTable = db.query(models.DataTable).filter(models.DataTable.id == datalake_id).first()
  46. if not db_item: raise Exception('未找到此数据')
  47. db.delete(db_item)
  48. db.commit()
  49. return db_item
  50. def share_ailab(db: Session, item: schemas.ShareAilab):
  51. # 校验ailab中是否存在此表
  52. ailab_table_names = hiveDs.list_tables()
  53. table_names = item.table_names
  54. project_ids = item.project_ids
  55. db_items = []
  56. for table_name in table_names:
  57. if not table_name in ailab_table_names: raise Exception(f'数据湖中不存在表{table_name}')
  58. data_tables: List[models.DataTable] = db.query(models.DataTable)\
  59. .filter(models.DataTable.table_name==table_name)\
  60. .filter(models.DataTable.type == 0).all()
  61. p_ids = [d.project_id for d in data_tables]
  62. for project_id in project_ids:
  63. if project_id in p_ids:
  64. continue
  65. data_table = models.DataTable(**{
  66. 'table_name': table_name,
  67. 'source': 1,
  68. 'type': 0,
  69. 'project_id': project_id,
  70. 'create_time': int(time.time()),
  71. 'user_id': g.user_id
  72. })
  73. db_items.append(data_table)
  74. res = []
  75. for db_item in db_items:
  76. db.add(db_item)
  77. db.commit()
  78. db.refresh(db_item)
  79. res.append(db_item.to_dict())
  80. return res
  81. def create_table(db: Session, item: schemas.CreateAilab):
  82. table_name = item.table_name
  83. columns = item.columns
  84. partition_column = item.partition_column
  85. # 校验表名是否重复
  86. table_names = hiveDs.list_tables()
  87. if table_name in table_names: raise Exception(f'表 {table_name} 命名重复')
  88. create_table_str = f"create table if not exists `{hiveDs.database_name}`.`{table_name}` ("
  89. if len(columns) == 0: raise Exception('表中至少有一个字段')
  90. for column in columns:
  91. create_table_str += f"`{column.column_name}` {column.Column_type},"
  92. create_table_str = create_table_str[:-1] + ")"
  93. if partition_column:
  94. create_table_str += f" PARTITIONED BY (`{partition_column}` string)"
  95. create_table_str += " ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE"
  96. hiveDs._execute_create_sql([create_table_str])
  97. res = hiveDs._execute_sql([f"show tables like '{table_name}'"])
  98. db_item = models.DataTable(**{
  99. 'database_name': hiveDs.database_name,
  100. 'table_name': table_name,
  101. 'source': 1,
  102. 'type': 1,
  103. 'project_id': g.project_id,
  104. 'create_time': int(time.time()),
  105. 'user_id': g.user_id
  106. })
  107. if len(res[0][0])>0:
  108. db.add(db_item)
  109. db.commit()
  110. db.refresh(db_item)
  111. return {
  112. 'status': True,
  113. 'sql': create_table_str
  114. }
  115. else:
  116. raise Exception("建表失败")
  117. def filter_by_name(table_name):
  118. return not bool(re.search(r'(?:job.*_tmp|task.*_tmp)',table_name))
  119. def get_ailab_table(db: Session, project_id: str):
  120. res = []
  121. if project_id == special_project_id:
  122. table_list = hiveDs.list_tables()
  123. for table_name in table_list:
  124. if filter_by_name(table_name): res.append(table_name)
  125. else:
  126. data_tables: List[models.DataTable] = db.query(models.DataTable)\
  127. .filter(models.DataTable.project_id==project_id)\
  128. .filter(models.DataTable.source==1)\
  129. .order_by(models.DataTable.create_time.desc()).all()
  130. res = [t.table_name for t in data_tables]
  131. return {"table_list": res, "database_name": database_name}
  132. def get_ailab_table_schema(db: Session, table_name: str):
  133. table_schema = hiveDs.get_table_schema(table_name)
  134. return table_schema
  135. def get_preview_ailab_table(db: Session, table_name: str):
  136. preview_data = hiveDs.get_preview_data(table_name)
  137. return preview_data
  138. def get_lake_table_schema(db: Session, db_name: str, table_name: str):
  139. table_schema = hiveDs.get_table_schema(table_name, db_name=db_name)
  140. return table_schema
  141. def get_preview_lake_table(db: Session, db_name: str, table_name: str):
  142. preview_data = hiveDs.get_preview_data(table_name, db_name=db_name)
  143. return preview_data
  144. def get_lake_table(db: Session, project_id: int):
  145. data_tables: List[models.DataTable] = db.query(models.DataTable)\
  146. .filter(models.DataTable.project_id==project_id)\
  147. .filter(models.DataTable.source==0)\
  148. .order_by(models.DataTable.create_time.desc()).all()
  149. return data_tables
  150. def get_lake_table_info(db: Session, dl_id: int):
  151. data_table: models.DataTable = db.query(models.DataTable)\
  152. .filter(models.DataTable.id==dl_id).first()
  153. if not data_table: raise Exception('未找到该数据湖表')
  154. return data_table
  155. def get_table_info(db: Session, db_name: str, table_name: str):
  156. table_info = hiveDs.get_table_info(table_name, db_name)
  157. return table_info
  158. def check_share(db: Session, table_name: str):
  159. data_tables: List[models.DataTable] = db.query(models.DataTable)\
  160. .filter(models.DataTable.table_name==table_name)\
  161. .filter(models.DataTable.type == 0).all()
  162. if len(data_tables) > 0:
  163. return 1
  164. return 0