1234567891011121314151617181920212223242526272829303132333435363738394041 |
- from asyncio.log import logger
- import time
- from typing import List
- from app import models, schemas
- from sqlalchemy.orm import Session
- from app.core.datax import *
- from configs.logging import logger
- def generate_datax_json(db: Session, param: schemas.DataXJsonParam):
- reader_ds=None
- if param.reader_type == 'datasource':
- reader_ds: models.JobJdbcDatasource = db.query(models.JobJdbcDatasource).filter(models.JobJdbcDatasource.id == param.reader_datasource_id).first()
- if not reader_ds: raise Exception('Reader datasource not found')
- r_ds = schemas.RWDataSource(**{
- 'id': reader_ds.id if reader_ds else None,
- 'datasource': reader_ds.datasource if reader_ds else 'hive',
- 'rw_type': param.reader_type,
- 'jdbc_url': reader_ds.jdbc_url if reader_ds else None,
- 'jdbc_username': reader_ds.jdbc_username if reader_ds else None,
- 'jdbc_password': reader_ds.jdbc_password if reader_ds else None,
- })
- writer_ds = None
- if param.writer_type == 'datasource':
- writer_ds: models.JobJdbcDatasource = db.query(models.JobJdbcDatasource).filter(models.JobJdbcDatasource.id == param.writer_datasource_id).first()
- if not writer_ds: raise Exception('Writer datasource not found')
- w_ds = schemas.RWDataSource(**{
- 'id': writer_ds.id if writer_ds else None,
- 'datasource': writer_ds.datasource if writer_ds else 'hive',
- 'rw_type': param.writer_type,
- 'jdbc_url': writer_ds.jdbc_url if writer_ds else None,
- 'jdbc_username': writer_ds.jdbc_username if writer_ds else None,
- 'jdbc_password': writer_ds.jdbc_password if writer_ds else None,
- })
- engine = DataXEngine()
- job = engine.build_job(r_ds, w_ds, param, is_show=False)
- logger.info(job)
- return {'json': job}
|