import time import uuid from typing import List from app import models, schemas from sqlalchemy.orm import Session from itsdangerous import BadSignature, SignatureExpired from itsdangerous import JSONWebSignatureSerializer as jws from constants.constants import SECRET_KEY from configs.globals import g # 生成新的project_token def generate_item_token(project: models.Project): s = jws(SECRET_KEY) return s.dumps({'project_id': project.id,'type':project.type}).decode('utf-8') # 验证Token方法 def verify_item_token(db: Session, token: str): s = jws(SECRET_KEY) try: data = s.loads(token) except SignatureExpired: raise Exception("item_token验证失败") except BadSignature: raise Exception("item_token验证失败") project: models.Project = db.query(models.Project).filter(models.Project.id == data['project_id']).first() return project def create_project(db: Session, item: schemas.ProjectCreate): # 创建项目 check_1 = db.query(models.Project).filter(models.Project.name == item.name).first() if check_1: raise Exception('项目名重复') project_code = str(uuid.uuid1()).replace('-','') db_item = models.Project(**{ 'name': item.name, 'code': project_code, 'type': 1, 'create_time': int(time.time()) }) db.add(db_item) db.commit() db.refresh(db_item) project_token = generate_item_token(db_item) db_item.project_token = project_token db.commit() db.flush() db.refresh(db_item) project_dict = db_item.to_dict() # 创建超级管理员与项目的关系 relation = models.ProjectUserRelation(**{ 'user_id': g.user_id, 'project_id': db_item.id, 'role_id': 1, }) db.add(relation) db.commit() db.refresh(relation) return project_dict def update_project(db: Session,item: schemas.ProjectUpdate): db_item: models.Project = db.query(models.Project).filter(models.Project.id == g.project_id).first() if not db_item: raise Exception('项目不存在') db_item.name = item.name db.commit() db.flush() db.refresh(db_item) return db_item def get_project_info(db: Session, id: int): project: models.Project = db.query(models.Project)\ .filter(models.Project.id == id).first() if not project: raise Exception('该项目不存在') return project def switch_project_by_user(db: Session,project_id: int,user_id: int): project: models.Project = db.query(models.Project)\ .filter(models.Project.id == project_id).first() if not project: raise Exception('项目不存在') relation: models.ProjectUserRelation = db.query(models.ProjectUserRelation)\ .filter(models.ProjectUserRelation.project_id == project_id)\ .filter(models.ProjectUserRelation.user_id == user_id).first() role: models.Roles = db.query(models.Roles)\ .filter(models.Roles.id == relation.role_id).first() project_token = generate_item_token(project) project.project_token = project_token db.commit() db.flush() db.refresh(project) item = project.to_dict() item.update({ 'role_id': role.id, 'role_name': role.name, 'role_code': role.code, }) return item def get_project_list(db: Session, user_id: int): relations = get_relations_by_user(db, user_id) roles: List[models.Roles] = db.query(models.Roles).all() id_to_role = {r.id:r for r in roles} id_to_relation = { r.project_id:r for r in relations } projects: List[models.Project] = db.query(models.Project)\ .filter(models.Project.id.in_(id_to_relation.keys())).all() res = [] for project in projects: item = project.to_dict() role = id_to_role[id_to_relation[project.id].role_id] item.update({ 'role_id': role.id, 'role_name': role.name, 'role_code': role.code, }) res.append(item) return res def get_share_projects(db: Session): projects: List[models.Project] = db.query(models.Project)\ .filter(models.Project.type==1).all() return projects def get_relations_by_user(db: Session, user_id: int): relations: List[models.ProjectUserRelation] = db.query(models.ProjectUserRelation)\ .filter(models.ProjectUserRelation.user_id == user_id).all() return relations def get_relations_by_project(db: Session, project_id: int): relations: List[models.ProjectUserRelation] = db.query(models.ProjectUserRelation)\ .filter(models.ProjectUserRelation.project_id == project_id).all() return relations