from asyncio.log import logger import time from typing import List from app import models, schemas from sqlalchemy.orm import Session from app.core.datax import * from configs.logging import logger def generate_datax_json(db: Session, param: schemas.DataXJsonParam): reader_ds=None if param.reader_type == 'datasource': reader_ds: models.JobJdbcDatasource = db.query(models.JobJdbcDatasource).filter(models.JobJdbcDatasource.id == param.reader_datasource_id).first() if not reader_ds: raise Exception('Reader datasource not found') r_ds = schemas.RWDataSource(**{ 'id': reader_ds.id if reader_ds else None, 'datasource': reader_ds.datasource if reader_ds else 'hive', 'rw_type': param.reader_type, 'jdbc_url': reader_ds.jdbc_url if reader_ds else None, 'jdbc_username': reader_ds.jdbc_username if reader_ds else None, 'jdbc_password': reader_ds.jdbc_password if reader_ds else None, }) writer_ds = None if param.writer_type == 'datasource': writer_ds: models.JobJdbcDatasource = db.query(models.JobJdbcDatasource).filter(models.JobJdbcDatasource.id == param.writer_datasource_id).first() if not writer_ds: raise Exception('Writer datasource not found') w_ds = schemas.RWDataSource(**{ 'id': writer_ds.id if writer_ds else None, 'datasource': writer_ds.datasource if writer_ds else 'hive', 'rw_type': param.writer_type, 'jdbc_url': writer_ds.jdbc_url if writer_ds else None, 'jdbc_username': writer_ds.jdbc_username if writer_ds else None, 'jdbc_password': writer_ds.jdbc_password if writer_ds else None, }) engine = DataXEngine() job = engine.build_job(r_ds, w_ds, param, is_show=False) logger.info(job) return {'json': job}