liweiquan 2 лет назад
Родитель
Сommit
ba75b9dd96
1 измененных файлов с 54 добавлено и 2 удалено
  1. 54 2
      app/core/datasource/hive.py

+ 54 - 2
app/core/datasource/hive.py

@@ -5,6 +5,40 @@ from pyhive.exc import DatabaseError
 from app.utils.get_kerberos import get_kerberos_to_local
 from configs.logging import logger
 from utils import flat_map
+import sasl
+from thrift_sasl import TSaslClientTransport
+from thrift.transport.TSocket import TSocket
+
+
+def create_hive_plain_transport(host, port, username, password, timeout=10):
+    socket = TSocket(host, port)
+    socket.setTimeout(timeout * 1000)
+
+    sasl_auth = 'PLAIN'
+
+    def sasl_factory():
+        sasl_client = sasl.Client()
+        sasl_client.setAttr('host', host)
+        sasl_client.setAttr('username', username)
+        sasl_client.setAttr('password', password)
+        sasl_client.init()
+        return sasl_client
+
+    return TSaslClientTransport(sasl_factory, sasl_auth, socket)
+
+def create_hive_kerberos_plain_transport(host, port, timeout=10):
+    socket = TSocket(host, port)
+    socket.setTimeout(timeout * 1000)
+
+    sasl_auth = 'PLAIN'
+
+    def sasl_factory():
+        sasl_client = sasl.Client()
+        sasl_client.setAttr('host', host)
+        sasl_client.init()
+        return sasl_client
+
+    return TSaslClientTransport(sasl_factory, sasl_auth, socket)
 
 class HiveDS(DataSourceBase):
     type = 'hive'
@@ -44,7 +78,17 @@ class HiveDS(DataSourceBase):
         res = []
         try:
             if self.kerberos == 0:
-                conn = hive.Connection(host=self.host, port=self.port, username=self.username, database=self.database_name)
+                # conn = hive.Connection(host=self.host, port=self.port, username=self.username, database=self.database_name)
+                conn = hive.connect(
+                    thrift_transport=create_hive_plain_transport(
+                        host=self.host,
+                        port=self.port,
+                        username=self.username,
+                        password=self.password,
+                        timeout=10
+                    ),
+                    database=self.database_name
+                )
             else:
                 file_name = ''
                 if self.path_type == 'minio':
@@ -53,7 +97,15 @@ class HiveDS(DataSourceBase):
                 else:
                     file_name = self.keytab
                 os.system(f'kinit -kt {file_name} {self.principal}')
-                conn = hive.Connection(host=self.host, database=self.database_name, port=self.port,  auth="KERBEROS", kerberos_service_name=self.kerberos_service_name)
+                # conn = hive.Connection(host=self.host, port=self.port,  auth="KERBEROS", kerberos_service_name=self.kerberos_service_name, database=self.database_name)
+                conn = hive.connect(
+                    thrift_transport=create_hive_kerberos_plain_transport(
+                        host=self.host,
+                        port=self.port,
+                        timeout=10
+                    ),
+                    auth="KERBEROS", kerberos_service_name=self.kerberos_service_name, database=self.database_name
+                )
 
 
             cursor = conn.cursor()