from app.core.datasource.datasource import DataSourceBase from pyhive import hive from pyhive.exc import DatabaseError from configs.logging import logger from utils import flat_map class HiveDS(DataSourceBase): type = 'hive' def __init__(self, host, port, username, password, database_name, type='hive'): DataSourceBase.__init__(self, host, port, username, password, database_name, type) self.host = host self.port = port self.username = username self.password = password self.database_name = 'default' if not database_name else database_name @property def jdbc_url(self): return f'jdbc:hive2://{self.host}:{self.port}/{self.database_name}' @property def jdbc_driver_class(self): return 'org.apache.hive.jdbc.HiveDriver' @property def connection_str(self): pass def _execute_sql(self, sqls): conn = None res = [] try: conn = hive.Connection(host=self.host, port=self.port, username=self.username, database=self.database_name) cursor = conn.cursor() for sql in sqls: cursor.execute(sql) res.append(cursor.fetchall()) logger.info(res) except Exception as e: logger.error(e) finally: if conn is not None: conn.close() return res def is_connect(self): sql = 'select 1' res = self._execute_sql([sql]) logger.info(res) if res: return True else: return False def get_preview_data(self, table_name, limit=100): sql1 = f'describe {self.database_name}.{table_name}' sql2 = f"SELECT * FROM {table_name} LIMIT {limit}" res = self._execute_sql([sql1, sql2]) logger.info(res) return { 'header': flat_map(lambda x: [':'.join(x[:2])], res[0]), 'content': res[1] } def list_tables(self): sql = f'show tables' res = self._execute_sql([sql]) return flat_map(lambda x: x, res[0]) def get_table_schema(self, table_name): logger.info(self.database_name) sql = f'describe {self.database_name}.{table_name}' res = self._execute_sql([sql]) if res: print(res[0]) res = [[str(i) , *x]for i, x in enumerate(filter(lambda x: x[0] != '', res[0]))] logger.info(res) return flat_map(lambda x: [':'.join(x[:3])], res) else: raise Exception('table not found')