Skip to content

Guia: Agregar un Nuevo Evento a la Plataforma

Esta guia describe el proceso completo para definir, publicar y consumir un nuevo evento en la plataforma Imagy, desde la definicion del schema hasta el contract test.

Prerequisitos

  • Servicio productor ya existe y esta configurado con MassTransit
  • Servicio consumidor ya existe y esta configurado con MassTransit
  • RabbitMQ corriendo localmente
  • Conocimiento del envelope estandar de eventos

1. Definir el Schema del Evento

Antes de escribir codigo, documentar el evento en el catalogo de contratos:

markdown
<!-- docs/contracts/events/{domain}.{entity}.{action}.md -->

# {domain}.{entity}.{action}

| Campo | Valor |
|-------|-------|
| Routing key | `{domain}.{entity}.{action}` |
| Productor | imagy-{domain} |
| Consumidores | imagy-{consumer1}, imagy-{consumer2} |
| Version | 1.0 |
| Trigger | {descripcion de cuando se emite} |

## Payload

| Campo | Tipo | Requerido | Descripcion |
|-------|------|-----------|-------------|
| event_id | UUID | Si | Identificador unico del evento |
| event_type | string | Si | Routing key completa |
| version | string | Si | Version del schema |
| timestamp | ISO 8601 | Si | Momento del evento |
| correlation_id | UUID | Si | ID de correlacion |
| tenant_id | UUID | Si | Tenant propietario |
| data | object | Si | Payload especifico |
| data.{field} | {type} | {req} | {desc} |

2. Crear el Record en C# (Productor)

En el proyecto Domain del servicio productor:

csharp
// src/Imagy.{Domain}.Domain/Events/{Entity}{Action}Event.cs

namespace Imagy.{Domain}.Domain.Events;

/// <summary>
/// Se publica cuando {descripcion del trigger}.
/// Routing key: {domain}.{entity}.{action}
/// Consumidores: {lista de consumidores}
/// </summary>
public record EntityActionEvent
{
    public Guid EventId { get; init; } = Guid.NewGuid();
    public string EventType => "{domain}.{entity}.{action}";
    public string Version => "1.0";
    public DateTime Timestamp { get; init; } = DateTime.UtcNow;
    public Guid CorrelationId { get; init; }
    public Guid TenantId { get; init; }
    public Guid? OrganizationId { get; init; }
    public string Source => "imagy-{domain}";
    public string ActorId { get; init; } = string.Empty;
    public string ActorType { get; init; } = string.Empty;
    public required EntityActionData Data { get; init; }
}

public record EntityActionData
{
    // Campos especificos del evento
    public required Guid EntityId { get; init; }
    public required string Status { get; init; }
    // ... mas campos segun el schema
}

3. Registrar en MassTransit (Productor)

Si el evento necesita routing key personalizada, configurar el message topology:

csharp
// En Program.cs o en extension method de configuracion
builder.Services.AddMassTransit(x =>
{
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host(builder.Configuration.GetConnectionString("RabbitMq"));

        // Configurar routing key para el evento
        cfg.Message<EntityActionEvent>(m =>
        {
            m.SetEntityName("imagy.events"); // Exchange
        });

        cfg.Publish<EntityActionEvent>(p =>
        {
            p.ExchangeType = ExchangeType.Topic;
        });

        cfg.ConfigureEndpoints(context);
    });
});

4. Publicar desde el Handler

En el servicio o handler que produce el evento:

csharp
// src/Imagy.{Domain}.Domain/Services/{Entity}Service.cs

public class EntityService : IEntityService
{
    private readonly IPublishEndpoint _publishEndpoint;
    private readonly ICorrelationContext _correlationContext;
    private readonly ITenantContext _tenantContext;

    public EntityService(
        IPublishEndpoint publishEndpoint,
        ICorrelationContext correlationContext,
        ITenantContext tenantContext)
    {
        _publishEndpoint = publishEndpoint;
        _correlationContext = correlationContext;
        _tenantContext = tenantContext;
    }

