hive.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. from app.core.ds.datasource import DataSourceBase
  2. from pyhive import hive
  3. from pyhive.exc import DatabaseError
  4. from configs.logging import logger
  5. from utils import flat_map
  6. class HiveDS(DataSourceBase):
  7. type = 'hive'
  8. def __init__(self, host, port, username, password, database_name, type='hive'):
  9. DataSourceBase.__init__(self, host, port, username, password, database_name, type)
  10. self.host = host
  11. self.port = port
  12. self.username = username
  13. self.password = password
  14. self.database_name = 'default' if not database_name else database_name
  15. @property
  16. def jdbc_url(self):
  17. return f'jdbc:hive2://{self.host}:{self.port}/{self.database_name}'
  18. @property
  19. def jdbc_driver_class(self):
  20. return 'org.apache.hive.jdbc.HiveDriver'
  21. @property
  22. def connection_str(self):
  23. pass
  24. def _execute_sql(self, sqls):
  25. conn = None
  26. res = []
  27. try:
  28. conn = hive.Connection(host=self.host, port=self.port, username=self.username, database=self.database_name)
  29. cursor = conn.cursor()
  30. for sql in sqls:
  31. cursor.execute(sql)
  32. res.append(cursor.fetchall())
  33. logger.info(res)
  34. except Exception as e:
  35. logger.error(e)
  36. finally:
  37. if conn is not None:
  38. conn.close()
  39. return res
  40. def is_connect(self):
  41. sql = 'select 1'
  42. res = self._execute_sql([sql])
  43. logger.info(res)
  44. if res:
  45. return True
  46. else:
  47. return False
  48. def get_preview_data(self, table_name, limit=100):
  49. sql1 = f'describe {self.database_name}.{table_name}'
  50. sql2 = f"SELECT * FROM {table_name} LIMIT {limit}"
  51. res = self._execute_sql([sql1, sql2])
  52. logger.info(res)
  53. return {
  54. 'header': flat_map(lambda x: [':'.join(x[:2])], res[0]),
  55. 'content': res[1]
  56. }
  57. def list_tables(self):
  58. sql = f'show tables'
  59. res = self._execute_sql([sql])
  60. return flat_map(lambda x: x, res[0])
  61. def get_table_schema(self, table_name):
  62. logger.info(self.database_name)
  63. sql = f'describe {self.database_name}.{table_name}'
  64. res = self._execute_sql([sql])
  65. if res:
  66. res = [[str(i) , *x]for i, x in enumerate(res[0])]
  67. logger.info(res)
  68. return flat_map(lambda x: [':'.join(x[:3])], res)
  69. else:
  70. raise Exception('table not found')