|
@@ -4,7 +4,8 @@ from app.models import JobJdbcDatasource
|
|
|
from app.schemas.datax_json import DataXJsonParam
|
|
|
from app.utils import *
|
|
|
|
|
|
-
|
|
|
+from configs.settings import config
|
|
|
+import json
|
|
|
|
|
|
{
|
|
|
"writer": {
|
|
@@ -61,6 +62,12 @@ from app.utils import *
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+def _build_hadoop_configs():
|
|
|
+ hadoop = config.get('HADOOP_INNER', 'hadoop_config')
|
|
|
+ kerberos = config.get('HADOOP_INNER', 'kerberos_config')
|
|
|
+ return json.loads(hadoop), json.loads(kerberos)
|
|
|
+
|
|
|
class HdfsReader(WriterBase):
|
|
|
def __init__(self, ds: JobJdbcDatasource):
|
|
|
WriterBase.__init__(self, ds)
|
|
@@ -82,17 +89,25 @@ class HdfsReader(WriterBase):
|
|
|
return res
|
|
|
|
|
|
def _convert_type(self, type):
|
|
|
- if type.lower() == 'int':
|
|
|
+ if 'int' in type.lower().strip():
|
|
|
return 'long'
|
|
|
- elif type.lower() == 'varchar':
|
|
|
+ elif 'varchar' in type.lower().strip():
|
|
|
return 'string'
|
|
|
+ return 'string'
|
|
|
|
|
|
def build_parameter(self, param: DataXJsonParam, is_show=True):
|
|
|
parameter = dict()
|
|
|
- parameter['path'] = param.hive_reader.reader_path
|
|
|
- parameter['defaultFS'] = param.hive_reader.reader_default_fs
|
|
|
+ # 需要kerberos验证的hive
|
|
|
+ if str(self.ds.id) in config.get('HADOOP_INNER', 'datasource').split(','):
|
|
|
+ parameter['defaultFS'] = config.get('HADOOP_INNER', 'default_fs')
|
|
|
+ hadoop, kerberos = _build_hadoop_configs()
|
|
|
+ parameter['hadoopConfig'] = hadoop
|
|
|
+ parameter.update(kerberos)
|
|
|
+ else:
|
|
|
+ parameter['defaultFS'] = param.hive_reader.reader_default_fs
|
|
|
+ parameter['path'] = param.hive_reader.reader_path.strip()
|
|
|
parameter['fileType'] = param.hive_reader.reader_file_type
|
|
|
- parameter['fieldDelimiter'] = param.hive_reader.reader_field_delimiter
|
|
|
+ parameter['fieldDelimiter'] = param.hive_reader.reader_field_delimiter.strip()
|
|
|
parameter['column'] = self._build_column(param.reader_columns)
|
|
|
return parameter
|
|
|
|
|
@@ -128,14 +143,22 @@ class HdfsWriter(WriterBase):
|
|
|
return res
|
|
|
|
|
|
|
|
|
+
|
|
|
def build_parameter(self, param: DataXJsonParam, is_show=True):
|
|
|
parameter = dict()
|
|
|
- parameter['defaultFS'] = param.hive_writer.writer_default_fs
|
|
|
+ # 需要kerberos验证的hive
|
|
|
+ if str(self.ds.id) in config.get('HADOOP_INNER', 'datasource').split(','):
|
|
|
+ parameter['defaultFS'] = config.get('HADOOP_INNER', 'default_fs')
|
|
|
+ hadoop, kerberos = _build_hadoop_configs()
|
|
|
+ parameter['hadoopConfig'] = hadoop
|
|
|
+ parameter.update(kerberos)
|
|
|
+ else:
|
|
|
+ parameter['defaultFS'] = param.hive_writer.writer_default_fs
|
|
|
parameter['fileType'] = param.hive_writer.writer_file_type
|
|
|
- parameter['path'] = param.hive_writer.writer_path
|
|
|
+ parameter['path'] = param.hive_writer.writer_path.strip()
|
|
|
parameter['fileName'] = param.hive_writer.writer_filename
|
|
|
parameter['writeMode'] = param.hive_writer.writer_mode
|
|
|
- parameter['fieldDelimiter'] = param.hive_writer.writer_field_delimiter
|
|
|
+ parameter['fieldDelimiter'] = param.hive_writer.writer_field_delimiter.strip()
|
|
|
parameter['column'] = self._build_column(param.writer_columns)
|
|
|
return parameter
|
|
|
|