Skip to content

Estrategia de Datos

Principios

  1. Database-per-Domain: Cada dominio de negocio tiene su propia base de datos
  2. CQRS: Escritura con EF Core (Primary), lectura con Dapper (Read Replica)
  3. RLS: Row Level Security en todas las tablas con datos de negocio
  4. Migraciones con DbUp: Scripts SQL versionados, no EF Migrations
  5. Soft Delete: Nunca DELETE fisico en datos de negocio (is_active = false)

CQRS - Command Query Responsibility Segregation

Por que separar lectura y escritura

AspectoEF Core (escritura)Dapper (lectura)
PropositoPersistir cambios con trackingQueries rapidas y flexibles
PerformanceOverhead de change trackingSQL directo, minimo overhead
FlexibilidadLimitado a LINQSQL completo, JOINs complejos
Instancia DBPrimary (consistencia)Read Replica (escalabilidad)
UsoINSERT, UPDATE, DELETESELECT (listados, reportes, filtros)

Alineacion con patrones avanzados de API

El uso de Dapper para lectura se alinea directamente con los patrones de API que adoptamos:

Sparse Fieldsets (?fields=id,name,status):

  • Con Dapper escribimos el SQL exacto con solo los campos solicitados
  • No hay overhead de cargar entidades completas como con EF Core
  • El query se construye dinamicamente segun los fields pedidos
csharp
public async Task<IEnumerable<dynamic>> GetProductsAsync(
    Guid tenantId, string[] fields, int page, int pageSize)
{
    await SetTenantContextAsync();

    // Solo seleccionar los campos solicitados
    var columns = fields.Length > 0
        ? string.Join(", ", fields.Select(f => SanitizeColumn(f)))
        : "id, code, name, status, interest_rate, created_at";

    var sql = $@"
        SELECT {columns}
        FROM credit_products
        WHERE is_active = true
        ORDER BY created_at DESC
        LIMIT @Limit OFFSET @Offset";

    return await _readDb.QueryAsync<dynamic>(sql, new
    {
        Limit = pageSize,
        Offset = (page - 1) * pageSize
    });
}

ETags / Concurrencia Optimista:

  • EF Core maneja el row_version automaticamente en escritura
  • Dapper incluye el row_version en las respuestas de lectura para que el cliente lo use como ETag
csharp
// Entidad con version para concurrencia optimista
public class CreditProduct : TenantEntity
{
    public string Code { get; private set; }
    public string Name { get; private set; }
    public decimal InterestRate { get; private set; }

    // Concurrencia optimista - se incrementa en cada UPDATE
    [ConcurrencyCheck]
    public int RowVersion { get; private set; }
}

// EF Core detecta conflictos automaticamente
// Si RowVersion no coincide, lanza DbUpdateConcurrencyException
csharp
// En el controller - ETag desde row_version
[HttpGet("{id}")]
public async Task<IActionResult> GetById(Guid id)
{
    var product = await _mediator.Send(new GetProductQuery(id));
    if (product is null) return NotFound();

    // ETag = hash del row_version
    var etag = $"\"{product.RowVersion}\"";
    Response.Headers.ETag = etag;

    return Ok(product.ToApiResponse());
}

[HttpPatch("{id}")]
public async Task<IActionResult> Update(
    Guid id,
    [FromBody] PatchProductDto dto,
    [FromHeader(Name = "If-Match")] string ifMatch)
{
    if (string.IsNullOrEmpty(ifMatch))
        return BadRequest("If-Match header is required");

    var expectedVersion = int.Parse(ifMatch.Trim('"'));

    var result = await _mediator.Send(new UpdateProductCommand(
        id, dto, expectedVersion));

    return result.Match(
        success => Ok(success),
        conflict => StatusCode(412, new { error = "PRECONDITION_FAILED",
            message = "Resource was modified by another request" }));
}

Idempotency Keys:

  • Se almacenan en una tabla dedicada por servicio
  • Dapper consulta rapidamente si la key ya fue procesada
  • EF Core persiste el resultado junto con la key
csharp
// Tabla de idempotencia
// CREATE TABLE idempotency_keys (
//     key VARCHAR(255) PRIMARY KEY,
//     tenant_id UUID NOT NULL,
//     response_status INT NOT NULL,
//     response_body JSONB NOT NULL,
//     created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
//     expires_at TIMESTAMPTZ NOT NULL
// );

