rdbms.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. from app.core.datax.base import ReaderBase, WriterBase
  2. from app.models import JobJdbcDatasource
  3. from app.schemas.datax_json import DataXJsonParam
  4. from app.utils import *
  5. {
  6. "reader": {
  7. "name": "mysqlreader",
  8. "parameter": {
  9. "username": "aJKK2B8UuyIho5zzrGAgeA==",
  10. "password": "aJKK2B8UuyIho5zzrGAgeA==",
  11. "connection": [
  12. {
  13. "querySql": [
  14. "select id, app_name, title, address_type from job_group_copy1"
  15. ],
  16. "jdbcUrl": ["jdbc:mysql://192.168.199.107:10086/datax_web"]
  17. }
  18. ]
  19. }
  20. }
  21. }
  22. {
  23. "job": {
  24. "content": [
  25. {
  26. "reader": {
  27. "name": "mysqlreader",
  28. "parameter": {
  29. "column": [],
  30. "connection": [
  31. {
  32. "jdbcUrl": [],
  33. "table": []
  34. }
  35. ],
  36. "password": "",
  37. "username": "",
  38. "where": ""
  39. }
  40. },
  41. "writer": {
  42. "name": "mysqlwriter",
  43. "parameter": {
  44. "column": [],
  45. "connection": [
  46. {
  47. "jdbcUrl": "",
  48. "table": []
  49. }
  50. ],
  51. "password": "",
  52. "preSql": [],
  53. "session": [],
  54. "username": "",
  55. "writeMode": ""
  56. }
  57. }
  58. }
  59. ],
  60. "setting": {
  61. "speed": {
  62. "channel": ""
  63. }
  64. }
  65. }
  66. }
  67. class RdbmsReader(ReaderBase):
  68. def __init__(self, ds: JobJdbcDatasource):
  69. ReaderBase.__init__(self, ds)
  70. if ds.datasource == 'mysql':
  71. self.name = 'mysqlreader'
  72. else:
  73. raise Exception('Unimplemented RdbmsReader')
  74. def build_parameter(self, param: DataXJsonParam, is_show=True):
  75. parameter = dict()
  76. connection = list()
  77. querySql = list()
  78. jdbcUrl = list()
  79. jdbcUrl.append(self.ds.jdbc_url)
  80. # querySql 和 table 只能有一个
  81. if param.rdbms_reader.query_sql:
  82. querySql.append(param.rdbms_reader.query_sql)
  83. connection.append({'querySql': querySql, 'jdbcUrl': jdbcUrl})
  84. # 忽略table、column、where
  85. else:
  86. if not param.reader_tables or not param.reader_columns:
  87. raise Exception('表名和字段名不能为空')
  88. table = param.reader_tables
  89. connection.append({'jdbcUrl': jdbcUrl, 'table': table})
  90. parameter['column'] = list(map(lambda x: x.split(':')[1], param.reader_columns))
  91. parameter['where'] = param.rdbms_reader.where_param
  92. parameter['splitPk'] = param.rdbms_reader.reader_split_pk
  93. parameter['connection'] = connection
  94. # 前端显示用
  95. if is_show:
  96. parameter['username'] = self.ds.jdbc_username
  97. parameter['password'] = self.ds.jdbc_password
  98. else:
  99. username, password = decode_user(self.ds.jdbc_username, self.ds.jdbc_password)
  100. parameter['username'] = username
  101. parameter['password'] = password
  102. return parameter
  103. def build(self, param: DataXJsonParam, is_show=True):
  104. reader = dict()
  105. parameter = self.build_parameter(param, is_show)
  106. reader['name'] = self.name
  107. reader['parameter'] = parameter
  108. return reader
  109. class RdbmsWriter(WriterBase):
  110. def __init__(self, ds: JobJdbcDatasource):
  111. WriterBase.__init__(self, ds)
  112. if ds.datasource == 'mysql':
  113. self.name = 'mysqlwriter'
  114. else:
  115. raise Exception('Unimplemented RdbmsWriter')
  116. def build_parameter(self, param: DataXJsonParam, is_show=True):
  117. parameter = dict()
  118. connection = list()
  119. connection.append({'jdbcUrl': self.ds.jdbc_url, 'table': param.writer_tables})
  120. parameter['connection'] = connection
  121. parameter['column'] = list(map(lambda x: x.split(':')[1], param.writer_columns))
  122. if is_show:
  123. parameter['username'] = self.ds.jdbc_username
  124. parameter['password'] = self.ds.jdbc_password
  125. else:
  126. username, password = decode_user(self.ds.jdbc_username, self.ds.jdbc_password)
  127. parameter['username'] = username
  128. parameter['password'] = password
  129. parameter['preSql'] = [param.rdbms_writer.pre_sql]
  130. parameter['postSql'] = [param.rdbms_writer.post_sql]
  131. return parameter
  132. def build(self, param: DataXJsonParam, is_show=True):
  133. writer = dict()
  134. parameter = self.build_parameter(param, is_show)
  135. writer['name'] = self.name
  136. writer['parameter'] = parameter
  137. return writer