from typing import List from app.core.datax.base import ReaderBase, WriterBase from app.schemas.datax_json import DataXJsonParam, RWDataSource from app.utils import * from configs.settings import config import json { "writer": { "name": "hdfswriter", "parameter": { "defaultFS": "hdfs://192.168.199.107:9000", "fileType": "text", "path": "/usr/hive/warehouse/test_1", "fileName": "test_1", "writeMode": "append", "fieldDelimiter": "|", "column": [ { "name": "id", "type": "int" }, { "name": "ssn", "type": "varchar" }, { "name": "test2", "type": "int" } ] } } } { "reader": { "name": "hdfsreader", "parameter": { "path": "/usr/hive/warehouse/grades/*", "defaultFS": "hdfs://192.168.199.107:9000", "fileType": "csv", "fieldDelimiter": ",", "column": [ { "index": 0, "type": "long" }, { "index": 3, "type": "string" }, { "index": 5, "type": "long" } ] } } } def _build_hadoop_configs(): hadoop = config.get('HADOOP_INNER', 'hadoop_config') kerberos = config.get('HADOOP_INNER', 'kerberos_config') return json.loads(hadoop), json.loads(kerberos) class HdfsReader(ReaderBase): def __init__(self, ds: RWDataSource): ReaderBase.__init__(self, ds) if ds.datasource == 'hive': self.name = 'hdfsreader' else: raise Exception('Unimplemented HdfsReader') def _build_column(self, columns: List[dict]): res = [] for column in columns: tmp = dict() index, name, type = column.split(':') tmp['index'] = index tmp['type'] = self._convert_type(type) res.append(tmp) if not res: raise Exception('No column found') return res def _convert_type(self, type): if 'int' in type.lower().strip(): return 'long' elif 'varchar' in type.lower().strip(): return 'string' return 'string' def build_parameter(self, param: DataXJsonParam, is_show=True): parameter = dict() # 需要kerberos验证的hive if self.ds.rw_type =='ailab': parameter['defaultFS'] = config.get('HADOOP_INNER', 'default_fs') hadoop, kerberos = _build_hadoop_configs() if len(hadoop) > 0: parameter['hadoopConfig'] = hadoop parameter.update(kerberos) elif self.ds.rw_type == 'datalake': #读取数据湖的配置 parameter['defaultFS'] = config.get('LAKE_HADOOP_INNER', 'default_fs') hadoop, kerberos = _build_hadoop_configs() if len(hadoop) > 0: parameter['hadoopConfig'] = hadoop parameter.update(kerberos) else: parameter['defaultFS'] = param.hive_reader.reader_default_fs parameter['path'] = param.hive_reader.reader_path.strip() parameter['fileType'] = param.hive_reader.reader_file_type parameter['fieldDelimiter'] = param.hive_reader.reader_field_delimiter.strip() parameter['column'] = self._build_column(param.reader_columns) return parameter def build(self, param: DataXJsonParam, is_show=True): reader = dict() parameter = self.build_parameter(param, is_show) reader['name'] = self.name reader['parameter'] = parameter return reader class HdfsWriter(WriterBase): def __init__(self, ds: RWDataSource): WriterBase.__init__(self, ds) if ds.datasource == 'hive': self.name = 'hdfswriter' else: raise Exception('Unimplemented HdfsWriter') def _build_column(self, columns: List[dict]): res = [] for column in columns: tmp = dict() _, name, type = column.split(':') tmp['name'] = name tmp['type'] = type res.append(tmp) if not res: raise Exception('No column found') return res def build_parameter(self, param: DataXJsonParam, is_show=True): parameter = dict() # 需要kerberos验证的hive if self.ds.rw_type =='ailab': parameter['defaultFS'] = config.get('HADOOP_INNER', 'default_fs') hadoop, kerberos = _build_hadoop_configs() if len(hadoop) > 0: parameter['hadoopConfig'] = hadoop parameter.update(kerberos) elif self.ds.rw_type == 'datalake': #读取数据湖的配置 parameter['defaultFS'] = config.get('LAKE_HADOOP_INNER', 'default_fs') hadoop, kerberos = _build_hadoop_configs() if len(hadoop) > 0: parameter['hadoopConfig'] = hadoop parameter.update(kerberos) else: parameter['defaultFS'] = param.hive_writer.writer_default_fs parameter['fileType'] = param.hive_writer.writer_file_type parameter['path'] = param.hive_writer.writer_path.strip() parameter['fileName'] = param.hive_writer.writer_filename parameter['writeMode'] = param.hive_writer.writer_mode parameter['fieldDelimiter'] = param.hive_writer.writer_field_delimiter.strip() parameter['column'] = self._build_column(param.writer_columns) return parameter def build(self, param: DataXJsonParam, is_show=True): writer = dict() parameter = self.build_parameter(param, is_show) writer['name'] = self.name writer['parameter'] = parameter return writer