public class IdempotencyService
{
    private readonly IDbConnection _readDb;  // Dapper - lectura rapida
    private readonly IUnitOfWork _unitOfWork; // EF Core - escritura

    public async Task<IdempotencyResult?> GetCachedResponseAsync(string key, Guid tenantId)
    {
        // Lectura rapida con Dapper
        return await _readDb.QueryFirstOrDefaultAsync<IdempotencyResult>(
            "SELECT response_status, response_body FROM idempotency_keys WHERE key = @Key AND tenant_id = @TenantId AND expires_at > NOW()",
            new { Key = key, TenantId = tenantId });
    }

    public async Task SaveResponseAsync(string key, Guid tenantId, int status, object body)
    {
        // Escritura con EF Core
        var entry = new IdempotencyKey(key, tenantId, status,
            JsonSerializer.Serialize(body),
            DateTime.UtcNow.AddHours(24));
        await _unitOfWork.IdempotencyKeys.AddAsync(entry);
        await _unitOfWork.SaveChangesAsync();
    }
}

Base de Datos por Dominio

Base de DatosDominioOwnerTablas principales
imagy_identityIdentityTeam Identitytenants, services, tenant_services, themes, user_tenants, audit_logs
imagy_flowImagFlowTeam Flowflows, flow_versions, flow_steps, flow_rules, provider_configs, requests, executions
imagy_lendingImagLendTeam Lendingcredit_products, applications, credits, disbursements, payments, collections
imagy_signImagSignTeam Flowsignature_requests, documents, certificates, signature_audit
imagy_subjectImagIDTeam Platformsubject_profiles, subject_tenant_views, subject_events, subject_devices, subject_lists

Reglas

  • Cada servicio solo accede a SU base de datos
  • Nunca queries cross-database directas
  • Si necesitas datos de otro dominio: API sincrona o evento asincrono
  • Un cluster Aurora compartido (costo-eficiente) con databases separadas (aislamiento logico)
  • Cada database tiene su propio usuario de aplicacion con permisos limitados

Usuarios de BD por Servicio

sql
-- Cada servicio tiene su propio rol
CREATE ROLE imagy_flow_app LOGIN PASSWORD '...' NOBYPASSRLS;
CREATE ROLE imagy_lending_app LOGIN PASSWORD '...' NOBYPASSRLS;
CREATE ROLE imagy_sign_app LOGIN PASSWORD '...' NOBYPASSRLS;
CREATE ROLE imagy_subject_app LOGIN PASSWORD '...' NOBYPASSRLS;

-- Cada rol solo tiene acceso a su database
GRANT CONNECT ON DATABASE imagy_flow TO imagy_flow_app;
GRANT USAGE ON SCHEMA public TO imagy_flow_app;
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA public TO imagy_flow_app;

-- Rol de solo lectura para reportes
CREATE ROLE imagy_readonly LOGIN PASSWORD '...' NOBYPASSRLS;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO imagy_readonly;

Migraciones con DbUp

Cada servicio tiene un proyecto Imagy.{Domain}.Migrations con scripts SQL versionados:

Imagy.Flow.Migrations/
├── Scripts/
│   ├── V001__create_flows_table.sql
│   ├── V002__create_flow_versions_table.sql
│   ├── V003__create_flow_steps_table.sql
│   ├── V004__create_provider_configs_table.sql
│   ├── V005__enable_rls.sql
│   ├── V006__create_idempotency_keys_table.sql
│   └── V007__add_row_version_columns.sql
├── Seed/
│   ├── S001__seed_default_providers.sql
│   └── S002__seed_test_data.sql
└── Program.cs

Convenciones de Scripts

PrefijoPropositoEjemplo
V{NNN}__Migracion de esquema (DDL)V001__create_flows_table.sql
S{NNN}__Seed dataS001__seed_default_providers.sql

Reglas de Migraciones

  • Scripts son inmutables una vez aplicados (nunca editar un script existente)
  • Cada script es idempotente donde sea posible (IF NOT EXISTS, ON CONFLICT DO NOTHING)
  • Incluir rollback como comentario al final del script
  • Siempre incluir row_version en tablas que soporten concurrencia optimista
  • Siempre incluir created_at, updated_at en todas las tablas
  • Siempre habilitar RLS en tablas con tenant_id

Ejemplo de Migracion

sql
-- V006__create_idempotency_keys_table.sql
-- Tabla para soporte de Idempotency-Key header en APIs

