123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- from app.core.datax.base import ReaderBase, WriterBase
- from app.schemas.datax_json import DataXJsonParam, RWDataSource
- from app.utils import *
- {
- "reader": {
- "name": "mysqlreader",
- "parameter": {
- "username": "aJKK2B8UuyIho5zzrGAgeA==",
- "password": "aJKK2B8UuyIho5zzrGAgeA==",
- "connection": [
- {
- "querySql": [
- "select id, app_name, title, address_type from job_group_copy1"
- ],
- "jdbcUrl": ["jdbc:mysql://192.168.199.107:10086/datax_web"]
- }
- ]
- }
- }
- }
- {
- "job": {
- "content": [
- {
- "reader": {
- "name": "mysqlreader",
- "parameter": {
- "column": [],
- "connection": [
- {
- "jdbcUrl": [],
- "table": []
- }
- ],
- "password": "",
- "username": "",
- "where": ""
- }
- },
- "writer": {
- "name": "mysqlwriter",
- "parameter": {
- "column": [],
- "connection": [
- {
- "jdbcUrl": "",
- "table": []
- }
- ],
- "password": "",
- "preSql": [],
- "session": [],
- "username": "",
- "writeMode": ""
- }
- }
- }
- ],
- "setting": {
- "speed": {
- "channel": ""
- }
- }
- }
- }
- class RdbmsReader(ReaderBase):
- def __init__(self, ds: RWDataSource):
- ReaderBase.__init__(self, ds)
- if ds.datasource == 'mysql':
- self.name = 'mysqlreader'
- else:
- raise Exception('Unimplemented RdbmsReader')
- def build_parameter(self, param: DataXJsonParam, is_show=True):
- parameter = dict()
- connection = list()
- querySql = list()
- jdbcUrl = list()
- jdbcUrl.append(self.ds.jdbc_url)
- # querySql 和 table 只能有一个
- if param.rdbms_reader.query_sql:
- querySql.append(param.rdbms_reader.query_sql)
- connection.append({'querySql': querySql, 'jdbcUrl': jdbcUrl})
- # 忽略table、column、where
- else:
- if not param.reader_tables or not param.reader_columns:
- raise Exception('表名和字段名不能为空')
- table = param.reader_tables
- connection.append({'jdbcUrl': jdbcUrl, 'table': table})
- parameter['column'] = list(map(lambda x: x.split(':')[1], param.reader_columns))
- parameter['where'] = param.rdbms_reader.where_param
- parameter['splitPk'] = param.rdbms_reader.reader_split_pk
- parameter['connection'] = connection
- # 前端显示用
- if is_show:
- parameter['username'] = self.ds.jdbc_username
- parameter['password'] = self.ds.jdbc_password
- else:
- username, password = decode_user(self.ds.jdbc_username, self.ds.jdbc_password)
- parameter['username'] = username
- parameter['password'] = password
- return parameter
- def build(self, param: DataXJsonParam, is_show=True):
- reader = dict()
- parameter = self.build_parameter(param, is_show)
- reader['name'] = self.name
- reader['parameter'] = parameter
- return reader
- class RdbmsWriter(WriterBase):
- def __init__(self, ds: RWDataSource):
- WriterBase.__init__(self, ds)
- if ds.datasource == 'mysql':
- self.name = 'mysqlwriter'
- else:
- raise Exception('Unimplemented RdbmsWriter')
- def build_parameter(self, param: DataXJsonParam, is_show=True):
- parameter = dict()
- connection = list()
- connection.append({'jdbcUrl': self.ds.jdbc_url, 'table': param.writer_tables})
- parameter['connection'] = connection
- parameter['column'] = list(map(lambda x: x.split(':')[1], param.writer_columns))
- if is_show:
- parameter['username'] = self.ds.jdbc_username
- parameter['password'] = self.ds.jdbc_password
- else:
- username, password = decode_user(self.ds.jdbc_username, self.ds.jdbc_password)
- parameter['username'] = username
- parameter['password'] = password
- parameter['preSql'] = [param.rdbms_writer.pre_sql]
- parameter['postSql'] = [param.rdbms_writer.post_sql]
- return parameter
- def build(self, param: DataXJsonParam, is_show=True):
- writer = dict()
- parameter = self.build_parameter(param, is_show)
- writer['name'] = self.name
- writer['parameter'] = parameter
- return writer
|