|
@@ -0,0 +1,175 @@
|
|
|
|
+import time
|
|
|
|
+import uuid
|
|
|
|
+from typing import List
|
|
|
|
+from app import models, schemas
|
|
|
|
+from sqlalchemy.orm import Session
|
|
|
|
+from werkzeug.security import check_password_hash, generate_password_hash
|
|
|
|
+from itsdangerous import BadSignature, SignatureExpired
|
|
|
|
+from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
|
|
|
|
+from constants.constants import SECRET_KEY
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def verify_password(password_hash, password):
|
|
|
|
+ return check_password_hash(password_hash, password)
|
|
|
|
+
|
|
|
|
+# 生成新的user_token
|
|
|
|
+def generate_user_token(user: models.Users, expiration=3600 * 5):
|
|
|
|
+ s = Serializer(SECRET_KEY, expires_in=expiration)
|
|
|
|
+ return s.dumps({'user_id': user.id,'password':user.password}).decode('utf-8')
|
|
|
|
+
|
|
|
|
+# 验证Token方法
|
|
|
|
+def verify_user_token(db: Session, token: str):
|
|
|
|
+ s = Serializer(SECRET_KEY)
|
|
|
|
+ try:
|
|
|
|
+ data = s.loads(token)
|
|
|
|
+ except SignatureExpired:
|
|
|
|
+ raise Exception("user_token验证失败")
|
|
|
|
+ except BadSignature:
|
|
|
|
+ raise Exception("user_token验证失败")
|
|
|
|
+ user: models.Users = db.query(models.Users).filter(models.Users.id == data['user_id']).first()
|
|
|
|
+ if not user:
|
|
|
|
+ raise Exception("不存在此用户")
|
|
|
|
+ if not user.password == data['password']:
|
|
|
|
+ raise Exception("密码已被修改,请重新登陆")
|
|
|
|
+ return user
|
|
|
|
+
|
|
|
|
+def create_users(db: Session, item: schemas.UsersCreate):
|
|
|
|
+ check_1 = db.query(models.Users).filter(models.Users.name == item.name).first()
|
|
|
|
+ if check_1:
|
|
|
|
+ raise Exception('该用户已存在')
|
|
|
|
+ check_2 = db.query(models.Users).filter(models.Users.username == item.username).first()
|
|
|
|
+ if check_2:
|
|
|
|
+ raise Exception('该账号已存在')
|
|
|
|
+
|
|
|
|
+ user_code = str(uuid.uuid1()).replace('-','')
|
|
|
|
+ db_item = models.Users(**{
|
|
|
|
+ 'name': item.name,
|
|
|
|
+ 'code': user_code,
|
|
|
|
+ 'username': item.username,
|
|
|
|
+ 'password': generate_password_hash(bytes(item.password.encode('utf-8'))),
|
|
|
|
+ 'create_time': int(time.time())
|
|
|
|
+ })
|
|
|
|
+ db.add(db_item)
|
|
|
|
+ db.commit()
|
|
|
|
+ db.refresh(db_item)
|
|
|
|
+ return db_item
|
|
|
|
+
|
|
|
|
+def login(db, item: schemas.LoginBase):
|
|
|
|
+ user: models.Users = db.query(models.Users).filter(models.Users.username == item.username).first()
|
|
|
|
+ if not user:
|
|
|
|
+ raise Exception('不存在此账号')
|
|
|
|
+ if not verify_password(user.password,item.password):
|
|
|
|
+ raise Exception('密码错误')
|
|
|
|
+ auth_token = generate_user_token(user)
|
|
|
|
+ res = user.to_dict()
|
|
|
|
+ res.update({'auth_token':auth_token})
|
|
|
|
+ return res
|
|
|
|
+
|
|
|
|
+def get_users_by_project(db: Session, project_id: int):
|
|
|
|
+ relation: List[models.ProjectUserRelation] = db.query(models.ProjectUserRelation)\
|
|
|
|
+ .filter(models.ProjectUserRelation.project_id == project_id).all()
|
|
|
|
+ user_role = { r.user_id:r.role_id for r in relation}
|
|
|
|
+ user_ids = list(user_role.keys())
|
|
|
|
+ roles: List[models.Roles] = db.query(models.Roles).all()
|
|
|
|
+ id_role = { r.id:r for r in roles}
|
|
|
|
+ users: List[models.Users] = db.query(models.Users)\
|
|
|
|
+ .filter(models.Users.id.in_(user_ids)).all()
|
|
|
|
+ res = []
|
|
|
|
+ for user in users:
|
|
|
|
+ if user.id == 1 and project_id != 1: continue
|
|
|
|
+ item = user.to_dict()
|
|
|
|
+ item.update({'role_name':id_role[user_role[user.id]].name})
|
|
|
|
+ res.append(item)
|
|
|
|
+ return res
|
|
|
|
+
|
|
|
|
+def retrieve_users_by_project(db: Session, project_id: int):
|
|
|
|
+ relation: List[models.ProjectUserRelation] = db.query(models.ProjectUserRelation)\
|
|
|
|
+ .filter(models.ProjectUserRelation.project_id == project_id).all()
|
|
|
|
+ now_user_ids = [ r.user_id for r in relation ]
|
|
|
|
+ users: List[models.Users] = db.query(models.Users)\
|
|
|
|
+ .filter(models.Users.id.notin_(now_user_ids)).all()
|
|
|
|
+ return users
|
|
|
|
+
|
|
|
|
+def add_users_to_project(db: Session, user_ids: List[int], project_id: int):
|
|
|
|
+ project: models.Project = db.query(models.Project).filter(models.Project.id == project_id).first()
|
|
|
|
+ if not project:
|
|
|
|
+ raise Exception('项目不存在')
|
|
|
|
+ exist_users = []
|
|
|
|
+ for user_id in user_ids:
|
|
|
|
+ check_user = db.query(models.Users).filter(models.Users.id == user_id).first()
|
|
|
|
+ if not check_user:
|
|
|
|
+ raise Exception('用户不存在')
|
|
|
|
+ realtion = db.query(models.ProjectUserRelation)\
|
|
|
|
+ .filter(models.ProjectUserRelation.project_id == project_id)\
|
|
|
|
+ .filter(models.ProjectUserRelation.user_id == user_id).first()
|
|
|
|
+ if realtion:
|
|
|
|
+ raise Exception('用户已加入项目,不可二次加入')
|
|
|
|
+ exist_users.append(user_id)
|
|
|
|
+ for user_id in exist_users:
|
|
|
|
+ db_item = models.ProjectUserRelation(**{
|
|
|
|
+ 'user_id': user_id,
|
|
|
|
+ 'project_id': project_id,
|
|
|
|
+ 'role_id': 3 if project.type == 0 else 5
|
|
|
|
+ })
|
|
|
|
+ db.add(db_item)
|
|
|
|
+ db.commit()
|
|
|
|
+ db.refresh(db_item)
|
|
|
|
+
|
|
|
|
+def remove_project_users(db: Session, user_id: int, project_id: int):
|
|
|
|
+ project: models.Project = db.query(models.Project).filter(models.Project.id == project_id).first()
|
|
|
|
+ if not project:
|
|
|
|
+ raise Exception('项目不存在')
|
|
|
|
+ check_user = db.query(models.Users).filter(models.Users.id == user_id).first()
|
|
|
|
+ if not check_user:
|
|
|
|
+ raise Exception('用户不存在')
|
|
|
|
+ realtion = db.query(models.ProjectUserRelation)\
|
|
|
|
+ .filter(models.ProjectUserRelation.project_id == project_id)\
|
|
|
|
+ .filter(models.ProjectUserRelation.user_id == user_id).first()
|
|
|
|
+ if not realtion:
|
|
|
|
+ raise Exception('用户未加入此项目,不可移除')
|
|
|
|
+ db.query(models.ProjectUserRelation)\
|
|
|
|
+ .filter(models.ProjectUserRelation.project_id == project_id)\
|
|
|
|
+ .filter(models.ProjectUserRelation.user_id == user_id).delete()
|
|
|
|
+
|
|
|
|
+def set_user_to_admin(db: Session,user_id: int,project_id: int):
|
|
|
|
+ project: models.Project = db.query(models.Project).filter(models.Project.id == project_id).first()
|
|
|
|
+ if not project:
|
|
|
|
+ raise Exception('项目不存在')
|
|
|
|
+ check_user = db.query(models.Users).filter(models.Users.id == user_id).first()
|
|
|
|
+ if not check_user:
|
|
|
|
+ raise Exception('用户不存在')
|
|
|
|
+ realtion: models.ProjectUserRelation = db.query(models.ProjectUserRelation)\
|
|
|
|
+ .filter(models.ProjectUserRelation.project_id == project_id)\
|
|
|
|
+ .filter(models.ProjectUserRelation.user_id == user_id).first()
|
|
|
|
+ if not realtion:
|
|
|
|
+ raise Exception('用户未加入项目,不可设为管理员')
|
|
|
|
+ admin_role_id = 2 if project.type == 0 else 4
|
|
|
|
+ admin_realtion: models.ProjectUserRelation = db.query(models.ProjectUserRelation)\
|
|
|
|
+ .filter(models.ProjectUserRelation.project_id == project_id)\
|
|
|
|
+ .filter(models.ProjectUserRelation.role_id == admin_role_id).first()
|
|
|
|
+ if admin_realtion:
|
|
|
|
+ raise Exception('项目已存在管理员,请先移除原先管理员在设置新的管理员')
|
|
|
|
+ realtion.role_id = admin_role_id
|
|
|
|
+ db.commit()
|
|
|
|
+ db.flush()
|
|
|
|
+ db.refresh(realtion)
|
|
|
|
+ return realtion
|
|
|
|
+
|
|
|
|
+def remove_user_to_admin(db: Session,user_id: int,project_id: int):
|
|
|
|
+ project: models.Project = db.query(models.Project).filter(models.Project.id == project_id).first()
|
|
|
|
+ if not project:
|
|
|
|
+ raise Exception('项目不存在')
|
|
|
|
+ check_user = db.query(models.Users).filter(models.Users.id == user_id).first()
|
|
|
|
+ if not check_user:
|
|
|
|
+ raise Exception('用户不存在')
|
|
|
|
+ realtion: models.ProjectUserRelation = db.query(models.ProjectUserRelation)\
|
|
|
|
+ .filter(models.ProjectUserRelation.project_id == project_id)\
|
|
|
|
+ .filter(models.ProjectUserRelation.user_id == user_id).first()
|
|
|
|
+ if not realtion:
|
|
|
|
+ raise Exception('用户未加入项目')
|
|
|
|
+ admin_role_id = 3 if project.type == 0 else 5
|
|
|
|
+ realtion.role_id = admin_role_id
|
|
|
|
+ db.commit()
|
|
|
|
+ db.flush()
|
|
|
|
+ db.refresh(realtion)
|
|
|
|
+ return realtion
|