CREATE TABLE IF NOT EXISTS idempotency_keys (
    key VARCHAR(255) NOT NULL,
    tenant_id UUID NOT NULL,
    response_status INT NOT NULL,
    response_body JSONB NOT NULL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    expires_at TIMESTAMPTZ NOT NULL,
    PRIMARY KEY (key, tenant_id)
);

CREATE INDEX idx_idempotency_keys_expires
    ON idempotency_keys(expires_at);

-- Limpieza automatica de keys expiradas (ejecutar via pg_cron o job)
-- DELETE FROM idempotency_keys WHERE expires_at < NOW();

-- Rollback:
-- DROP TABLE IF EXISTS idempotency_keys;
sql
-- V007__add_row_version_columns.sql
-- Soporte para ETags / concurrencia optimista

ALTER TABLE credit_products
    ADD COLUMN IF NOT EXISTS row_version INT NOT NULL DEFAULT 1;

ALTER TABLE flows
    ADD COLUMN IF NOT EXISTS row_version INT NOT NULL DEFAULT 1;

-- Trigger para auto-incrementar row_version en UPDATE
CREATE OR REPLACE FUNCTION increment_row_version()
RETURNS TRIGGER AS $$
BEGIN
    NEW.row_version = OLD.row_version + 1;
    NEW.updated_at = NOW();
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trg_credit_products_version
    BEFORE UPDATE ON credit_products
    FOR EACH ROW EXECUTE FUNCTION increment_row_version();

CREATE TRIGGER trg_flows_version
    BEFORE UPDATE ON flows
    FOR EACH ROW EXECUTE FUNCTION increment_row_version();

-- Rollback:
-- DROP TRIGGER IF EXISTS trg_credit_products_version ON credit_products;
-- DROP TRIGGER IF EXISTS trg_flows_version ON flows;
-- ALTER TABLE credit_products DROP COLUMN IF EXISTS row_version;
-- ALTER TABLE flows DROP COLUMN IF EXISTS row_version;

Patron de Lectura con Dapper

Query basica con RLS

csharp
public class CreditProductReadRepository : ICreditProductReadRepository
{
    private readonly IDbConnection _readDb;
    private readonly IIdentityContext _identity;

    public async Task<PagedResult<CreditProductDto>> GetAllAsync(
        CreditProductFilters filters, int page, int pageSize)
    {
        await SetTenantContextAsync();

        var sql = @"
            SELECT id, code, name, interest_rate, min_amount, max_amount,
                   min_term_days, max_term_days, is_active, row_version, created_at
            FROM credit_products
            WHERE is_active = @IsActive
            ORDER BY created_at DESC
            LIMIT @Limit OFFSET @Offset";

        var countSql = @"
            SELECT COUNT(*) FROM credit_products WHERE is_active = @IsActive";

        var items = await _readDb.QueryAsync<CreditProductDto>(sql, new
        {
            filters.IsActive,
            Limit = pageSize,
            Offset = (page - 1) * pageSize
        });

        var total = await _readDb.ExecuteScalarAsync<int>(countSql, new
        {
            filters.IsActive
        });

        return new PagedResult<CreditProductDto>(items, total, page, pageSize);
    }

    private async Task SetTenantContextAsync()
    {
        if (_readDb.State != ConnectionState.Open)
            await ((DbConnection)_readDb).OpenAsync();

        await _readDb.ExecuteAsync(
            "SET app.current_tenant_id = @TenantId",
            new { _identity.TenantId });
    }
}

Query con Sparse Fieldsets

csharp
public async Task<IEnumerable<dynamic>> GetWithFieldsAsync(
    string[] fields, CreditProductFilters filters, int page, int pageSize)
{
    await SetTenantContextAsync();

    // Whitelist de campos permitidos (previene SQL injection)
    var allowedFields = new HashSet<string>
    {
        "id", "code", "name", "interest_rate", "min_amount",
        "max_amount", "min_term_days", "max_term_days",
        "is_active", "row_version", "created_at"
    };

    var requestedFields = fields.Length > 0
        ? fields.Where(f => allowedFields.Contains(f)).ToArray()
        : allowedFields.ToArray();

    if (requestedFields.Length == 0)
        requestedFields = new[] { "id", "code", "name" };

    // Siempre incluir row_version para ETags
    if (!requestedFields.Contains("row_version"))
        requestedFields = requestedFields.Append("row_version").ToArray();

    var columns = string.Join(", ", requestedFields);

    var sql = $@"
        SELECT {columns}
        FROM credit_products
        WHERE is_active = @IsActive
        ORDER BY created_at DESC
        LIMIT @Limit OFFSET @Offset";

    return await _readDb.QueryAsync<dynamic>(sql, new
    {
        filters.IsActive,
        Limit = pageSize,
        Offset = (page - 1) * pageSize
    });
}

