api-admim/app/rbac/classes_customizadas.py

104 lines
3.8 KiB
Python

# import jwt
# from fastapi_users.jwt import decode_jwt, generate_jwt, SecretType
# from fastapi_users.manager import BaseUserManager
# from fastapi_users.authentication.strategy import JWTStrategy
# from fastapi_users import models, exceptions
# from typing import Optional, List
#
#
# class CustomJWTStrategy(JWTStrategy[models.UP, models.ID]):
# def __init__(
# self,
# secret: SecretType,
# lifetime_seconds: Optional[int],
# token_audience: List[str] = ["fastapi-users:auth"],
# algorithm: str = "HS256",
# public_key: Optional[SecretType] = None
# ):
# super().__init__(secret, lifetime_seconds, token_audience, algorithm, public_key)
#
# async def write_token(self, user: models.UP) -> str:
# todas_as_permissoes = set()
# if hasattr(user, 'papeis'):
# for papel in user.papeis:
# if hasattr(papel, 'permissoes'):
# todas_as_permissoes.update(permissao.id for permissao in papel.permissoes)
#
# data = {
# "sub": str(user.id),
# "permissions": list(todas_as_permissoes),
# "aud": self.token_audience,
# }
# return generate_jwt(data, self.encode_key, self.lifetime_seconds, algorithm=self.algorithm)
#
# async def read_token(
# self, token: Optional[str], user_manager: BaseUserManager[models.UP, models.ID]
# ) -> Optional[models.UP]:
# if token is None:
# return None
#
# try:
# data = decode_jwt(
# token, self.decode_key, self.token_audience, algorithms=[self.algorithm]
# )
# user_id = data.get("sub")
# if user_id is None:
# return None
#
# permissions = data.get("permissions", [])
#
# except jwt.PyJWTError:
# return None
#
# try:
# parsed_id = user_manager.parse_id(user_id)
# user = await user_manager.get(parsed_id)
# if user:
# user.permissions = permissions
# return user
# except (exceptions.UserNotExists, exceptions.InvalidID):
# return None
import jwt
from fastapi_users.authentication import JWTStrategy
from fastapi_users.jwt import generate_jwt, decode_jwt
from fastapi_users import exceptions
from app.config import SECRET
class CustomJWTStrategy(JWTStrategy):
async def write_token(self, user) -> str:
# Coletar todas as permissões do usuário a partir de seus papéis
todas_as_permissoes = set() # Usamos um set para evitar duplicatas
if hasattr(user, 'papeis'):
for papel in user.papeis:
if hasattr(papel, 'permissoes'):
for permissao in papel.permissoes:
todas_as_permissoes.add(permissao.id) # Usar o ID da permissão
# Transformar o set em uma lista para o payload do token
lista_de_permissoes = list(todas_as_permissoes)
print("user id")
print(user.id)
# Aqui, adicionamos as claims personalizadas ao payload
data = {
"sub": str(user.id),
"permissions": lista_de_permissoes, # Acessa diretamente a lista de IDs de permissões coletadas
"aud": self.token_audience, # Audiência, conforme o padrão
}
token = generate_jwt(
data, self.encode_key, self.lifetime_seconds, algorithm=self.algorithm
)
return token
async def read_token(self, token: str, user_manager):
# Decodifica o token JWT usando a função padrão decode_jwt do fastapi_users
try:
payload = decode_jwt(token, SECRET, audience=self.token_audience)
return payload
except Exception as e:
raise ValueError(f"Token inválido: {str(e)}")