Contratos de Eventos — Plataforma Imagy
Estructura Base (obligatoria para todos los eventos)
json
{
"event_id": "uuid-v4 (único por evento)",
"event_type": "domain.entity.action",
"version": "1.0",
"timestamp": "2026-05-18T10:30:00Z (UTC, ISO 8601)",
"correlation_id": "uuid (para trazar cadenas de eventos)",
"causation_id": "uuid (evento que causó este evento, nullable)",
"tenant_id": "uuid (OBLIGATORIO)",
"organization_id": "uuid (nullable)",
"source": "imagy-{service-name}",
"actor_id": "uuid (user que originó la acción)",
"actor_type": "user | system | scheduler | anonymous",
"data": { }
}Reglas
Publicación
- Siempre incluir
tenant_id— un evento sin tenant es inválido - Siempre incluir
correlation_id— permite trazar flujos completos event_typesigue el patrón:{domain}.{entity}.{action}versiones semver simplificado:"1.0","2.0", etc.sourceidentifica el servicio:"imagy-flow-engine","imagy-lending"datacontiene solo datos del evento — no el estado completo de la entidad- Nunca incluir datos sensibles (passwords, tokens, PII innecesaria) en eventos
Consumo
- Idempotencia obligatoria — procesar el mismo evento 2 veces no debe causar problemas
- Trackear
event_iden tablaprocessed_eventspara detectar duplicados - Establecer tenant context desde
tenant_iddel evento antes de procesar - Validar firma HMAC del mensaje antes de procesar
- Manejar versiones — si
versiones desconocida, loguear warning y skip (no fallar) - Timeout de procesamiento: máximo 30 segundos por evento
- Retry policy: 3 intentos con backoff exponencial (1s, 5s, 25s)
- Dead Letter Queue: después de 3 fallos, enviar a DLQ para revisión manual
Evolución de Contratos
| Cambio | ¿Breaking? | Acción |
|---|---|---|
Agregar campo a data | No | Incrementar minor version |
Agregar nuevo event_type | No | Documentar en catálogo |
Eliminar campo de data | Sí | Crear nuevo event_type con nueva version |
| Renombrar campo | Sí | Crear nuevo event_type |
| Cambiar tipo de un campo | Sí | Crear nuevo event_type |
| Deprecar evento | No | Marcar como deprecated, mantener 6 meses |
Seguridad
Firma de Mensajes
Cada mensaje publicado incluye un header X-Message-Signature:
X-Message-Signature: sha256={hmac_hex}
X-Message-Timestamp: {unix_timestamp}Cálculo:
csharp
var payload = JsonSerializer.Serialize(message);
var timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds().ToString();
var signatureInput = $"{timestamp}.{payload}";
var hmac = HMACSHA256(signatureInput, sharedSecret);Validación en consumer:
- Extraer timestamp del header
- Verificar que timestamp < 5 minutos de antigüedad (previene replay)
- Recalcular HMAC con el payload recibido
- Comparar con
FixedTimeEquals(previene timing attacks)
Permisos por Servicio en RabbitMQ
| Servicio | Puede publicar en | Puede consumir de |
|---|---|---|
| imagy-flow-engine | imagy.events (routing: flow.*) | subject.*, sign.* |
| imagy-lending | imagy.events (routing: lending.*) | flow.*, subject.*, sign.* |
| imagy-sign | imagy.events (routing: sign.*) | flow.* |
| imagy-subject | imagy.events (routing: subject.*) | flow.*, lending.*, sign.* |
| imagy-identity | imagy.events (routing: identity.*) | — |
Credenciales
- Cada servicio tiene su propio usuario/password en RabbitMQ
- Credenciales en AWS Secrets Manager (rotación automática)
- TLS obligatorio en conexiones a RabbitMQ
- En local: credenciales en
.env(nunca en código)
Configuración MassTransit
csharp
services.AddMassTransit(x =>
{
x.SetKebabCaseEndpointNameFormatter();
// Registrar consumers del servicio
x.AddConsumersFromNamespaceContaining<FlowCompletedConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(config.RabbitMqUrl, h =>
{
h.Username(config.RabbitMqUser);
h.Password(config.RabbitMqPassword);
h.UseSsl(s => s.Protocol = SslProtocols.Tls12);
});
// Exchange principal
cfg.Message<IntegrationEvent>(e => e.SetEntityName("imagy.events"));
// Retry policy
cfg.UseMessageRetry(r => r.Exponential(3,
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(25),
TimeSpan.FromSeconds(5)));
// Dead Letter Queue
cfg.ConfigureEndpoints(context);
});
});Tabla de Idempotencia
Cada servicio que consume eventos debe tener:
sql
CREATE TABLE processed_events (
event_id UUID PRIMARY KEY,
event_type VARCHAR(100) NOT NULL,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
source VARCHAR(100) NOT NULL
);
-- Limpieza automática (eventos > 30 días)
-- Job programado o pg_cron
DELETE FROM processed_events WHERE processed_at < NOW() - INTERVAL '30 days';Patrón de Consumer Base
csharp
public abstract class BaseConsumer<T> : IConsumer<T> where T : class, IIntegrationEvent
{
private readonly IIdentityContext _identity;
private readonly IProcessedEventStore _processedEvents;
private readonly IMessageSignatureValidator _signatureValidator;
public async Task Consume(ConsumeContext<T> context)
{
var message = context.Message;
// 1. Validar firma
if (!_signatureValidator.Validate(context))
throw new SecurityException("Invalid message signature");
// 2. Verificar idempotencia
if (await _processedEvents.ExistsAsync(message.EventId))
return; // Ya procesado, skip
// 3. Establecer tenant context
_identity.SetContext(message.TenantId, message.ActorId, message.ActorType);
// 4. Procesar
await HandleAsync(message, context.CancellationToken);
// 5. Marcar como procesado
await _processedEvents.MarkProcessedAsync(message.EventId, message.EventType, message.Source);
}
protected abstract Task HandleAsync(T message, CancellationToken ct);
}