Estrategia de Datos
Principios
- Database-per-Domain: Cada dominio de negocio tiene su propia base de datos
- CQRS: Escritura con EF Core (Primary), lectura con Dapper (Read Replica)
- RLS: Row Level Security en todas las tablas con datos de negocio
- Migraciones con DbUp: Scripts SQL versionados, no EF Migrations
- Soft Delete: Nunca DELETE fisico en datos de negocio (
is_active = false)
CQRS - Command Query Responsibility Segregation
Por que separar lectura y escritura
| Aspecto | EF Core (escritura) | Dapper (lectura) |
|---|---|---|
| Proposito | Persistir cambios con tracking | Queries rapidas y flexibles |
| Performance | Overhead de change tracking | SQL directo, minimo overhead |
| Flexibilidad | Limitado a LINQ | SQL completo, JOINs complejos |
| Instancia DB | Primary (consistencia) | Read Replica (escalabilidad) |
| Uso | INSERT, UPDATE, DELETE | SELECT (listados, reportes, filtros) |
Alineacion con patrones avanzados de API
El uso de Dapper para lectura se alinea directamente con los patrones de API que adoptamos:
Sparse Fieldsets (?fields=id,name,status):
- Con Dapper escribimos el SQL exacto con solo los campos solicitados
- No hay overhead de cargar entidades completas como con EF Core
- El query se construye dinamicamente segun los fields pedidos
csharp
public async Task<IEnumerable<dynamic>> GetProductsAsync(
Guid tenantId, string[] fields, int page, int pageSize)
{
await SetTenantContextAsync();
// Solo seleccionar los campos solicitados
var columns = fields.Length > 0
? string.Join(", ", fields.Select(f => SanitizeColumn(f)))
: "id, code, name, status, interest_rate, created_at";
var sql = $@"
SELECT {columns}
FROM credit_products
WHERE is_active = true
ORDER BY created_at DESC
LIMIT @Limit OFFSET @Offset";
return await _readDb.QueryAsync<dynamic>(sql, new
{
Limit = pageSize,
Offset = (page - 1) * pageSize
});
}ETags / Concurrencia Optimista:
- EF Core maneja el
row_versionautomaticamente en escritura - Dapper incluye el
row_versionen las respuestas de lectura para que el cliente lo use como ETag
csharp
// Entidad con version para concurrencia optimista
public class CreditProduct : TenantEntity
{
public string Code { get; private set; }
public string Name { get; private set; }
public decimal InterestRate { get; private set; }
// Concurrencia optimista - se incrementa en cada UPDATE
[ConcurrencyCheck]
public int RowVersion { get; private set; }
}
// EF Core detecta conflictos automaticamente
// Si RowVersion no coincide, lanza DbUpdateConcurrencyExceptioncsharp
// En el controller - ETag desde row_version
[HttpGet("{id}")]
public async Task<IActionResult> GetById(Guid id)
{
var product = await _mediator.Send(new GetProductQuery(id));
if (product is null) return NotFound();
// ETag = hash del row_version
var etag = $"\"{product.RowVersion}\"";
Response.Headers.ETag = etag;
return Ok(product.ToApiResponse());
}
[HttpPatch("{id}")]
public async Task<IActionResult> Update(
Guid id,
[FromBody] PatchProductDto dto,
[FromHeader(Name = "If-Match")] string ifMatch)
{
if (string.IsNullOrEmpty(ifMatch))
return BadRequest("If-Match header is required");
var expectedVersion = int.Parse(ifMatch.Trim('"'));
var result = await _mediator.Send(new UpdateProductCommand(
id, dto, expectedVersion));
return result.Match(
success => Ok(success),
conflict => StatusCode(412, new { error = "PRECONDITION_FAILED",
message = "Resource was modified by another request" }));
}Idempotency Keys:
- Se almacenan en una tabla dedicada por servicio
- Dapper consulta rapidamente si la key ya fue procesada
- EF Core persiste el resultado junto con la key
csharp
// Tabla de idempotencia
// CREATE TABLE idempotency_keys (
// key VARCHAR(255) PRIMARY KEY,
// tenant_id UUID NOT NULL,
// response_status INT NOT NULL,
// response_body JSONB NOT NULL,
// created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
// expires_at TIMESTAMPTZ NOT NULL
// );
public class IdempotencyService
{
private readonly IDbConnection _readDb; // Dapper - lectura rapida
private readonly IUnitOfWork _unitOfWork; // EF Core - escritura
public async Task<IdempotencyResult?> GetCachedResponseAsync(string key, Guid tenantId)
{
// Lectura rapida con Dapper
return await _readDb.QueryFirstOrDefaultAsync<IdempotencyResult>(
"SELECT response_status, response_body FROM idempotency_keys WHERE key = @Key AND tenant_id = @TenantId AND expires_at > NOW()",
new { Key = key, TenantId = tenantId });
}
public async Task SaveResponseAsync(string key, Guid tenantId, int status, object body)
{
// Escritura con EF Core
var entry = new IdempotencyKey(key, tenantId, status,
JsonSerializer.Serialize(body),
DateTime.UtcNow.AddHours(24));
await _unitOfWork.IdempotencyKeys.AddAsync(entry);
await _unitOfWork.SaveChangesAsync();
}
}Base de Datos por Dominio
| Base de Datos | Dominio | Owner | Tablas principales |
|---|---|---|---|
imagy_identity | Identity | Team Identity | tenants, services, tenant_services, themes, user_tenants, audit_logs |
imagy_flow | ImagFlow | Team Flow | flows, flow_versions, flow_steps, flow_rules, provider_configs, requests, executions |
imagy_lending | ImagLend | Team Lending | credit_products, applications, credits, disbursements, payments, collections |
imagy_sign | ImagSign | Team Flow | signature_requests, documents, certificates, signature_audit |
imagy_subject | ImagID | Team Platform | subject_profiles, subject_tenant_views, subject_events, subject_devices, subject_lists |
Reglas
- Cada servicio solo accede a SU base de datos
- Nunca queries cross-database directas
- Si necesitas datos de otro dominio: API sincrona o evento asincrono
- Un cluster Aurora compartido (costo-eficiente) con databases separadas (aislamiento logico)
- Cada database tiene su propio usuario de aplicacion con permisos limitados
Usuarios de BD por Servicio
sql
-- Cada servicio tiene su propio rol
CREATE ROLE imagy_flow_app LOGIN PASSWORD '...' NOBYPASSRLS;
CREATE ROLE imagy_lending_app LOGIN PASSWORD '...' NOBYPASSRLS;
CREATE ROLE imagy_sign_app LOGIN PASSWORD '...' NOBYPASSRLS;
CREATE ROLE imagy_subject_app LOGIN PASSWORD '...' NOBYPASSRLS;
-- Cada rol solo tiene acceso a su database
GRANT CONNECT ON DATABASE imagy_flow TO imagy_flow_app;
GRANT USAGE ON SCHEMA public TO imagy_flow_app;
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA public TO imagy_flow_app;
-- Rol de solo lectura para reportes
CREATE ROLE imagy_readonly LOGIN PASSWORD '...' NOBYPASSRLS;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO imagy_readonly;Migraciones con DbUp
Cada servicio tiene un proyecto Imagy.{Domain}.Migrations con scripts SQL versionados:
Imagy.Flow.Migrations/
├── Scripts/
│ ├── V001__create_flows_table.sql
│ ├── V002__create_flow_versions_table.sql
│ ├── V003__create_flow_steps_table.sql
│ ├── V004__create_provider_configs_table.sql
│ ├── V005__enable_rls.sql
│ ├── V006__create_idempotency_keys_table.sql
│ └── V007__add_row_version_columns.sql
├── Seed/
│ ├── S001__seed_default_providers.sql
│ └── S002__seed_test_data.sql
└── Program.csConvenciones de Scripts
| Prefijo | Proposito | Ejemplo |
|---|---|---|
V{NNN}__ | Migracion de esquema (DDL) | V001__create_flows_table.sql |
S{NNN}__ | Seed data | S001__seed_default_providers.sql |
Reglas de Migraciones
- Scripts son inmutables una vez aplicados (nunca editar un script existente)
- Cada script es idempotente donde sea posible (
IF NOT EXISTS,ON CONFLICT DO NOTHING) - Incluir rollback como comentario al final del script
- Siempre incluir
row_versionen tablas que soporten concurrencia optimista - Siempre incluir
created_at,updated_aten todas las tablas - Siempre habilitar RLS en tablas con
tenant_id
Ejemplo de Migracion
sql
-- V006__create_idempotency_keys_table.sql
-- Tabla para soporte de Idempotency-Key header en APIs
CREATE TABLE IF NOT EXISTS idempotency_keys (
key VARCHAR(255) NOT NULL,
tenant_id UUID NOT NULL,
response_status INT NOT NULL,
response_body JSONB NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
expires_at TIMESTAMPTZ NOT NULL,
PRIMARY KEY (key, tenant_id)
);
CREATE INDEX idx_idempotency_keys_expires
ON idempotency_keys(expires_at);
-- Limpieza automatica de keys expiradas (ejecutar via pg_cron o job)
-- DELETE FROM idempotency_keys WHERE expires_at < NOW();
-- Rollback:
-- DROP TABLE IF EXISTS idempotency_keys;sql
-- V007__add_row_version_columns.sql
-- Soporte para ETags / concurrencia optimista
ALTER TABLE credit_products
ADD COLUMN IF NOT EXISTS row_version INT NOT NULL DEFAULT 1;
ALTER TABLE flows
ADD COLUMN IF NOT EXISTS row_version INT NOT NULL DEFAULT 1;
-- Trigger para auto-incrementar row_version en UPDATE
CREATE OR REPLACE FUNCTION increment_row_version()
RETURNS TRIGGER AS $$
BEGIN
NEW.row_version = OLD.row_version + 1;
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_credit_products_version
BEFORE UPDATE ON credit_products
FOR EACH ROW EXECUTE FUNCTION increment_row_version();
CREATE TRIGGER trg_flows_version
BEFORE UPDATE ON flows
FOR EACH ROW EXECUTE FUNCTION increment_row_version();
-- Rollback:
-- DROP TRIGGER IF EXISTS trg_credit_products_version ON credit_products;
-- DROP TRIGGER IF EXISTS trg_flows_version ON flows;
-- ALTER TABLE credit_products DROP COLUMN IF EXISTS row_version;
-- ALTER TABLE flows DROP COLUMN IF EXISTS row_version;Patron de Lectura con Dapper
Query basica con RLS
csharp
public class CreditProductReadRepository : ICreditProductReadRepository
{
private readonly IDbConnection _readDb;
private readonly IIdentityContext _identity;
public async Task<PagedResult<CreditProductDto>> GetAllAsync(
CreditProductFilters filters, int page, int pageSize)
{
await SetTenantContextAsync();
var sql = @"
SELECT id, code, name, interest_rate, min_amount, max_amount,
min_term_days, max_term_days, is_active, row_version, created_at
FROM credit_products
WHERE is_active = @IsActive
ORDER BY created_at DESC
LIMIT @Limit OFFSET @Offset";
var countSql = @"
SELECT COUNT(*) FROM credit_products WHERE is_active = @IsActive";
var items = await _readDb.QueryAsync<CreditProductDto>(sql, new
{
filters.IsActive,
Limit = pageSize,
Offset = (page - 1) * pageSize
});
var total = await _readDb.ExecuteScalarAsync<int>(countSql, new
{
filters.IsActive
});
return new PagedResult<CreditProductDto>(items, total, page, pageSize);
}
private async Task SetTenantContextAsync()
{
if (_readDb.State != ConnectionState.Open)
await ((DbConnection)_readDb).OpenAsync();
await _readDb.ExecuteAsync(
"SET app.current_tenant_id = @TenantId",
new { _identity.TenantId });
}
}Query con Sparse Fieldsets
csharp
public async Task<IEnumerable<dynamic>> GetWithFieldsAsync(
string[] fields, CreditProductFilters filters, int page, int pageSize)
{
await SetTenantContextAsync();
// Whitelist de campos permitidos (previene SQL injection)
var allowedFields = new HashSet<string>
{
"id", "code", "name", "interest_rate", "min_amount",
"max_amount", "min_term_days", "max_term_days",
"is_active", "row_version", "created_at"
};
var requestedFields = fields.Length > 0
? fields.Where(f => allowedFields.Contains(f)).ToArray()
: allowedFields.ToArray();
if (requestedFields.Length == 0)
requestedFields = new[] { "id", "code", "name" };
// Siempre incluir row_version para ETags
if (!requestedFields.Contains("row_version"))
requestedFields = requestedFields.Append("row_version").ToArray();
var columns = string.Join(", ", requestedFields);
var sql = $@"
SELECT {columns}
FROM credit_products
WHERE is_active = @IsActive
ORDER BY created_at DESC
LIMIT @Limit OFFSET @Offset";
return await _readDb.QueryAsync<dynamic>(sql, new
{
filters.IsActive,
Limit = pageSize,
Offset = (page - 1) * pageSize
});
}Patron de Escritura con EF Core
Command con Concurrencia Optimista
csharp
public class UpdateCreditProductHandler
: IRequestHandler<UpdateCreditProductCommand, Result<CreditProductUpdatedResponse>>
{
private readonly ICreditProductRepository _repository;
private readonly IUnitOfWork _unitOfWork;
private readonly IPublishEndpoint _publishEndpoint;
public async Task<Result<CreditProductUpdatedResponse>> Handle(
UpdateCreditProductCommand cmd, CancellationToken ct)
{
var product = await _repository.GetByIdAsync(cmd.ProductId, ct);
if (product is null)
return Result.Failure("NOT_FOUND", "Product not found");
// Verificar concurrencia optimista (ETag)
if (product.RowVersion != cmd.ExpectedVersion)
return Result.Failure("PRECONDITION_FAILED",
"Resource was modified by another request");
// Aplicar cambios parciales (merge-patch)
product.ApplyPatch(cmd.Patch);
try
{
await _unitOfWork.SaveChangesAsync(ct);
}
catch (DbUpdateConcurrencyException)
{
// Otro proceso modifico el registro entre el GET y el SAVE
return Result.Failure("PRECONDITION_FAILED",
"Resource was modified by another request");
}
// Publicar evento
await _publishEndpoint.Publish(new CreditProductUpdated { ... }, ct);
return Result.Success(new CreditProductUpdatedResponse(
product.Id, product.RowVersion));
}
}Merge-Patch en la Entidad
csharp
public class CreditProduct : TenantEntity
{
// ... properties ...
public void ApplyPatch(CreditProductPatch patch)
{
// Solo actualizar campos que vienen en el patch (no null)
if (patch.Name is not null) Name = patch.Name;
if (patch.InterestRate is not null) InterestRate = patch.InterestRate.Value;
if (patch.MinAmount is not null) MinAmount = patch.MinAmount.Value;
if (patch.MaxAmount is not null) MaxAmount = patch.MaxAmount.Value;
if (patch.MinTermDays is not null) MinTermDays = patch.MinTermDays.Value;
if (patch.MaxTermDays is not null) MaxTermDays = patch.MaxTermDays.Value;
if (patch.IsActive is not null) IsActive = patch.IsActive.Value;
// row_version se incrementa automaticamente via trigger
}
}
// El DTO de patch usa nullable para distinguir "no enviado" de "enviado como null"
public record CreditProductPatch(
string? Name = null,
decimal? InterestRate = null,
decimal? MinAmount = null,
decimal? MaxAmount = null,
int? MinTermDays = null,
int? MaxTermDays = null,
bool? IsActive = null
);Read Replica - Consideraciones
| Aspecto | Detalle |
|---|---|
| Replication lag | Tipicamente < 100ms en Aurora Multi-AZ |
| Read-your-writes | Si el usuario acaba de escribir y necesita leer inmediatamente, usar Primary |
| RLS en Replica | Las policies RLS aplican igual. SET tenant context antes de cada query. |
| Connection pooling | Pools separados para Primary y Replica |
| Failover | Si Primary falla, Aurora promueve Replica automaticamente |
Resumen de Patrones por Capa
| Capa | Patron | Herramienta | Instancia |
|---|---|---|---|
| API Controller | Recibe request, valida ETag/Idempotency-Key | ASP.NET Core | - |
| Application Command | Logica de negocio, persiste cambios | MediatR + EF Core | Primary |
| Application Query | Lectura optimizada, sparse fieldsets | MediatR + Dapper | Read Replica |
| Infrastructure Write | DbContext con RLS interceptor | EF Core | Primary |
| Infrastructure Read | SQL directo con RLS | Dapper | Read Replica |
| Idempotency | Verificar key antes de procesar | Dapper (read) + EF Core (write) | Ambas |
| Concurrencia | row_version + trigger + ETag | PostgreSQL trigger + EF Core | Primary |