engine.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. from typing import Any, List
  2. from app import models
  3. from app import schemas
  4. from app.core.datax.rdbms import RdbmsReader, RdbmsWriter
  5. from app.core.datax.hdfs import *
  6. class ReaderFactory:
  7. @staticmethod
  8. def get_reader(ds: models.JobJdbcDatasource):
  9. if ds.datasource == 'mysql':
  10. return RdbmsReader(ds)
  11. elif ds.datasource == 'hive':
  12. return HdfsReader(ds)
  13. else:
  14. raise Exception('Unimplemented Reader')
  15. class WriterFactory:
  16. @staticmethod
  17. def get_writer(ds: models.JobJdbcDatasource):
  18. if ds.datasource == 'mysql':
  19. return RdbmsWriter(ds)
  20. elif ds.datasource == 'hive':
  21. return HdfsWriter(ds)
  22. else:
  23. raise Exception('Unimplemented Writer')
  24. class DataXEngine:
  25. def build_job(self, ds_reader: models.JobJdbcDatasource,
  26. ds_writer: models.JobJdbcDatasource,
  27. param: schemas.DataXJsonParam, is_show=True) -> dict:
  28. res = dict()
  29. content = self.build_content(ds_reader, ds_writer, param, is_show)
  30. setting = self.build_setting()
  31. res['job'] = {
  32. 'content': content,
  33. 'setting': setting
  34. }
  35. return res
  36. def build_content(self, ds_reader: models.JobJdbcDatasource,
  37. ds_writer: models.JobJdbcDatasource,
  38. param: schemas.DataXJsonParam, is_show) -> List[Any]:
  39. reader = ReaderFactory.get_reader(ds_reader)
  40. writer = WriterFactory.get_writer(ds_writer)
  41. res = dict()
  42. res['reader'] = reader.build(param, is_show)
  43. print("===",param)
  44. res['writer'] = writer.build(param, is_show)
  45. return [res]
  46. def build_setting(self):
  47. return {
  48. "speed": {
  49. "channel": "5"
  50. }
  51. # },
  52. # "errorLimit": {
  53. # "record": 0
  54. # }
  55. }