job_jdbc_datasource.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. import base64
  2. import time
  3. from typing import List
  4. from sqlalchemy.orm import Session
  5. from app.core.ds.datasource import DataSrouceFactory
  6. import app.schemas as schemas
  7. import app.models as models
  8. from app.utils import decode_user
  9. from configs.logging import logger
  10. def _decode(url, datasource, database_name):
  11. return url.replace('jdbc:', '').replace('hive2://', '').replace(f'{datasource}://', '').replace(f'/{database_name}', '')
  12. def _format_datasource(db: Session, item: schemas.JobJdbcDatasourceBase, ds_id: int=0):
  13. if ds_id != 0:
  14. item = db.query(models.JobJdbcDatasource).filter(models.JobJdbcDatasource.id == ds_id).first()
  15. if not item:
  16. raise Exception('JobJdbcDatasource not found')
  17. item.jdbc_url = _decode(item.jdbc_url, item.datasource, item.database_name)
  18. item.jdbc_username, item.jdbc_password = decode_user(item.jdbc_username, item.jdbc_password)
  19. host, port = item.jdbc_url.split(':')
  20. if not host or not port:
  21. raise Exception('jdbc_url is invalid')
  22. ds = DataSrouceFactory.create(item.datasource, {'port': port, 'host': host, 'username': item.jdbc_username, 'password': item.jdbc_password, 'database_name': item.database_name})
  23. item.jdbc_url = ds.jdbc_url
  24. item.jdbc_username = ds.jdbc_username
  25. item.jdbc_password = ds.jdbc_password
  26. return ds, item
  27. def test_datasource_connection(db: Session, item: schemas.JobJdbcDatasourceCreate):
  28. ds, item = _format_datasource(db, item)
  29. return ds.is_connect()
  30. def get_table_schema(db: Session, ds_id: int, table_name: str):
  31. ds, item = _format_datasource(db, None, ds_id)
  32. return ds.get_table_schema(table_name)
  33. def get_preview_data(db: Session, ds_id: int, table_name: str, limit: int = 100):
  34. ds, item = _format_datasource(db, None, ds_id)
  35. return ds.get_preview_data(table_name, limit)
  36. def get_table_names(db: Session, ds_id: int):
  37. ds, item = _format_datasource(db, None, ds_id)
  38. return ds.list_tables()
  39. def create_job_jdbc_datasource(db: Session, item: schemas.JobJdbcDatasourceCreate):
  40. ds, item = _format_datasource(db, item)
  41. #
  42. create_time: int = int(time.time())
  43. db_item = models.JobJdbcDatasource(**item.dict(), **{
  44. 'status': 1,
  45. 'create_time': create_time,
  46. 'create_by': 'admin',
  47. 'update_time': create_time,
  48. 'update_by': 'admin',
  49. 'jdbc_driver_class': ds.jdbc_driver_class
  50. })
  51. db.add(db_item)
  52. db.commit()
  53. db.refresh(db_item)
  54. return db_item
  55. def get_job_jdbc_datasources(db: Session, skip: int = 0, limit: int = 20):
  56. res: List[models.JobJdbcDatasource] = db.query(models.JobJdbcDatasource).filter(models.JobJdbcDatasource.status==1).all()
  57. for item in res:
  58. item.jdbc_url = _decode(item.jdbc_url, item.datasource, item.database_name)
  59. return res
  60. # return db.query(models.JobJdbcDatasource).offset(skip).limit(limit).all()
  61. def update_job_jdbc_datasources(db: Session, ds_id: int, update_item: schemas.JobJdbcDatasourceUpdate):
  62. ds, update_item = _format_datasource(db, update_item)
  63. db_item = db.query(models.JobJdbcDatasource).filter(models.JobJdbcDatasource.id == ds_id).first()
  64. if not db_item:
  65. raise Exception('JobJdbcDatasource not found')
  66. update_dict = update_item.dict(exclude_unset=True)
  67. for k, v in update_dict.items():
  68. setattr(db_item, k, v)
  69. db_item.jdbc_driver_class = ds.jdbc_driver_class
  70. db_item.update_time = int(time.time())
  71. db_item.update_by = 'admin1' # TODO
  72. db.commit()
  73. db.flush()
  74. db.refresh(db_item)
  75. return db_item
  76. def delete_job_jdbc_datasource(db: Session, ds_id: int):
  77. db_item = db.query(models.JobJdbcDatasource).filter(models.JobJdbcDatasource.id == ds_id).first()
  78. if not db_item:
  79. raise Exception('JobJdbcDatasource not found')
  80. db_item.status = 0
  81. db.commit()
  82. db.flush()
  83. db.refresh(db_item)
  84. return db_item