job_jdbc_datasource.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. import time
  2. from typing import List
  3. from sqlalchemy.orm import Session
  4. from app.core.datasource.datasource import DataSrouceFactory
  5. import app.schemas as schemas
  6. import app.models as models
  7. from app.utils import decode_user
  8. def _decode(url, datasource, database_name):
  9. return url.replace('jdbc:', '').replace('hive2://', '').replace(f'{datasource}://', '').replace(f'/{database_name}',
  10. '')
  11. def _format_datasource(db: Session, item: schemas.JobJdbcDatasourceBase, ds_id: int = 0):
  12. if ds_id != 0:
  13. item = db.query(models.JobJdbcDatasource).filter(models.JobJdbcDatasource.id == ds_id).first()
  14. if not item:
  15. raise Exception('未找到该数据源')
  16. item.jdbc_url = _decode(item.jdbc_url, item.datasource, item.database_name)
  17. item.jdbc_username, item.jdbc_password = decode_user(item.jdbc_username, item.jdbc_password)
  18. host, port = item.jdbc_url.split(':')
  19. if not host or not port:
  20. raise Exception('jdbc_url无效')
  21. ds = DataSrouceFactory.create(item.datasource, {'port': port, 'host': host, 'username': item.jdbc_username,
  22. 'password': item.jdbc_password,
  23. 'database_name': item.database_name,
  24. 'kerberos': item.kerberos,
  25. 'keytab': item.keytab,
  26. 'krb5config': item.krb5config,
  27. 'kerberos_service_name': item.kerberos_service_name,
  28. 'principal':item.principal})
  29. item.jdbc_url = ds.jdbc_url
  30. item.jdbc_username = ds.jdbc_username if item.kerberos == 0 else None
  31. item.jdbc_password = ds.jdbc_password if item.kerberos == 0 else None
  32. return ds, item
  33. def test_datasource_connection(db: Session, item: schemas.JobJdbcDatasourceCreate):
  34. ds, item = _format_datasource(db, item)
  35. return ds.is_connect()
  36. def get_table_schema(db: Session, ds_id: int, table_name: str):
  37. ds, item = _format_datasource(db, None, ds_id)
  38. return ds.get_table_schema(table_name)
  39. def get_preview_data(db: Session, ds_id: int, table_name: str, limit: int = 100):
  40. ds, item = _format_datasource(db, None, ds_id)
  41. return ds.get_preview_data(table_name, limit)
  42. def get_table_names(db: Session, ds_id: int):
  43. ds, item = _format_datasource(db, None, ds_id)
  44. return ds.list_tables()
  45. def create_job_jdbc_datasource(db: Session, item: schemas.JobJdbcDatasourceCreate):
  46. ds, item = _format_datasource(db, item)
  47. #
  48. create_time: int = int(time.time())
  49. db_item = models.JobJdbcDatasource(**item.dict(), **{
  50. 'status': 1,
  51. 'create_time': create_time,
  52. 'create_by': 'admin',
  53. 'update_time': create_time,
  54. 'update_by': 'admin',
  55. 'jdbc_driver_class': ds.jdbc_driver_class
  56. })
  57. db.add(db_item)
  58. db.commit()
  59. db.refresh(db_item)
  60. return db_item
  61. def get_job_jdbc_datasources(db: Session, datasource_type: str = None, skip: int = 0, limit: int = 20):
  62. res: List[models.JobJdbcDatasource] = []
  63. if datasource_type is not None and datasource_type != '':
  64. res = db.query(models.JobJdbcDatasource)\
  65. .filter(models.JobJdbcDatasource.datasource == datasource_type)\
  66. .filter(models.JobJdbcDatasource.status == 1).all()
  67. else:
  68. res = db.query(models.JobJdbcDatasource)\
  69. .filter(models.JobJdbcDatasource.status == 1).all()
  70. for item in res:
  71. item.jdbc_url = _decode(item.jdbc_url, item.datasource, item.database_name)
  72. return res
  73. def update_job_jdbc_datasources(db: Session, ds_id: int, update_item: schemas.JobJdbcDatasourceUpdate):
  74. ds, update_item = _format_datasource(db, update_item)
  75. db_item = db.query(models.JobJdbcDatasource).filter(models.JobJdbcDatasource.id == ds_id).first()
  76. if not db_item:
  77. raise Exception('未找到该数据源')
  78. update_dict = update_item.dict(exclude_unset=True)
  79. for k, v in update_dict.items():
  80. setattr(db_item, k, v)
  81. db_item.jdbc_driver_class = ds.jdbc_driver_class
  82. db_item.update_time = int(time.time())
  83. db_item.update_by = 'admin1' # TODO
  84. db.commit()
  85. db.flush()
  86. db.refresh(db_item)
  87. return db_item
  88. def delete_job_jdbc_datasource(db: Session, ds_id: int):
  89. db_item = db.query(models.JobJdbcDatasource).filter(models.JobJdbcDatasource.id == ds_id).first()
  90. if not db_item:
  91. raise Exception('未找到该数据源')
  92. db_item.status = 0
  93. db.commit()
  94. db.flush()
  95. db.refresh(db_item)
  96. return db_item