mysql.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. from dataclasses import dataclass
  2. from app.core.datasource.datasource import DataSourceBase
  3. from mysql import connector
  4. from mysql.connector import Error
  5. from sqlalchemy import create_engine
  6. import pandas as pd
  7. from configs import logger
  8. from utils import flat_map
  9. class MysqlDS(DataSourceBase):
  10. def __init__(self, host: str, port: int, username: str, password: str, database_name: str, use_ssl: int, type: str='mysql'):
  11. DataSourceBase.__init__(self, type, host, port, username, password, database_name, )
  12. self.use_ssl = use_ssl
  13. @property
  14. def jdbc_url(self):
  15. jdbc = f'jdbc:mysql://{self.host}:{self.port}/{self.database_name}'
  16. if self.use_ssl == 0:
  17. jdbc = f'{jdbc}?useSSL=false'
  18. return jdbc
  19. @property
  20. def jdbc_driver_class(self):
  21. return 'com.mysql.jdbc.Driver'
  22. @property
  23. def connection_str(self):
  24. return f'mysql+pymysql://{self.username}:{self.password}@{self.host}:{self.port}/{self.database_name}'
  25. def is_connect(self):
  26. # 判断mysql是否连接成功
  27. conn = None
  28. try:
  29. use_ssl = False if self.use_ssl == 0 else True
  30. conn = connector.connect(host=self.host,
  31. port=self.port,
  32. database=self.database_name,
  33. user=self.username,
  34. password=self.password,
  35. ssl_disabled=not use_ssl,
  36. connection_timeout=5)
  37. if conn.is_connected():
  38. logger.info('Connected to MySQL database')
  39. except Error as e:
  40. logger.error(e)
  41. finally:
  42. if conn is not None and conn.is_connected():
  43. conn.close()
  44. return True
  45. else:
  46. return False
  47. def _execute_sql(self, sqls):
  48. conn = None
  49. res = []
  50. try:
  51. use_ssl = False if self.use_ssl == 0 else True
  52. conn = connector.connect(host=self.host,
  53. port=self.port,
  54. database=self.database_name,
  55. user=self.username,
  56. password=self.password,
  57. ssl_disabled=not use_ssl,
  58. connection_timeout=5)
  59. cursor = conn.cursor()
  60. for sql in sqls:
  61. cursor.execute(sql)
  62. res.append(cursor.fetchall())
  63. logger.info(res)
  64. except Error as e:
  65. logger.error(e)
  66. raise Exception('mysql 连接失败')
  67. finally:
  68. if conn is not None and conn.is_connected():
  69. conn.close()
  70. return res
  71. def get_preview_data(self, table_name, limit=100):
  72. # sql1 = f'SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA="{self.database_name}" AND TABLE_NAME="{table_name}"'
  73. table_schema = self.get_table_schema(table_name)
  74. sql2 = f"SELECT * FROM {table_name} LIMIT {limit}"
  75. res = self._execute_sql([sql2])
  76. logger.info(res)
  77. return {
  78. # 'header': flat_map(lambda x: x, res[0]),
  79. 'header': [str(column).split(':')[1] for column in table_schema],
  80. 'content': res[0]
  81. }
  82. # db_connection = create_engine(self.connection_str)
  83. # df = pd.read_sql(sql, con=db_connection)
  84. # # print(df)
  85. # logger.info(df.head())
  86. # return df.to_numpy()
  87. def list_tables(self):
  88. # table_type = "base table" AND
  89. sql = f'SELECT table_name FROM information_schema.tables WHERE table_schema="{self.database_name}"'
  90. res = self._execute_sql([sql])
  91. return flat_map(lambda x: x, res[0])
  92. def get_table_schema(self, table_name):
  93. def handle_col(x):
  94. line = list(map(lambda s: s.decode('utf-8') if type(s) == type(b'bytes') else str(s), x))
  95. return [':'.join(line[:3])]
  96. sql = f'describe `{self.database_name}`.{table_name}'
  97. logger.info(sql)
  98. res = self._execute_sql([sql])
  99. if res:
  100. res = [[str(i) , *x]for i, x in enumerate(res[0])]
  101. logger.info(res)
  102. return flat_map(lambda x: handle_col(x), res)
  103. else:
  104. raise Exception('table not found')