data_table.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. import time
  2. import uuid
  3. from typing import List
  4. from app import models, schemas
  5. from sqlalchemy.orm import Session
  6. from app.common.hive import hiveDs
  7. def import_datalake(db: Session, item: schemas.ImportDataLake):
  8. # 校验数据湖中是否存在此库表
  9. ailab_table_names = hiveDs.list_tables()
  10. if not item.table_name in ailab_table_names: raise Exception(f'数据湖中不存在表{item.table_name}')
  11. data_table = models.DataTable(**{
  12. 'database_name': item.database_name,
  13. 'table_name': item.table_name,
  14. 'source': 0,
  15. 'type': 2,
  16. 'project_id': 1
  17. })
  18. db.add(data_table)
  19. db.commit()
  20. db.refresh(data_table)
  21. return data_table
  22. def share_ailab(db: Session, item: schemas.ShareAilab):
  23. # 校验ailab中是否存在此表
  24. ailab_table_names = hiveDs.list_tables()
  25. table_names = item.table_names
  26. project_ids = item.project_ids
  27. db_items = []
  28. for table_name in table_names:
  29. if not table_name in ailab_table_names: raise Exception(f'数据湖中不存在表{table_name}')
  30. for project_id in project_ids:
  31. data_table = models.DataTable(**{
  32. 'table_name': table_name,
  33. 'source': 1,
  34. 'type': 0,
  35. 'project_id': project_id
  36. })
  37. db_items.append(data_table)
  38. res = []
  39. for db_item in db_items:
  40. db.add(db_item)
  41. db.commit()
  42. db.refresh(db_item)
  43. res.append(db_item.to_dict())
  44. return res
  45. def create_table(db: Session, item: schemas.CreateAilab):
  46. table_name = item.table_name
  47. columns = item.columns
  48. partition_column = item.partition_column
  49. # 校验表名是否重复
  50. table_names = hiveDs.list_tables()
  51. if table_name in table_names: raise Exception(f'表 {table_name} 命名重复')
  52. create_table_str = f"create table if not exists {hiveDs.database_name}.`{table_name}` ("
  53. if len(columns) == 0: raise Exception('表中至少有一个字段')
  54. for column in columns:
  55. create_table_str += f"`{column.column_name}` {column.Column_type},"
  56. create_table_str = create_table_str[:-1] + ")"
  57. if partition_column:
  58. create_table_str += f" PARTITIONED BY (`{partition_column}` string)"
  59. create_table_str += " ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE"
  60. hiveDs._execute_create_sql([create_table_str])
  61. res = hiveDs._execute_sql([f"show tables like '{table_name}'"])
  62. db_item = models.DataTable(**{
  63. 'database_name': hiveDs.database_name,
  64. 'table_name': table_name,
  65. 'source': 1,
  66. 'type': 1,
  67. })
  68. if len(res[0][0])>0:
  69. db.add(db_item)
  70. db.commit()
  71. db.refresh(db_item)
  72. return {
  73. 'status': True,
  74. 'sql': create_table_str
  75. }
  76. else:
  77. raise Exception("建表失败")
  78. def get_ailab_table(db: Session, project_id: int):
  79. project: models.Project = db.query(models.Project)\
  80. .filter(models.Project.id==project_id).first()
  81. if not project:
  82. raise Exception('项目不存在')
  83. res = []
  84. if project.type == 0:
  85. res = hiveDs.list_tables()
  86. else:
  87. data_tables: List[models.DataTable] = db.query(models.DataTable)\
  88. .filter(models.DataTable.project_id==project_id)\
  89. .filter(models.DataTable.source==1).all()
  90. res = [t.table_name for t in data_tables]
  91. return res
  92. def get_ailab_table_schema(db: Session, table_name: str):
  93. table_schema = hiveDs.get_table_schema(table_name)
  94. return table_schema
  95. def get_preview_ailab_table(db: Session, table_name: str):
  96. preview_data = hiveDs.get_preview_data(table_name)
  97. return preview_data
  98. def get_lake_table(db: Session, project_id: int):
  99. data_tables: List[models.DataTable] = db.query(models.DataTable)\
  100. .filter(models.DataTable.project_id==project_id)\
  101. .filter(models.DataTable.source==0).all()
  102. return data_tables