api-admim/app/multi_tenant/onboarding/novo_inquilino_service.py

469 lines
18 KiB
Python

# from __future__ import annotations
#
# import os
# import asyncio
# import subprocess
# from dataclasses import dataclass
# from typing import Optional
#
# from app.multi_tenant.criar_tenant import tenant_create # sua função já existente
#
#
# class NovoInquilinoError(Exception):
# """Erro de alto nível no fluxo de criação de inquilino."""
#
#
# @dataclass
# class NovoInquilinoResult:
# tenant: str
# status: str
# alembic_stdout: Optional[str] = None
#
#
# async def _run_alembic_for_tenant(tenant_id: str, alembic_ini_path: Optional[str] = None) -> str:
# """
# Executa `alembic -x tenant=<id> upgrade head` para aplicar migrações
# (histórico centralizado na API do sistema).
#
# Implementação compatível com Windows/Uvicorn:
# - usa subprocess.run síncrono dentro de asyncio.to_thread
# """
# cmd = ["alembic"]
# if alembic_ini_path:
# cmd += ["-c", alembic_ini_path]
# cmd += ["-x", f"tenant={tenant_id}", "upgrade", "head"]
#
# def _run_sync():
# # Se precisar garantir diretório, use cwd="..." aqui.
# return subprocess.run(cmd, capture_output=True, text=True, check=True)
#
# try:
# result = await asyncio.to_thread(_run_sync)
# except subprocess.CalledProcessError as e:
# raise NovoInquilinoError(
# f"Erro Alembic (rc={e.returncode})\nSTDOUT:\n{e.stdout}\nSTDERR:\n{e.stderr}"
# ) from e
# except FileNotFoundError as e:
# # 'alembic' não encontrado no PATH/venv
# raise NovoInquilinoError(
# "Comando 'alembic' não encontrado. Verifique se o virtualenv está ativo e se o Alembic está instalado."
# ) from e
# except Exception as e:
# raise NovoInquilinoError(f"Falha inesperada ao executar Alembic: {e}") from e
#
# return result.stdout
#
#
# async def criar_novo_inquilino_service(
# *,
# nome: str,
# email: str,
# password: str,
# cpf_cnpj: str,
# alembic_ini_path_env: str = "ALEMBIC_INI_PATH",
# ) -> NovoInquilinoResult:
# """
# Caso de uso: cria um novo inquilino e roda migrações do schema dele.
# Mantém paridade com o seu script original.
# """
# print("Criar Novo Inquilino")
# try:
# tenant_identifier = await tenant_create(
# nome=nome, email=email, password=password, cpf_cnpj=cpf_cnpj
# )
# except Exception as e:
# # Falha ao criar registro/base do inquilino
# raise NovoInquilinoError(f"Erro ao criar o tenant: {e}") from e
#
# # Executa migrações a partir do histórico central (API do sistema)
# alembic_ini_path = os.getenv(alembic_ini_path_env) # opcional: apontar para outro repo
# stdout = await _run_alembic_for_tenant(str(tenant_identifier), alembic_ini_path)
#
# return NovoInquilinoResult(
# tenant=str(tenant_identifier),
# status="active",
# alembic_stdout=stdout,
# )
############################################################################################################
# app/multi_tenant/onboarding/novo_inquilino_service.py
from __future__ import annotations
import os
import asyncio
import subprocess
from dataclasses import dataclass
from typing import Optional, Dict, Any, List
from contextlib import asynccontextmanager
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from app.multi_tenant.criar_tenant import tenant_create # sua função já existente
from app.database.session import sessionmanager # precisa já estar inicializado no app
# ------------------------------------------------------------------------------
# Debug helpers (compartilhados)
# ------------------------------------------------------------------------------
from app.common.debug import make_dbg, make_dbg_lazy
_DBG_NS = "NovoInquilino" # rótulo curto para logar a origem deste módulo
_dbg = make_dbg(_DBG_NS)
_dbg_lazy = make_dbg_lazy(_DBG_NS)
# ------------------------------------------------------------------------------
# Exceção e resultado de caso de uso
# ------------------------------------------------------------------------------
class NovoInquilinoError(Exception):
"""Erro de alto nível no fluxo de criação de inquilino."""
@dataclass
class NovoInquilinoResult:
tenant: str
status: str
alembic_stdout: Optional[str] = None
# ------------------------------------------------------------------------------
# Helpers de sessão
# ------------------------------------------------------------------------------
@asynccontextmanager
async def _new_session() -> AsyncSession:
"""
Abre uma AsyncSession a partir do sessionmanager (compatível com sua infra).
"""
_dbg("_new_session(): abertura solicitada")
# Muitos session managers expõem um sessionmaker; aqui usamos esse caminho.
sm = sessionmanager.get_sessionmaker()
_dbg("_new_session(): usando get_sessionmaker()")
session: AsyncSession = sm()
_dbg("_new_session(): AsyncSession obtida")
try:
yield session
finally:
await session.close()
_dbg("_new_session(): sessão fechada")
# ------------------------------------------------------------------------------
# Helpers de schema / infra
# ------------------------------------------------------------------------------
def _derive_schema_candidates(tenant_uuid: str, prefix: str = "t_") -> List[str]:
"""
Retorna *todas* as possibilidades de nome de schema que queremos tentar dropar,
na ordem de preferência.
1) t_<uuid_sem_hifen> (ex.: t_01996bc991217d51ab00aacb559f8480)
2) <uuid_com_hifen> (ex.: 01996bcb-9184-7f68-9cf1-b5fe7af7c348) ← usado pelo seu Alembic
"""
uuid_no_dash = tenant_uuid.replace("-", "")
candidates = [
f"{prefix}{uuid_no_dash}",
tenant_uuid,
]
_dbg("_derive_schema_candidates(): tenant_uuid={0} -> {1}", tenant_uuid, candidates)
return candidates
async def _drop_schema(schema_name: str) -> bool:
"""
DROP SCHEMA IF EXISTS "<schema_name>" CASCADE
Retorna True se executou o comando sem erro (mesmo se o schema não existisse).
"""
_dbg('_drop_schema(): início. schema_name={0}', schema_name)
async with _new_session() as s:
await s.execute(text(f'DROP SCHEMA IF EXISTS "{schema_name}" CASCADE'))
await s.commit()
_dbg('_drop_schema(): OK. schema_name={0}', schema_name)
return True
async def _drop_tenant_schemas_all(tenant_uuid: str, schema_prefix: str) -> Dict[str, Any]:
"""
Tenta dropar *todas* as variações de schema possíveis do tenant.
Retorna um relatório com sucesso/erro por candidato.
"""
report: Dict[str, Any] = {"tenant_uuid": tenant_uuid, "schemas": []}
for schema in _derive_schema_candidates(tenant_uuid, schema_prefix):
try:
await _drop_schema(schema)
report["schemas"].append({"name": schema, "dropped": True})
except Exception as e:
_dbg("_drop_tenant_schemas_all(): erro ao dropar {0}: {1}", schema, e)
report["schemas"].append({"name": schema, "dropped": False, "error": str(e)})
_dbg("_drop_tenant_schemas_all(): report={0}", report)
return report
# ------------------------------------------------------------------------------
# Helpers de limpeza no shared (usuários, vínculos, inquilino)
# ------------------------------------------------------------------------------
async def _delete_users_and_links_all(tenant_uuid: str) -> Dict[str, int]:
"""
Remove vínculos de papéis e usuários para um inquilino no schema 'shared'.
Usa DELETE ... RETURNING 1 para contar afetados de forma portátil.
"""
_dbg("_delete_users_and_links_all(): início. tenant_uuid={0}", tenant_uuid)
stats = {"papel_vinculos": 0, "usuarios": 0}
async with _new_session() as s:
_dbg("_delete_users_and_links_all(): deletando vínculos em shared.rbac_papeis_usuario ...")
res_links = await s.execute(
text(
"""
DELETE FROM shared.rbac_papeis_usuario
WHERE user_uuid IN (
SELECT id FROM shared.rbac_usuarios
WHERE fk_inquilino_uuid = :tenant_uuid
)
RETURNING 1
"""
),
{"tenant_uuid": tenant_uuid},
)
# Contagem via RETURNING
links_rows = res_links.fetchall()
stats["papel_vinculos"] = len(links_rows)
_dbg("_delete_users_and_links_all(): vínculos deletados={0}", stats["papel_vinculos"])
_dbg("_delete_users_and_links_all(): deletando usuários em shared.rbac_usuarios ...")
res_users = await s.execute(
text(
"""
DELETE FROM shared.rbac_usuarios
WHERE fk_inquilino_uuid = :tenant_uuid
RETURNING 1
"""
),
{"tenant_uuid": tenant_uuid},
)
users_rows = res_users.fetchall()
stats["usuarios"] = len(users_rows)
_dbg("_delete_users_and_links_all(): usuários deletados={0}", stats["usuarios"])
await s.commit()
_dbg("_delete_users_and_links_all(): OK. stats={0}", stats)
return stats
async def _delete_tenant_record(tenant_uuid: str) -> int:
"""
Remove o próprio registro do inquilino no shared.inquilinos.
Retorna qtd de linhas afetadas.
"""
_dbg("_delete_tenant_record(): início. tenant_uuid={0}", tenant_uuid)
async with _new_session() as s:
res = await s.execute(
text(
"""
DELETE FROM shared.inquilinos
WHERE uuid = :tenant_uuid
RETURNING 1
"""
),
{"tenant_uuid": tenant_uuid},
)
rows = res.fetchall()
await s.commit()
count = len(rows)
_dbg("_delete_tenant_record(): OK. linhas_deletadas={0}", count)
return count
async def _find_tenant_uuid_by_doc(cpf_cnpj: str) -> Optional[str]:
"""
Busca o uuid do inquilino via cpf_cnpj (schema 'shared').
"""
_dbg("_find_tenant_uuid_by_doc(): início. cpf_cnpj={0}", cpf_cnpj)
async with _new_session() as s:
res = await s.execute(
text("SELECT uuid FROM shared.inquilinos WHERE cpf_cnpj = :doc"),
{"doc": cpf_cnpj},
)
row = res.fetchone()
tenant_uuid = row[0] if row else None
_dbg("_find_tenant_uuid_by_doc(): resultado={0}", tenant_uuid)
return tenant_uuid
# ------------------------------------------------------------------------------
# Limpezas compostas
# ------------------------------------------------------------------------------
async def _cleanup_full_by_tenant_uuid(tenant_uuid: str, schema_prefix: str) -> Dict[str, Any]:
"""
Limpa toda a sujeira que pode ter ficado para um tenant específico:
- DROP de todos os schemas candidatos
- DELETE vínculos + usuários
- DELETE registro do inquilino
"""
_dbg("_cleanup_full_by_tenant_uuid(): início. tenant_uuid={0}", tenant_uuid)
report: Dict[str, Any] = {"tenant_uuid": tenant_uuid}
# 1) Drop dos schemas candidatos
drop_report = await _drop_tenant_schemas_all(tenant_uuid, schema_prefix)
report["schema_names"] = [s["name"] for s in drop_report["schemas"]]
report["dropped_schema"] = any(s["dropped"] for s in drop_report["schemas"])
_dbg("_cleanup_full_by_tenant_uuid(): drop schema OK ({0})", report["schema_names"])
# 2) Usuários + vínculos
users_stats = await _delete_users_and_links_all(tenant_uuid)
report["users_cleanup"] = users_stats
_dbg("_cleanup_full_by_tenant_uuid(): users/links cleanup OK: {0}", users_stats)
# 3) Registro do inquilino
deleted = await _delete_tenant_record(tenant_uuid)
report["tenants_deleted"] = deleted
_dbg("_cleanup_full_by_tenant_uuid(): tenant record delete OK. linhas={0}", deleted)
_dbg("_cleanup_full_by_tenant_uuid(): fim. report={0}", report)
return report
async def _cleanup_full_by_doc(cpf_cnpj: str, schema_prefix: str) -> Dict[str, Any]:
"""
Limpeza quando falhou *antes* de sabermos seguramente o tenant_uuid (p. ex., erro criando usuário).
- Descobre tenant_uuid por cpf_cnpj;
- Se achou, faz a limpeza completa por uuid.
"""
_dbg("_cleanup_full_by_doc(): início. cpf_cnpj={0}", cpf_cnpj)
tenant_uuid = await _find_tenant_uuid_by_doc(cpf_cnpj)
if not tenant_uuid:
report = {"cpf_cnpj": cpf_cnpj, "tenant_uuid": None, "note": "nenhum inquilino encontrado"}
_dbg("_cleanup_full_by_doc(): nenhum inquilino para {0}. report={1}", cpf_cnpj, report)
return report
inner = await _cleanup_full_by_tenant_uuid(tenant_uuid, schema_prefix)
report = {
"cpf_cnpj": cpf_cnpj,
"tenant_uuid": tenant_uuid,
"schema_names": inner.get("schema_names"),
"dropped_schema": inner.get("dropped_schema"),
"users_cleanup": inner.get("users_cleanup"),
"tenants_deleted": inner.get("tenants_deleted"),
}
_dbg("_cleanup_full_by_doc(): fim. report={0}", report)
return report
# ------------------------------------------------------------------------------
# Alembic runner
# ------------------------------------------------------------------------------
async def _run_alembic_for_tenant(tenant_id: str, alembic_ini_path: Optional[str] = None) -> str:
"""
Executa `alembic -x tenant=<id> upgrade head` para aplicar migrações
(histórico centralizado na API do sistema).
Observações:
- Captura STDOUT/STDERR para diagnóstico.
- Em Windows/Uvicorn usamos subprocess.run síncrono dentro de asyncio.to_thread.
"""
cmd = ["alembic"]
if alembic_ini_path:
cmd += ["-c", alembic_ini_path]
cmd += ["-x", f"tenant={tenant_id}", "upgrade", "head"]
_dbg("_run_alembic_for_tenant(): cmd={0}", " ".join(cmd))
def _run_sync():
return subprocess.run(cmd, capture_output=True, text=True, check=True)
try:
result = await asyncio.to_thread(_run_sync)
except subprocess.CalledProcessError as e:
_dbg("_run_alembic_for_tenant(): CalledProcessError rc={0}", e.returncode)
_dbg("_run_alembic_for_tenant(): STDOUT\n{0}\n", e.stdout or "")
_dbg("_run_alembic_for_tenant(): STDERR\n{0}\n", e.stderr or "")
raise NovoInquilinoError(
f"Erro Alembic (rc={e.returncode})\nSTDOUT:\n{e.stdout}\n\nSTDERR:\n{e.stderr}"
) from e
except FileNotFoundError as e:
_dbg("_run_alembic_for_tenant(): 'alembic' não encontrado")
raise NovoInquilinoError(
"Comando 'alembic' não encontrado. Verifique se o virtualenv está ativo e se o Alembic está instalado."
) from e
except Exception as e:
_dbg("_run_alembic_for_tenant(): falha inesperada: {0}", e)
raise NovoInquilinoError(f"Falha inesperada ao executar Alembic: {e}") from e
_dbg("_run_alembic_for_tenant(): OK. returncode={0}", result.returncode)
if result.stdout:
_dbg("_run_alembic_for_tenant(): STDOUT (início)\n{0}\n_run_alembic_for_tenant(): STDOUT (fim)", result.stdout)
if result.stderr:
# Alembic/SQLAlchemy costuma logar no STDERR mesmo em sucesso
_dbg("_run_alembic_for_tenant(): STDERR (início)\n{0}\n_run_alembic_for_tenant(): STDERR (fim)", result.stderr)
# Normalmente não precisamos do stdout; manter só para compat.
return result.stdout or ""
# ------------------------------------------------------------------------------
# Caso de uso principal
# ------------------------------------------------------------------------------
async def criar_novo_inquilino_service(
*,
nome: str,
email: str,
password: str,
cpf_cnpj: str,
alembic_ini_path_env: str = "ALEMBIC_INI_PATH",
schema_prefix: str = "t_",
) -> NovoInquilinoResult:
"""
Fluxo:
1) Cria registro do inquilino + usuário inicial (tenant_create)
2) Executa migrações do schema do tenant (alembic -x tenant=<uuid> upgrade head)
Em qualquer falha, tenta limpar tudo para evitar dados órfãos.
"""
_dbg("criar_novo_inquilino_service(): início")
_dbg("payload: nome={0!r}, email={1!r}, cpf_cnpj={2!r}", nome, email, cpf_cnpj)
_dbg("criar_novo_inquilino_service(): chamando tenant_create(...)")
tenant_uuid: Optional[str] = None
# 1) Criar base do inquilino (inclui usuário inicial via seu fluxo)
try:
tenant_uuid = str(
await tenant_create(nome=nome, email=email, password=password, cpf_cnpj=cpf_cnpj)
)
_dbg("criar_novo_inquilino_service(): tenant_create OK. tenant_uuid={0}", tenant_uuid)
except Exception as e:
_dbg("criar_novo_inquilino_service(): ERRO em tenant_create: {0}", e)
# Limpeza pelo documento, pois talvez não tenhamos tenant_uuid sólido
_dbg("criar_novo_inquilino_service(): iniciando limpeza por cpf_cnpj (falha no tenant_create)")
try:
cleanup = await _cleanup_full_by_doc(cpf_cnpj, schema_prefix)
_dbg("criar_novo_inquilino_service(): limpeza concluída. cleanup={0}", cleanup)
except Exception as cleanup_err:
_dbg("criar_novo_inquilino_service(): limpeza por cpf_cnpj FALHOU: {0}", cleanup_err)
# Re-lança um erro de alto nível
raise NovoInquilinoError(str(e)) from e
# 2) Rodar migrações do tenant
alembic_ini_path = os.getenv(alembic_ini_path_env) # opcional: apontar para outro repo/folder
_dbg("criar_novo_inquilino_service(): alembic_ini_path={0}", alembic_ini_path)
_dbg("criar_novo_inquilino_service(): rodando Alembic para o tenant")
try:
stdout = await _run_alembic_for_tenant(tenant_uuid, alembic_ini_path)
_dbg("criar_novo_inquilino_service(): Alembic OK")
result = NovoInquilinoResult(tenant=tenant_uuid, status="active", alembic_stdout=stdout or "")
_dbg("criar_novo_inquilino_service(): fim com sucesso. result={0}", result)
return result
except Exception as mig_err:
_dbg("criar_novo_inquilino_service(): ERRO na migração: {0}", mig_err)
_dbg("criar_novo_inquilino_service(): iniciando limpeza por tenant_uuid (falha na migração)")
try:
cleanup = await _cleanup_full_by_tenant_uuid(tenant_uuid, schema_prefix)
_dbg("criar_novo_inquilino_service(): limpeza concluída. cleanup={0}", cleanup)
except Exception as cleanup_err:
_dbg("criar_novo_inquilino_service(): limpeza por tenant_uuid FALHOU: {0}", cleanup_err)
# Após limpar, propaga erro
raise