hdfs.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. from typing import List
  2. from app.core.datax.base import ReaderBase, WriterBase
  3. from app.models import JobJdbcDatasource
  4. from app.schemas.datax_json import DataXJsonParam
  5. from app.utils import *
  6. from configs.settings import config
  7. import json
  8. {
  9. "writer": {
  10. "name": "hdfswriter",
  11. "parameter": {
  12. "defaultFS": "hdfs://192.168.199.107:9000",
  13. "fileType": "text",
  14. "path": "/usr/hive/warehouse/test_1",
  15. "fileName": "test_1",
  16. "writeMode": "append",
  17. "fieldDelimiter": "|",
  18. "column": [
  19. {
  20. "name": "id",
  21. "type": "int"
  22. },
  23. {
  24. "name": "ssn",
  25. "type": "varchar"
  26. },
  27. {
  28. "name": "test2",
  29. "type": "int"
  30. }
  31. ]
  32. }
  33. }
  34. }
  35. {
  36. "reader": {
  37. "name": "hdfsreader",
  38. "parameter": {
  39. "path": "/usr/hive/warehouse/grades/*",
  40. "defaultFS": "hdfs://192.168.199.107:9000",
  41. "fileType": "csv",
  42. "fieldDelimiter": ",",
  43. "column": [
  44. {
  45. "index": 0,
  46. "type": "long"
  47. },
  48. {
  49. "index": 3,
  50. "type": "string"
  51. },
  52. {
  53. "index": 5,
  54. "type": "long"
  55. }
  56. ]
  57. }
  58. }
  59. }
  60. def _build_hadoop_configs():
  61. hadoop = config.get('HADOOP_INNER', 'hadoop_config')
  62. kerberos = config.get('HADOOP_INNER', 'kerberos_config')
  63. return json.loads(hadoop), json.loads(kerberos)
  64. class HdfsReader(WriterBase):
  65. def __init__(self, ds: JobJdbcDatasource):
  66. WriterBase.__init__(self, ds)
  67. if ds.datasource == 'hive':
  68. self.name = 'hdfsreader'
  69. else:
  70. raise Exception('Unimplemented HdfsReader')
  71. def _build_column(self, columns: List[dict]):
  72. res = []
  73. for column in columns:
  74. tmp = dict()
  75. index, name, type = column.split(':')
  76. tmp['index'] = index
  77. tmp['type'] = self._convert_type(type)
  78. res.append(tmp)
  79. if not res:
  80. raise Exception('No column found')
  81. return res
  82. def _convert_type(self, type):
  83. if 'int' in type.lower().strip():
  84. return 'long'
  85. elif 'varchar' in type.lower().strip():
  86. return 'string'
  87. return 'string'
  88. def build_parameter(self, param: DataXJsonParam, is_show=True):
  89. parameter = dict()
  90. # 需要kerberos验证的hive
  91. if str(self.ds.id) in config.get('HADOOP_INNER', 'datasource').split(','):
  92. parameter['defaultFS'] = config.get('HADOOP_INNER', 'default_fs')
  93. hadoop, kerberos = _build_hadoop_configs()
  94. parameter['hadoopConfig'] = hadoop
  95. parameter.update(kerberos)
  96. else:
  97. parameter['defaultFS'] = param.hive_reader.reader_default_fs
  98. parameter['path'] = param.hive_reader.reader_path.strip()
  99. parameter['fileType'] = param.hive_reader.reader_file_type
  100. parameter['fieldDelimiter'] = param.hive_reader.reader_field_delimiter.strip()
  101. parameter['column'] = self._build_column(param.reader_columns)
  102. return parameter
  103. def build(self, param: DataXJsonParam, is_show=True):
  104. reader = dict()
  105. parameter = self.build_parameter(param, is_show)
  106. reader['name'] = self.name
  107. reader['parameter'] = parameter
  108. return reader
  109. class HdfsWriter(WriterBase):
  110. def __init__(self, ds: JobJdbcDatasource):
  111. WriterBase.__init__(self, ds)
  112. if ds.datasource == 'hive':
  113. self.name = 'hdfswriter'
  114. else:
  115. raise Exception('Unimplemented HdfsWriter')
  116. def _build_column(self, columns: List[dict]):
  117. res = []
  118. for column in columns:
  119. tmp = dict()
  120. _, name, type = column.split(':')
  121. tmp['name'] = name
  122. tmp['type'] = type
  123. res.append(tmp)
  124. if not res:
  125. raise Exception('No column found')
  126. return res
  127. def build_parameter(self, param: DataXJsonParam, is_show=True):
  128. parameter = dict()
  129. # 需要kerberos验证的hive
  130. if str(self.ds.id) in config.get('HADOOP_INNER', 'datasource').split(','):
  131. parameter['defaultFS'] = config.get('HADOOP_INNER', 'default_fs')
  132. hadoop, kerberos = _build_hadoop_configs()
  133. parameter['hadoopConfig'] = hadoop
  134. parameter.update(kerberos)
  135. else:
  136. parameter['defaultFS'] = param.hive_writer.writer_default_fs
  137. parameter['fileType'] = param.hive_writer.writer_file_type
  138. parameter['path'] = param.hive_writer.writer_path.strip()
  139. parameter['fileName'] = param.hive_writer.writer_filename
  140. parameter['writeMode'] = param.hive_writer.writer_mode
  141. parameter['fieldDelimiter'] = param.hive_writer.writer_field_delimiter.strip()
  142. parameter['column'] = self._build_column(param.writer_columns)
  143. return parameter
  144. def build(self, param: DataXJsonParam, is_show=True):
  145. writer = dict()
  146. parameter = self.build_parameter(param, is_show)
  147. writer['name'] = self.name
  148. writer['parameter'] = parameter
  149. return writer