rdbms.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. from app.core.datax.base import ReaderBase, WriterBase
  2. from app.schemas.datax_json import DataXJsonParam, RWDataSource
  3. from app.utils import *
  4. {
  5. "reader": {
  6. "name": "mysqlreader",
  7. "parameter": {
  8. "username": "aJKK2B8UuyIho5zzrGAgeA==",
  9. "password": "aJKK2B8UuyIho5zzrGAgeA==",
  10. "connection": [
  11. {
  12. "querySql": [
  13. "select id, app_name, title, address_type from job_group_copy1"
  14. ],
  15. "jdbcUrl": ["jdbc:mysql://192.168.199.107:10086/datax_web"]
  16. }
  17. ]
  18. }
  19. }
  20. }
  21. {
  22. "job": {
  23. "content": [
  24. {
  25. "reader": {
  26. "name": "mysqlreader",
  27. "parameter": {
  28. "column": [],
  29. "connection": [
  30. {
  31. "jdbcUrl": [],
  32. "table": []
  33. }
  34. ],
  35. "password": "",
  36. "username": "",
  37. "where": ""
  38. }
  39. },
  40. "writer": {
  41. "name": "mysqlwriter",
  42. "parameter": {
  43. "column": [],
  44. "connection": [
  45. {
  46. "jdbcUrl": "",
  47. "table": []
  48. }
  49. ],
  50. "password": "",
  51. "preSql": [],
  52. "session": [],
  53. "username": "",
  54. "writeMode": ""
  55. }
  56. }
  57. }
  58. ],
  59. "setting": {
  60. "speed": {
  61. "channel": ""
  62. }
  63. }
  64. }
  65. }
  66. class RdbmsReader(ReaderBase):
  67. def __init__(self, ds: RWDataSource):
  68. ReaderBase.__init__(self, ds)
  69. if ds.datasource == 'mysql':
  70. self.name = 'mysqlreader'
  71. else:
  72. raise Exception('Unimplemented RdbmsReader')
  73. def build_parameter(self, param: DataXJsonParam, is_show=True):
  74. parameter = dict()
  75. connection = list()
  76. querySql = list()
  77. jdbcUrl = list()
  78. jdbcUrl.append(self.ds.jdbc_url)
  79. # querySql 和 table 只能有一个
  80. if param.rdbms_reader.query_sql:
  81. querySql.append(param.rdbms_reader.query_sql)
  82. connection.append({'querySql': querySql, 'jdbcUrl': jdbcUrl})
  83. # 忽略table、column、where
  84. else:
  85. if not param.reader_tables or not param.reader_columns:
  86. raise Exception('表名和字段名不能为空')
  87. table = param.reader_tables
  88. connection.append({'jdbcUrl': jdbcUrl, 'table': table})
  89. parameter['column'] = list(map(lambda x: x.split(':')[1], param.reader_columns))
  90. parameter['where'] = param.rdbms_reader.where_param
  91. parameter['splitPk'] = param.rdbms_reader.reader_split_pk
  92. parameter['connection'] = connection
  93. # 前端显示用
  94. if is_show:
  95. parameter['username'] = self.ds.jdbc_username
  96. parameter['password'] = self.ds.jdbc_password
  97. else:
  98. username, password = decode_user(self.ds.jdbc_username, self.ds.jdbc_password)
  99. parameter['username'] = username
  100. parameter['password'] = password
  101. return parameter
  102. def build(self, param: DataXJsonParam, is_show=True):
  103. reader = dict()
  104. parameter = self.build_parameter(param, is_show)
  105. reader['name'] = self.name
  106. reader['parameter'] = parameter
  107. return reader
  108. class RdbmsWriter(WriterBase):
  109. def __init__(self, ds: RWDataSource):
  110. WriterBase.__init__(self, ds)
  111. if ds.datasource == 'mysql':
  112. self.name = 'mysqlwriter'
  113. else:
  114. raise Exception('Unimplemented RdbmsWriter')
  115. def build_parameter(self, param: DataXJsonParam, is_show=True):
  116. parameter = dict()
  117. connection = list()
  118. connection.append({'jdbcUrl': self.ds.jdbc_url, 'table': param.writer_tables})
  119. parameter['connection'] = connection
  120. parameter['column'] = list(map(lambda x: x.split(':')[1], param.writer_columns))
  121. if is_show:
  122. parameter['username'] = self.ds.jdbc_username
  123. parameter['password'] = self.ds.jdbc_password
  124. else:
  125. username, password = decode_user(self.ds.jdbc_username, self.ds.jdbc_password)
  126. parameter['username'] = username
  127. parameter['password'] = password
  128. parameter['preSql'] = [param.rdbms_writer.pre_sql]
  129. parameter['postSql'] = [param.rdbms_writer.post_sql]
  130. return parameter
  131. def build(self, param: DataXJsonParam, is_show=True):
  132. writer = dict()
  133. parameter = self.build_parameter(param, is_show)
  134. writer['name'] = self.name
  135. writer['parameter'] = parameter
  136. return writer