Skip to content

Contratos de Eventos — Plataforma Imagy

Estructura Base (obligatoria para todos los eventos)

json
{
  "event_id": "uuid-v4 (único por evento)",
  "event_type": "domain.entity.action",
  "version": "1.0",
  "timestamp": "2026-05-18T10:30:00Z (UTC, ISO 8601)",
  "correlation_id": "uuid (para trazar cadenas de eventos)",
  "causation_id": "uuid (evento que causó este evento, nullable)",
  "tenant_id": "uuid (OBLIGATORIO)",
  "organization_id": "uuid (nullable)",
  "source": "imagy-{service-name}",
  "actor_id": "uuid (user que originó la acción)",
  "actor_type": "user | system | scheduler | anonymous",
  "data": { }
}

Reglas

Publicación

  1. Siempre incluir tenant_id — un evento sin tenant es inválido
  2. Siempre incluir correlation_id — permite trazar flujos completos
  3. event_type sigue el patrón: {domain}.{entity}.{action}
  4. version es semver simplificado: "1.0", "2.0", etc.
  5. source identifica el servicio: "imagy-flow-engine", "imagy-lending"
  6. data contiene solo datos del evento — no el estado completo de la entidad
  7. Nunca incluir datos sensibles (passwords, tokens, PII innecesaria) en eventos

Consumo

  1. Idempotencia obligatoria — procesar el mismo evento 2 veces no debe causar problemas
  2. Trackear event_id en tabla processed_events para detectar duplicados
  3. Establecer tenant context desde tenant_id del evento antes de procesar
  4. Validar firma HMAC del mensaje antes de procesar
  5. Manejar versiones — si version es desconocida, loguear warning y skip (no fallar)
  6. Timeout de procesamiento: máximo 30 segundos por evento
  7. Retry policy: 3 intentos con backoff exponencial (1s, 5s, 25s)
  8. Dead Letter Queue: después de 3 fallos, enviar a DLQ para revisión manual

Evolución de Contratos

Cambio¿Breaking?Acción
Agregar campo a dataNoIncrementar minor version
Agregar nuevo event_typeNoDocumentar en catálogo
Eliminar campo de dataCrear nuevo event_type con nueva version
Renombrar campoCrear nuevo event_type
Cambiar tipo de un campoCrear nuevo event_type
Deprecar eventoNoMarcar como deprecated, mantener 6 meses

Seguridad

Firma de Mensajes

Cada mensaje publicado incluye un header X-Message-Signature:

X-Message-Signature: sha256={hmac_hex}
X-Message-Timestamp: {unix_timestamp}

Cálculo:

csharp
var payload = JsonSerializer.Serialize(message);
var timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds().ToString();
var signatureInput = $"{timestamp}.{payload}";
var hmac = HMACSHA256(signatureInput, sharedSecret);

Validación en consumer:

  1. Extraer timestamp del header
  2. Verificar que timestamp < 5 minutos de antigüedad (previene replay)
  3. Recalcular HMAC con el payload recibido
  4. Comparar con FixedTimeEquals (previene timing attacks)

Permisos por Servicio en RabbitMQ

ServicioPuede publicar enPuede consumir de
imagy-flow-engineimagy.events (routing: flow.*)subject.*, sign.*
imagy-lendingimagy.events (routing: lending.*)flow.*, subject.*, sign.*
imagy-signimagy.events (routing: sign.*)flow.*
imagy-subjectimagy.events (routing: subject.*)flow.*, lending.*, sign.*
imagy-identityimagy.events (routing: identity.*)

Credenciales

  • Cada servicio tiene su propio usuario/password en RabbitMQ
  • Credenciales en AWS Secrets Manager (rotación automática)
  • TLS obligatorio en conexiones a RabbitMQ
  • En local: credenciales en .env (nunca en código)

Configuración MassTransit

csharp
services.AddMassTransit(x =>
{
    x.SetKebabCaseEndpointNameFormatter();
    
    // Registrar consumers del servicio
    x.AddConsumersFromNamespaceContaining<FlowCompletedConsumer>();
    
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host(config.RabbitMqUrl, h =>
        {
            h.Username(config.RabbitMqUser);
            h.Password(config.RabbitMqPassword);
            h.UseSsl(s => s.Protocol = SslProtocols.Tls12);
        });
        
        // Exchange principal
        cfg.Message<IntegrationEvent>(e => e.SetEntityName("imagy.events"));
        
        // Retry policy
        cfg.UseMessageRetry(r => r.Exponential(3, 
            TimeSpan.FromSeconds(1), 
            TimeSpan.FromSeconds(25), 
            TimeSpan.FromSeconds(5)));
        
        // Dead Letter Queue
        cfg.ConfigureEndpoints(context);
    });
});

Tabla de Idempotencia

Cada servicio que consume eventos debe tener:

sql
CREATE TABLE processed_events (
    event_id UUID PRIMARY KEY,
    event_type VARCHAR(100) NOT NULL,
    processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    source VARCHAR(100) NOT NULL
);

-- Limpieza automática (eventos > 30 días)
-- Job programado o pg_cron
DELETE FROM processed_events WHERE processed_at < NOW() - INTERVAL '30 days';

Patrón de Consumer Base

csharp
public abstract class BaseConsumer<T> : IConsumer<T> where T : class, IIntegrationEvent
{
    private readonly IIdentityContext _identity;
    private readonly IProcessedEventStore _processedEvents;
    private readonly IMessageSignatureValidator _signatureValidator;

    public async Task Consume(ConsumeContext<T> context)
    {
        var message = context.Message;
        
        // 1. Validar firma
        if (!_signatureValidator.Validate(context))
            throw new SecurityException("Invalid message signature");
        
        // 2. Verificar idempotencia
        if (await _processedEvents.ExistsAsync(message.EventId))
            return; // Ya procesado, skip
        
        // 3. Establecer tenant context
        _identity.SetContext(message.TenantId, message.ActorId, message.ActorType);
        
        // 4. Procesar
        await HandleAsync(message, context.CancellationToken);
        
        // 5. Marcar como procesado
        await _processedEvents.MarkProcessedAsync(message.EventId, message.EventType, message.Source);
    }

    protected abstract Task HandleAsync(T message, CancellationToken ct);
}

Reimagine Tech LLC — Documentacion Interna