mysql.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. from dataclasses import dataclass
  2. from app.core.datasource.datasource import DataSourceBase
  3. from mysql import connector
  4. from mysql.connector import Error
  5. from sqlalchemy import create_engine
  6. import pandas as pd
  7. from configs import logger
  8. from utils import flat_map
  9. class MysqlDS(DataSourceBase):
  10. def __init__(self, host: str, port: int, username: str, password: str, database_name: str, use_ssl: int, type: str='mysql'):
  11. DataSourceBase.__init__(self, type, host, port, username, password, database_name, )
  12. self.use_ssl = use_ssl
  13. @property
  14. def jdbc_url(self):
  15. jdbc = f'jdbc:mysql://{self.host}:{self.port}/{self.database_name}'
  16. if self.use_ssl == 0:
  17. jdbc = f'{jdbc}?useSSL=false'
  18. return jdbc
  19. @property
  20. def jdbc_driver_class(self):
  21. return 'com.mysql.jdbc.Driver'
  22. @property
  23. def connection_str(self):
  24. return f'mysql+pymysql://{self.username}:{self.password}@{self.host}:{self.port}/{self.database_name}'
  25. def is_connect(self):
  26. # 判断mysql是否连接成功
  27. conn = None
  28. try:
  29. use_ssl = False if self.use_ssl == 0 else True
  30. conn = connector.connect(host=self.host,
  31. port=self.port,
  32. database=self.database_name,
  33. user=self.username,
  34. password=self.password,
  35. ssl_disabled=not use_ssl,
  36. connection_timeout=5)
  37. if conn.is_connected():
  38. logger.info('Connected to MySQL database')
  39. except Error as e:
  40. logger.error(e)
  41. finally:
  42. if conn is not None and conn.is_connected():
  43. conn.close()
  44. return True
  45. else:
  46. return False
  47. def _execute_sql(self, sqls):
  48. conn = None
  49. res = []
  50. try:
  51. use_ssl = False if self.use_ssl == 0 else True
  52. conn = connector.connect(host=self.host,
  53. port=self.port,
  54. database=self.database_name,
  55. user=self.username,
  56. password=self.password,
  57. ssl_disabled=not use_ssl,
  58. connection_timeout=5)
  59. cursor = conn.cursor()
  60. for sql in sqls:
  61. cursor.execute(sql)
  62. res.append(cursor.fetchall())
  63. logger.info(res)
  64. except Error as e:
  65. logger.error(e)
  66. finally:
  67. if conn is not None and conn.is_connected():
  68. conn.close()
  69. return res
  70. def get_preview_data(self, table_name, limit=100):
  71. sql1 = f'SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA="{self.database_name}" AND TABLE_NAME="{table_name}"'
  72. sql2 = f"SELECT * FROM {table_name} LIMIT {limit}"
  73. res = self._execute_sql([sql1, sql2])
  74. logger.info(res)
  75. return {
  76. 'header': flat_map(lambda x: x, res[0]),
  77. 'content': res[1]
  78. }
  79. # db_connection = create_engine(self.connection_str)
  80. # df = pd.read_sql(sql, con=db_connection)
  81. # # print(df)
  82. # logger.info(df.head())
  83. # return df.to_numpy()
  84. def list_tables(self):
  85. # table_type = "base table" AND
  86. sql = f'SELECT table_name FROM information_schema.tables WHERE table_schema="{self.database_name}"'
  87. res = self._execute_sql([sql])
  88. return flat_map(lambda x: x, res[0])
  89. def get_table_schema(self, table_name):
  90. def handle_col(x):
  91. line = list(map(lambda s: s.decode('utf-8') if type(s) == type(b'bytes') else str(s), x))
  92. return [':'.join(line[:3])]
  93. sql = f'describe `{self.database_name}`.{table_name}'
  94. logger.info(sql)
  95. res = self._execute_sql([sql])
  96. if res:
  97. res = [[str(i) , *x]for i, x in enumerate(res[0])]
  98. logger.info(res)
  99. return flat_map(lambda x: handle_col(x), res)
  100. else:
  101. raise Exception('table not found')