Guia: Agregar un Nuevo Evento a la Plataforma
Esta guia describe el proceso completo para definir, publicar y consumir un nuevo evento en la plataforma Imagy, desde la definicion del schema hasta el contract test.
Prerequisitos
- Servicio productor ya existe y esta configurado con MassTransit
- Servicio consumidor ya existe y esta configurado con MassTransit
- RabbitMQ corriendo localmente
- Conocimiento del envelope estandar de eventos
1. Definir el Schema del Evento
Antes de escribir codigo, documentar el evento en el catalogo de contratos:
markdown
<!-- docs/contracts/events/{domain}.{entity}.{action}.md -->
# {domain}.{entity}.{action}
| Campo | Valor |
|-------|-------|
| Routing key | `{domain}.{entity}.{action}` |
| Productor | imagy-{domain} |
| Consumidores | imagy-{consumer1}, imagy-{consumer2} |
| Version | 1.0 |
| Trigger | {descripcion de cuando se emite} |
## Payload
| Campo | Tipo | Requerido | Descripcion |
|-------|------|-----------|-------------|
| event_id | UUID | Si | Identificador unico del evento |
| event_type | string | Si | Routing key completa |
| version | string | Si | Version del schema |
| timestamp | ISO 8601 | Si | Momento del evento |
| correlation_id | UUID | Si | ID de correlacion |
| tenant_id | UUID | Si | Tenant propietario |
| data | object | Si | Payload especifico |
| data.{field} | {type} | {req} | {desc} |2. Crear el Record en C# (Productor)
En el proyecto Domain del servicio productor:
csharp
// src/Imagy.{Domain}.Domain/Events/{Entity}{Action}Event.cs
namespace Imagy.{Domain}.Domain.Events;
/// <summary>
/// Se publica cuando {descripcion del trigger}.
/// Routing key: {domain}.{entity}.{action}
/// Consumidores: {lista de consumidores}
/// </summary>
public record EntityActionEvent
{
public Guid EventId { get; init; } = Guid.NewGuid();
public string EventType => "{domain}.{entity}.{action}";
public string Version => "1.0";
public DateTime Timestamp { get; init; } = DateTime.UtcNow;
public Guid CorrelationId { get; init; }
public Guid TenantId { get; init; }
public Guid? OrganizationId { get; init; }
public string Source => "imagy-{domain}";
public string ActorId { get; init; } = string.Empty;
public string ActorType { get; init; } = string.Empty;
public required EntityActionData Data { get; init; }
}
public record EntityActionData
{
// Campos especificos del evento
public required Guid EntityId { get; init; }
public required string Status { get; init; }
// ... mas campos segun el schema
}3. Registrar en MassTransit (Productor)
Si el evento necesita routing key personalizada, configurar el message topology:
csharp
// En Program.cs o en extension method de configuracion
builder.Services.AddMassTransit(x =>
{
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(builder.Configuration.GetConnectionString("RabbitMq"));
// Configurar routing key para el evento
cfg.Message<EntityActionEvent>(m =>
{
m.SetEntityName("imagy.events"); // Exchange
});
cfg.Publish<EntityActionEvent>(p =>
{
p.ExchangeType = ExchangeType.Topic;
});
cfg.ConfigureEndpoints(context);
});
});4. Publicar desde el Handler
En el servicio o handler que produce el evento:
csharp
// src/Imagy.{Domain}.Domain/Services/{Entity}Service.cs
public class EntityService : IEntityService
{
private readonly IPublishEndpoint _publishEndpoint;
private readonly ICorrelationContext _correlationContext;
private readonly ITenantContext _tenantContext;
public EntityService(
IPublishEndpoint publishEndpoint,
ICorrelationContext correlationContext,
ITenantContext tenantContext)
{
_publishEndpoint = publishEndpoint;
_correlationContext = correlationContext;
_tenantContext = tenantContext;
}
public async Task<EntityResponse> PerformActionAsync(ActionRequest request, CancellationToken ct)
{
// 1. Ejecutar logica de negocio
var entity = await ProcessAction(request, ct);
// 2. Publicar evento
await _publishEndpoint.Publish(new EntityActionEvent
{
CorrelationId = _correlationContext.CorrelationId,
TenantId = _tenantContext.TenantId,
OrganizationId = _tenantContext.OrganizationId,
ActorId = _correlationContext.ActorId,
ActorType = _correlationContext.ActorType,
Data = new EntityActionData
{
EntityId = entity.Id,
Status = entity.Status
}
}, ct);
// 3. Retornar resultado
return MapToResponse(entity);
}
}5. Crear Consumer en el Servicio Destino
En el proyecto Infrastructure del servicio consumidor:
csharp
// src/Imagy.{Consumer}.Infrastructure/Messaging/Consumers/EntityActionConsumer.cs
namespace Imagy.{Consumer}.Infrastructure.Messaging.Consumers;
public class EntityActionConsumer : IConsumer<EntityActionEvent>
{
private readonly ILogger<EntityActionConsumer> _logger;
private readonly IProcessedEventsRepository _processedEvents;
private readonly IReactionService _reactionService;
public EntityActionConsumer(
ILogger<EntityActionConsumer> logger,
IProcessedEventsRepository processedEvents,
IReactionService reactionService)
{
_logger = logger;
_processedEvents = processedEvents;
_reactionService = reactionService;
}
public async Task Consume(ConsumeContext<EntityActionEvent> context)
{
var message = context.Message;
// 1. Idempotencia: verificar si ya se proceso
if (await _processedEvents.ExistsAsync(message.EventId))
{
_logger.LogInformation("Event {EventId} already processed, skipping", message.EventId);
return;
}
// 2. Procesar el evento
await _reactionService.HandleEntityActionAsync(message.Data, context.CancellationToken);
// 3. Marcar como procesado
await _processedEvents.MarkProcessedAsync(message.EventId, message.EventType);
_logger.LogInformation(
"Processed event {EventType} with id {EventId}",
message.EventType, message.EventId);
}
}6. Configurar Queue y Binding (Consumidor)
csharp
// En Program.cs del servicio consumidor
builder.Services.AddMassTransit(x =>
{
x.AddConsumer<EntityActionConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(builder.Configuration.GetConnectionString("RabbitMq"));
cfg.ReceiveEndpoint("imagy-{consumer}-{domain}-events", e =>
{
e.ConfigureConsumer<EntityActionConsumer>(context);
// Bind al exchange con routing key
e.Bind("imagy.events", s =>
{
s.RoutingKey = "{domain}.{entity}.{action}";
s.ExchangeType = ExchangeType.Topic;
});
// Retry policy
e.UseMessageRetry(r => r.Intervals(
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(2),
TimeSpan.FromSeconds(4)));
// Dead letter
e.ConfigureDeadLetterQueueErrorTransport();
e.PrefetchCount = 10;
});
cfg.ConfigureEndpoints(context);
});
});7. Agregar al Catalogo de Contratos
Actualizar el indice de eventos en la documentacion:
markdown
<!-- docs/contracts/events/index.md -->
| Routing Key | Productor | Consumidores | Version |
|-------------|-----------|--------------|---------|
| {domain}.{entity}.{action} | imagy-{domain} | imagy-{consumer} | 1.0 |Crear el archivo de contrato detallado con el schema completo (paso 1).
8. Escribir Contract Test
El contract test verifica que productor y consumidor estan de acuerdo en el schema:
csharp
// tests/Imagy.{Domain}.Tests/Contracts/EntityActionEventContractTest.cs
public class EntityActionEventContractTest
{
[Fact]
public void EntityActionEvent_Schema_IsBackwardCompatible()
{
// Arrange: schema esperado por el consumidor
var expectedFields = new[]
{
"EventId", "EventType", "Version", "Timestamp",
"CorrelationId", "TenantId", "Data"
};
var expectedDataFields = new[]
{
"EntityId", "Status"
};
// Act: verificar que el record tiene todos los campos
var eventType = typeof(EntityActionEvent);
var dataType = typeof(EntityActionData);
var eventProperties = eventType.GetProperties().Select(p => p.Name);
var dataProperties = dataType.GetProperties().Select(p => p.Name);
// Assert
eventProperties.Should().Contain(expectedFields);
dataProperties.Should().Contain(expectedDataFields);
}
[Fact]
public void EntityActionEvent_Serialization_RoundTrip()
{
// Arrange
var original = new EntityActionEvent
{
CorrelationId = Guid.NewGuid(),
TenantId = Guid.NewGuid(),
ActorId = "user-123",
ActorType = "operator",
Data = new EntityActionData
{
EntityId = Guid.NewGuid(),
Status = "completed"
}
};
// Act: serializar y deserializar
var json = JsonSerializer.Serialize(original);
var deserialized = JsonSerializer.Deserialize<EntityActionEvent>(json);
// Assert
deserialized.Should().BeEquivalentTo(original);
}
[Fact]
public void EntityActionEvent_DefaultValues_AreCorrect()
{
// Arrange & Act
var evt = new EntityActionEvent
{
CorrelationId = Guid.NewGuid(),
TenantId = Guid.NewGuid(),
Data = new EntityActionData
{
EntityId = Guid.NewGuid(),
Status = "active"
}
};
// Assert
evt.EventId.Should().NotBeEmpty();
evt.EventType.Should().Be("{domain}.{entity}.{action}");
evt.Version.Should().Be("1.0");
evt.Source.Should().Be("imagy-{domain}");
evt.Timestamp.Should().BeCloseTo(DateTime.UtcNow, TimeSpan.FromSeconds(5));
}
}9. Verificar Localmente
bash
# 1. Levantar infraestructura
docker compose -f docker-compose.infra.yml up -d
# 2. Verificar que el exchange existe en RabbitMQ
# Abrir http://localhost:15672 (guest/guest)
# Verificar exchange "imagy.events" tipo topic
# 3. Ejecutar tests
dotnet test --filter "EntityActionEventContractTest"
# 4. Levantar productor y consumidor
cd src/Imagy.{Domain}.Api && dotnet run &
cd src/Imagy.{Consumer}.Api && dotnet run &
# 5. Disparar el evento (via endpoint o test)
curl -X POST http://localhost:{port}/api/v1/{entity}/action \
-H "Authorization: Bearer {jwt}" \
-H "Content-Type: application/json" \
-d '{"field": "value"}'
# 6. Verificar en RabbitMQ que el mensaje fue consumido
# La queue debe mostrar 0 mensajes pendientesChecklist
- [ ] Schema documentado en catalogo de contratos
- [ ] Record C# creado con todos los campos del envelope
- [ ] Evento registrado en MassTransit (productor)
- [ ] Publicacion desde el handler con correlation_id y tenant_id
- [ ] Consumer creado con idempotencia (processed_events)
- [ ] Queue y binding configurados (consumidor)
- [ ] Retry policy y dead letter configurados
- [ ] Contract test escrito y pasando
- [ ] Verificacion local exitosa
- [ ] Indice de eventos actualizado en documentacion
Errores Comunes
| Error | Causa | Solucion |
|---|---|---|
| Mensaje no llega al consumer | Routing key incorrecta | Verificar binding en RabbitMQ Management UI |
| Mensaje se procesa multiples veces | Falta idempotencia | Implementar check en processed_events |
| Serializacion falla | Campos no coinciden | Verificar que productor y consumidor usan el mismo record |
| Dead letter queue crece | Consumer lanza excepcion | Revisar logs del consumidor, fix y re-procesar |
| Exchange no existe | MassTransit no lo creo | Verificar configuracion de topology |
Referencias
- Comunicacion entre Servicios — Patrones de mensajeria
- Catalogo de Eventos — Todos los eventos de la plataforma
- Guia: Nuevo Servicio — Si necesitas crear el servicio primero