    public async Task<EntityResponse> PerformActionAsync(ActionRequest request, CancellationToken ct)
    {
        // 1. Ejecutar logica de negocio
        var entity = await ProcessAction(request, ct);

        // 2. Publicar evento
        await _publishEndpoint.Publish(new EntityActionEvent
        {
            CorrelationId = _correlationContext.CorrelationId,
            TenantId = _tenantContext.TenantId,
            OrganizationId = _tenantContext.OrganizationId,
            ActorId = _correlationContext.ActorId,
            ActorType = _correlationContext.ActorType,
            Data = new EntityActionData
            {
                EntityId = entity.Id,
                Status = entity.Status
            }
        }, ct);

        // 3. Retornar resultado
        return MapToResponse(entity);
    }
}

5. Crear Consumer en el Servicio Destino

En el proyecto Infrastructure del servicio consumidor:

csharp
// src/Imagy.{Consumer}.Infrastructure/Messaging/Consumers/EntityActionConsumer.cs

namespace Imagy.{Consumer}.Infrastructure.Messaging.Consumers;

public class EntityActionConsumer : IConsumer<EntityActionEvent>
{
    private readonly ILogger<EntityActionConsumer> _logger;
    private readonly IProcessedEventsRepository _processedEvents;
    private readonly IReactionService _reactionService;

    public EntityActionConsumer(
        ILogger<EntityActionConsumer> logger,
        IProcessedEventsRepository processedEvents,
        IReactionService reactionService)
    {
        _logger = logger;
        _processedEvents = processedEvents;
        _reactionService = reactionService;
    }

    public async Task Consume(ConsumeContext<EntityActionEvent> context)
    {
        var message = context.Message;

        // 1. Idempotencia: verificar si ya se proceso
        if (await _processedEvents.ExistsAsync(message.EventId))
        {
            _logger.LogInformation("Event {EventId} already processed, skipping", message.EventId);
            return;
        }

        // 2. Procesar el evento
        await _reactionService.HandleEntityActionAsync(message.Data, context.CancellationToken);

        // 3. Marcar como procesado
        await _processedEvents.MarkProcessedAsync(message.EventId, message.EventType);

        _logger.LogInformation(
            "Processed event {EventType} with id {EventId}",
            message.EventType, message.EventId);
    }
}

6. Configurar Queue y Binding (Consumidor)

csharp
// En Program.cs del servicio consumidor
builder.Services.AddMassTransit(x =>
{
    x.AddConsumer<EntityActionConsumer>();

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host(builder.Configuration.GetConnectionString("RabbitMq"));

        cfg.ReceiveEndpoint("imagy-{consumer}-{domain}-events", e =>
        {
            e.ConfigureConsumer<EntityActionConsumer>(context);

            // Bind al exchange con routing key
            e.Bind("imagy.events", s =>
            {
                s.RoutingKey = "{domain}.{entity}.{action}";
                s.ExchangeType = ExchangeType.Topic;
            });

            // Retry policy
            e.UseMessageRetry(r => r.Intervals(
                TimeSpan.FromSeconds(1),
                TimeSpan.FromSeconds(2),
                TimeSpan.FromSeconds(4)));

            // Dead letter
            e.ConfigureDeadLetterQueueErrorTransport();

            e.PrefetchCount = 10;
        });

        cfg.ConfigureEndpoints(context);
    });
});

7. Agregar al Catalogo de Contratos

Actualizar el indice de eventos en la documentacion:

markdown
<!-- docs/contracts/events/index.md -->

| Routing Key | Productor | Consumidores | Version |
|-------------|-----------|--------------|---------|
| {domain}.{entity}.{action} | imagy-{domain} | imagy-{consumer} | 1.0 |

Crear el archivo de contrato detallado con el schema completo (paso 1).

8. Escribir Contract Test

El contract test verifica que productor y consumidor estan de acuerdo en el schema:

csharp
// tests/Imagy.{Domain}.Tests/Contracts/EntityActionEventContractTest.cs

