engine.py 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. from typing import Any, List
  2. from app import schemas
  3. from app.core.datax.rdbms import RdbmsReader, RdbmsWriter
  4. from app.core.datax.hdfs import *
  5. class ReaderFactory:
  6. @staticmethod
  7. def get_reader(ds: schemas.RWDataSource):
  8. if ds.datasource == 'mysql':
  9. return RdbmsReader(ds)
  10. elif ds.datasource == 'hive':
  11. return HdfsReader(ds)
  12. else:
  13. raise Exception('Unimplemented Reader')
  14. class WriterFactory:
  15. @staticmethod
  16. def get_writer(ds: schemas.RWDataSource):
  17. if ds.datasource == 'mysql':
  18. return RdbmsWriter(ds)
  19. elif ds.datasource == 'hive':
  20. return HdfsWriter(ds)
  21. else:
  22. raise Exception('Unimplemented Writer')
  23. class DataXEngine:
  24. def build_job(self, ds_reader: schemas.RWDataSource,
  25. ds_writer: schemas.RWDataSource,
  26. param: schemas.DataXJsonParam, is_show=True) -> dict:
  27. res = dict()
  28. content = self.build_content(ds_reader, ds_writer, param, is_show)
  29. setting = self.build_setting()
  30. res['job'] = {
  31. 'content': content,
  32. 'setting': setting
  33. }
  34. return res
  35. def build_content(self, ds_reader: schemas.RWDataSource,
  36. ds_writer: schemas.RWDataSource,
  37. param: schemas.DataXJsonParam, is_show) -> List[Any]:
  38. reader = ReaderFactory.get_reader(ds_reader)
  39. writer = WriterFactory.get_writer(ds_writer)
  40. res = dict()
  41. res['reader'] = reader.build(param, is_show)
  42. print("===",param)
  43. res['writer'] = writer.build(param, is_show)
  44. return [res]
  45. def build_setting(self):
  46. return {
  47. "speed": {
  48. "channel": "5"
  49. }
  50. # },
  51. # "errorLimit": {
  52. # "record": 0
  53. # }
  54. }