Comunicación entre Servicios
Principios
- Cada servicio es autónomo — tiene su propia BD y puede operar independientemente
- Preferir asíncrono — usar eventos cuando no se necesita respuesta inmediata
- JWT propagado — las llamadas síncronas llevan el JWT original del usuario
- Eventos son contratos — cambiar un evento es un breaking change que requiere coordinación
- Idempotencia obligatoria — todo consumer debe poder procesar el mismo evento múltiples veces sin efectos adversos
Tipos de Comunicación
| Tipo | Cuándo usar | Latencia | Acoplamiento |
|---|---|---|---|
| HTTP síncrono | Necesitas datos para continuar el proceso | Baja (ms) | Alto (temporal) |
| Evento asíncrono | Notificar algo que pasó, sin esperar respuesta | Variable | Bajo |
| HTTP + evento | Acción que requiere confirmación + notificación posterior | Mixta | Medio |
Comunicación Síncrona (HTTP)
Patrón: JWT Propagation
Cuando un servicio necesita llamar a otro sincrónicamente, propaga el JWT del request original:
Implementación: JwtPropagationHandler
csharp
public class JwtPropagationHandler : DelegatingHandler
{
private readonly IHttpContextAccessor _httpContextAccessor;
protected override Task<HttpResponseMessage> SendAsync(
HttpRequestMessage request, CancellationToken ct)
{
var authHeader = _httpContextAccessor.HttpContext?
.Request.Headers.Authorization.FirstOrDefault();
if (!string.IsNullOrEmpty(authHeader))
request.Headers.TryAddWithoutValidation("Authorization", authHeader);
return base.SendAsync(request, ct);
}
}
// Registro en DI
services.AddHttpClient("ImagSign")
.AddHttpMessageHandler<JwtPropagationHandler>()
.AddPolicyHandler(GetRetryPolicy())
.AddPolicyHandler(GetCircuitBreakerPolicy());Resilience (Polly)
Todas las llamadas HTTP entre servicios usan Polly:
csharp
// Retry: 3 intentos con backoff exponencial
static IAsyncPolicy<HttpResponseMessage> GetRetryPolicy() =>
HttpPolicyExtensions
.HandleTransientHttpError()
.WaitAndRetryAsync(3, attempt =>
TimeSpan.FromMilliseconds(200 * Math.Pow(2, attempt)));
// Circuit Breaker: 5 fallos → abierto 30s
static IAsyncPolicy<HttpResponseMessage> GetCircuitBreakerPolicy() =>
HttpPolicyExtensions
.HandleTransientHttpError()
.CircuitBreakerAsync(5, TimeSpan.FromSeconds(30));Timeouts
| Llamada | Timeout | Justificación |
|---|---|---|
| Servicio → Servicio (queries) | 5s | Queries deben ser rápidas |
| Servicio → Servicio (commands) | 15s | Commands pueden involucrar escritura |
| Servicio → Provider Gateway | 30s | Proveedores externos son lentos |
| Provider Gateway → Proveedor externo | 30s | Configurable por proveedor |
Catálogo de Llamadas Síncronas
| Origen | Destino | Endpoint | Método | Propósito |
|---|---|---|---|---|
| ImagLend | ImagFlow | /api/v1/executions/trigger | POST | Iniciar flujo de validación para solicitud de crédito |
| ImagFlow | ImagSign | /api/v1/signatures/request | POST | Ejecutar firma como paso del flujo |
| ImagLend | ImagID | /api/v1/subjects/{id}/lists | GET | Verificar si sujeto está en lista negra |
| ImagFlow | ImagID | /api/v1/subjects/{id}/metrics | GET | Obtener historial para evaluación de reglas |
| ImagLend | ImagID | /api/v1/subjects/{id}/profile | GET | Obtener perfil del sujeto |
Comunicación Asíncrona (Eventos)
Infraestructura
| Componente | Valor |
|---|---|
| Broker | RabbitMQ (Amazon MQ en producción) |
| Librería | MassTransit 8.x |
| Exchange | imagy.events (tipo: topic) |
| Routing key | {domain}.{entity}.{action} |
| Serialización | JSON (System.Text.Json) |
| Seguridad | TLS + firma HMAC-SHA256 |
Topología
Estructura Base de un Evento
json
{
"event_id": "550e8400-e29b-41d4-a716-446655440000",
"event_type": "lending.credit.disbursed",
"version": "1.0",
"timestamp": "2026-05-18T10:30:00Z",
"correlation_id": "660e8400-e29b-41d4-a716-446655440001",
"causation_id": "770e8400-e29b-41d4-a716-446655440002",
"tenant_id": "880e8400-e29b-41d4-a716-446655440003",
"organization_id": "990e8400-e29b-41d4-a716-446655440004",
"source": "imagy-lending",
"actor_id": "aa0e8400-e29b-41d4-a716-446655440005",
"actor_type": "user",
"data": {
"credit_id": "bb0e8400-e29b-41d4-a716-446655440006",
"subject_identifier": "1234567890",
"amount": 500000,
"currency": "COP",
"product_code": "microcredito-30d"
}
}Campos Obligatorios
| Campo | Tipo | Descripción |
|---|---|---|
event_id | UUID | Identificador único del evento (para idempotencia) |
event_type | string | {domain}.{entity}.{action} |
version | string | Versión del schema ("1.0") |
timestamp | ISO 8601 | Momento en que ocurrió el evento (UTC) |
correlation_id | UUID | Traza la cadena completa de eventos relacionados |
tenant_id | UUID | Tenant donde ocurrió (OBLIGATORIO) |
source | string | Servicio que publicó ("imagy-lending") |
actor_id | string | Usuario que originó la acción |
actor_type | string | "user" / "system" / "scheduler" / "anonymous" |
data | object | Payload específico del evento |
Campos Opcionales
| Campo | Tipo | Descripción |
|---|---|---|
causation_id | UUID | Evento que causó este evento |
organization_id | UUID | Organización dentro del tenant |
Publicación de Eventos
csharp
// REGLA: Publicar DESPUÉS de persistir exitosamente
public class DisburseCreditHandler : IRequestHandler<DisburseCreditCommand, Result<...>>
{
public async Task<Result<...>> Handle(DisburseCreditCommand cmd, CancellationToken ct)
{
// 1. Lógica de negocio
var credit = await _repository.GetByIdAsync(cmd.CreditId, ct);
credit.Disburse(cmd.Amount, cmd.DisbursementDate);
// 2. Persistir
await _unitOfWork.SaveChangesAsync(ct);
// 3. Publicar evento DESPUÉS de persistir
await _publishEndpoint.Publish(new CreditDisbursed
{
EventId = Guid.NewGuid(),
EventType = "lending.credit.disbursed",
Version = "1.0",
Timestamp = DateTime.UtcNow,
CorrelationId = _identity.CorrelationId,
TenantId = _identity.TenantId,
Source = "imagy-lending",
ActorId = _identity.UserId,
ActorType = _identity.ActorType,
Data = new
{
CreditId = credit.Id,
SubjectIdentifier = credit.SubjectIdentifier,
Amount = cmd.Amount,
Currency = "COP",
ProductCode = credit.ProductCode
}
}, ct);
return Result.Success(...);
}
}Consumo de Eventos
csharp
public class CreditDisbursedConsumer : BaseConsumer<CreditDisbursed>
{
private readonly ISubjectEventRepository _eventRepo;
protected override async Task HandleAsync(CreditDisbursed message, CancellationToken ct)
{
// IIdentityContext ya establecido por BaseConsumer (tenant_id del evento)
// Registrar evento en el historial del sujeto
var subjectEvent = SubjectEvent.Create(
subjectIdentifier: message.Data.SubjectIdentifier,
eventType: "credit_disbursed",
sourceService: message.Source,
sourceId: message.Data.CreditId,
data: JsonSerializer.SerializeToDocument(message.Data));
await _eventRepo.AddAsync(subjectEvent, ct);
await _unitOfWork.SaveChangesAsync(ct);
}
}Idempotencia
Cada consumer verifica si ya procesó el evento:
csharp
public abstract class BaseConsumer<T> : IConsumer<T> where T : class, IIntegrationEvent
{
public async Task Consume(ConsumeContext<T> context)
{
var message = context.Message;
// 1. Validar firma HMAC
if (!_signatureValidator.Validate(context))
throw new SecurityException("Invalid message signature");
// 2. Idempotencia: ¿ya procesado?
if (await _processedEvents.ExistsAsync(message.EventId))
return;
// 3. Establecer tenant context
_identity.SetContext(message.TenantId, message.ActorId, message.ActorType);
// 4. Procesar
await HandleAsync(message, context.CancellationToken);
// 5. Marcar como procesado
await _processedEvents.MarkProcessedAsync(
message.EventId, message.EventType, message.Source);
}
protected abstract Task HandleAsync(T message, CancellationToken ct);
}Dead Letter Queue (DLQ)
Eventos que fallan después de 3 reintentos van a la DLQ:
Configuración MassTransit
csharp
services.AddMassTransit(x =>
{
x.SetKebabCaseEndpointNameFormatter();
x.AddConsumersFromNamespaceContaining<CreditDisbursedConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(config.RabbitMqUrl, h =>
{
h.Username(config.RabbitMqUser);
h.Password(config.RabbitMqPassword);
h.UseSsl(s => s.Protocol = SslProtocols.Tls12);
});
// Retry policy
cfg.UseMessageRetry(r => r.Exponential(
retryCount: 3,
minInterval: TimeSpan.FromSeconds(1),
maxInterval: TimeSpan.FromSeconds(25),
intervalDelta: TimeSpan.FromSeconds(5)));
// Configurar endpoints automáticamente
cfg.ConfigureEndpoints(context);
});
});Seguridad en la Comunicación
HTTP (servicio-a-servicio)
| Capa | Mecanismo |
|---|---|
| Transporte | TLS 1.2+ (obligatorio) |
| Autenticación | JWT firmado por Keycloak (propagado) |
| Autorización | Cada servicio valida roles del JWT |
| Tenant isolation | IIdentityContext extrae tenant_id del JWT → RLS |
Eventos (RabbitMQ)
| Capa | Mecanismo |
|---|---|
| Transporte | TLS 1.2+ (conexión a RabbitMQ) |
| Autenticación | Credenciales por servicio (usuario/password) |
| Integridad | Firma HMAC-SHA256 por mensaje |
| Tenant isolation | tenant_id en payload → consumer establece RLS |
| Anti-replay | Timestamp < 5 minutos + idempotencia por event_id |
Firma HMAC de Mensajes
csharp
// Productor: firma el mensaje
public class MessageSigningFilter<T> : IFilter<PublishContext<T>> where T : class
{
public async Task Send(PublishContext<T> context, IPipe<PublishContext<T>> next)
{
var payload = JsonSerializer.Serialize(context.Message);
var timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds().ToString();
var signatureInput = $"{timestamp}.{payload}";
using var hmac = new HMACSHA256(Encoding.UTF8.GetBytes(_sharedSecret));
var hash = hmac.ComputeHash(Encoding.UTF8.GetBytes(signatureInput));
var signature = Convert.ToHexString(hash).ToLowerInvariant();
context.Headers.Set("X-Message-Signature", $"sha256={signature}");
context.Headers.Set("X-Message-Timestamp", timestamp);
await next.Send(context);
}
}
// Consumer: valida la firma
public class MessageSignatureValidator
{
public bool Validate<T>(ConsumeContext<T> context) where T : class
{
var signature = context.Headers.Get<string>("X-Message-Signature");
var timestamp = context.Headers.Get<string>("X-Message-Timestamp");
if (string.IsNullOrEmpty(signature) || string.IsNullOrEmpty(timestamp))
return false;
// Anti-replay: timestamp < 5 minutos
var messageTime = DateTimeOffset.FromUnixTimeSeconds(long.Parse(timestamp));
if (DateTimeOffset.UtcNow - messageTime > TimeSpan.FromMinutes(5))
return false;
// Recalcular HMAC
var payload = JsonSerializer.Serialize(context.Message);
var signatureInput = $"{timestamp}.{payload}";
using var hmac = new HMACSHA256(Encoding.UTF8.GetBytes(_sharedSecret));
var hash = hmac.ComputeHash(Encoding.UTF8.GetBytes(signatureInput));
var expected = $"sha256={Convert.ToHexString(hash).ToLowerInvariant()}";
return CryptographicOperations.FixedTimeEquals(
Encoding.UTF8.GetBytes(signature),
Encoding.UTF8.GetBytes(expected));
}
}Correlation y Trazabilidad
Correlation ID
Cada request que entra al sistema genera (o propaga) un correlation_id. Este ID viaja por toda la cadena:
Implementación
csharp
// Middleware en cada servicio: propaga o genera correlation_id
public class CorrelationIdMiddleware
{
public async Task InvokeAsync(HttpContext context)
{
var correlationId = context.Request.Headers["X-Correlation-Id"].FirstOrDefault()
?? Guid.NewGuid().ToString();
context.Items["CorrelationId"] = correlationId;
context.Response.Headers.Append("X-Correlation-Id", correlationId);
using (LogContext.PushProperty("CorrelationId", correlationId))
{
await _next(context);
}
}
}Patrones de Comunicación por Caso de Uso
Caso 1: Solicitud de crédito con validación
Caso 2: Sujeto en lista negra bloquea crédito
Caso 3: Firma standalone (sin flujo)
Reglas de Comunicación
HACER
- ✅ Propagar JWT en llamadas HTTP entre servicios
- ✅ Incluir
tenant_iden todos los eventos - ✅ Incluir
correlation_iden todos los eventos y llamadas HTTP - ✅ Publicar eventos DESPUÉS de persistir exitosamente
- ✅ Implementar idempotencia en todos los consumers
- ✅ Usar Polly (retry + circuit breaker) en llamadas HTTP
- ✅ Firmar mensajes con HMAC-SHA256
- ✅ Validar firma antes de procesar un evento
- ✅ Usar TLS en todas las conexiones
NO HACER
- ❌ Acceder a la BD de otro servicio directamente
- ❌ Confiar en headers custom (X-Tenant-Id) — siempre JWT
- ❌ Publicar eventos ANTES de persistir (riesgo de inconsistencia)
- ❌ Asumir orden de llegada de eventos
- ❌ Hacer llamadas síncronas dentro de un consumer de eventos
- ❌ Ignorar errores de publicación de eventos
- ❌ Crear dependencias circulares síncronas (A→B→A)