datax_json.py 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. from asyncio.log import logger
  2. import time
  3. from typing import List
  4. from app import models, schemas
  5. from sqlalchemy.orm import Session
  6. from app.core.datax import *
  7. from configs.logging import logger
  8. def generate_datax_json(db: Session, param: schemas.DataXJsonParam):
  9. reader_ds=None
  10. if param.reader_type == 'datasource':
  11. reader_ds: models.JobJdbcDatasource = db.query(models.JobJdbcDatasource).filter(models.JobJdbcDatasource.id == param.reader_datasource_id).first()
  12. if not reader_ds: raise Exception('Reader datasource not found')
  13. r_ds = schemas.RWDataSource(**{
  14. 'id': reader_ds.id if reader_ds else None,
  15. 'datasource': reader_ds.datasource if reader_ds else 'hive',
  16. 'rw_type': param.reader_type,
  17. 'jdbc_url': reader_ds.jdbc_url if reader_ds else None,
  18. 'jdbc_username': reader_ds.jdbc_username if reader_ds else None,
  19. 'jdbc_password': reader_ds.jdbc_password if reader_ds else None,
  20. })
  21. writer_ds = None
  22. if param.writer_type == 'datasource':
  23. writer_ds: models.JobJdbcDatasource = db.query(models.JobJdbcDatasource).filter(models.JobJdbcDatasource.id == param.writer_datasource_id).first()
  24. if not writer_ds: raise Exception('Writer datasource not found')
  25. w_ds = schemas.RWDataSource(**{
  26. 'id': writer_ds.id if writer_ds else None,
  27. 'datasource': writer_ds.datasource if writer_ds else 'hive',
  28. 'rw_type': param.writer_type,
  29. 'jdbc_url': writer_ds.jdbc_url if writer_ds else None,
  30. 'jdbc_username': writer_ds.jdbc_username if writer_ds else None,
  31. 'jdbc_password': writer_ds.jdbc_password if writer_ds else None,
  32. })
  33. engine = DataXEngine()
  34. job = engine.build_job(r_ds, w_ds, param, is_show=False)
  35. logger.info(job)
  36. return {'json': job}