hive.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. import os
  2. from app.core.datasource.datasource import DataSourceBase
  3. from pyhive import hive
  4. from pyhive.exc import DatabaseError
  5. from configs.logging import logger
  6. from utils import flat_map
  7. class HiveDS(DataSourceBase):
  8. type = 'hive'
  9. def __init__(self, host, port,database_name,\
  10. username=None, password=None, kerberos=0, \
  11. keytab=None, krb5config=None, kerberos_service_name=None, \
  12. principal=None, type='hive'):
  13. DataSourceBase.__init__(self, host, port, username, password, database_name, type)
  14. self.host = host
  15. self.port = port
  16. self.username = username
  17. self.password = password
  18. self.database_name = 'default' if not database_name else database_name
  19. self.kerberos = int(kerberos)
  20. self.keytab = keytab
  21. self.krb5config = krb5config
  22. self.kerberos_service_name = kerberos_service_name
  23. self.principal = principal
  24. @property
  25. def jdbc_url(self):
  26. return f'jdbc:hive2://{self.host}:{self.port}/{self.database_name}'
  27. @property
  28. def jdbc_driver_class(self):
  29. return 'org.apache.hive.jdbc.HiveDriver'
  30. @property
  31. def connection_str(self):
  32. pass
  33. def _execute_sql(self, sqls):
  34. conn = None
  35. res = []
  36. try:
  37. if self.kerberos == 0:
  38. conn = hive.Connection(host=self.host, port=self.port, username=self.username, database=self.database_name)
  39. else:
  40. os.system(f'kiinit -kt {self.keytab} {self.principal}')
  41. conn = hive.Connection(host=self.host, database=self.database_name, port=self.port, auth="KERBEROS", kerberos_service_name=self.kerberos_service_name)
  42. cursor = conn.cursor()
  43. for sql in sqls:
  44. cursor.execute(sql)
  45. res.append(cursor.fetchall())
  46. # logger.info(res)
  47. except Exception as e:
  48. logger.error(e)
  49. finally:
  50. if conn is not None:
  51. conn.close()
  52. return res
  53. def is_connect(self):
  54. sql = 'select 1'
  55. res = self._execute_sql([sql])
  56. logger.info(res)
  57. if res:
  58. return True
  59. else:
  60. return False
  61. def get_preview_data(self, table_name, limit=100, page = 0):
  62. sql1 = f'describe {self.database_name}.{table_name}'
  63. sql2 = f"SELECT * FROM {table_name} LIMIT {page},{limit}"
  64. res = self._execute_sql([sql1, sql2])
  65. logger.info(res)
  66. return {
  67. 'header': flat_map(lambda x: [':'.join(x[:2])], res[0]),
  68. 'content': res[1]
  69. }
  70. def get_data_num(self, table_name):
  71. sql2 = f"SELECT 1 FROM {table_name}"
  72. res = self._execute_sql([sql2])
  73. return len(res[0])
  74. def list_tables(self):
  75. sql = f'show tables'
  76. res = self._execute_sql([sql])
  77. return flat_map(lambda x: x, res[0])
  78. def get_table_schema(self, table_name):
  79. logger.info(self.database_name)
  80. sql1 = f'show columns in {self.database_name}.{table_name}'
  81. res = self._execute_sql([sql1])
  82. if res:
  83. columns = list(map(lambda x: x[0],res[0]))
  84. # logger.info(columns)
  85. else:
  86. raise Exception(f'{table_name} no columns')
  87. ans = []
  88. for i, col in enumerate(columns):
  89. sql = f'describe {self.database_name}.{table_name} {col}'
  90. try:
  91. res = self._execute_sql([sql])
  92. if res:
  93. # print(res[0])
  94. res = [[str(i), *x] for x in filter(lambda x: x[0] != '', res[0])]
  95. ans.append(''.join(flat_map(lambda x: ':'.join(x[:3]), res)))
  96. else:
  97. raise Exception('table not found')
  98. except Exception:
  99. return ans
  100. return ans