123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- import time
- import uuid
- from typing import List
- from app import models, schemas
- from sqlalchemy.orm import Session
- from itsdangerous import BadSignature, SignatureExpired
- from itsdangerous import JSONWebSignatureSerializer as jws
- from constants.constants import SECRET_KEY
- from configs.globals import g
- # 生成新的project_token
- def generate_item_token(project: models.Project):
- s = jws(SECRET_KEY)
- return s.dumps({'project_id': project.id,'type':project.type}).decode('utf-8')
- # 验证Token方法
- def verify_item_token(db: Session, token: str):
- s = jws(SECRET_KEY)
- try:
- data = s.loads(token)
- except SignatureExpired:
- raise Exception("item_token验证失败")
- except BadSignature:
- raise Exception("item_token验证失败")
- project: models.Project = db.query(models.Project).filter(models.Project.id == data['project_id']).first()
- return project
- def create_project(db: Session, item: schemas.ProjectCreate):
- # 创建项目
- check_1 = db.query(models.Project).filter(models.Project.name == item.name).first()
- if check_1:
- raise Exception('项目名重复')
- project_code = str(uuid.uuid1()).replace('-','')
- db_item = models.Project(**{
- 'name': item.name,
- 'code': project_code,
- 'type': 1,
- 'create_time': int(time.time())
- })
- db.add(db_item)
- db.commit()
- db.refresh(db_item)
- project_token = generate_item_token(db_item)
- db_item.project_token = project_token
- db.commit()
- db.flush()
- db.refresh(db_item)
- project_dict = db_item.to_dict()
- # 创建超级管理员与项目的关系
- relation = models.ProjectUserRelation(**{
- 'user_id': g.user_id,
- 'project_id': db_item.id,
- 'role_id': 1,
- })
- db.add(relation)
- db.commit()
- db.refresh(relation)
- return project_dict
- def update_project(db: Session,item: schemas.ProjectUpdate):
- db_item: models.Project = db.query(models.Project).filter(models.Project.id == g.project_id).first()
- if not db_item:
- raise Exception('项目不存在')
- db_item.name = item.name
- db.commit()
- db.flush()
- db.refresh(db_item)
- return db_item
- def get_project_info(db: Session, id: int):
- project: models.Project = db.query(models.Project)\
- .filter(models.Project.id == id).first()
- if not project:
- raise Exception('该项目不存在')
- return project
- def switch_project_by_user(db: Session,project_id: int,user_id: int):
- project: models.Project = db.query(models.Project)\
- .filter(models.Project.id == project_id).first()
- if not project:
- raise Exception('项目不存在')
- relation: models.ProjectUserRelation = db.query(models.ProjectUserRelation)\
- .filter(models.ProjectUserRelation.project_id == project_id)\
- .filter(models.ProjectUserRelation.user_id == user_id).first()
- role: models.Roles = db.query(models.Roles)\
- .filter(models.Roles.id == relation.role_id).first()
- project_token = generate_item_token(project)
- project.project_token = project_token
- db.commit()
- db.flush()
- db.refresh(project)
- item = project.to_dict()
- item.update({
- 'role_id': role.id,
- 'role_name': role.name,
- 'role_code': role.code,
- })
- return item
- def get_project_list(db: Session, user_id: int):
- relations = get_relations_by_user(db, user_id)
- roles: List[models.Roles] = db.query(models.Roles).all()
- id_to_role = {r.id:r for r in roles}
- id_to_relation = { r.project_id:r for r in relations }
- projects: List[models.Project] = db.query(models.Project)\
- .filter(models.Project.id.in_(id_to_relation.keys())).all()
- res = []
- for project in projects:
- item = project.to_dict()
- role = id_to_role[id_to_relation[project.id].role_id]
- item.update({
- 'role_id': role.id,
- 'role_name': role.name,
- 'role_code': role.code,
- })
- res.append(item)
- return res
- def get_share_projects(db: Session):
- projects: List[models.Project] = db.query(models.Project)\
- .filter(models.Project.type==1).all()
- return projects
- def get_relations_by_user(db: Session, user_id: int):
- relations: List[models.ProjectUserRelation] = db.query(models.ProjectUserRelation)\
- .filter(models.ProjectUserRelation.user_id == user_id).all()
- return relations
- def get_relations_by_project(db: Session, project_id: int):
- relations: List[models.ProjectUserRelation] = db.query(models.ProjectUserRelation)\
- .filter(models.ProjectUserRelation.project_id == project_id).all()
- return relations
|