users.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. import time
  2. import uuid
  3. from typing import List
  4. from app import models, schemas
  5. from sqlalchemy.orm import Session
  6. from werkzeug.security import check_password_hash, generate_password_hash
  7. from itsdangerous import BadSignature, SignatureExpired
  8. from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
  9. from app.utils.utils import decode_base64
  10. from constants.constants import SECRET_KEY
  11. def verify_password(password_hash, password):
  12. return check_password_hash(password_hash, password)
  13. # 生成新的user_token
  14. def generate_user_token(user: models.Users, expiration=3600 * 5):
  15. s = Serializer(SECRET_KEY, expires_in=expiration)
  16. return s.dumps({'user_id': user.id,'password':user.password}).decode('utf-8')
  17. # 验证Token方法
  18. def verify_user_token(db: Session, token: str):
  19. s = Serializer(SECRET_KEY)
  20. try:
  21. data = s.loads(token)
  22. except SignatureExpired:
  23. raise Exception("user_token验证失败")
  24. except BadSignature:
  25. raise Exception("user_token验证失败")
  26. user: models.Users = db.query(models.Users).filter(models.Users.id == data['user_id']).first()
  27. if not user:
  28. raise Exception("不存在此用户")
  29. if not user.password == data['password']:
  30. raise Exception("密码已被修改,请重新登陆")
  31. return user
  32. def create_users(db: Session, item: schemas.UsersCreate):
  33. check_1 = db.query(models.Users).filter(models.Users.name == item.name).first()
  34. if check_1:
  35. raise Exception('该用户已存在')
  36. check_2 = db.query(models.Users).filter(models.Users.username == item.username).first()
  37. if check_2:
  38. raise Exception('该账号已存在')
  39. item.password = decode_base64(item.password)
  40. user_code = str(uuid.uuid1()).replace('-','')
  41. db_item = models.Users(**{
  42. 'name': item.name,
  43. 'code': user_code,
  44. 'username': item.username,
  45. 'password': generate_password_hash(bytes(item.password.encode('utf-8'))),
  46. 'create_time': int(time.time())
  47. })
  48. db.add(db_item)
  49. db.commit()
  50. db.refresh(db_item)
  51. return db_item
  52. def login(db: Session, item: schemas.LoginBase):
  53. user: models.Users = db.query(models.Users).filter(models.Users.username == item.username).first()
  54. if not user:
  55. raise Exception('不存在此账号')
  56. item.password = decode_base64(item.password)
  57. if not verify_password(user.password,item.password):
  58. raise Exception('密码错误')
  59. auth_token = generate_user_token(user)
  60. res = user.to_dict()
  61. res.update({'auth_token':auth_token})
  62. return res
  63. def get_users_by_project(db: Session, project_id: int):
  64. relation: List[models.ProjectUserRelation] = db.query(models.ProjectUserRelation)\
  65. .filter(models.ProjectUserRelation.project_id == project_id).all()
  66. user_role = { r.user_id:r.role_id for r in relation}
  67. user_ids = list(user_role.keys())
  68. roles: List[models.Roles] = db.query(models.Roles).all()
  69. id_role = { r.id:r for r in roles}
  70. users: List[models.Users] = db.query(models.Users)\
  71. .filter(models.Users.id.in_(user_ids)).all()
  72. res = []
  73. for user in users:
  74. if user.id == 1: continue
  75. item = user.to_dict()
  76. item.update({'role_name':id_role[user_role[user.id]].name})
  77. res.append(item)
  78. return res
  79. def retrieve_users_by_project(db: Session, project_id: int):
  80. relation: List[models.ProjectUserRelation] = db.query(models.ProjectUserRelation)\
  81. .filter(models.ProjectUserRelation.project_id == project_id).all()
  82. now_user_ids = [ r.user_id for r in relation ]
  83. users: List[models.Users] = db.query(models.Users)\
  84. .filter(models.Users.id.notin_(now_user_ids)).all()
  85. return users
  86. def add_users_to_project(db: Session, user_ids: List[int], project_id: int):
  87. project: models.Project = db.query(models.Project).filter(models.Project.id == project_id).first()
  88. if not project:
  89. raise Exception('项目不存在')
  90. exist_users = []
  91. for user_id in user_ids:
  92. check_user = db.query(models.Users).filter(models.Users.id == user_id).first()
  93. if not check_user:
  94. raise Exception('用户不存在')
  95. realtion = db.query(models.ProjectUserRelation)\
  96. .filter(models.ProjectUserRelation.project_id == project_id)\
  97. .filter(models.ProjectUserRelation.user_id == user_id).first()
  98. if realtion:
  99. raise Exception('用户已加入项目,不可二次加入')
  100. exist_users.append(user_id)
  101. for user_id in exist_users:
  102. db_item = models.ProjectUserRelation(**{
  103. 'user_id': user_id,
  104. 'project_id': project_id,
  105. 'role_id': 3 if project.type == 0 else 5
  106. })
  107. db.add(db_item)
  108. db.commit()
  109. db.refresh(db_item)
  110. def remove_project_users(db: Session, user_id: int, project_id: int):
  111. project: models.Project = db.query(models.Project).filter(models.Project.id == project_id).first()
  112. if not project:
  113. raise Exception('项目不存在')
  114. check_user = db.query(models.Users).filter(models.Users.id == user_id).first()
  115. if not check_user:
  116. raise Exception('用户不存在')
  117. realtion = db.query(models.ProjectUserRelation)\
  118. .filter(models.ProjectUserRelation.project_id == project_id)\
  119. .filter(models.ProjectUserRelation.user_id == user_id).first()
  120. if not realtion:
  121. raise Exception('用户未加入此项目,不可移除')
  122. db.query(models.ProjectUserRelation)\
  123. .filter(models.ProjectUserRelation.project_id == project_id)\
  124. .filter(models.ProjectUserRelation.user_id == user_id).delete()
  125. def set_user_to_admin(db: Session,user_id: int,project_id: int):
  126. project: models.Project = db.query(models.Project).filter(models.Project.id == project_id).first()
  127. if not project:
  128. raise Exception('项目不存在')
  129. check_user = db.query(models.Users).filter(models.Users.id == user_id).first()
  130. if not check_user:
  131. raise Exception('用户不存在')
  132. realtion: models.ProjectUserRelation = db.query(models.ProjectUserRelation)\
  133. .filter(models.ProjectUserRelation.project_id == project_id)\
  134. .filter(models.ProjectUserRelation.user_id == user_id).first()
  135. if not realtion:
  136. raise Exception('用户未加入项目,不可设为管理员')
  137. admin_role_id = 2 if project.type == 0 else 4
  138. admin_realtion: models.ProjectUserRelation = db.query(models.ProjectUserRelation)\
  139. .filter(models.ProjectUserRelation.project_id == project_id)\
  140. .filter(models.ProjectUserRelation.role_id == admin_role_id).first()
  141. if admin_realtion:
  142. raise Exception('项目已存在管理员,请先移除原先管理员在设置新的管理员')
  143. realtion.role_id = admin_role_id
  144. db.commit()
  145. db.flush()
  146. db.refresh(realtion)
  147. return realtion
  148. def remove_user_to_admin(db: Session,user_id: int,project_id: int):
  149. project: models.Project = db.query(models.Project).filter(models.Project.id == project_id).first()
  150. if not project:
  151. raise Exception('项目不存在')
  152. check_user = db.query(models.Users).filter(models.Users.id == user_id).first()
  153. if not check_user:
  154. raise Exception('用户不存在')
  155. realtion: models.ProjectUserRelation = db.query(models.ProjectUserRelation)\
  156. .filter(models.ProjectUserRelation.project_id == project_id)\
  157. .filter(models.ProjectUserRelation.user_id == user_id).first()
  158. if not realtion:
  159. raise Exception('用户未加入项目')
  160. admin_role_id = 3 if project.type == 0 else 5
  161. realtion.role_id = admin_role_id
  162. db.commit()
  163. db.flush()
  164. db.refresh(realtion)
  165. return realtion