469 lines
18 KiB
Python
469 lines
18 KiB
Python
# from __future__ import annotations
|
|
#
|
|
# import os
|
|
# import asyncio
|
|
# import subprocess
|
|
# from dataclasses import dataclass
|
|
# from typing import Optional
|
|
#
|
|
# from app.multi_tenant.criar_tenant import tenant_create # sua função já existente
|
|
#
|
|
#
|
|
# class NovoInquilinoError(Exception):
|
|
# """Erro de alto nível no fluxo de criação de inquilino."""
|
|
#
|
|
#
|
|
# @dataclass
|
|
# class NovoInquilinoResult:
|
|
# tenant: str
|
|
# status: str
|
|
# alembic_stdout: Optional[str] = None
|
|
#
|
|
#
|
|
# async def _run_alembic_for_tenant(tenant_id: str, alembic_ini_path: Optional[str] = None) -> str:
|
|
# """
|
|
# Executa `alembic -x tenant=<id> upgrade head` para aplicar migrações
|
|
# (histórico centralizado na API do sistema).
|
|
#
|
|
# Implementação compatível com Windows/Uvicorn:
|
|
# - usa subprocess.run síncrono dentro de asyncio.to_thread
|
|
# """
|
|
# cmd = ["alembic"]
|
|
# if alembic_ini_path:
|
|
# cmd += ["-c", alembic_ini_path]
|
|
# cmd += ["-x", f"tenant={tenant_id}", "upgrade", "head"]
|
|
#
|
|
# def _run_sync():
|
|
# # Se precisar garantir diretório, use cwd="..." aqui.
|
|
# return subprocess.run(cmd, capture_output=True, text=True, check=True)
|
|
#
|
|
# try:
|
|
# result = await asyncio.to_thread(_run_sync)
|
|
# except subprocess.CalledProcessError as e:
|
|
# raise NovoInquilinoError(
|
|
# f"Erro Alembic (rc={e.returncode})\nSTDOUT:\n{e.stdout}\nSTDERR:\n{e.stderr}"
|
|
# ) from e
|
|
# except FileNotFoundError as e:
|
|
# # 'alembic' não encontrado no PATH/venv
|
|
# raise NovoInquilinoError(
|
|
# "Comando 'alembic' não encontrado. Verifique se o virtualenv está ativo e se o Alembic está instalado."
|
|
# ) from e
|
|
# except Exception as e:
|
|
# raise NovoInquilinoError(f"Falha inesperada ao executar Alembic: {e}") from e
|
|
#
|
|
# return result.stdout
|
|
#
|
|
#
|
|
# async def criar_novo_inquilino_service(
|
|
# *,
|
|
# nome: str,
|
|
# email: str,
|
|
# password: str,
|
|
# cpf_cnpj: str,
|
|
# alembic_ini_path_env: str = "ALEMBIC_INI_PATH",
|
|
# ) -> NovoInquilinoResult:
|
|
# """
|
|
# Caso de uso: cria um novo inquilino e roda migrações do schema dele.
|
|
# Mantém paridade com o seu script original.
|
|
# """
|
|
# print("Criar Novo Inquilino")
|
|
# try:
|
|
# tenant_identifier = await tenant_create(
|
|
# nome=nome, email=email, password=password, cpf_cnpj=cpf_cnpj
|
|
# )
|
|
# except Exception as e:
|
|
# # Falha ao criar registro/base do inquilino
|
|
# raise NovoInquilinoError(f"Erro ao criar o tenant: {e}") from e
|
|
#
|
|
# # Executa migrações a partir do histórico central (API do sistema)
|
|
# alembic_ini_path = os.getenv(alembic_ini_path_env) # opcional: apontar para outro repo
|
|
# stdout = await _run_alembic_for_tenant(str(tenant_identifier), alembic_ini_path)
|
|
#
|
|
# return NovoInquilinoResult(
|
|
# tenant=str(tenant_identifier),
|
|
# status="active",
|
|
# alembic_stdout=stdout,
|
|
# )
|
|
|
|
############################################################################################################
|
|
|
|
# app/multi_tenant/onboarding/novo_inquilino_service.py
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import asyncio
|
|
import subprocess
|
|
from dataclasses import dataclass
|
|
from typing import Optional, Dict, Any, List
|
|
from contextlib import asynccontextmanager
|
|
|
|
from sqlalchemy import text
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
|
|
from app.multi_tenant.criar_tenant import tenant_create # sua função já existente
|
|
from app.database.session import sessionmanager # precisa já estar inicializado no app
|
|
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Debug helpers (compartilhados)
|
|
# ------------------------------------------------------------------------------
|
|
from app.common.debug import make_dbg, make_dbg_lazy
|
|
_DBG_NS = "NovoInquilino" # rótulo curto para logar a origem deste módulo
|
|
_dbg = make_dbg(_DBG_NS)
|
|
_dbg_lazy = make_dbg_lazy(_DBG_NS)
|
|
|
|
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Exceção e resultado de caso de uso
|
|
# ------------------------------------------------------------------------------
|
|
class NovoInquilinoError(Exception):
|
|
"""Erro de alto nível no fluxo de criação de inquilino."""
|
|
|
|
|
|
@dataclass
|
|
class NovoInquilinoResult:
|
|
tenant: str
|
|
status: str
|
|
alembic_stdout: Optional[str] = None
|
|
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Helpers de sessão
|
|
# ------------------------------------------------------------------------------
|
|
@asynccontextmanager
|
|
async def _new_session() -> AsyncSession:
|
|
"""
|
|
Abre uma AsyncSession a partir do sessionmanager (compatível com sua infra).
|
|
"""
|
|
_dbg("_new_session(): abertura solicitada")
|
|
# Muitos session managers expõem um sessionmaker; aqui usamos esse caminho.
|
|
sm = sessionmanager.get_sessionmaker()
|
|
_dbg("_new_session(): usando get_sessionmaker()")
|
|
session: AsyncSession = sm()
|
|
_dbg("_new_session(): AsyncSession obtida")
|
|
try:
|
|
yield session
|
|
finally:
|
|
await session.close()
|
|
_dbg("_new_session(): sessão fechada")
|
|
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Helpers de schema / infra
|
|
# ------------------------------------------------------------------------------
|
|
def _derive_schema_candidates(tenant_uuid: str, prefix: str = "t_") -> List[str]:
|
|
"""
|
|
Retorna *todas* as possibilidades de nome de schema que queremos tentar dropar,
|
|
na ordem de preferência.
|
|
|
|
1) t_<uuid_sem_hifen> (ex.: t_01996bc991217d51ab00aacb559f8480)
|
|
2) <uuid_com_hifen> (ex.: 01996bcb-9184-7f68-9cf1-b5fe7af7c348) ← usado pelo seu Alembic
|
|
"""
|
|
uuid_no_dash = tenant_uuid.replace("-", "")
|
|
candidates = [
|
|
f"{prefix}{uuid_no_dash}",
|
|
tenant_uuid,
|
|
]
|
|
_dbg("_derive_schema_candidates(): tenant_uuid={0} -> {1}", tenant_uuid, candidates)
|
|
return candidates
|
|
|
|
|
|
async def _drop_schema(schema_name: str) -> bool:
|
|
"""
|
|
DROP SCHEMA IF EXISTS "<schema_name>" CASCADE
|
|
Retorna True se executou o comando sem erro (mesmo se o schema não existisse).
|
|
"""
|
|
_dbg('_drop_schema(): início. schema_name={0}', schema_name)
|
|
async with _new_session() as s:
|
|
await s.execute(text(f'DROP SCHEMA IF EXISTS "{schema_name}" CASCADE'))
|
|
await s.commit()
|
|
_dbg('_drop_schema(): OK. schema_name={0}', schema_name)
|
|
return True
|
|
|
|
|
|
async def _drop_tenant_schemas_all(tenant_uuid: str, schema_prefix: str) -> Dict[str, Any]:
|
|
"""
|
|
Tenta dropar *todas* as variações de schema possíveis do tenant.
|
|
Retorna um relatório com sucesso/erro por candidato.
|
|
"""
|
|
report: Dict[str, Any] = {"tenant_uuid": tenant_uuid, "schemas": []}
|
|
for schema in _derive_schema_candidates(tenant_uuid, schema_prefix):
|
|
try:
|
|
await _drop_schema(schema)
|
|
report["schemas"].append({"name": schema, "dropped": True})
|
|
except Exception as e:
|
|
_dbg("_drop_tenant_schemas_all(): erro ao dropar {0}: {1}", schema, e)
|
|
report["schemas"].append({"name": schema, "dropped": False, "error": str(e)})
|
|
_dbg("_drop_tenant_schemas_all(): report={0}", report)
|
|
return report
|
|
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Helpers de limpeza no shared (usuários, vínculos, inquilino)
|
|
# ------------------------------------------------------------------------------
|
|
async def _delete_users_and_links_all(tenant_uuid: str) -> Dict[str, int]:
|
|
"""
|
|
Remove vínculos de papéis e usuários para um inquilino no schema 'shared'.
|
|
Usa DELETE ... RETURNING 1 para contar afetados de forma portátil.
|
|
"""
|
|
_dbg("_delete_users_and_links_all(): início. tenant_uuid={0}", tenant_uuid)
|
|
stats = {"papel_vinculos": 0, "usuarios": 0}
|
|
|
|
async with _new_session() as s:
|
|
_dbg("_delete_users_and_links_all(): deletando vínculos em shared.rbac_papeis_usuario ...")
|
|
res_links = await s.execute(
|
|
text(
|
|
"""
|
|
DELETE FROM shared.rbac_papeis_usuario
|
|
WHERE user_uuid IN (
|
|
SELECT id FROM shared.rbac_usuarios
|
|
WHERE fk_inquilino_uuid = :tenant_uuid
|
|
)
|
|
RETURNING 1
|
|
"""
|
|
),
|
|
{"tenant_uuid": tenant_uuid},
|
|
)
|
|
# Contagem via RETURNING
|
|
links_rows = res_links.fetchall()
|
|
stats["papel_vinculos"] = len(links_rows)
|
|
_dbg("_delete_users_and_links_all(): vínculos deletados={0}", stats["papel_vinculos"])
|
|
|
|
_dbg("_delete_users_and_links_all(): deletando usuários em shared.rbac_usuarios ...")
|
|
res_users = await s.execute(
|
|
text(
|
|
"""
|
|
DELETE FROM shared.rbac_usuarios
|
|
WHERE fk_inquilino_uuid = :tenant_uuid
|
|
RETURNING 1
|
|
"""
|
|
),
|
|
{"tenant_uuid": tenant_uuid},
|
|
)
|
|
users_rows = res_users.fetchall()
|
|
stats["usuarios"] = len(users_rows)
|
|
_dbg("_delete_users_and_links_all(): usuários deletados={0}", stats["usuarios"])
|
|
|
|
await s.commit()
|
|
_dbg("_delete_users_and_links_all(): OK. stats={0}", stats)
|
|
|
|
return stats
|
|
|
|
|
|
async def _delete_tenant_record(tenant_uuid: str) -> int:
|
|
"""
|
|
Remove o próprio registro do inquilino no shared.inquilinos.
|
|
Retorna qtd de linhas afetadas.
|
|
"""
|
|
_dbg("_delete_tenant_record(): início. tenant_uuid={0}", tenant_uuid)
|
|
async with _new_session() as s:
|
|
res = await s.execute(
|
|
text(
|
|
"""
|
|
DELETE FROM shared.inquilinos
|
|
WHERE uuid = :tenant_uuid
|
|
RETURNING 1
|
|
"""
|
|
),
|
|
{"tenant_uuid": tenant_uuid},
|
|
)
|
|
rows = res.fetchall()
|
|
await s.commit()
|
|
count = len(rows)
|
|
_dbg("_delete_tenant_record(): OK. linhas_deletadas={0}", count)
|
|
return count
|
|
|
|
|
|
async def _find_tenant_uuid_by_doc(cpf_cnpj: str) -> Optional[str]:
|
|
"""
|
|
Busca o uuid do inquilino via cpf_cnpj (schema 'shared').
|
|
"""
|
|
_dbg("_find_tenant_uuid_by_doc(): início. cpf_cnpj={0}", cpf_cnpj)
|
|
async with _new_session() as s:
|
|
res = await s.execute(
|
|
text("SELECT uuid FROM shared.inquilinos WHERE cpf_cnpj = :doc"),
|
|
{"doc": cpf_cnpj},
|
|
)
|
|
row = res.fetchone()
|
|
tenant_uuid = row[0] if row else None
|
|
_dbg("_find_tenant_uuid_by_doc(): resultado={0}", tenant_uuid)
|
|
return tenant_uuid
|
|
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Limpezas compostas
|
|
# ------------------------------------------------------------------------------
|
|
async def _cleanup_full_by_tenant_uuid(tenant_uuid: str, schema_prefix: str) -> Dict[str, Any]:
|
|
"""
|
|
Limpa toda a sujeira que pode ter ficado para um tenant específico:
|
|
- DROP de todos os schemas candidatos
|
|
- DELETE vínculos + usuários
|
|
- DELETE registro do inquilino
|
|
"""
|
|
_dbg("_cleanup_full_by_tenant_uuid(): início. tenant_uuid={0}", tenant_uuid)
|
|
report: Dict[str, Any] = {"tenant_uuid": tenant_uuid}
|
|
|
|
# 1) Drop dos schemas candidatos
|
|
drop_report = await _drop_tenant_schemas_all(tenant_uuid, schema_prefix)
|
|
report["schema_names"] = [s["name"] for s in drop_report["schemas"]]
|
|
report["dropped_schema"] = any(s["dropped"] for s in drop_report["schemas"])
|
|
_dbg("_cleanup_full_by_tenant_uuid(): drop schema OK ({0})", report["schema_names"])
|
|
|
|
# 2) Usuários + vínculos
|
|
users_stats = await _delete_users_and_links_all(tenant_uuid)
|
|
report["users_cleanup"] = users_stats
|
|
_dbg("_cleanup_full_by_tenant_uuid(): users/links cleanup OK: {0}", users_stats)
|
|
|
|
# 3) Registro do inquilino
|
|
deleted = await _delete_tenant_record(tenant_uuid)
|
|
report["tenants_deleted"] = deleted
|
|
_dbg("_cleanup_full_by_tenant_uuid(): tenant record delete OK. linhas={0}", deleted)
|
|
|
|
_dbg("_cleanup_full_by_tenant_uuid(): fim. report={0}", report)
|
|
return report
|
|
|
|
|
|
async def _cleanup_full_by_doc(cpf_cnpj: str, schema_prefix: str) -> Dict[str, Any]:
|
|
"""
|
|
Limpeza quando falhou *antes* de sabermos seguramente o tenant_uuid (p. ex., erro criando usuário).
|
|
- Descobre tenant_uuid por cpf_cnpj;
|
|
- Se achou, faz a limpeza completa por uuid.
|
|
"""
|
|
_dbg("_cleanup_full_by_doc(): início. cpf_cnpj={0}", cpf_cnpj)
|
|
tenant_uuid = await _find_tenant_uuid_by_doc(cpf_cnpj)
|
|
if not tenant_uuid:
|
|
report = {"cpf_cnpj": cpf_cnpj, "tenant_uuid": None, "note": "nenhum inquilino encontrado"}
|
|
_dbg("_cleanup_full_by_doc(): nenhum inquilino para {0}. report={1}", cpf_cnpj, report)
|
|
return report
|
|
|
|
inner = await _cleanup_full_by_tenant_uuid(tenant_uuid, schema_prefix)
|
|
report = {
|
|
"cpf_cnpj": cpf_cnpj,
|
|
"tenant_uuid": tenant_uuid,
|
|
"schema_names": inner.get("schema_names"),
|
|
"dropped_schema": inner.get("dropped_schema"),
|
|
"users_cleanup": inner.get("users_cleanup"),
|
|
"tenants_deleted": inner.get("tenants_deleted"),
|
|
}
|
|
_dbg("_cleanup_full_by_doc(): fim. report={0}", report)
|
|
return report
|
|
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Alembic runner
|
|
# ------------------------------------------------------------------------------
|
|
async def _run_alembic_for_tenant(tenant_id: str, alembic_ini_path: Optional[str] = None) -> str:
|
|
"""
|
|
Executa `alembic -x tenant=<id> upgrade head` para aplicar migrações
|
|
(histórico centralizado na API do sistema).
|
|
|
|
Observações:
|
|
- Captura STDOUT/STDERR para diagnóstico.
|
|
- Em Windows/Uvicorn usamos subprocess.run síncrono dentro de asyncio.to_thread.
|
|
"""
|
|
cmd = ["alembic"]
|
|
if alembic_ini_path:
|
|
cmd += ["-c", alembic_ini_path]
|
|
cmd += ["-x", f"tenant={tenant_id}", "upgrade", "head"]
|
|
|
|
_dbg("_run_alembic_for_tenant(): cmd={0}", " ".join(cmd))
|
|
|
|
def _run_sync():
|
|
return subprocess.run(cmd, capture_output=True, text=True, check=True)
|
|
|
|
try:
|
|
result = await asyncio.to_thread(_run_sync)
|
|
except subprocess.CalledProcessError as e:
|
|
_dbg("_run_alembic_for_tenant(): CalledProcessError rc={0}", e.returncode)
|
|
_dbg("_run_alembic_for_tenant(): STDOUT\n{0}\n", e.stdout or "")
|
|
_dbg("_run_alembic_for_tenant(): STDERR\n{0}\n", e.stderr or "")
|
|
raise NovoInquilinoError(
|
|
f"Erro Alembic (rc={e.returncode})\nSTDOUT:\n{e.stdout}\n\nSTDERR:\n{e.stderr}"
|
|
) from e
|
|
except FileNotFoundError as e:
|
|
_dbg("_run_alembic_for_tenant(): 'alembic' não encontrado")
|
|
raise NovoInquilinoError(
|
|
"Comando 'alembic' não encontrado. Verifique se o virtualenv está ativo e se o Alembic está instalado."
|
|
) from e
|
|
except Exception as e:
|
|
_dbg("_run_alembic_for_tenant(): falha inesperada: {0}", e)
|
|
raise NovoInquilinoError(f"Falha inesperada ao executar Alembic: {e}") from e
|
|
|
|
_dbg("_run_alembic_for_tenant(): OK. returncode={0}", result.returncode)
|
|
if result.stdout:
|
|
_dbg("_run_alembic_for_tenant(): STDOUT (início)\n{0}\n_run_alembic_for_tenant(): STDOUT (fim)", result.stdout)
|
|
if result.stderr:
|
|
# Alembic/SQLAlchemy costuma logar no STDERR mesmo em sucesso
|
|
_dbg("_run_alembic_for_tenant(): STDERR (início)\n{0}\n_run_alembic_for_tenant(): STDERR (fim)", result.stderr)
|
|
|
|
# Normalmente não precisamos do stdout; manter só para compat.
|
|
return result.stdout or ""
|
|
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Caso de uso principal
|
|
# ------------------------------------------------------------------------------
|
|
async def criar_novo_inquilino_service(
|
|
*,
|
|
nome: str,
|
|
email: str,
|
|
password: str,
|
|
cpf_cnpj: str,
|
|
alembic_ini_path_env: str = "ALEMBIC_INI_PATH",
|
|
schema_prefix: str = "t_",
|
|
) -> NovoInquilinoResult:
|
|
"""
|
|
Fluxo:
|
|
1) Cria registro do inquilino + usuário inicial (tenant_create)
|
|
2) Executa migrações do schema do tenant (alembic -x tenant=<uuid> upgrade head)
|
|
Em qualquer falha, tenta limpar tudo para evitar dados órfãos.
|
|
"""
|
|
_dbg("criar_novo_inquilino_service(): início")
|
|
_dbg("payload: nome={0!r}, email={1!r}, cpf_cnpj={2!r}", nome, email, cpf_cnpj)
|
|
_dbg("criar_novo_inquilino_service(): chamando tenant_create(...)")
|
|
|
|
tenant_uuid: Optional[str] = None
|
|
|
|
# 1) Criar base do inquilino (inclui usuário inicial via seu fluxo)
|
|
try:
|
|
tenant_uuid = str(
|
|
await tenant_create(nome=nome, email=email, password=password, cpf_cnpj=cpf_cnpj)
|
|
)
|
|
_dbg("criar_novo_inquilino_service(): tenant_create OK. tenant_uuid={0}", tenant_uuid)
|
|
except Exception as e:
|
|
_dbg("criar_novo_inquilino_service(): ERRO em tenant_create: {0}", e)
|
|
# Limpeza pelo documento, pois talvez não tenhamos tenant_uuid sólido
|
|
_dbg("criar_novo_inquilino_service(): iniciando limpeza por cpf_cnpj (falha no tenant_create)")
|
|
try:
|
|
cleanup = await _cleanup_full_by_doc(cpf_cnpj, schema_prefix)
|
|
_dbg("criar_novo_inquilino_service(): limpeza concluída. cleanup={0}", cleanup)
|
|
except Exception as cleanup_err:
|
|
_dbg("criar_novo_inquilino_service(): limpeza por cpf_cnpj FALHOU: {0}", cleanup_err)
|
|
# Re-lança um erro de alto nível
|
|
raise NovoInquilinoError(str(e)) from e
|
|
|
|
# 2) Rodar migrações do tenant
|
|
alembic_ini_path = os.getenv(alembic_ini_path_env) # opcional: apontar para outro repo/folder
|
|
_dbg("criar_novo_inquilino_service(): alembic_ini_path={0}", alembic_ini_path)
|
|
_dbg("criar_novo_inquilino_service(): rodando Alembic para o tenant")
|
|
|
|
try:
|
|
stdout = await _run_alembic_for_tenant(tenant_uuid, alembic_ini_path)
|
|
_dbg("criar_novo_inquilino_service(): Alembic OK")
|
|
result = NovoInquilinoResult(tenant=tenant_uuid, status="active", alembic_stdout=stdout or "")
|
|
_dbg("criar_novo_inquilino_service(): fim com sucesso. result={0}", result)
|
|
return result
|
|
except Exception as mig_err:
|
|
_dbg("criar_novo_inquilino_service(): ERRO na migração: {0}", mig_err)
|
|
_dbg("criar_novo_inquilino_service(): iniciando limpeza por tenant_uuid (falha na migração)")
|
|
try:
|
|
cleanup = await _cleanup_full_by_tenant_uuid(tenant_uuid, schema_prefix)
|
|
_dbg("criar_novo_inquilino_service(): limpeza concluída. cleanup={0}", cleanup)
|
|
except Exception as cleanup_err:
|
|
_dbg("criar_novo_inquilino_service(): limpeza por tenant_uuid FALHOU: {0}", cleanup_err)
|
|
|
|
# Após limpar, propaga erro
|
|
raise
|