Skip to content

Comunicación entre Servicios

Principios

  1. Cada servicio es autónomo — tiene su propia BD y puede operar independientemente
  2. Preferir asíncrono — usar eventos cuando no se necesita respuesta inmediata
  3. JWT propagado — las llamadas síncronas llevan el JWT original del usuario
  4. Eventos son contratos — cambiar un evento es un breaking change que requiere coordinación
  5. Idempotencia obligatoria — todo consumer debe poder procesar el mismo evento múltiples veces sin efectos adversos

Tipos de Comunicación

TipoCuándo usarLatenciaAcoplamiento
HTTP síncronoNecesitas datos para continuar el procesoBaja (ms)Alto (temporal)
Evento asíncronoNotificar algo que pasó, sin esperar respuestaVariableBajo
HTTP + eventoAcción que requiere confirmación + notificación posteriorMixtaMedio

Comunicación Síncrona (HTTP)

Patrón: JWT Propagation

Cuando un servicio necesita llamar a otro sincrónicamente, propaga el JWT del request original:

Implementación: JwtPropagationHandler

csharp
public class JwtPropagationHandler : DelegatingHandler
{
    private readonly IHttpContextAccessor _httpContextAccessor;

    protected override Task<HttpResponseMessage> SendAsync(
        HttpRequestMessage request, CancellationToken ct)
    {
        var authHeader = _httpContextAccessor.HttpContext?
            .Request.Headers.Authorization.FirstOrDefault();

        if (!string.IsNullOrEmpty(authHeader))
            request.Headers.TryAddWithoutValidation("Authorization", authHeader);

        return base.SendAsync(request, ct);
    }
}

// Registro en DI
services.AddHttpClient("ImagSign")
    .AddHttpMessageHandler<JwtPropagationHandler>()
    .AddPolicyHandler(GetRetryPolicy())
    .AddPolicyHandler(GetCircuitBreakerPolicy());

Resilience (Polly)

Todas las llamadas HTTP entre servicios usan Polly:

csharp
// Retry: 3 intentos con backoff exponencial
static IAsyncPolicy<HttpResponseMessage> GetRetryPolicy() =>
    HttpPolicyExtensions
        .HandleTransientHttpError()
        .WaitAndRetryAsync(3, attempt => 
            TimeSpan.FromMilliseconds(200 * Math.Pow(2, attempt)));

// Circuit Breaker: 5 fallos → abierto 30s
static IAsyncPolicy<HttpResponseMessage> GetCircuitBreakerPolicy() =>
    HttpPolicyExtensions
        .HandleTransientHttpError()
        .CircuitBreakerAsync(5, TimeSpan.FromSeconds(30));

Timeouts

LlamadaTimeoutJustificación
Servicio → Servicio (queries)5sQueries deben ser rápidas
Servicio → Servicio (commands)15sCommands pueden involucrar escritura
Servicio → Provider Gateway30sProveedores externos son lentos
Provider Gateway → Proveedor externo30sConfigurable por proveedor

Catálogo de Llamadas Síncronas

OrigenDestinoEndpointMétodoPropósito
ImagLendImagFlow/api/v1/executions/triggerPOSTIniciar flujo de validación para solicitud de crédito
ImagFlowImagSign/api/v1/signatures/requestPOSTEjecutar firma como paso del flujo
ImagLendImagID/api/v1/subjects/{id}/listsGETVerificar si sujeto está en lista negra
ImagFlowImagID/api/v1/subjects/{id}/metricsGETObtener historial para evaluación de reglas
ImagLendImagID/api/v1/subjects/{id}/profileGETObtener perfil del sujeto

Comunicación Asíncrona (Eventos)

Infraestructura

ComponenteValor
BrokerRabbitMQ (Amazon MQ en producción)
LibreríaMassTransit 8.x
Exchangeimagy.events (tipo: topic)
Routing key{domain}.{entity}.{action}
SerializaciónJSON (System.Text.Json)
SeguridadTLS + firma HMAC-SHA256

Topología

Estructura Base de un Evento

json
{
  "event_id": "550e8400-e29b-41d4-a716-446655440000",
  "event_type": "lending.credit.disbursed",
  "version": "1.0",
  "timestamp": "2026-05-18T10:30:00Z",
  "correlation_id": "660e8400-e29b-41d4-a716-446655440001",
  "causation_id": "770e8400-e29b-41d4-a716-446655440002",
  "tenant_id": "880e8400-e29b-41d4-a716-446655440003",
  "organization_id": "990e8400-e29b-41d4-a716-446655440004",
  "source": "imagy-lending",
  "actor_id": "aa0e8400-e29b-41d4-a716-446655440005",
  "actor_type": "user",
  "data": {
    "credit_id": "bb0e8400-e29b-41d4-a716-446655440006",
    "subject_identifier": "1234567890",
    "amount": 500000,
    "currency": "COP",
    "product_code": "microcredito-30d"
  }
}

