engine.py 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. from typing import Any, List
  2. from app import models
  3. from app import schemas
  4. from app.core.datax.rdbms import RdbmsReader, RdbmsWriter
  5. from app.core.datax.hdfs import *
  6. from app.models import database
  7. class ReaderFactory:
  8. @staticmethod
  9. def get_reader(ds: models.JobJdbcDatasource):
  10. if ds.datasource == 'mysql':
  11. return RdbmsReader(ds)
  12. elif ds.datasource == 'hive':
  13. return HdfsReader(ds)
  14. else:
  15. raise Exception('Unimplemented Reader')
  16. class WriterFactory:
  17. @staticmethod
  18. def get_writer(ds: models.JobJdbcDatasource):
  19. if ds.datasource == 'mysql':
  20. return RdbmsWriter(ds)
  21. elif ds.datasource == 'hive':
  22. return HdfsWriter(ds)
  23. else:
  24. raise Exception('Unimplemented Writer')
  25. class DataXEngine:
  26. def build_job(self, ds_reader: models.JobJdbcDatasource,
  27. ds_writer: models.JobJdbcDatasource,
  28. param: schemas.DataXJsonParam, is_show=True) -> dict:
  29. res = dict()
  30. content = self.build_content(ds_reader, ds_writer, param, is_show)
  31. setting = self.build_setting()
  32. res['job'] = {
  33. 'content': content,
  34. 'setting': setting
  35. }
  36. return res
  37. def build_content(self, ds_reader: models.JobJdbcDatasource,
  38. ds_writer: models.JobJdbcDatasource,
  39. param: schemas.DataXJsonParam, is_show) -> List[Any]:
  40. reader = ReaderFactory.get_reader(ds_reader)
  41. writer = WriterFactory.get_writer(ds_writer)
  42. res = dict()
  43. res['reader'] = reader.build(param, is_show)
  44. res['writer'] = writer.build(param, is_show)
  45. return [res]
  46. def build_setting(self):
  47. return {
  48. 'speed': {
  49. 'channel': '1'
  50. }
  51. }