12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364 |
- from typing import Any, List
- from app import schemas
- from app.core.datax.rdbms import RdbmsReader, RdbmsWriter
- from app.core.datax.hdfs import *
- class ReaderFactory:
- @staticmethod
- def get_reader(ds: schemas.RWDataSource):
- if ds.datasource == 'mysql':
- return RdbmsReader(ds)
- elif ds.datasource == 'hive':
- return HdfsReader(ds)
- else:
- raise Exception('Unimplemented Reader')
- class WriterFactory:
- @staticmethod
- def get_writer(ds: schemas.RWDataSource):
- if ds.datasource == 'mysql':
- return RdbmsWriter(ds)
- elif ds.datasource == 'hive':
- return HdfsWriter(ds)
- else:
- raise Exception('Unimplemented Writer')
- class DataXEngine:
- def build_job(self, ds_reader: schemas.RWDataSource,
- ds_writer: schemas.RWDataSource,
- param: schemas.DataXJsonParam, is_show=True) -> dict:
- res = dict()
- content = self.build_content(ds_reader, ds_writer, param, is_show)
- setting = self.build_setting()
- res['job'] = {
- 'content': content,
- 'setting': setting
- }
- return res
- def build_content(self, ds_reader: schemas.RWDataSource,
- ds_writer: schemas.RWDataSource,
- param: schemas.DataXJsonParam, is_show) -> List[Any]:
- reader = ReaderFactory.get_reader(ds_reader)
- writer = WriterFactory.get_writer(ds_writer)
- res = dict()
- res['reader'] = reader.build(param, is_show)
- print("===",param)
- res['writer'] = writer.build(param, is_show)
- return [res]
- def build_setting(self):
- return {
- "speed": {
- "channel": "5"
- }
- # },
- # "errorLimit": {
- # "record": 0
- # }
- }
|