public class EntityActionEventContractTest
{
    [Fact]
    public void EntityActionEvent_Schema_IsBackwardCompatible()
    {
        // Arrange: schema esperado por el consumidor
        var expectedFields = new[]
        {
            "EventId", "EventType", "Version", "Timestamp",
            "CorrelationId", "TenantId", "Data"
        };

        var expectedDataFields = new[]
        {
            "EntityId", "Status"
        };

        // Act: verificar que el record tiene todos los campos
        var eventType = typeof(EntityActionEvent);
        var dataType = typeof(EntityActionData);

        var eventProperties = eventType.GetProperties().Select(p => p.Name);
        var dataProperties = dataType.GetProperties().Select(p => p.Name);

        // Assert
        eventProperties.Should().Contain(expectedFields);
        dataProperties.Should().Contain(expectedDataFields);
    }

    [Fact]
    public void EntityActionEvent_Serialization_RoundTrip()
    {
        // Arrange
        var original = new EntityActionEvent
        {
            CorrelationId = Guid.NewGuid(),
            TenantId = Guid.NewGuid(),
            ActorId = "user-123",
            ActorType = "operator",
            Data = new EntityActionData
            {
                EntityId = Guid.NewGuid(),
                Status = "completed"
            }
        };

        // Act: serializar y deserializar
        var json = JsonSerializer.Serialize(original);
        var deserialized = JsonSerializer.Deserialize<EntityActionEvent>(json);

        // Assert
        deserialized.Should().BeEquivalentTo(original);
    }

    [Fact]
    public void EntityActionEvent_DefaultValues_AreCorrect()
    {
        // Arrange & Act
        var evt = new EntityActionEvent
        {
            CorrelationId = Guid.NewGuid(),
            TenantId = Guid.NewGuid(),
            Data = new EntityActionData
            {
                EntityId = Guid.NewGuid(),
                Status = "active"
            }
        };

        // Assert
        evt.EventId.Should().NotBeEmpty();
        evt.EventType.Should().Be("{domain}.{entity}.{action}");
        evt.Version.Should().Be("1.0");
        evt.Source.Should().Be("imagy-{domain}");
        evt.Timestamp.Should().BeCloseTo(DateTime.UtcNow, TimeSpan.FromSeconds(5));
    }
}

9. Verificar Localmente

bash
# 1. Levantar infraestructura
docker compose -f docker-compose.infra.yml up -d

# 2. Verificar que el exchange existe en RabbitMQ
# Abrir http://localhost:15672 (guest/guest)
# Verificar exchange "imagy.events" tipo topic

# 3. Ejecutar tests
dotnet test --filter "EntityActionEventContractTest"

# 4. Levantar productor y consumidor
cd src/Imagy.{Domain}.Api && dotnet run &
cd src/Imagy.{Consumer}.Api && dotnet run &

# 5. Disparar el evento (via endpoint o test)
curl -X POST http://localhost:{port}/api/v1/{entity}/action \
  -H "Authorization: Bearer {jwt}" \
  -H "Content-Type: application/json" \
  -d '{"field": "value"}'

# 6. Verificar en RabbitMQ que el mensaje fue consumido
# La queue debe mostrar 0 mensajes pendientes

Checklist

  • [ ] Schema documentado en catalogo de contratos
  • [ ] Record C# creado con todos los campos del envelope
  • [ ] Evento registrado en MassTransit (productor)
  • [ ] Publicacion desde el handler con correlation_id y tenant_id
  • [ ] Consumer creado con idempotencia (processed_events)
  • [ ] Queue y binding configurados (consumidor)
  • [ ] Retry policy y dead letter configurados
  • [ ] Contract test escrito y pasando
  • [ ] Verificacion local exitosa
  • [ ] Indice de eventos actualizado en documentacion

Errores Comunes

ErrorCausaSolucion
Mensaje no llega al consumerRouting key incorrectaVerificar binding en RabbitMQ Management UI
Mensaje se procesa multiples vecesFalta idempotenciaImplementar check en processed_events
Serializacion fallaCampos no coincidenVerificar que productor y consumidor usan el mismo record
Dead letter queue creceConsumer lanza excepcionRevisar logs del consumidor, fix y re-procesar
Exchange no existeMassTransit no lo creoVerificar configuracion de topology

Referencias

Reimagine Tech LLC — Documentacion Interna