瀏覽代碼

高可用初版

liweiquan 2 年之前
父節點
當前提交
f3e1394099
共有 4 個文件被更改,包括 83 次插入72 次删除
  1. 7 1
      app/common/hive.py
  2. 2 2
      app/core/datasource/datasource.py
  3. 71 69
      app/core/datasource/hive.py
  4. 3 0
      txtest.ini

+ 7 - 1
app/common/hive.py

@@ -13,6 +13,9 @@ KEYTAB = config.get('HIVE', 'KEYTAB', vars=DefaultOption(config, 'HIVE', KEYTAB
 KRB5CONFIG = config.get('HIVE', 'KRB5CONFIG', vars=DefaultOption(config, 'HIVE', KRB5CONFIG = None))
 KERBEROS_SERVICE_NAME = config.get('HIVE', 'KERBEROS_SERVICE_NAME', vars=DefaultOption(config, 'HIVE', KERBEROS_SERVICE_NAME = None))
 PRINCIPAL = config.get('HIVE', 'PRINCIPAL', vars=DefaultOption(config, 'HIVE', PRINCIPAL = None))
+ZOOKEEPER_ENABLE = config.get('HIVE', 'ZOOKEEPER_ENABLE', vars=DefaultOption(config, 'HIVE', ZOOKEEPER_ENABLE = 0))
+ZOOKEEPER_HOSTS = config.get('HIVE', 'ZOOKEEPER_HOSTS', vars=DefaultOption(config, 'HIVE', ZOOKEEPER_HOSTS = None))
+ZOOKEEPER_NAMESPACE = config.get('HIVE', 'ZOOKEEPER_NAMESPACE', vars=DefaultOption(config, 'HIVE', ZOOKEEPER_NAMESPACE = None))
 
 hiveDs = HiveDS(**{'type': 'hive',
                     'host': HOST,
@@ -25,4 +28,7 @@ hiveDs = HiveDS(**{'type': 'hive',
                     'krb5config': KRB5CONFIG,
                     'kerberos_service_name': KERBEROS_SERVICE_NAME,
                     'principal':PRINCIPAL,
-                    'path_type': 'local'})
+                    'path_type': 'local',
+                    'zookeeper_enable': ZOOKEEPER_ENABLE,
+                    'zookeeper_hosts': ZOOKEEPER_HOSTS,
+                    'zookeeper_namespace': ZOOKEEPER_NAMESPACE})

+ 2 - 2
app/core/datasource/datasource.py

@@ -7,8 +7,8 @@ from app.utils import *
 @dataclass
 class DataSourceBase:
     type: str
-    host: str
-    port: int
+    host: Optional[str]
+    port: Optional[int]
     username: str
     password: str
     database_name: str

+ 71 - 69
app/core/datasource/hive.py

@@ -1,4 +1,6 @@
 import os
+import random
+import re
 from app.core.datasource.datasource import DataSourceBase
 from pyhive import hive
 from pyhive.exc import DatabaseError
@@ -8,6 +10,7 @@ from utils import flat_map
 import sasl
 from thrift_sasl import TSaslClientTransport
 from thrift.transport.TSocket import TSocket
+from kazoo.client import KazooClient
 
 
 def create_hive_plain_transport(host, port, username, password, timeout=10):
@@ -41,13 +44,15 @@ def create_hive_kerberos_plain_transport(host, port, kerberos_service_name, time
 
     return TSaslClientTransport(sasl_factory, sasl_auth, socket)
 
+
 class HiveDS(DataSourceBase):
     type = 'hive'
 
     def __init__(self, host, port,database_name,\
         username=None, password=None,  kerberos=0, \
         keytab=None, krb5config=None, kerberos_service_name=None, \
-        principal=None, type='hive', path_type='minio'):
+        principal=None, type='hive', path_type='minio', \
+        zookeeper_enable=0, zookeeper_hosts=None, zookeeper_namespace=None):
         DataSourceBase.__init__(self, host, port, username, password, database_name, type)
         self.host = host
         self.port = port
@@ -60,6 +65,10 @@ class HiveDS(DataSourceBase):
         self.kerberos_service_name = kerberos_service_name
         self.principal = principal
         self.path_type = path_type
+        self.zookeeper_enable = zookeeper_enable
+        self.zookeeper_hosts = zookeeper_hosts
+        self.zookeeper_namespace = zookeeper_namespace
+
 
 
     @property
@@ -74,44 +83,70 @@ class HiveDS(DataSourceBase):
     def connection_str(self):
         pass
 
-    def _execute_sql(self, sqls, db_name: str = None):
+    def _connect(self, db_name: str = None):
         conn = None
-        res = []
-        try:
-            if self.kerberos == 0:
-                # conn = hive.Connection(host=self.host, port=self.port, username=self.username, database=self.database_name)
-                conn = hive.connect(
-                    thrift_transport=create_hive_plain_transport(
-                        host=self.host,
-                        port=self.port,
-                        username=self.username,
-                        password=self.password,
-                        timeout=10
-                    ),
-                    database=db_name if db_name else self.database_name
-                )
-            else:
-                file_name = ''
-                if self.path_type == 'minio':
-                    get_kerberos_to_local(self.keytab)
-                    file_name = './assets/kerberos/'+self.keytab.split("/")[-1]
+        host_list = [f'{self.host}:{self.port}']
+        if self.zookeeper_enable == 1:
+            zk_client = KazooClient(hosts=self.zookeeper_hosts)
+            zk_client.start()
+            result = zk_client.get_children(self.zookeeper_namespace)
+            zk_client.stop()
+            host_list = []
+            for host in result:
+                if bool(re.search(r"(serverUri)",host)):
+                    host_list.append(host.split("=")[1].split(";")[0])
+        host_count = len(host_list)
+        while host_count > 0:
+            host_count -= 1
+            index = random.randint(0, host_count)
+            host_str = host_list.pop(index).split(":")
+            try:
+                if self.kerberos == 0:
+                    conn = hive.connect(
+                        thrift_transport=create_hive_plain_transport(
+                            host=host_str[0],
+                            port=host_str[1],
+                            username=self.username,
+                            password=self.password,
+                            timeout=10
+                        ),
+                        database=db_name if db_name else self.database_name
+                    )
                 else:
-                    file_name = self.keytab
-                auth_res = os.system(f'kinit -kt {file_name} {self.principal}')
-                if auth_res != 0:
-                    raise Exception('hive 连接失败')
-                # conn = hive.Connection(host=self.host, port=self.port,  auth="KERBEROS", kerberos_service_name=self.kerberos_service_name, database=self.database_name)
-                conn = hive.connect(
-                    thrift_transport=create_hive_kerberos_plain_transport(
-                        host=self.host,
-                        port=self.port,
-                        kerberos_service_name=self.kerberos_service_name,
-                        timeout=10
-                    ),
-                    database=db_name if db_name else self.database_name
-                )
+                    file_name = ''
+                    if self.path_type == 'minio':
+                        get_kerberos_to_local(self.keytab)
+                        file_name = './assets/kerberos/'+self.keytab.split("/")[-1]
+                    else:
+                        file_name = self.keytab
+                    auth_res = os.system(f'kinit -kt {file_name} {self.principal}')
+                    if auth_res != 0:
+                        raise Exception('hive 连接失败')
+                    conn = hive.connect(
+                        thrift_transport=create_hive_kerberos_plain_transport(
+                            host=host_str[0],
+                            port=host_str[1],
+                            kerberos_service_name=self.kerberos_service_name,
+                            timeout=10
+                        ),
+                        database=db_name if db_name else self.database_name
+                    )
+                cursor = conn.cursor()
+                cursor.execute('select 1')
+                result = cursor.fetchall()
+                print('获取连接,通过连接查询数据测试===>',result)
+                if result:
+                    return conn
+            except Exception as e:
+                logger.error(e)
+        raise Exception('hive 连接失败')
 
 
+    def _execute_sql(self, sqls, db_name: str = None):
+        conn = None
+        res = []
+        try:
+            conn = self._connect(db_name)
             cursor = conn.cursor()
             for sql in sqls:
                 cursor.execute(sql)
@@ -130,40 +165,7 @@ class HiveDS(DataSourceBase):
         conn = None
         res = []
         try:
-            if self.kerberos == 0:
-                # conn = hive.Connection(host=self.host, port=self.port, username=self.username, database=self.database_name)
-                conn = hive.connect(
-                    thrift_transport=create_hive_plain_transport(
-                        host=self.host,
-                        port=self.port,
-                        username=self.username,
-                        password=self.password,
-                        timeout=10
-                    ),
-                    database=self.database_name
-                )
-            else:
-                file_name = ''
-                if self.path_type == 'minio':
-                    get_kerberos_to_local(self.keytab)
-                    file_name = './assets/kerberos/'+self.keytab.split("/")[-1]
-                else:
-                    file_name = self.keytab
-                auth_res = os.system(f'kinit -kt {file_name} {self.principal}')
-                if auth_res != 0:
-                    raise Exception('hive 连接失败')
-                # conn = hive.Connection(host=self.host, port=self.port,  auth="KERBEROS", kerberos_service_name=self.kerberos_service_name, database=self.database_name)
-                conn = hive.connect(
-                    thrift_transport=create_hive_kerberos_plain_transport(
-                        host=self.host,
-                        port=self.port,
-                        kerberos_service_name=self.kerberos_service_name,
-                        timeout=10
-                    ),
-                    database=self.database_name
-                )
-
-
+            conn = self._connect()
             cursor = conn.cursor()
             for sql in sqls:
                 cursor.execute(sql)

+ 3 - 0
txtest.ini

@@ -44,6 +44,9 @@ krb5config = assets/test/krb5.conf
 kerberos_service_name = hadoop
 principal = ailab@EMR-5XJSY31F
 base_path = /usr/hive/warehouse/ailab.db/
+zookeeper_enable=1
+zookeeper_hosts = 10.254.20.23:2181,10.254.20.26:2181,10.254.20.29:2181
+zookeeper_namespace = hiveserver2
 
 
 [HIVE_METASTORE]