hdfs.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. from typing import List
  2. from app.core.datax.base import ReaderBase, WriterBase
  3. from app.models import JobJdbcDatasource
  4. from app.schemas.datax_json import DataXJsonParam
  5. from app.utils import *
  6. {
  7. "writer": {
  8. "name": "hdfswriter",
  9. "parameter": {
  10. "defaultFS": "hdfs://192.168.199.107:9000",
  11. "fileType": "text",
  12. "path": "/usr/hive/warehouse/test_1",
  13. "fileName": "test_1",
  14. "writeMode": "append",
  15. "fieldDelimiter": "|",
  16. "column": [
  17. {
  18. "name": "id",
  19. "type": "int"
  20. },
  21. {
  22. "name": "ssn",
  23. "type": "varchar"
  24. },
  25. {
  26. "name": "test2",
  27. "type": "int"
  28. }
  29. ]
  30. }
  31. }
  32. }
  33. {
  34. "reader": {
  35. "name": "hdfsreader",
  36. "parameter": {
  37. "path": "/usr/hive/warehouse/grades/*",
  38. "defaultFS": "hdfs://192.168.199.107:9000",
  39. "fileType": "csv",
  40. "fieldDelimiter": ",",
  41. "column": [
  42. {
  43. "index": 0,
  44. "type": "long"
  45. },
  46. {
  47. "index": 3,
  48. "type": "string"
  49. },
  50. {
  51. "index": 5,
  52. "type": "long"
  53. }
  54. ]
  55. }
  56. }
  57. }
  58. class HdfsReader(WriterBase):
  59. def __init__(self, ds: JobJdbcDatasource):
  60. WriterBase.__init__(self, ds)
  61. if ds.datasource == 'hive':
  62. self.name = 'hdfsreader'
  63. else:
  64. raise Exception('Unimplemented HdfsReader')
  65. def _build_column(self, columns: List[dict]):
  66. res = []
  67. for column in columns:
  68. tmp = dict()
  69. index, name, type = column.split(':')
  70. tmp['index'] = index
  71. tmp['type'] = self._convert_type(type)
  72. res.append(tmp)
  73. if not res:
  74. raise Exception('No column found')
  75. return res
  76. def _convert_type(self, type):
  77. if type.lower() == 'int':
  78. return 'long'
  79. elif type.lower() == 'varchar':
  80. return 'string'
  81. def build_parameter(self, param: DataXJsonParam, is_show=True):
  82. parameter = dict()
  83. parameter['path'] = param.hive_reader.reader_path
  84. parameter['defaultFS'] = param.hive_reader.reader_default_fs
  85. parameter['fileType'] = param.hive_reader.reader_file_type
  86. parameter['fieldDelimiter'] = param.hive_reader.reader_field_delimiter
  87. parameter['column'] = self._build_column(param.reader_columns)
  88. return parameter
  89. def build(self, param: DataXJsonParam, is_show=True):
  90. reader = dict()
  91. parameter = self.build_parameter(param, is_show)
  92. reader['name'] = self.name
  93. reader['parameter'] = parameter
  94. return reader
  95. class HdfsWriter(WriterBase):
  96. def __init__(self, ds: JobJdbcDatasource):
  97. WriterBase.__init__(self, ds)
  98. if ds.datasource == 'hive':
  99. self.name = 'hdfswriter'
  100. else:
  101. raise Exception('Unimplemented HdfsWriter')
  102. def _build_column(self, columns: List[dict]):
  103. res = []
  104. for column in columns:
  105. tmp = dict()
  106. _, name, type = column.split(':')
  107. tmp['name'] = name
  108. tmp['type'] = type
  109. res.append(tmp)
  110. if not res:
  111. raise Exception('No column found')
  112. return res
  113. def build_parameter(self, param: DataXJsonParam, is_show=True):
  114. parameter = dict()
  115. parameter['defaultFS'] = param.hive_writer.writer_default_fs
  116. parameter['fileType'] = param.hive_writer.writer_file_type
  117. parameter['path'] = param.hive_writer.writer_path
  118. parameter['fileName'] = param.hive_writer.writer_filename
  119. parameter['writeMode'] = param.hive_writer.write_mode
  120. parameter['fieldDelimiter'] = param.hive_writer.write_field_delimiter
  121. parameter['column'] = self._build_column(param.writer_columns)
  122. return parameter
  123. def build(self, param: DataXJsonParam, is_show=True):
  124. writer = dict()
  125. parameter = self.build_parameter(param, is_show)
  126. writer['name'] = self.name
  127. writer['parameter'] = parameter
  128. return writer