project.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. import time
  2. import uuid
  3. from typing import List
  4. from app import models, schemas
  5. from sqlalchemy.orm import Session
  6. from itsdangerous import BadSignature, SignatureExpired
  7. from itsdangerous import JSONWebSignatureSerializer as jws
  8. from constants.constants import SECRET_KEY
  9. from configs.globals import g
  10. # 生成新的project_token
  11. def generate_item_token(project: models.Project):
  12. s = jws(SECRET_KEY)
  13. return s.dumps({'project_id': project.id,'type':project.type}).decode('utf-8')
  14. # 验证Token方法
  15. def verify_item_token(db: Session, token: str):
  16. s = jws(SECRET_KEY)
  17. try:
  18. data = s.loads(token)
  19. except SignatureExpired:
  20. raise Exception("item_token验证失败")
  21. except BadSignature:
  22. raise Exception("item_token验证失败")
  23. project: models.Project = db.query(models.Project).filter(models.Project.id == data['project_id']).first()
  24. return project
  25. def create_project(db: Session, item: schemas.ProjectCreate):
  26. # 创建项目
  27. check_1 = db.query(models.Project).filter(models.Project.name == item.name).first()
  28. if check_1:
  29. raise Exception('项目名重复')
  30. project_code = str(uuid.uuid1()).replace('-','')
  31. db_item = models.Project(**{
  32. 'name': item.name,
  33. 'code': project_code,
  34. 'type': 1,
  35. 'user_id': g.user_id,
  36. 'create_time': int(time.time())
  37. })
  38. db.add(db_item)
  39. db.commit()
  40. db.refresh(db_item)
  41. project_token = generate_item_token(db_item)
  42. db_item.project_token = project_token
  43. db.commit()
  44. db.flush()
  45. db.refresh(db_item)
  46. project_dict = db_item.to_dict()
  47. # 创建超级管理员与项目的关系
  48. relation = models.ProjectUserRelation(**{
  49. 'user_id': g.user_id,
  50. 'project_id': db_item.id,
  51. 'role_id': 1,
  52. })
  53. db.add(relation)
  54. db.commit()
  55. db.refresh(relation)
  56. return project_dict
  57. def update_project(db: Session,item: schemas.ProjectUpdate):
  58. db_item: models.Project = db.query(models.Project).filter(models.Project.id == g.project_id).first()
  59. if not db_item:
  60. raise Exception('项目不存在')
  61. db_item.name = item.name
  62. db.commit()
  63. db.flush()
  64. db.refresh(db_item)
  65. return db_item
  66. def get_project_info(db: Session, id: int):
  67. project: models.Project = db.query(models.Project)\
  68. .filter(models.Project.id == id).first()
  69. if not project:
  70. raise Exception('该项目不存在')
  71. return project
  72. def switch_project_by_user(db: Session,project_id: int,user_id: int):
  73. project: models.Project = db.query(models.Project)\
  74. .filter(models.Project.id == project_id).first()
  75. if not project:
  76. raise Exception('项目不存在')
  77. relation: models.ProjectUserRelation = db.query(models.ProjectUserRelation)\
  78. .filter(models.ProjectUserRelation.project_id == project_id)\
  79. .filter(models.ProjectUserRelation.user_id == user_id).first()
  80. role: models.Roles = db.query(models.Roles)\
  81. .filter(models.Roles.id == relation.role_id).first()
  82. project_token = generate_item_token(project)
  83. project.project_token = project_token
  84. db.commit()
  85. db.flush()
  86. db.refresh(project)
  87. item = project.to_dict()
  88. item.update({
  89. 'role_id': role.id,
  90. 'role_name': role.name,
  91. 'role_code': role.code,
  92. })
  93. return item
  94. def get_project_list(db: Session, user_id: int):
  95. relations = get_relations_by_user(db, user_id)
  96. roles: List[models.Roles] = db.query(models.Roles).all()
  97. id_to_role = {r.id:r for r in roles}
  98. id_to_relation = { r.project_id:r for r in relations }
  99. projects: List[models.Project] = db.query(models.Project)\
  100. .filter(models.Project.id.in_(id_to_relation.keys())).all()
  101. res = []
  102. for project in projects:
  103. item = project.to_dict()
  104. role = id_to_role[id_to_relation[project.id].role_id]
  105. item.update({
  106. 'role_id': role.id,
  107. 'role_name': role.name,
  108. 'role_code': role.code,
  109. })
  110. res.append(item)
  111. return res
  112. def get_share_projects(db: Session):
  113. projects: List[models.Project] = db.query(models.Project)\
  114. .filter(models.Project.type==1).all()
  115. return projects
  116. def get_relations_by_user(db: Session, user_id: int):
  117. relations: List[models.ProjectUserRelation] = db.query(models.ProjectUserRelation)\
  118. .filter(models.ProjectUserRelation.user_id == user_id).all()
  119. return relations
  120. def get_relations_by_project(db: Session, project_id: int):
  121. relations: List[models.ProjectUserRelation] = db.query(models.ProjectUserRelation)\
  122. .filter(models.ProjectUserRelation.project_id == project_id).all()
  123. return relations