data_table.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. import re
  2. import time
  3. import uuid
  4. from typing import List
  5. from app import models, schemas
  6. from sqlalchemy.orm import Session
  7. from app.common.hive import hiveDs
  8. from configs.globals import g
  9. def import_datalake(db: Session, item: schemas.ImportDataLake):
  10. # 校验数据湖中是否存在此库表
  11. ailab_table_names = hiveDs.list_tables()
  12. if not item.table_name in ailab_table_names: raise Exception(f'数据湖中不存在表{item.table_name}')
  13. data_table = models.DataTable(**{
  14. 'database_name': item.database_name,
  15. 'table_name': item.table_name,
  16. 'table_path': item.table_path,
  17. 'source': 0,
  18. 'type': 2,
  19. 'project_id': g.project_id,
  20. 'create_time': int(time.time()),
  21. 'user_id': g.user_id
  22. })
  23. db.add(data_table)
  24. db.commit()
  25. db.refresh(data_table)
  26. return data_table
  27. def update_datalake(db: Session, datalake_id: int, item: schemas.ImportDataLake):
  28. ailab_table_names = hiveDs.list_tables()
  29. if not item.table_name in ailab_table_names: raise Exception(f'数据湖中不存在表{item.table_name}')
  30. db_item: models.DataTable = db.query(models.DataTable).filter(models.DataTable.id == datalake_id).first()
  31. if not db_item: raise Exception('未找到此数据')
  32. db_item.database_name = item.database_name
  33. db_item.table_name = item.table_name
  34. db_item.table_path = item.table_path
  35. db.commit()
  36. db.flush()
  37. db.refresh(db_item)
  38. return db_item
  39. def delete_datalake(db: Session, datalake_id: int):
  40. db_item: models.DataTable = db.query(models.DataTable).filter(models.DataTable.id == datalake_id).first()
  41. if not db_item: raise Exception('未找到此数据')
  42. db.delete(db_item)
  43. db.commit()
  44. return db_item
  45. def share_ailab(db: Session, item: schemas.ShareAilab):
  46. # 校验ailab中是否存在此表
  47. ailab_table_names = hiveDs.list_tables()
  48. table_names = item.table_names
  49. project_ids = item.project_ids
  50. db_items = []
  51. for table_name in table_names:
  52. if not table_name in ailab_table_names: raise Exception(f'数据湖中不存在表{table_name}')
  53. data_tables: List[models.DataTable] = db.query(models.DataTable)\
  54. .filter(models.DataTable.table_name==table_name)\
  55. .filter(models.DataTable.type == 0).all()
  56. p_ids = [d.project_id for d in data_tables]
  57. for project_id in project_ids:
  58. if project_id in p_ids:
  59. continue
  60. data_table = models.DataTable(**{
  61. 'table_name': table_name,
  62. 'source': 1,
  63. 'type': 0,
  64. 'project_id': project_id,
  65. 'create_time': int(time.time()),
  66. 'user_id': g.user_id
  67. })
  68. db_items.append(data_table)
  69. res = []
  70. for db_item in db_items:
  71. db.add(db_item)
  72. db.commit()
  73. db.refresh(db_item)
  74. res.append(db_item.to_dict())
  75. return res
  76. def create_table(db: Session, item: schemas.CreateAilab):
  77. table_name = item.table_name
  78. columns = item.columns
  79. partition_column = item.partition_column
  80. # 校验表名是否重复
  81. table_names = hiveDs.list_tables()
  82. if table_name in table_names: raise Exception(f'表 {table_name} 命名重复')
  83. create_table_str = f"create table if not exists `{hiveDs.database_name}`.`{table_name}` ("
  84. if len(columns) == 0: raise Exception('表中至少有一个字段')
  85. for column in columns:
  86. create_table_str += f"`{column.column_name}` {column.Column_type},"
  87. create_table_str = create_table_str[:-1] + ")"
  88. if partition_column:
  89. create_table_str += f" PARTITIONED BY (`{partition_column}` string)"
  90. create_table_str += " ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE"
  91. hiveDs._execute_create_sql([create_table_str])
  92. res = hiveDs._execute_sql([f"show tables like '{table_name}'"])
  93. db_item = models.DataTable(**{
  94. 'database_name': hiveDs.database_name,
  95. 'table_name': table_name,
  96. 'source': 1,
  97. 'type': 1,
  98. 'project_id': g.project_id,
  99. 'create_time': int(time.time()),
  100. 'user_id': g.user_id
  101. })
  102. if len(res[0][0])>0:
  103. db.add(db_item)
  104. db.commit()
  105. db.refresh(db_item)
  106. return {
  107. 'status': True,
  108. 'sql': create_table_str
  109. }
  110. else:
  111. raise Exception("建表失败")
  112. def filter_by_name(table_name):
  113. return not bool(re.search(r'(?:job.*_tmp|task.*_tmp)',table_name))
  114. def get_ailab_table(db: Session, project_id: int):
  115. project: models.Project = db.query(models.Project)\
  116. .filter(models.Project.id==project_id).first()
  117. if not project:
  118. raise Exception('项目不存在')
  119. res = []
  120. if project.type == 0:
  121. table_list = hiveDs.list_tables()
  122. for table_name in table_list:
  123. if filter_by_name(table_name): res.append(table_name)
  124. else:
  125. data_tables: List[models.DataTable] = db.query(models.DataTable)\
  126. .filter(models.DataTable.project_id==project_id)\
  127. .filter(models.DataTable.source==1)\
  128. .order_by(models.DataTable.create_time.desc()).all()
  129. res = [t.table_name for t in data_tables]
  130. return res
  131. def get_ailab_table_schema(db: Session, table_name: str):
  132. table_schema = hiveDs.get_table_schema(table_name)
  133. return table_schema
  134. def get_preview_ailab_table(db: Session, table_name: str):
  135. preview_data = hiveDs.get_preview_data(table_name)
  136. return preview_data
  137. def get_lake_table(db: Session, project_id: int):
  138. data_tables: List[models.DataTable] = db.query(models.DataTable)\
  139. .filter(models.DataTable.project_id==project_id)\
  140. .filter(models.DataTable.source==0)\
  141. .order_by(models.DataTable.create_time.desc()).all()
  142. return data_tables
  143. def get_lake_table_info(db: Session, dl_id: int):
  144. data_table: models.DataTable = db.query(models.DataTable)\
  145. .filter(models.DataTable.id==dl_id).first()
  146. if not data_table: raise Exception('未找到该数据湖表')
  147. return data_table