import time import uuid from typing import List from app import models, schemas from sqlalchemy.orm import Session from werkzeug.security import check_password_hash, generate_password_hash from configs.globals import g from itsdangerous import BadSignature, SignatureExpired from itsdangerous import TimedJSONWebSignatureSerializer as Serializer from app.utils.utils import decode_base64 from constants.constants import SECRET_KEY def verify_password(password_hash, password): return check_password_hash(password_hash, password) # 生成新的user_token def generate_user_token(user: models.Users, expiration=3600 * 5): s = Serializer(SECRET_KEY, expires_in=expiration) return s.dumps({'user_id': user.id,'password':user.password}).decode('utf-8') # 验证Token方法 def verify_user_token(db: Session, token: str): s = Serializer(SECRET_KEY) try: data = s.loads(token) except SignatureExpired: raise Exception("user_token验证失败") except BadSignature: raise Exception("user_token验证失败") user: models.Users = db.query(models.Users).filter(models.Users.id == data['user_id']).first() if not user: raise Exception("不存在此用户") if not user.password == data['password']: raise Exception("密码已被修改,请重新登陆") return user def create_users(db: Session, item: schemas.UsersCreate): check_1 = db.query(models.Users).filter(models.Users.name == item.name).first() if check_1: raise Exception('该用户已存在') check_2 = db.query(models.Users).filter(models.Users.username == item.username).first() if check_2: raise Exception('该账号已存在') item.password = decode_base64(item.password) user_code = str(uuid.uuid1()).replace('-','') db_item = models.Users(**{ 'name': item.name, 'code': user_code, 'username': item.username, 'password': generate_password_hash(bytes(item.password.encode('utf-8'))), 'create_time': int(time.time()), }) db.add(db_item) db.commit() db.refresh(db_item) return db_item def login(db: Session, item: schemas.LoginBase): user: models.Users = db.query(models.Users).filter(models.Users.username == item.username).first() if not user: raise Exception('不存在此账号') item.password = decode_base64(item.password) if not verify_password(user.password,item.password): raise Exception('密码错误') auth_token = generate_user_token(user) user.user_token = auth_token db.commit() db.flush() db.refresh(user) res = user.to_dict() res.update({'auth_token':auth_token}) return res def get_users_by_project(db: Session, project_id: int): relation: List[models.ProjectUserRelation] = db.query(models.ProjectUserRelation)\ .filter(models.ProjectUserRelation.project_id == project_id).all() user_role = { r.user_id:r.role_id for r in relation} user_ids = list(user_role.keys()) roles: List[models.Roles] = db.query(models.Roles).all() id_role = { r.id:r for r in roles} users: List[models.Users] = db.query(models.Users)\ .filter(models.Users.id.in_(user_ids)).all() res = [] for user in users: if user.id == 1: continue item = user.to_dict() item.update({'role_name':id_role[user_role[user.id]].name}) res.append(item) return res def retrieve_users_by_project(db: Session, project_id: int): relation: List[models.ProjectUserRelation] = db.query(models.ProjectUserRelation)\ .filter(models.ProjectUserRelation.project_id == project_id).all() now_user_ids = [ r.user_id for r in relation ] users: List[models.Users] = db.query(models.Users)\ .filter(models.Users.id.notin_(now_user_ids)).all() return users def add_users_to_project(db: Session, user_ids: List[int], project_id: int): project: models.Project = db.query(models.Project).filter(models.Project.id == project_id).first() if not project: raise Exception('项目不存在') exist_users = [] for user_id in user_ids: check_user = db.query(models.Users).filter(models.Users.id == user_id).first() if not check_user: raise Exception('用户不存在') realtion = db.query(models.ProjectUserRelation)\ .filter(models.ProjectUserRelation.project_id == project_id)\ .filter(models.ProjectUserRelation.user_id == user_id).first() if realtion: raise Exception('用户已加入项目,不可二次加入') exist_users.append(user_id) for user_id in exist_users: db_item = models.ProjectUserRelation(**{ 'user_id': user_id, 'project_id': project_id, 'role_id': 3 if project.type == 0 else 5 }) db.add(db_item) db.commit() db.refresh(db_item) def remove_project_users(db: Session, user_id: int, project_id: int): project: models.Project = db.query(models.Project).filter(models.Project.id == project_id).first() if not project: raise Exception('项目不存在') check_user = db.query(models.Users).filter(models.Users.id == user_id).first() if not check_user: raise Exception('用户不存在') realtion: models.ProjectUserRelation = db.query(models.ProjectUserRelation)\ .filter(models.ProjectUserRelation.project_id == project_id)\ .filter(models.ProjectUserRelation.user_id == user_id).first() if not realtion: raise Exception('用户未加入此项目,不可移除') if realtion.user_id == g.user_id: raise Exception('用户不能移除自己') if realtion.role_id in [2,4]: raise Exception('该用户为项目管理员,不可直接移除') db.query(models.ProjectUserRelation)\ .filter(models.ProjectUserRelation.project_id == project_id)\ .filter(models.ProjectUserRelation.user_id == user_id).delete() db.commit() db.flush() def set_user_to_admin(db: Session,user_id: int,project_id: int): project: models.Project = db.query(models.Project).filter(models.Project.id == project_id).first() if not project: raise Exception('项目不存在') check_user = db.query(models.Users).filter(models.Users.id == user_id).first() if not check_user: raise Exception('用户不存在') realtion: models.ProjectUserRelation = db.query(models.ProjectUserRelation)\ .filter(models.ProjectUserRelation.project_id == project_id)\ .filter(models.ProjectUserRelation.user_id == user_id).first() if not realtion: raise Exception('用户未加入项目,不可设为管理员') admin_role_id = 2 if project.type == 0 else 4 admin_realtion: models.ProjectUserRelation = db.query(models.ProjectUserRelation)\ .filter(models.ProjectUserRelation.project_id == project_id)\ .filter(models.ProjectUserRelation.role_id == admin_role_id).first() if admin_realtion: raise Exception('项目已存在管理员,请先移除原先管理员在设置新的管理员') realtion.role_id = admin_role_id db.commit() db.flush() db.refresh(realtion) return realtion def remove_user_to_admin(db: Session,user_id: int,project_id: int): project: models.Project = db.query(models.Project).filter(models.Project.id == project_id).first() if not project: raise Exception('项目不存在') check_user = db.query(models.Users).filter(models.Users.id == user_id).first() if not check_user: raise Exception('用户不存在') realtion: models.ProjectUserRelation = db.query(models.ProjectUserRelation)\ .filter(models.ProjectUserRelation.project_id == project_id)\ .filter(models.ProjectUserRelation.user_id == user_id).first() if not realtion: raise Exception('用户未加入项目') admin_role_id = 3 if project.type == 0 else 5 realtion.role_id = admin_role_id db.commit() db.flush() db.refresh(realtion) return realtion