from typing import Any, List from app import schemas from app.core.datax.rdbms import RdbmsReader, RdbmsWriter from app.core.datax.hdfs import * class ReaderFactory: @staticmethod def get_reader(ds: schemas.RWDataSource): if ds.datasource == 'mysql': return RdbmsReader(ds) elif ds.datasource == 'hive': return HdfsReader(ds) else: raise Exception('Unimplemented Reader') class WriterFactory: @staticmethod def get_writer(ds: schemas.RWDataSource): if ds.datasource == 'mysql': return RdbmsWriter(ds) elif ds.datasource == 'hive': return HdfsWriter(ds) else: raise Exception('Unimplemented Writer') class DataXEngine: def build_job(self, ds_reader: schemas.RWDataSource, ds_writer: schemas.RWDataSource, param: schemas.DataXJsonParam, is_show=True) -> dict: res = dict() content = self.build_content(ds_reader, ds_writer, param, is_show) setting = self.build_setting() res['job'] = { 'content': content, 'setting': setting } return res def build_content(self, ds_reader: schemas.RWDataSource, ds_writer: schemas.RWDataSource, param: schemas.DataXJsonParam, is_show) -> List[Any]: reader = ReaderFactory.get_reader(ds_reader) writer = WriterFactory.get_writer(ds_writer) res = dict() res['reader'] = reader.build(param, is_show) print("===",param) res['writer'] = writer.build(param, is_show) return [res] def build_setting(self): return { "speed": { "channel": "5" } # }, # "errorLimit": { # "record": 0 # } }