|
@@ -1,11 +1,35 @@
|
|
|
|
+import base64
|
|
import time
|
|
import time
|
|
from typing import List
|
|
from typing import List
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy.orm import Session
|
|
from app.core.ds.datasource import DataSrouceFactory
|
|
from app.core.ds.datasource import DataSrouceFactory
|
|
import app.schemas as schemas
|
|
import app.schemas as schemas
|
|
import app.models as models
|
|
import app.models as models
|
|
|
|
+from configs.logging import logger
|
|
|
|
|
|
-def _format_datasource(item: schemas.JobJdbcDatasourceBase):
|
|
|
|
|
|
+
|
|
|
|
+def _decode(url, datasource, database_name):
|
|
|
|
+ return url.replace('jdbc:', '').replace('hive2://', '').replace(f'{datasource}://', '').replace(f'/{database_name}', '')
|
|
|
|
+
|
|
|
|
+def _decode_user(username, password):
|
|
|
|
+ if not username or not password:
|
|
|
|
+ return '' , ''
|
|
|
|
+ try:
|
|
|
|
+ u = base64.b64decode(username).decode('utf-8')
|
|
|
|
+ p = base64.b64decode(password).decode('utf-8')
|
|
|
|
+ return u, p
|
|
|
|
+ except Exception as e:
|
|
|
|
+ logger.error(e)
|
|
|
|
+ return username, password
|
|
|
|
+
|
|
|
|
+def _format_datasource(db: Session, item: schemas.JobJdbcDatasourceBase, ds_id: int=0):
|
|
|
|
+
|
|
|
|
+ if ds_id != 0:
|
|
|
|
+ item = db.query(models.JobJdbcDatasource).filter(models.JobJdbcDatasource.id == ds_id).first()
|
|
|
|
+ if not item:
|
|
|
|
+ raise Exception('JobJdbcDatasource not found')
|
|
|
|
+ item.jdbc_url = _decode(item.jdbc_url, item.datasource, item.database_name)
|
|
|
|
+ item.jdbc_username, item.jdbc_password = _decode_user(item.jdbc_username, item.jdbc_password)
|
|
host, port = item.jdbc_url.split(':')
|
|
host, port = item.jdbc_url.split(':')
|
|
if not host or not port:
|
|
if not host or not port:
|
|
raise Exception('jdbc_url is invalid')
|
|
raise Exception('jdbc_url is invalid')
|
|
@@ -17,20 +41,20 @@ def _format_datasource(item: schemas.JobJdbcDatasourceBase):
|
|
return ds, item
|
|
return ds, item
|
|
|
|
|
|
|
|
|
|
-def test_datasource_connection(item: schemas.JobJdbcDatasourceCreate):
|
|
|
|
- ds, item = _format_datasource(item)
|
|
|
|
|
|
+def test_datasource_connection(db: Session, item: schemas.JobJdbcDatasourceCreate):
|
|
|
|
+ ds, item = _format_datasource(db, item)
|
|
return ds.is_connect()
|
|
return ds.is_connect()
|
|
|
|
|
|
-def get_table_schema(item: schemas.JobJdbcDatasourceCreate, table_name: str):
|
|
|
|
- ds, item = _format_datasource(item)
|
|
|
|
|
|
+def get_table_schema(db: Session, ds_id: int, table_name: str):
|
|
|
|
+ ds, item = _format_datasource(db, None, ds_id)
|
|
return ds.get_table_schema(table_name)
|
|
return ds.get_table_schema(table_name)
|
|
|
|
|
|
-def get_preview_data(item: schemas.JobJdbcDatasourceCreate, table_name: str, limit: int = 100):
|
|
|
|
- ds, item = _format_datasource(item)
|
|
|
|
|
|
+def get_preview_data(db: Session, ds_id: int, table_name: str, limit: int = 100):
|
|
|
|
+ ds, item = _format_datasource(db, None, ds_id)
|
|
return ds.get_preview_data(table_name, limit)
|
|
return ds.get_preview_data(table_name, limit)
|
|
|
|
|
|
-def get_table_names(item: schemas.JobJdbcDatasourceCreate):
|
|
|
|
- ds, item = _format_datasource(item)
|
|
|
|
|
|
+def get_table_names(db: Session, ds_id: int):
|
|
|
|
+ ds, item = _format_datasource(db, None, ds_id)
|
|
return ds.list_tables()
|
|
return ds.list_tables()
|
|
|
|
|
|
def create_job_jdbc_datasource(db: Session, item: schemas.JobJdbcDatasourceCreate):
|
|
def create_job_jdbc_datasource(db: Session, item: schemas.JobJdbcDatasourceCreate):
|
|
@@ -53,8 +77,6 @@ def create_job_jdbc_datasource(db: Session, item: schemas.JobJdbcDatasourceCreat
|
|
|
|
|
|
|
|
|
|
def get_job_jdbc_datasources(db: Session, skip: int = 0, limit: int = 20):
|
|
def get_job_jdbc_datasources(db: Session, skip: int = 0, limit: int = 20):
|
|
- def _decode(url, datasource, database_name):
|
|
|
|
- return url.replace('jdbc:', '').replace(f'{datasource}://', '').replace(f'/{database_name}', '')
|
|
|
|
|
|
|
|
res: List[models.JobJdbcDatasource] = db.query(models.JobJdbcDatasource).filter(models.JobJdbcDatasource.status==1).all()
|
|
res: List[models.JobJdbcDatasource] = db.query(models.JobJdbcDatasource).filter(models.JobJdbcDatasource.status==1).all()
|
|
for item in res:
|
|
for item in res:
|
|
@@ -65,7 +87,7 @@ def get_job_jdbc_datasources(db: Session, skip: int = 0, limit: int = 20):
|
|
|
|
|
|
|
|
|
|
def update_job_jdbc_datasources(db: Session, ds_id: int, update_item: schemas.JobJdbcDatasourceUpdate):
|
|
def update_job_jdbc_datasources(db: Session, ds_id: int, update_item: schemas.JobJdbcDatasourceUpdate):
|
|
- ds, update_item = _format_datasource(update_item)
|
|
|
|
|
|
+ ds, update_item = _format_datasource(db, update_item)
|
|
|
|
|
|
db_item = db.query(models.JobJdbcDatasource).filter(models.JobJdbcDatasource.id == ds_id).first()
|
|
db_item = db.query(models.JobJdbcDatasource).filter(models.JobJdbcDatasource.id == ds_id).first()
|
|
if not db_item:
|
|
if not db_item:
|