|
@@ -4,7 +4,8 @@ from app.models import JobJdbcDatasource
|
|
|
from app.schemas.datax_json import DataXJsonParam
|
|
|
from app.utils import *
|
|
|
|
|
|
-
|
|
|
+from configs.settings import config
|
|
|
+import json
|
|
|
|
|
|
{
|
|
|
"writer": {
|
|
@@ -61,6 +62,12 @@ from app.utils import *
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+def _build_hadoop_configs():
|
|
|
+ hadoop = config.get('HADOOP_INNER', 'hadoop_config')
|
|
|
+ kerberos = config.get('HADOOP_INNER', 'kerberos_config')
|
|
|
+ return json.loads(hadoop), json.loads(kerberos)
|
|
|
+
|
|
|
class HdfsReader(WriterBase):
|
|
|
def __init__(self, ds: JobJdbcDatasource):
|
|
|
WriterBase.__init__(self, ds)
|
|
@@ -89,8 +96,15 @@ class HdfsReader(WriterBase):
|
|
|
|
|
|
def build_parameter(self, param: DataXJsonParam, is_show=True):
|
|
|
parameter = dict()
|
|
|
+ # 需要kerberos验证的hive
|
|
|
+ if str(self.ds.id) in config.get('HADOOP_INNER', 'datasource').split(','):
|
|
|
+ parameter['defaultFS'] = config.get('HADOOP_INNER', 'default_fs')
|
|
|
+ hadoop, kerberos = _build_hadoop_configs()
|
|
|
+ parameter['hadoopConfig'] = hadoop
|
|
|
+ parameter.update(kerberos)
|
|
|
+ else:
|
|
|
+ parameter['defaultFS'] = param.hive_reader.reader_default_fs
|
|
|
parameter['path'] = param.hive_reader.reader_path
|
|
|
- parameter['defaultFS'] = param.hive_reader.reader_default_fs
|
|
|
parameter['fileType'] = param.hive_reader.reader_file_type
|
|
|
parameter['fieldDelimiter'] = param.hive_reader.reader_field_delimiter
|
|
|
parameter['column'] = self._build_column(param.reader_columns)
|
|
@@ -128,9 +142,17 @@ class HdfsWriter(WriterBase):
|
|
|
return res
|
|
|
|
|
|
|
|
|
+
|
|
|
def build_parameter(self, param: DataXJsonParam, is_show=True):
|
|
|
parameter = dict()
|
|
|
- parameter['defaultFS'] = param.hive_writer.writer_default_fs
|
|
|
+ # 需要kerberos验证的hive
|
|
|
+ if str(self.ds.id) in config.get('HADOOP_INNER', 'datasource').split(','):
|
|
|
+ parameter['defaultFS'] = config.get('HADOOP_INNER', 'default_fs')
|
|
|
+ hadoop, kerberos = _build_hadoop_configs()
|
|
|
+ parameter['hadoopConfig'] = hadoop
|
|
|
+ parameter.update(kerberos)
|
|
|
+ else:
|
|
|
+ parameter['defaultFS'] = param.hive_writer.writer_default_fs
|
|
|
parameter['fileType'] = param.hive_writer.writer_file_type
|
|
|
parameter['path'] = param.hive_writer.writer_path
|
|
|
parameter['fileName'] = param.hive_writer.writer_filename
|