database.py 1.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. import time # database.py
  2. from typing import Optional
  3. from sqlalchemy.orm import Session
  4. from sqlalchemy import create_engine
  5. from sqlalchemy.ext.declarative import declarative_base
  6. from sqlalchemy.orm import sessionmaker
  7. from sqlalchemy.orm.attributes import flag_modified
  8. from configs.logging import logger
  9. from configs.settings import config
  10. USER = config.get('DATABASE', 'USER')
  11. PWD = config.get('DATABASE', 'pwd')
  12. DB_NAME = config.get('DATABASE', 'DB_NAME')
  13. HOST = config.get('DATABASE', 'HOST')
  14. PORT = config.get('DATABASE', 'PORT')
  15. SQLALCHEMY_DATABASE_URL = f'mysql+mysqlconnector://{USER}:{PWD}@{HOST}:{PORT}/{DB_NAME}?charset=utf8&auth_plugin=mysql_native_password'
  16. engine = create_engine(
  17. SQLALCHEMY_DATABASE_URL, pool_pre_ping=True
  18. )
  19. SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
  20. logger.info(f" connect to mysql success: {SQLALCHEMY_DATABASE_URL}")
  21. Base = declarative_base()
  22. class BaseModel(Base):
  23. __abstract__ = True
  24. def to_dict(self):
  25. return {c.name: getattr(self, c.name) for c in self.__table__.columns}
  26. def update(self, db: Session, enforce_update: Optional[dict] = None):
  27. for k in enforce_update or {}:
  28. flag_modified(self, k)
  29. if hasattr(self, "update_time"):
  30. self.update_time = int(time.time())
  31. # db.add(self)
  32. db.commit()
  33. db.flush()
  34. db.refresh(self)