job_jdbc_datasource.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. import time
  2. from typing import List
  3. from sqlalchemy.orm import Session
  4. from sqlalchemy import func
  5. from app.core.datasource.datasource import DataSrouceFactory
  6. import app.schemas as schemas
  7. import app.models as models
  8. from app.utils import decode_user
  9. from app.utils.utils import decode_base64
  10. def _decode(url, datasource, database_name):
  11. url = url.replace('jdbc:', '').replace('hive2://', '').replace(f'{datasource}://', '').replace(f'/{database_name}','')
  12. return url.split('?')[0]
  13. def _format_datasource(db: Session, item: schemas.JobJdbcDatasourceBase, ds_id: int = 0):
  14. if ds_id != 0:
  15. item = db.query(models.JobJdbcDatasource).filter(models.JobJdbcDatasource.id == ds_id).first()
  16. if not item:
  17. raise Exception('未找到该数据源')
  18. item.jdbc_url = _decode(item.jdbc_url, item.datasource, item.database_name)
  19. item.jdbc_username, item.jdbc_password = decode_user(item.jdbc_username, item.jdbc_password)
  20. try:
  21. host, port = item.jdbc_url.split(':')
  22. except:
  23. raise Exception('数据库地址填写错误')
  24. if not host or not port:
  25. raise Exception('jdbc_url无效')
  26. ds = None
  27. if item.datasource == 'hive':
  28. ds = DataSrouceFactory.create(item.datasource, {'port': port, 'host': host, 'username': item.jdbc_username,
  29. 'password': item.jdbc_password,
  30. 'database_name': item.database_name,
  31. 'kerberos': item.kerberos,
  32. 'keytab': item.keytab,
  33. 'krb5config': item.krb5config,
  34. 'kerberos_service_name': item.kerberos_service_name,
  35. 'principal':item.principal})
  36. else:
  37. ds = DataSrouceFactory.create(item.datasource, {'port': port, 'host': host, 'username': item.jdbc_username,
  38. 'password': item.jdbc_password,
  39. 'database_name': item.database_name,
  40. 'use_ssl': item.use_ssl
  41. })
  42. item.jdbc_url = ds.jdbc_url
  43. item.jdbc_username = ds.jdbc_username if item.kerberos == 0 else None
  44. item.jdbc_password = ds.jdbc_password if item.kerberos == 0 else None
  45. return ds, item
  46. def test_datasource_connection(db: Session, item: schemas.JobJdbcDatasourceCreate):
  47. if item.jdbc_password and item.jdbc_password != '':
  48. item.jdbc_password = decode_base64(item.jdbc_password)
  49. ds, item = _format_datasource(db, item)
  50. return ds.is_connect()
  51. def get_table_schema(db: Session, ds_id: int, table_name: str):
  52. ds, item = _format_datasource(db, None, ds_id)
  53. return ds.get_table_schema(table_name)
  54. def get_preview_data(db: Session, ds_id: int, table_name: str, limit: int = 100):
  55. ds, item = _format_datasource(db, None, ds_id)
  56. return ds.get_preview_data(table_name, limit)
  57. def get_table_names(db: Session, ds_id: int):
  58. ds, item = _format_datasource(db, None, ds_id)
  59. return ds.list_tables()
  60. def create_job_jdbc_datasource(db: Session, item: schemas.JobJdbcDatasourceCreate):
  61. if item.jdbc_password and item.jdbc_password != '':
  62. item.jdbc_password = decode_base64(item.jdbc_password)
  63. ds, item = _format_datasource(db, item)
  64. con_result = ds.is_connect()
  65. if not con_result:
  66. raise Exception('连接失败,不允许添加')
  67. create_time: int = int(time.time())
  68. name_item = db.query(models.JobJdbcDatasource)\
  69. .filter(models.JobJdbcDatasource.datasource_name == func.binary(item.datasource_name))\
  70. .filter(models.JobJdbcDatasource.status == 1).first()
  71. if name_item:
  72. raise Exception('数据源名称重复')
  73. db_item = models.JobJdbcDatasource(**item.dict(), **{
  74. 'status': 1,
  75. 'create_time': create_time,
  76. 'create_by': 'admin',
  77. 'update_time': create_time,
  78. 'update_by': 'admin',
  79. 'jdbc_driver_class': ds.jdbc_driver_class
  80. })
  81. db.add(db_item)
  82. db.commit()
  83. db.refresh(db_item)
  84. return db_item
  85. def get_job_jdbc_datasources(db: Session, datasource_type: str = None, skip: int = 0, limit: int = 20):
  86. res: List[models.JobJdbcDatasource] = []
  87. if datasource_type is not None and datasource_type != '':
  88. res = db.query(models.JobJdbcDatasource)\
  89. .filter(models.JobJdbcDatasource.datasource == datasource_type)\
  90. .filter(models.JobJdbcDatasource.status == 1)\
  91. .order_by(models.JobJdbcDatasource.create_time.desc()).all()
  92. else:
  93. res = db.query(models.JobJdbcDatasource)\
  94. .filter(models.JobJdbcDatasource.status == 1)\
  95. .order_by(models.JobJdbcDatasource.create_time.desc()).all()
  96. for item in res:
  97. item.jdbc_url = _decode(item.jdbc_url, item.datasource, item.database_name)
  98. return res
  99. def get_job_jdbc_datasources_info(db: Session, ds_id: int):
  100. db_item: models.JobJdbcDatasource = db.query(models.JobJdbcDatasource)\
  101. .filter(models.JobJdbcDatasource.id == ds_id).first()
  102. db_item.jdbc_url = _decode(db_item.jdbc_url, db_item.datasource, db_item.database_name)
  103. if db_item.jdbc_username and db_item.jdbc_username != '':
  104. db_item.jdbc_username = decode_base64(db_item.jdbc_username)
  105. return db_item
  106. def update_job_jdbc_datasources(db: Session, ds_id: int, update_item: schemas.JobJdbcDatasourceUpdate):
  107. if update_item.jdbc_password and update_item.jdbc_password != '':
  108. update_item.jdbc_password = decode_base64(update_item.jdbc_password)
  109. ds, update_item = _format_datasource(db, update_item)
  110. con_result = ds.is_connect()
  111. if not con_result:
  112. raise Exception('连接失败,不允许添加')
  113. db_item = db.query(models.JobJdbcDatasource).filter(models.JobJdbcDatasource.id == ds_id).first()
  114. if not db_item:
  115. raise Exception('未找到该数据源')
  116. name_item = db.query(models.JobJdbcDatasource)\
  117. .filter(models.JobJdbcDatasource.datasource_name == func.binary(update_item.datasource_name))\
  118. .filter(models.JobJdbcDatasource.status == 1)\
  119. .filter(models.JobJdbcDatasource.id != ds_id).first()
  120. if name_item:
  121. raise Exception('数据源名称重复')
  122. update_dict = update_item.dict(exclude_unset=True)
  123. for k, v in update_dict.items():
  124. setattr(db_item, k, v)
  125. db_item.jdbc_driver_class = ds.jdbc_driver_class
  126. db_item.update_time = int(time.time())
  127. db_item.update_by = 'admin1' # TODO
  128. db.commit()
  129. db.flush()
  130. db.refresh(db_item)
  131. return db_item
  132. def delete_job_jdbc_datasource(db: Session, ds_id: int):
  133. db_item = db.query(models.JobJdbcDatasource).filter(models.JobJdbcDatasource.id == ds_id).first()
  134. if not db_item:
  135. raise Exception('未找到该数据源')
  136. db_item.status = 0
  137. db.commit()
  138. db.flush()
  139. db.refresh(db_item)
  140. return db_item
  141. def get_job_jdbc_datasource(db: Session, ds_id: int):
  142. db_item: models.JobJdbcDatasource = db.query(models.JobJdbcDatasource).filter(models.JobJdbcDatasource.id == ds_id).first()
  143. if not db_item:
  144. raise Exception('未找到该数据源')
  145. return db_item
  146. def get_job_jdbc_datasource_table_location(db: Session, db_name: str, table_name: str, ds_id: int):
  147. ds, item = _format_datasource(db, None, ds_id)
  148. return ds.get_table_info(table_name,db_name)