123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- import time
- import uuid
- from typing import List
- from app import models, schemas
- from sqlalchemy.orm import Session
- from werkzeug.security import check_password_hash, generate_password_hash
- from configs.globals import g
- from itsdangerous import BadSignature, SignatureExpired
- from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
- from app.utils.utils import decode_base64
- from constants.constants import SECRET_KEY
- def verify_password(password_hash, password):
- return check_password_hash(password_hash, password)
- # 生成新的user_token
- def generate_user_token(user: models.Users, expiration=3600 * 5):
- s = Serializer(SECRET_KEY, expires_in=expiration)
- return s.dumps({'user_id': user.id,'password':user.password}).decode('utf-8')
- # 验证Token方法
- def verify_user_token(db: Session, token: str):
- s = Serializer(SECRET_KEY)
- try:
- data = s.loads(token)
- except SignatureExpired:
- raise Exception("user_token验证失败")
- except BadSignature:
- raise Exception("user_token验证失败")
- user: models.Users = db.query(models.Users).filter(models.Users.id == data['user_id']).first()
- if not user:
- raise Exception("不存在此用户")
- if not user.password == data['password']:
- raise Exception("密码已被修改,请重新登陆")
- return user
- def create_users(db: Session, item: schemas.UsersCreate):
- check_1 = db.query(models.Users).filter(models.Users.name == item.name).first()
- if check_1:
- raise Exception('该用户已存在')
- check_2 = db.query(models.Users).filter(models.Users.username == item.username).first()
- if check_2:
- raise Exception('该账号已存在')
- item.password = decode_base64(item.password)
- user_code = str(uuid.uuid1()).replace('-','')
- db_item = models.Users(**{
- 'name': item.name,
- 'code': user_code,
- 'username': item.username,
- 'password': generate_password_hash(bytes(item.password.encode('utf-8'))),
- 'create_time': int(time.time()),
- })
- db.add(db_item)
- db.commit()
- db.refresh(db_item)
- return db_item
- def login(db: Session, item: schemas.LoginBase):
- user: models.Users = db.query(models.Users).filter(models.Users.username == item.username).first()
- if not user:
- raise Exception('不存在此账号')
- item.password = decode_base64(item.password)
- if not verify_password(user.password,item.password):
- raise Exception('密码错误')
- auth_token = generate_user_token(user)
- user.user_token = auth_token
- db.commit()
- db.flush()
- db.refresh(user)
- res = user.to_dict()
- res.update({'auth_token':auth_token})
- return res
- def get_users_by_project(db: Session, project_id: int):
- relation: List[models.ProjectUserRelation] = db.query(models.ProjectUserRelation)\
- .filter(models.ProjectUserRelation.project_id == project_id).all()
- user_role = { r.user_id:r.role_id for r in relation}
- user_ids = list(user_role.keys())
- roles: List[models.Roles] = db.query(models.Roles).all()
- id_role = { r.id:r for r in roles}
- users: List[models.Users] = db.query(models.Users)\
- .filter(models.Users.id.in_(user_ids)).all()
- res = []
- for user in users:
- if user.id == 1: continue
- item = user.to_dict()
- item.update({'role_name':id_role[user_role[user.id]].name})
- res.append(item)
- return res
- def retrieve_users_by_project(db: Session, project_id: int):
- relation: List[models.ProjectUserRelation] = db.query(models.ProjectUserRelation)\
- .filter(models.ProjectUserRelation.project_id == project_id).all()
- now_user_ids = [ r.user_id for r in relation ]
- users: List[models.Users] = db.query(models.Users)\
- .filter(models.Users.id.notin_(now_user_ids)).all()
- return users
- def add_users_to_project(db: Session, user_ids: List[int], project_id: int):
- project: models.Project = db.query(models.Project).filter(models.Project.id == project_id).first()
- if not project:
- raise Exception('项目不存在')
- exist_users = []
- for user_id in user_ids:
- check_user = db.query(models.Users).filter(models.Users.id == user_id).first()
- if not check_user:
- raise Exception('用户不存在')
- realtion = db.query(models.ProjectUserRelation)\
- .filter(models.ProjectUserRelation.project_id == project_id)\
- .filter(models.ProjectUserRelation.user_id == user_id).first()
- if realtion:
- raise Exception('用户已加入项目,不可二次加入')
- exist_users.append(user_id)
- for user_id in exist_users:
- db_item = models.ProjectUserRelation(**{
- 'user_id': user_id,
- 'project_id': project_id,
- 'role_id': 3 if project.type == 0 else 5
- })
- db.add(db_item)
- db.commit()
- db.refresh(db_item)
- def remove_project_users(db: Session, user_id: int, project_id: int):
- project: models.Project = db.query(models.Project).filter(models.Project.id == project_id).first()
- if not project:
- raise Exception('项目不存在')
- check_user = db.query(models.Users).filter(models.Users.id == user_id).first()
- if not check_user:
- raise Exception('用户不存在')
- realtion: models.ProjectUserRelation = db.query(models.ProjectUserRelation)\
- .filter(models.ProjectUserRelation.project_id == project_id)\
- .filter(models.ProjectUserRelation.user_id == user_id).first()
- if not realtion:
- raise Exception('用户未加入此项目,不可移除')
- if realtion.user_id == g.user_id:
- raise Exception('用户不能移除自己')
- if realtion.role_id in [2,4]:
- raise Exception('该用户为项目管理员,不可直接移除')
- db.query(models.ProjectUserRelation)\
- .filter(models.ProjectUserRelation.project_id == project_id)\
- .filter(models.ProjectUserRelation.user_id == user_id).delete()
- db.commit()
- db.flush()
- def set_user_to_admin(db: Session,user_id: int,project_id: int):
- project: models.Project = db.query(models.Project).filter(models.Project.id == project_id).first()
- if not project:
- raise Exception('项目不存在')
- check_user = db.query(models.Users).filter(models.Users.id == user_id).first()
- if not check_user:
- raise Exception('用户不存在')
- realtion: models.ProjectUserRelation = db.query(models.ProjectUserRelation)\
- .filter(models.ProjectUserRelation.project_id == project_id)\
- .filter(models.ProjectUserRelation.user_id == user_id).first()
- if not realtion:
- raise Exception('用户未加入项目,不可设为管理员')
- admin_role_id = 2 if project.type == 0 else 4
- admin_realtion: models.ProjectUserRelation = db.query(models.ProjectUserRelation)\
- .filter(models.ProjectUserRelation.project_id == project_id)\
- .filter(models.ProjectUserRelation.role_id == admin_role_id).first()
- if admin_realtion:
- raise Exception('项目已存在管理员,请先移除原先管理员在设置新的管理员')
- realtion.role_id = admin_role_id
- db.commit()
- db.flush()
- db.refresh(realtion)
- return realtion
- def remove_user_to_admin(db: Session,user_id: int,project_id: int):
- project: models.Project = db.query(models.Project).filter(models.Project.id == project_id).first()
- if not project:
- raise Exception('项目不存在')
- check_user = db.query(models.Users).filter(models.Users.id == user_id).first()
- if not check_user:
- raise Exception('用户不存在')
- realtion: models.ProjectUserRelation = db.query(models.ProjectUserRelation)\
- .filter(models.ProjectUserRelation.project_id == project_id)\
- .filter(models.ProjectUserRelation.user_id == user_id).first()
- if not realtion:
- raise Exception('用户未加入项目')
- admin_role_id = 3 if project.type == 0 else 5
- realtion.role_id = admin_role_id
- db.commit()
- db.flush()
- db.refresh(realtion)
- return realtion
|