hdfs.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. from typing import List
  2. from app.core.datax.base import ReaderBase, WriterBase
  3. from app.schemas.datax_json import DataXJsonParam, RWDataSource
  4. from app.utils import *
  5. from configs.settings import config
  6. import json
  7. {
  8. "writer": {
  9. "name": "hdfswriter",
  10. "parameter": {
  11. "defaultFS": "hdfs://192.168.199.107:9000",
  12. "fileType": "text",
  13. "path": "/usr/hive/warehouse/test_1",
  14. "fileName": "test_1",
  15. "writeMode": "append",
  16. "fieldDelimiter": "|",
  17. "column": [
  18. {
  19. "name": "id",
  20. "type": "int"
  21. },
  22. {
  23. "name": "ssn",
  24. "type": "varchar"
  25. },
  26. {
  27. "name": "test2",
  28. "type": "int"
  29. }
  30. ]
  31. }
  32. }
  33. }
  34. {
  35. "reader": {
  36. "name": "hdfsreader",
  37. "parameter": {
  38. "path": "/usr/hive/warehouse/grades/*",
  39. "defaultFS": "hdfs://192.168.199.107:9000",
  40. "fileType": "csv",
  41. "fieldDelimiter": ",",
  42. "column": [
  43. {
  44. "index": 0,
  45. "type": "long"
  46. },
  47. {
  48. "index": 3,
  49. "type": "string"
  50. },
  51. {
  52. "index": 5,
  53. "type": "long"
  54. }
  55. ]
  56. }
  57. }
  58. }
  59. def _build_hadoop_configs():
  60. hadoop = config.get('HADOOP_INNER', 'hadoop_config')
  61. kerberos = config.get('HADOOP_INNER', 'kerberos_config')
  62. return json.loads(hadoop), json.loads(kerberos)
  63. class HdfsReader(ReaderBase):
  64. def __init__(self, ds: RWDataSource):
  65. ReaderBase.__init__(self, ds)
  66. if ds.datasource == 'hive':
  67. self.name = 'hdfsreader'
  68. else:
  69. raise Exception('Unimplemented HdfsReader')
  70. def _build_column(self, columns: List[dict]):
  71. res = []
  72. for column in columns:
  73. tmp = dict()
  74. index, name, type = column.split(':')
  75. tmp['index'] = index
  76. tmp['type'] = self._convert_type(type)
  77. res.append(tmp)
  78. if not res:
  79. raise Exception('No column found')
  80. return res
  81. def _convert_type(self, type):
  82. if 'int' in type.lower().strip():
  83. return 'long'
  84. elif 'varchar' in type.lower().strip():
  85. return 'string'
  86. return 'string'
  87. def build_parameter(self, param: DataXJsonParam, is_show=True):
  88. parameter = dict()
  89. # 需要kerberos验证的hive
  90. if self.ds.rw_type =='ailab':
  91. parameter['defaultFS'] = config.get('HADOOP_INNER', 'default_fs')
  92. hadoop, kerberos = _build_hadoop_configs()
  93. if len(hadoop) > 0:
  94. parameter['hadoopConfig'] = hadoop
  95. parameter.update(kerberos)
  96. elif self.ds.rw_type == 'datalake':
  97. #读取数据湖的配置
  98. parameter['defaultFS'] = config.get('LAKE_HADOOP_INNER', 'default_fs')
  99. hadoop, kerberos = _build_hadoop_configs()
  100. if len(hadoop) > 0:
  101. parameter['hadoopConfig'] = hadoop
  102. parameter.update(kerberos)
  103. else:
  104. parameter['defaultFS'] = param.hive_reader.reader_default_fs
  105. parameter['path'] = param.hive_reader.reader_path.strip()
  106. parameter['fileType'] = param.hive_reader.reader_file_type
  107. parameter['fieldDelimiter'] = param.hive_reader.reader_field_delimiter.strip()
  108. parameter['column'] = self._build_column(param.reader_columns)
  109. return parameter
  110. def build(self, param: DataXJsonParam, is_show=True):
  111. reader = dict()
  112. parameter = self.build_parameter(param, is_show)
  113. reader['name'] = self.name
  114. reader['parameter'] = parameter
  115. return reader
  116. class HdfsWriter(WriterBase):
  117. def __init__(self, ds: RWDataSource):
  118. WriterBase.__init__(self, ds)
  119. if ds.datasource == 'hive':
  120. self.name = 'hdfswriter'
  121. else:
  122. raise Exception('Unimplemented HdfsWriter')
  123. def _build_column(self, columns: List[dict]):
  124. res = []
  125. for column in columns:
  126. tmp = dict()
  127. _, name, type = column.split(':')
  128. tmp['name'] = name
  129. tmp['type'] = type
  130. res.append(tmp)
  131. if not res:
  132. raise Exception('No column found')
  133. return res
  134. def build_parameter(self, param: DataXJsonParam, is_show=True):
  135. parameter = dict()
  136. # 需要kerberos验证的hive
  137. if self.ds.rw_type =='ailab':
  138. parameter['defaultFS'] = config.get('HADOOP_INNER', 'default_fs')
  139. hadoop, kerberos = _build_hadoop_configs()
  140. if len(hadoop) > 0:
  141. parameter['hadoopConfig'] = hadoop
  142. parameter.update(kerberos)
  143. elif self.ds.rw_type == 'datalake':
  144. #读取数据湖的配置
  145. parameter['defaultFS'] = config.get('LAKE_HADOOP_INNER', 'default_fs')
  146. hadoop, kerberos = _build_hadoop_configs()
  147. if len(hadoop) > 0:
  148. parameter['hadoopConfig'] = hadoop
  149. parameter.update(kerberos)
  150. else:
  151. parameter['defaultFS'] = param.hive_writer.writer_default_fs
  152. parameter['fileType'] = param.hive_writer.writer_file_type
  153. parameter['path'] = param.hive_writer.writer_path.strip()
  154. parameter['fileName'] = param.hive_writer.writer_filename
  155. parameter['writeMode'] = param.hive_writer.writer_mode
  156. parameter['fieldDelimiter'] = param.hive_writer.writer_field_delimiter.strip()
  157. parameter['column'] = self._build_column(param.writer_columns)
  158. return parameter
  159. def build(self, param: DataXJsonParam, is_show=True):
  160. writer = dict()
  161. parameter = self.build_parameter(param, is_show)
  162. writer['name'] = self.name
  163. writer['parameter'] = parameter
  164. return writer