1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162 |
- from typing import Any, List
- from app import models
- from app import schemas
- from app.core.datax.rdbms import RdbmsReader, RdbmsWriter
- from app.core.datax.hdfs import *
- from app.models import database
- class ReaderFactory:
- @staticmethod
- def get_reader(ds: models.JobJdbcDatasource):
- if ds.datasource == 'mysql':
- return RdbmsReader(ds)
- elif ds.datasource == 'hive':
- return HdfsReader(ds)
- else:
- raise Exception('Unimplemented Reader')
- class WriterFactory:
- @staticmethod
- def get_writer(ds: models.JobJdbcDatasource):
- if ds.datasource == 'mysql':
- return RdbmsWriter(ds)
- elif ds.datasource == 'hive':
- return HdfsWriter(ds)
- else:
- raise Exception('Unimplemented Writer')
- class DataXEngine:
- def build_job(self, ds_reader: models.JobJdbcDatasource,
- ds_writer: models.JobJdbcDatasource,
- param: schemas.DataXJsonParam, is_show=True) -> dict:
- res = dict()
- content = self.build_content(ds_reader, ds_writer, param, is_show)
- setting = self.build_setting()
- res['job'] = {
- 'content': content,
- 'setting': setting
- }
- return res
- def build_content(self, ds_reader: models.JobJdbcDatasource,
- ds_writer: models.JobJdbcDatasource,
- param: schemas.DataXJsonParam, is_show) -> List[Any]:
- reader = ReaderFactory.get_reader(ds_reader)
- writer = WriterFactory.get_writer(ds_writer)
- res = dict()
- res['reader'] = reader.build(param, is_show)
- res['writer'] = writer.build(param, is_show)
- return [res]
- def build_setting(self):
- return {
- 'speed': {
- 'channel': '1'
- }
- }
|