Campos Obligatorios

CampoTipoDescripción
event_idUUIDIdentificador único del evento (para idempotencia)
event_typestring{domain}.{entity}.{action}
versionstringVersión del schema ("1.0")
timestampISO 8601Momento en que ocurrió el evento (UTC)
correlation_idUUIDTraza la cadena completa de eventos relacionados
tenant_idUUIDTenant donde ocurrió (OBLIGATORIO)
sourcestringServicio que publicó ("imagy-lending")
actor_idstringUsuario que originó la acción
actor_typestring"user" / "system" / "scheduler" / "anonymous"
dataobjectPayload específico del evento

Campos Opcionales

CampoTipoDescripción
causation_idUUIDEvento que causó este evento
organization_idUUIDOrganización dentro del tenant

Publicación de Eventos

csharp
// REGLA: Publicar DESPUÉS de persistir exitosamente
public class DisburseCreditHandler : IRequestHandler<DisburseCreditCommand, Result<...>>
{
    public async Task<Result<...>> Handle(DisburseCreditCommand cmd, CancellationToken ct)
    {
        // 1. Lógica de negocio
        var credit = await _repository.GetByIdAsync(cmd.CreditId, ct);
        credit.Disburse(cmd.Amount, cmd.DisbursementDate);

        // 2. Persistir
        await _unitOfWork.SaveChangesAsync(ct);

        // 3. Publicar evento DESPUÉS de persistir
        await _publishEndpoint.Publish(new CreditDisbursed
        {
            EventId = Guid.NewGuid(),
            EventType = "lending.credit.disbursed",
            Version = "1.0",
            Timestamp = DateTime.UtcNow,
            CorrelationId = _identity.CorrelationId,
            TenantId = _identity.TenantId,
            Source = "imagy-lending",
            ActorId = _identity.UserId,
            ActorType = _identity.ActorType,
            Data = new
            {
                CreditId = credit.Id,
                SubjectIdentifier = credit.SubjectIdentifier,
                Amount = cmd.Amount,
                Currency = "COP",
                ProductCode = credit.ProductCode
            }
        }, ct);

        return Result.Success(...);
    }
}

Consumo de Eventos

csharp
public class CreditDisbursedConsumer : BaseConsumer<CreditDisbursed>
{
    private readonly ISubjectEventRepository _eventRepo;

    protected override async Task HandleAsync(CreditDisbursed message, CancellationToken ct)
    {
        // IIdentityContext ya establecido por BaseConsumer (tenant_id del evento)
        
        // Registrar evento en el historial del sujeto
        var subjectEvent = SubjectEvent.Create(
            subjectIdentifier: message.Data.SubjectIdentifier,
            eventType: "credit_disbursed",
            sourceService: message.Source,
            sourceId: message.Data.CreditId,
            data: JsonSerializer.SerializeToDocument(message.Data));

        await _eventRepo.AddAsync(subjectEvent, ct);
        await _unitOfWork.SaveChangesAsync(ct);
    }
}

Idempotencia

Cada consumer verifica si ya procesó el evento:

csharp
public abstract class BaseConsumer<T> : IConsumer<T> where T : class, IIntegrationEvent
{
    public async Task Consume(ConsumeContext<T> context)
    {
        var message = context.Message;

        // 1. Validar firma HMAC
        if (!_signatureValidator.Validate(context))
            throw new SecurityException("Invalid message signature");

        // 2. Idempotencia: ¿ya procesado?
        if (await _processedEvents.ExistsAsync(message.EventId))
            return;

        // 3. Establecer tenant context
        _identity.SetContext(message.TenantId, message.ActorId, message.ActorType);

        // 4. Procesar
        await HandleAsync(message, context.CancellationToken);

        // 5. Marcar como procesado
        await _processedEvents.MarkProcessedAsync(
            message.EventId, message.EventType, message.Source);
    }

    protected abstract Task HandleAsync(T message, CancellationToken ct);
}

Dead Letter Queue (DLQ)

Eventos que fallan después de 3 reintentos van a la DLQ:

Configuración MassTransit

csharp
services.AddMassTransit(x =>
{
    x.SetKebabCaseEndpointNameFormatter();
    x.AddConsumersFromNamespaceContaining<CreditDisbursedConsumer>();

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host(config.RabbitMqUrl, h =>
        {
            h.Username(config.RabbitMqUser);
            h.Password(config.RabbitMqPassword);
            h.UseSsl(s => s.Protocol = SslProtocols.Tls12);
        });

        // Retry policy
        cfg.UseMessageRetry(r => r.Exponential(
            retryCount: 3,
            minInterval: TimeSpan.FromSeconds(1),
            maxInterval: TimeSpan.FromSeconds(25),
            intervalDelta: TimeSpan.FromSeconds(5)));

        // Configurar endpoints automáticamente
        cfg.ConfigureEndpoints(context);
    });
});

Seguridad en la Comunicación

HTTP (servicio-a-servicio)

