from app.core.datax.base import ReaderBase, WriterBase from app.schemas.datax_json import DataXJsonParam, RWDataSource from app.utils import * { "reader": { "name": "mysqlreader", "parameter": { "username": "aJKK2B8UuyIho5zzrGAgeA==", "password": "aJKK2B8UuyIho5zzrGAgeA==", "connection": [ { "querySql": [ "select id, app_name, title, address_type from job_group_copy1" ], "jdbcUrl": ["jdbc:mysql://192.168.199.107:10086/datax_web"] } ] } } } { "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "column": [], "connection": [ { "jdbcUrl": [], "table": [] } ], "password": "", "username": "", "where": "" } }, "writer": { "name": "mysqlwriter", "parameter": { "column": [], "connection": [ { "jdbcUrl": "", "table": [] } ], "password": "", "preSql": [], "session": [], "username": "", "writeMode": "" } } } ], "setting": { "speed": { "channel": "" } } } } class RdbmsReader(ReaderBase): def __init__(self, ds: RWDataSource): ReaderBase.__init__(self, ds) if ds.datasource == 'mysql': self.name = 'mysqlreader' else: raise Exception('Unimplemented RdbmsReader') def build_parameter(self, param: DataXJsonParam, is_show=True): parameter = dict() connection = list() querySql = list() jdbcUrl = list() jdbcUrl.append(self.ds.jdbc_url) # querySql 和 table 只能有一个 if param.rdbms_reader.query_sql: querySql.append(param.rdbms_reader.query_sql) connection.append({'querySql': querySql, 'jdbcUrl': jdbcUrl}) # 忽略table、column、where else: if not param.reader_tables or not param.reader_columns: raise Exception('表名和字段名不能为空') table = param.reader_tables connection.append({'jdbcUrl': jdbcUrl, 'table': table}) parameter['column'] = list(map(lambda x: x.split(':')[1], param.reader_columns)) parameter['where'] = param.rdbms_reader.where_param parameter['splitPk'] = param.rdbms_reader.reader_split_pk parameter['connection'] = connection # 前端显示用 if is_show: parameter['username'] = self.ds.jdbc_username parameter['password'] = self.ds.jdbc_password else: username, password = decode_user(self.ds.jdbc_username, self.ds.jdbc_password) parameter['username'] = username parameter['password'] = password return parameter def build(self, param: DataXJsonParam, is_show=True): reader = dict() parameter = self.build_parameter(param, is_show) reader['name'] = self.name reader['parameter'] = parameter return reader class RdbmsWriter(WriterBase): def __init__(self, ds: RWDataSource): WriterBase.__init__(self, ds) if ds.datasource == 'mysql': self.name = 'mysqlwriter' else: raise Exception('Unimplemented RdbmsWriter') def build_parameter(self, param: DataXJsonParam, is_show=True): parameter = dict() connection = list() connection.append({'jdbcUrl': self.ds.jdbc_url, 'table': param.writer_tables}) parameter['connection'] = connection parameter['column'] = list(map(lambda x: x.split(':')[1], param.writer_columns)) if is_show: parameter['username'] = self.ds.jdbc_username parameter['password'] = self.ds.jdbc_password else: username, password = decode_user(self.ds.jdbc_username, self.ds.jdbc_password) parameter['username'] = username parameter['password'] = password parameter['preSql'] = [param.rdbms_writer.pre_sql] parameter['postSql'] = [param.rdbms_writer.post_sql] return parameter def build(self, param: DataXJsonParam, is_show=True): writer = dict() parameter = self.build_parameter(param, is_show) writer['name'] = self.name writer['parameter'] = parameter return writer