Patron de Escritura con EF Core

Command con Concurrencia Optimista

csharp
public class UpdateCreditProductHandler
    : IRequestHandler<UpdateCreditProductCommand, Result<CreditProductUpdatedResponse>>
{
    private readonly ICreditProductRepository _repository;
    private readonly IUnitOfWork _unitOfWork;
    private readonly IPublishEndpoint _publishEndpoint;

    public async Task<Result<CreditProductUpdatedResponse>> Handle(
        UpdateCreditProductCommand cmd, CancellationToken ct)
    {
        var product = await _repository.GetByIdAsync(cmd.ProductId, ct);
        if (product is null)
            return Result.Failure("NOT_FOUND", "Product not found");

        // Verificar concurrencia optimista (ETag)
        if (product.RowVersion != cmd.ExpectedVersion)
            return Result.Failure("PRECONDITION_FAILED",
                "Resource was modified by another request");

        // Aplicar cambios parciales (merge-patch)
        product.ApplyPatch(cmd.Patch);

        try
        {
            await _unitOfWork.SaveChangesAsync(ct);
        }
        catch (DbUpdateConcurrencyException)
        {
            // Otro proceso modifico el registro entre el GET y el SAVE
            return Result.Failure("PRECONDITION_FAILED",
                "Resource was modified by another request");
        }

        // Publicar evento
        await _publishEndpoint.Publish(new CreditProductUpdated { ... }, ct);

        return Result.Success(new CreditProductUpdatedResponse(
            product.Id, product.RowVersion));
    }
}

Merge-Patch en la Entidad

csharp
public class CreditProduct : TenantEntity
{
    // ... properties ...

    public void ApplyPatch(CreditProductPatch patch)
    {
        // Solo actualizar campos que vienen en el patch (no null)
        if (patch.Name is not null) Name = patch.Name;
        if (patch.InterestRate is not null) InterestRate = patch.InterestRate.Value;
        if (patch.MinAmount is not null) MinAmount = patch.MinAmount.Value;
        if (patch.MaxAmount is not null) MaxAmount = patch.MaxAmount.Value;
        if (patch.MinTermDays is not null) MinTermDays = patch.MinTermDays.Value;
        if (patch.MaxTermDays is not null) MaxTermDays = patch.MaxTermDays.Value;
        if (patch.IsActive is not null) IsActive = patch.IsActive.Value;
        // row_version se incrementa automaticamente via trigger
    }
}

// El DTO de patch usa nullable para distinguir "no enviado" de "enviado como null"
public record CreditProductPatch(
    string? Name = null,
    decimal? InterestRate = null,
    decimal? MinAmount = null,
    decimal? MaxAmount = null,
    int? MinTermDays = null,
    int? MaxTermDays = null,
    bool? IsActive = null
);

Read Replica - Consideraciones

AspectoDetalle
Replication lagTipicamente < 100ms en Aurora Multi-AZ
Read-your-writesSi el usuario acaba de escribir y necesita leer inmediatamente, usar Primary
RLS en ReplicaLas policies RLS aplican igual. SET tenant context antes de cada query.
Connection poolingPools separados para Primary y Replica
FailoverSi Primary falla, Aurora promueve Replica automaticamente

Resumen de Patrones por Capa

CapaPatronHerramientaInstancia
API ControllerRecibe request, valida ETag/Idempotency-KeyASP.NET Core-
Application CommandLogica de negocio, persiste cambiosMediatR + EF CorePrimary
Application QueryLectura optimizada, sparse fieldsetsMediatR + DapperRead Replica
Infrastructure WriteDbContext con RLS interceptorEF CorePrimary
Infrastructure ReadSQL directo con RLSDapperRead Replica
IdempotencyVerificar key antes de procesarDapper (read) + EF Core (write)Ambas
Concurrenciarow_version + trigger + ETagPostgreSQL trigger + EF CorePrimary

Reimagine Tech LLC — Documentacion Interna