CapaMecanismo
TransporteTLS 1.2+ (obligatorio)
AutenticaciónJWT firmado por Keycloak (propagado)
AutorizaciónCada servicio valida roles del JWT
Tenant isolationIIdentityContext extrae tenant_id del JWT → RLS

Eventos (RabbitMQ)

CapaMecanismo
TransporteTLS 1.2+ (conexión a RabbitMQ)
AutenticaciónCredenciales por servicio (usuario/password)
IntegridadFirma HMAC-SHA256 por mensaje
Tenant isolationtenant_id en payload → consumer establece RLS
Anti-replayTimestamp < 5 minutos + idempotencia por event_id

Firma HMAC de Mensajes

csharp
// Productor: firma el mensaje
public class MessageSigningFilter<T> : IFilter<PublishContext<T>> where T : class
{
    public async Task Send(PublishContext<T> context, IPipe<PublishContext<T>> next)
    {
        var payload = JsonSerializer.Serialize(context.Message);
        var timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds().ToString();
        var signatureInput = $"{timestamp}.{payload}";
        
        using var hmac = new HMACSHA256(Encoding.UTF8.GetBytes(_sharedSecret));
        var hash = hmac.ComputeHash(Encoding.UTF8.GetBytes(signatureInput));
        var signature = Convert.ToHexString(hash).ToLowerInvariant();

        context.Headers.Set("X-Message-Signature", $"sha256={signature}");
        context.Headers.Set("X-Message-Timestamp", timestamp);

        await next.Send(context);
    }
}

// Consumer: valida la firma
public class MessageSignatureValidator
{
    public bool Validate<T>(ConsumeContext<T> context) where T : class
    {
        var signature = context.Headers.Get<string>("X-Message-Signature");
        var timestamp = context.Headers.Get<string>("X-Message-Timestamp");

        if (string.IsNullOrEmpty(signature) || string.IsNullOrEmpty(timestamp))
            return false;

        // Anti-replay: timestamp < 5 minutos
        var messageTime = DateTimeOffset.FromUnixTimeSeconds(long.Parse(timestamp));
        if (DateTimeOffset.UtcNow - messageTime > TimeSpan.FromMinutes(5))
            return false;

        // Recalcular HMAC
        var payload = JsonSerializer.Serialize(context.Message);
        var signatureInput = $"{timestamp}.{payload}";
        
        using var hmac = new HMACSHA256(Encoding.UTF8.GetBytes(_sharedSecret));
        var hash = hmac.ComputeHash(Encoding.UTF8.GetBytes(signatureInput));
        var expected = $"sha256={Convert.ToHexString(hash).ToLowerInvariant()}";

        return CryptographicOperations.FixedTimeEquals(
            Encoding.UTF8.GetBytes(signature),
            Encoding.UTF8.GetBytes(expected));
    }
}

Correlation y Trazabilidad

Correlation ID

Cada request que entra al sistema genera (o propaga) un correlation_id. Este ID viaja por toda la cadena:

Implementación

csharp
// Middleware en cada servicio: propaga o genera correlation_id
public class CorrelationIdMiddleware
{
    public async Task InvokeAsync(HttpContext context)
    {
        var correlationId = context.Request.Headers["X-Correlation-Id"].FirstOrDefault()
            ?? Guid.NewGuid().ToString();

        context.Items["CorrelationId"] = correlationId;
        context.Response.Headers.Append("X-Correlation-Id", correlationId);

        using (LogContext.PushProperty("CorrelationId", correlationId))
        {
            await _next(context);
        }
    }
}

Patrones de Comunicación por Caso de Uso

Caso 1: Solicitud de crédito con validación

Caso 2: Sujeto en lista negra bloquea crédito

Caso 3: Firma standalone (sin flujo)


Reglas de Comunicación

HACER

  • ✅ Propagar JWT en llamadas HTTP entre servicios
  • ✅ Incluir tenant_id en todos los eventos
  • ✅ Incluir correlation_id en todos los eventos y llamadas HTTP
  • ✅ Publicar eventos DESPUÉS de persistir exitosamente
  • ✅ Implementar idempotencia en todos los consumers
  • ✅ Usar Polly (retry + circuit breaker) en llamadas HTTP
  • ✅ Firmar mensajes con HMAC-SHA256
  • ✅ Validar firma antes de procesar un evento
  • ✅ Usar TLS en todas las conexiones

NO HACER

  • ❌ Acceder a la BD de otro servicio directamente
  • ❌ Confiar en headers custom (X-Tenant-Id) — siempre JWT
  • ❌ Publicar eventos ANTES de persistir (riesgo de inconsistencia)
  • ❌ Asumir orden de llegada de eventos
  • ❌ Hacer llamadas síncronas dentro de un consumer de eventos
  • ❌ Ignorar errores de publicación de eventos
  • ❌ Crear dependencias circulares síncronas (A→B→A)

Reimagine Tech LLC — Documentacion Interna