database.py 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. import time # database.py
  2. from typing import Optional
  3. from sqlalchemy.orm import Session
  4. from sqlalchemy import create_engine
  5. from sqlalchemy.ext.declarative import declarative_base
  6. from sqlalchemy.orm import sessionmaker
  7. from sqlalchemy.orm.attributes import flag_modified
  8. from configs.logging import logger
  9. from configs.settings import config
  10. USER = config.get('DATABASE', 'USER')
  11. PWD = config.get('DATABASE', 'pwd')
  12. DB_NAME = config.get('DATABASE', 'DB_NAME')
  13. HOST = config.get('DATABASE', 'HOST')
  14. PORT = config.get('DATABASE', 'PORT')
  15. SSL_DISABLED = config.get('DATABASE', 'SSL_DISABLED')
  16. SQLALCHEMY_DATABASE_URL = f'mysql+mysqlconnector://{USER}:{PWD}@{HOST}:{PORT}/{DB_NAME}?ssl_disabled={SSL_DISABLED}&charset=utf8&auth_plugin=mysql_native_password'
  17. engine = create_engine(
  18. SQLALCHEMY_DATABASE_URL, pool_pre_ping=True
  19. )
  20. SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
  21. logger.info(f" connect to mysql success: {SQLALCHEMY_DATABASE_URL}")
  22. Base = declarative_base()
  23. class BaseModel(Base):
  24. __abstract__ = True
  25. def to_dict(self):
  26. return {c.name: getattr(self, c.name) for c in self.__table__.columns}
  27. def update(self, db: Session, enforce_update: Optional[dict] = None):
  28. for k in enforce_update or {}:
  29. flag_modified(self, k)
  30. if hasattr(self, "update_time"):
  31. self.update_time = int(time.time())
  32. # db.add(self)
  33. db.commit()
  34. db.flush()
  35. db.refresh(self)