Browse Source

update hive

Zhang Li 2 years ago
parent
commit
7d7941d389
5 changed files with 84 additions and 7 deletions
  1. 60 0
      app/core/hive.py
  2. 2 0
      app/core/mysql.py
  3. 15 6
      app/schemas/job_jdbc_datasouce.py
  4. 6 1
      requirements.txt
  5. 1 0
      utils/__init__.py

+ 60 - 0
app/core/hive.py

@@ -1,8 +1,20 @@
 from app.core.datasource import DataSourceBase
+from pyhive import hive
+from pyhive.exc import DatabaseError
+from configs.logging import logger
+from utils import flat_map
 
 class HiveDS(DataSourceBase):
     type = 'hive'
 
+    def __init__(self, host, port, username, password, database_name, type='hive'):
+        DataSourceBase.__init__(self, host, port, username, password, database_name, type)
+        self.host = host
+        self.port = port
+        self.username = username
+        self.password = password
+        self.database_name = 'default' if not database_name else database_name
+
     @property
     def jdbc_url(self):
         return f'jdbc:hive2://{self.host}:{self.port}/{self.database_name}'
@@ -11,4 +23,52 @@ class HiveDS(DataSourceBase):
     def jdbc_driver_class(self):
         return 'org.apache.hive.jdbc.HiveDriver'
 
+    @property
+    def connection_str(self):
+        pass
+
+    def _execute_sql(self, sqls):
+        conn = None
+        res = []
+        try:
+            conn = hive.Connection(host=self.host, port=self.port, username=self.username, database=self.database_name)
+
+            cursor = conn.cursor()
+            for sql in sqls:
+                cursor.execute(sql)
+                res.append(cursor.fetchall())
+            logger.info(res)
+        except Exception as e:
+            logger.error(e)
+
+        finally:
+            if conn is not None:
+                conn.close()
+        return res
+
+    def is_connect(self):
+        sql = 'select 1'
+        res = self._execute_sql([sql])
+        logger.info(res)
+        if res:
+            return True
+        else:
+            return False
+
+
+    def get_preview_data(self, table_name, limit=100):
+        sql1 = f'describe {self.database_name}.{table_name}'
+        sql2 = f"SELECT * FROM {table_name} LIMIT {limit}"
+        res = self._execute_sql([sql1, sql2])
+        logger.info(res)
+        return {
+            'header': flat_map(lambda x: [':'.join(x[:2])], res[0]),
+            'content': res[1]
+        }
+
+    def list_tables(self):
+        sql = f'show tables'
+        res = self._execute_sql([sql])
+        return flat_map(lambda x: x, res[0])
+
 

+ 2 - 0
app/core/mysql.py

@@ -1,4 +1,5 @@
 
+from dataclasses import dataclass
 from app.core.datasource import DataSourceBase
 from mysql import connector
 from mysql.connector import Error
@@ -8,6 +9,7 @@ from configs import logger
 from utils import flat_map
 
 
+@dataclass
 class MysqlDS(DataSourceBase):
     type = 'mysql'
 

+ 15 - 6
app/schemas/job_jdbc_datasouce.py

@@ -20,13 +20,22 @@ class JobJdbcDatasourceBase(BaseModel):
     comments: str
     class Config:
         schema_extra = {
+            # "example": {
+            #     "datasource_name": 'test',
+            #     "datasource": "mysql",
+            #     "database_name": 'datax_web',
+            #     "jdbc_username": 'root',
+            #     "jdbc_password": 'happylay',
+            #     "jdbc_url": '192.168.199.107:10086',
+            #     "comments": 'This is a very nice Item'
+            # }
             "example": {
-                "datasource_name": 'test',
-                "datasource": "mysql",
-                "database_name": 'datax_web',
-                "jdbc_username": 'root',
-                "jdbc_password": 'happylay',
-                "jdbc_url": '192.168.199.107:10086',
+                "datasource_name": 'testhive',
+                "datasource": "hive",
+                "database_name": 'default',
+                "jdbc_username": '',
+                "jdbc_password": '',
+                "jdbc_url": '192.168.199.107:10000',
                 "comments": 'This is a very nice Item'
             }
         }

+ 6 - 1
requirements.txt

@@ -1,2 +1,7 @@
 fastapi_pagination=0.9.3
-pymysql=1.0.2
+pymysql=1.0.2
+PyHive=0.6.5
+pure-sasl=0.6.2
+sasl=0.3.1
+thrift=0.16.0
+thrift-sasl=0.4.3

+ 1 - 0
utils/__init__.py

@@ -4,5 +4,6 @@ from utils.sx_time import *
 def flat_map(f, xs):
         ys = []
         for x in xs:
+            print(x)
             ys.extend(f(x))
         return ys