# import asyncio # import logging # from logging.config import fileConfig # from alembic import context # from sqlalchemy import text, pool # from sqlalchemy.ext.asyncio import async_engine_from_config # from sqlalchemy.ext.asyncio import AsyncEngine # from app.database.models import Base # from app.config import URL_BD # # config = context.config # config.set_main_option("sqlalchemy.url", URL_BD) # # if config.config_file_name is not None: # fileConfig(config.config_file_name) # # target_metadata = Base.metadata # # logging.basicConfig() # # logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO) # # # logging.getLogger("sqlalchemy.engine").setLevel(logging.DEBUG) # # # def run_migrations_offline() -> None: # raise NotImplementedError("Modo offline não implementado.") # # # async def run_async_migrations() -> None: # connectable: AsyncEngine = async_engine_from_config( # config.get_section(config.config_ini_section, {}), # prefix="sqlalchemy.", # poolclass=pool.NullPool, # ) # # args = context.get_x_argument(as_dictionary=True) # special_schema = args.get("special_schema") # current_tenant = args.get("tenant") # # if current_tenant is None and special_schema is None: # raise Exception("Você deve fornecer 'tenant' ou 'special_schema' como argumento.") # elif current_tenant is not None and special_schema is not None: # raise Exception("'tenant' e 'special_schema' não podem ser usados simultaneamente.") # # async with connectable.connect() as async_connection: # await async_connection.run_sync( # lambda connection: run_migrations_online_internal(connection, current_tenant, special_schema) # ) # # await connectable.dispose() # # # def run_migrations_online_internal(connection, current_tenant: str, special_schema: str) -> None: # if special_schema == "shared": # script_location = "alembic/versions/shared" # schema_name = "shared" # """ # Esquema da Tabela definido como o mesmo valor do esquema a serem criadas as Tabelas # Como a tabelas comuns já tem o esquema definido nelas essa configuração garante que a tabela versão será # criada no mesmo esquma das tabelas compartilhadas # """ # schema_table = special_schema # # else: # script_location = "alembic/versions/tenants" # schema_name = current_tenant # """ # Esquema da Tabela definido como None # Como a tabelas dos inquilinos serão cadastradas cada um em um esquema diferente elas saõ configuradas como None # Se nesse ponto já definirmos o esquema como o do inquilino a migração junto como o search_path exclui a tablea # de versão no script de migração por isso é necessáiro configurar o esquema como None no Upgrade junto com # o search_path a tabela de versão vai ser criada no mesmo esquema do inquilino # """ # schema_table = None # # context.script.version_locations = [script_location] # # create_schema_if_not_exists(connection, schema_name) # # connection.execute(text('set search_path to "%s"' % schema_name)) # # connection.commit() # # connection.dialect.default_schema_name = schema_name # # print("Default schema set to:", connection.dialect.default_schema_name) # # # if current_tenant: # connection.execute(text(f'SET search_path TO "{current_tenant}"')) # # connection.execute(text('set search_path to "%s"' % current_tenant)) # connection.commit() # else: # print("set schema") # connection.execute(text(f'SET search_path TO "shared"')) # # connection.execute(text('set search_path to "%s"' % current_tenant)) # connection.commit() # # # Verificar o search_path configurado # result = connection.execute(text("SHOW search_path")) # current_search_path = result.scalar() # print(f"Current search_path: {current_search_path}") # # def include_object(object, name, type_, reflected, compare_to): # # schema = getattr(object, "schema", None) # # if special_schema == "shared": # # Sobrescreve o schema para None apenas se for 'shared' # # object.schema = None # # Inclusçaõ na Migração apenas as tabelas compartilhadas # if schema == "shared": # return True # else: # return False # # if special_schema is None: # # Inclusão na Migração apenas as tabelas dos inquilinos # if schema is None: # return True # else: # return False # # # Exclui por padrão se não atender aos critérios # return False # # # def include_name(name, type_, parent_names): # # if type_ == "table": # # return name in target_metadata.tables # # else: # # return True # # context.configure( # connection=connection, # target_metadata=target_metadata, # include_object=include_object, # version_table_schema=schema_table, # include_schemas=True, # # dialect_opts={"paramstyle": "named"}, # # include_name=include_name, # # # # ) # # with context.begin_transaction(): # context.run_migrations() # # # def create_schema_if_not_exists(connection, schema_name: str): # query = text(f'CREATE SCHEMA IF NOT EXISTS "{schema_name}"') # connection.execute(query) # connection.commit() # # # def run_migrations_online() -> None: # asyncio.run(run_async_migrations()) # # # if context.is_offline_mode(): # run_migrations_offline() # else: # run_migrations_online() import asyncio import logging from logging.config import fileConfig from alembic import context from sqlalchemy import text, pool from sqlalchemy.ext.asyncio import async_engine_from_config from sqlalchemy.ext.asyncio import AsyncEngine from app.database.models import Base from app.config import URL_BD from sqlalchemy.orm import clear_mappers clear_mappers() config = context.config config.set_main_option("sqlalchemy.url", URL_BD) if config.config_file_name is not None: fileConfig(config.config_file_name) target_metadata = Base.metadata logging.basicConfig() logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO) def run_migrations_offline() -> None: raise NotImplementedError("Modo offline não implementado.") async def run_async_migrations() -> None: connectable: AsyncEngine = async_engine_from_config( config.get_section(config.config_ini_section, {}), prefix="sqlalchemy.", poolclass=pool.NullPool, ) args = context.get_x_argument(as_dictionary=True) special_schema = args.get("special_schema") current_tenant = args.get("tenant") if current_tenant is None and special_schema is None: raise Exception("Você deve fornecer 'tenant' ou 'special_schema' como argumento.") elif current_tenant is not None and special_schema is not None: raise Exception("'tenant' e 'special_schema' não podem ser usados simultaneamente.") async with connectable.connect() as async_connection: await async_connection.run_sync( lambda connection: run_migrations_online_internal(connection, current_tenant, special_schema) ) await connectable.dispose() def run_migrations_online_internal(connection, current_tenant: str, special_schema: str) -> None: if special_schema == "shared": script_location = "alembic/versions/shared" schema_name = "shared" """ Esquema da Tabela definido como o mesmo valor do esquema a serem criadas as Tabelas Como a tabelas comuns já tem o esquema definido nelas essa configuração garante que a tabela versão será criada no mesmo esquma das tabelas compartilhadas """ schema_table = "shared" else: script_location = "alembic/versions/tenants" schema_name = current_tenant """ Esquema da Tabela definido como None Como a tabelas dos inquilinos serão cadastradas cada um em um esquema diferente elas saõ configuradas como None Se nesse ponto já definirmos o esquema como o do inquilino a migração junto como o search_path exclui a tablea de versão no script de migração por isso é necessáiro configurar o esquema como None no Upgrade junto com o search_path a tabela de versão vai ser criada no mesmo esquema do inquilino """ schema_table = None context.script.version_locations = [script_location] create_schema_if_not_exists(connection, schema_name) if current_tenant: # print("print dentro current_tenant ") # connection.execute(text(f'SET search_path TO "{current_tenant}"')) connection.execute(text('set search_path to "%s"' % current_tenant)) connection.commit() connection.dialect.default_schema_name = current_tenant if special_schema: # print("print dentro special_schema ") # connection.execute(text(f'SET search_path TO "{special_schema}"')) connection.execute(text('set search_path to "%s"' % special_schema)) connection.commit() connection.dialect.default_schema_name = special_schema def include_object(object, name, type_, reflected, compare_to): if special_schema == "shared": # Inclusçaõ na Migração apenas as tabelas compartilhadas if (type_ == "table" and (name.startswith("rbac_") or name == "inquilinos")) or type_ == "column" \ or type_ == "foreign_key_constraint": print(f"Table included: {name}") return True else: return False if special_schema is None: # Inclusão na Migração apenas as tabelas dos inquilinos if (type_ == "table" and not (name.startswith("rbac_") or name == "inquilinos")) or type_ == "column"\ or type_ == "foreign_key_constraint": return True else: return False # Exclui por padrão se não atender aos critérios return False context.configure( connection=connection, target_metadata=target_metadata, include_object=include_object, version_table_schema=schema_table, # include_schemas=True, ) with context.begin_transaction(): context.run_migrations() def create_schema_if_not_exists(connection, schema_name: str): query = text(f'CREATE SCHEMA IF NOT EXISTS "{schema_name}"') connection.execute(query) connection.commit() def run_migrations_online() -> None: asyncio.run(run_async_migrations()) if context.is_offline_mode(): run_migrations_offline() else: run_migrations_online()