123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- from typing import List
- from app.core.datax.base import ReaderBase, WriterBase
- from app.schemas.datax_json import DataXJsonParam, RWDataSource
- from app.utils import *
- from configs.settings import config
- import json
- {
- "writer": {
- "name": "hdfswriter",
- "parameter": {
- "defaultFS": "hdfs://192.168.199.107:9000",
- "fileType": "text",
- "path": "/usr/hive/warehouse/test_1",
- "fileName": "test_1",
- "writeMode": "append",
- "fieldDelimiter": "|",
- "column": [
- {
- "name": "id",
- "type": "int"
- },
- {
- "name": "ssn",
- "type": "varchar"
- },
- {
- "name": "test2",
- "type": "int"
- }
- ]
- }
- }
- }
- {
- "reader": {
- "name": "hdfsreader",
- "parameter": {
- "path": "/usr/hive/warehouse/grades/*",
- "defaultFS": "hdfs://192.168.199.107:9000",
- "fileType": "csv",
- "fieldDelimiter": ",",
- "column": [
- {
- "index": 0,
- "type": "long"
- },
- {
- "index": 3,
- "type": "string"
- },
- {
- "index": 5,
- "type": "long"
- }
- ]
- }
- }
- }
- def _build_hadoop_configs():
- hadoop = config.get('HADOOP_INNER', 'hadoop_config')
- kerberos = config.get('HADOOP_INNER', 'kerberos_config')
- return json.loads(hadoop), json.loads(kerberos)
- class HdfsReader(ReaderBase):
- def __init__(self, ds: RWDataSource):
- ReaderBase.__init__(self, ds)
- if ds.datasource == 'hive':
- self.name = 'hdfsreader'
- else:
- raise Exception('Unimplemented HdfsReader')
- def _build_column(self, columns: List[dict]):
- res = []
- for column in columns:
- tmp = dict()
- index, name, type = column.split(':')
- tmp['index'] = index
- tmp['type'] = self._convert_type(type)
- res.append(tmp)
- if not res:
- raise Exception('No column found')
- return res
- def _convert_type(self, type):
- if 'int' in type.lower().strip():
- return 'long'
- elif 'varchar' in type.lower().strip():
- return 'string'
- return 'string'
- def build_parameter(self, param: DataXJsonParam, is_show=True):
- parameter = dict()
- # 需要kerberos验证的hive
- if self.ds.rw_type =='ailab':
- parameter['defaultFS'] = config.get('HADOOP_INNER', 'default_fs')
- hadoop, kerberos = _build_hadoop_configs()
- if len(hadoop) > 0:
- parameter['hadoopConfig'] = hadoop
- parameter.update(kerberos)
- elif self.ds.rw_type == 'datalake':
- #读取数据湖的配置
- parameter['defaultFS'] = config.get('LAKE_HADOOP_INNER', 'default_fs')
- hadoop, kerberos = _build_hadoop_configs()
- if len(hadoop) > 0:
- parameter['hadoopConfig'] = hadoop
- parameter.update(kerberos)
- else:
- parameter['defaultFS'] = param.hive_reader.reader_default_fs
- parameter['path'] = param.hive_reader.reader_path.strip()
- parameter['fileType'] = param.hive_reader.reader_file_type
- parameter['fieldDelimiter'] = param.hive_reader.reader_field_delimiter.strip()
- parameter['column'] = self._build_column(param.reader_columns)
- return parameter
- def build(self, param: DataXJsonParam, is_show=True):
- reader = dict()
- parameter = self.build_parameter(param, is_show)
- reader['name'] = self.name
- reader['parameter'] = parameter
- return reader
- class HdfsWriter(WriterBase):
- def __init__(self, ds: RWDataSource):
- WriterBase.__init__(self, ds)
- if ds.datasource == 'hive':
- self.name = 'hdfswriter'
- else:
- raise Exception('Unimplemented HdfsWriter')
- def _build_column(self, columns: List[dict]):
- res = []
- for column in columns:
- tmp = dict()
- _, name, type = column.split(':')
- tmp['name'] = name
- tmp['type'] = type
- res.append(tmp)
- if not res:
- raise Exception('No column found')
- return res
- def build_parameter(self, param: DataXJsonParam, is_show=True):
- parameter = dict()
- # 需要kerberos验证的hive
- if self.ds.rw_type =='ailab':
- parameter['defaultFS'] = config.get('HADOOP_INNER', 'default_fs')
- hadoop, kerberos = _build_hadoop_configs()
- if len(hadoop) > 0:
- parameter['hadoopConfig'] = hadoop
- parameter.update(kerberos)
- elif self.ds.rw_type == 'datalake':
- #读取数据湖的配置
- parameter['defaultFS'] = config.get('LAKE_HADOOP_INNER', 'default_fs')
- hadoop, kerberos = _build_hadoop_configs()
- if len(hadoop) > 0:
- parameter['hadoopConfig'] = hadoop
- parameter.update(kerberos)
- else:
- parameter['defaultFS'] = param.hive_writer.writer_default_fs
- parameter['fileType'] = param.hive_writer.writer_file_type
- parameter['path'] = param.hive_writer.writer_path.strip()
- parameter['fileName'] = param.hive_writer.writer_filename
- parameter['writeMode'] = param.hive_writer.writer_mode
- parameter['fieldDelimiter'] = param.hive_writer.writer_field_delimiter.strip()
- parameter['column'] = self._build_column(param.writer_columns)
- return parameter
- def build(self, param: DataXJsonParam, is_show=True):
- writer = dict()
- parameter = self.build_parameter(param, is_show)
- writer['name'] = self.name
- writer['parameter'] = parameter
- return writer
|