hdfs.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. from typing import List
  2. from app.core.datax.base import ReaderBase, WriterBase
  3. from app.models import JobJdbcDatasource
  4. from app.schemas.datax_json import DataXJsonParam
  5. from app.utils import *
  6. from configs.settings import config
  7. import json
  8. {
  9. "writer": {
  10. "name": "hdfswriter",
  11. "parameter": {
  12. "defaultFS": "hdfs://192.168.199.107:9000",
  13. "fileType": "text",
  14. "path": "/usr/hive/warehouse/test_1",
  15. "fileName": "test_1",
  16. "writeMode": "append",
  17. "fieldDelimiter": "|",
  18. "column": [
  19. {
  20. "name": "id",
  21. "type": "int"
  22. },
  23. {
  24. "name": "ssn",
  25. "type": "varchar"
  26. },
  27. {
  28. "name": "test2",
  29. "type": "int"
  30. }
  31. ]
  32. }
  33. }
  34. }
  35. {
  36. "reader": {
  37. "name": "hdfsreader",
  38. "parameter": {
  39. "path": "/usr/hive/warehouse/grades/*",
  40. "defaultFS": "hdfs://192.168.199.107:9000",
  41. "fileType": "csv",
  42. "fieldDelimiter": ",",
  43. "column": [
  44. {
  45. "index": 0,
  46. "type": "long"
  47. },
  48. {
  49. "index": 3,
  50. "type": "string"
  51. },
  52. {
  53. "index": 5,
  54. "type": "long"
  55. }
  56. ]
  57. }
  58. }
  59. }
  60. def _build_hadoop_configs():
  61. hadoop = config.get('HADOOP_INNER', 'hadoop_config')
  62. kerberos = config.get('HADOOP_INNER', 'kerberos_config')
  63. return json.loads(hadoop), json.loads(kerberos)
  64. class HdfsReader(WriterBase):
  65. def __init__(self, ds: JobJdbcDatasource):
  66. WriterBase.__init__(self, ds)
  67. if ds.datasource == 'hive':
  68. self.name = 'hdfsreader'
  69. else:
  70. raise Exception('Unimplemented HdfsReader')
  71. def _build_column(self, columns: List[dict]):
  72. res = []
  73. for column in columns:
  74. tmp = dict()
  75. index, name, type = column.split(':')
  76. tmp['index'] = index
  77. tmp['type'] = self._convert_type(type)
  78. res.append(tmp)
  79. if not res:
  80. raise Exception('No column found')
  81. return res
  82. def _convert_type(self, type):
  83. if type.lower() == 'int':
  84. return 'long'
  85. elif type.lower() == 'varchar':
  86. return 'string'
  87. def build_parameter(self, param: DataXJsonParam, is_show=True):
  88. parameter = dict()
  89. # 需要kerberos验证的hive
  90. if str(self.ds.id) in config.get('HADOOP_INNER', 'datasource').split(','):
  91. parameter['defaultFS'] = config.get('HADOOP_INNER', 'default_fs')
  92. hadoop, kerberos = _build_hadoop_configs()
  93. parameter['hadoopConfig'] = hadoop
  94. parameter.update(kerberos)
  95. else:
  96. parameter['defaultFS'] = param.hive_reader.reader_default_fs
  97. parameter['path'] = param.hive_reader.reader_path
  98. parameter['fileType'] = param.hive_reader.reader_file_type
  99. parameter['fieldDelimiter'] = param.hive_reader.reader_field_delimiter
  100. parameter['column'] = self._build_column(param.reader_columns)
  101. return parameter
  102. def build(self, param: DataXJsonParam, is_show=True):
  103. reader = dict()
  104. parameter = self.build_parameter(param, is_show)
  105. reader['name'] = self.name
  106. reader['parameter'] = parameter
  107. return reader
  108. class HdfsWriter(WriterBase):
  109. def __init__(self, ds: JobJdbcDatasource):
  110. WriterBase.__init__(self, ds)
  111. if ds.datasource == 'hive':
  112. self.name = 'hdfswriter'
  113. else:
  114. raise Exception('Unimplemented HdfsWriter')
  115. def _build_column(self, columns: List[dict]):
  116. res = []
  117. for column in columns:
  118. tmp = dict()
  119. _, name, type = column.split(':')
  120. tmp['name'] = name
  121. tmp['type'] = type
  122. res.append(tmp)
  123. if not res:
  124. raise Exception('No column found')
  125. return res
  126. def build_parameter(self, param: DataXJsonParam, is_show=True):
  127. parameter = dict()
  128. # 需要kerberos验证的hive
  129. if str(self.ds.id) in config.get('HADOOP_INNER', 'datasource').split(','):
  130. parameter['defaultFS'] = config.get('HADOOP_INNER', 'default_fs')
  131. hadoop, kerberos = _build_hadoop_configs()
  132. parameter['hadoopConfig'] = hadoop
  133. parameter.update(kerberos)
  134. else:
  135. parameter['defaultFS'] = param.hive_writer.writer_default_fs
  136. parameter['fileType'] = param.hive_writer.writer_file_type
  137. parameter['path'] = param.hive_writer.writer_path
  138. parameter['fileName'] = param.hive_writer.writer_filename
  139. parameter['writeMode'] = param.hive_writer.writer_mode
  140. parameter['fieldDelimiter'] = param.hive_writer.writer_field_delimiter
  141. parameter['column'] = self._build_column(param.writer_columns)
  142. return parameter
  143. def build(self, param: DataXJsonParam, is_show=True):
  144. writer = dict()
  145. parameter = self.build_parameter(param, is_show)
  146. writer['name'] = self.name
  147. writer['parameter'] = parameter
  148. return writer