Message Queue Patterns
Design reliable systems with message queues and async processing.
A Simple Analogy
Message queues are like a postal system. You drop your message in a queue, and it's reliably delivered to the recipient, even if they're not home.
Why Message Queues?
- Decoupling: Systems don't depend directly
- Scalability: Process at own pace
- Reliability: Messages persist
- Async processing: Don't wait for slow operations
- Load balancing: Distribute work
Publisher-Subscriber
// Publisher
public class OrderService
{
private readonly IMessagePublisher _publisher;
public async Task<Order> CreateOrderAsync(CreateOrderRequest request)
{
var order = new Order { /* ... */ };
await _context.Orders.AddAsync(order);
await _context.SaveChangesAsync();
// Publish event
await _publisher.PublishAsync(new OrderCreatedEvent
{
OrderId = order.Id,
CustomerId = order.CustomerId,
Total = order.Total
});
return order;
}
}
// Subscriber
public class EmailNotificationHandler : IMessageHandler<OrderCreatedEvent>
{
private readonly IEmailService _emailService;
public async Task HandleAsync(OrderCreatedEvent @event)
{
var customer = await _context.Customers.FindAsync(@event.CustomerId);
await _emailService.SendAsync(new EmailMessage
{
To = customer.Email,
Subject = "Order Confirmation",
Body = $"Order #{@event.OrderId} for ${@event.Total}"
});
}
}
Work Queue Pattern
// Producer
public class ImageProcessingService
{
private readonly IQueuePublisher _queue;
public async Task QueueImageProcessingAsync(string imageId)
{
await _queue.EnqueueAsync(new ImageProcessingJob
{
ImageId = imageId,
CreatedAt = DateTime.UtcNow
}, "image-processing-queue");
}
}
// Consumer
public class ImageProcessingWorker : BackgroundService
{
private readonly IQueueConsumer _queue;
private readonly IImageProcessor _processor;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _queue.ConsumeAsync<ImageProcessingJob>(
"image-processing-queue",
async job =>
{
try
{
await _processor.ProcessAsync(job.ImageId);
}
catch (Exception ex)
{
// Move to dead letter queue
await _queue.EnqueueAsync(job, "image-processing-dlq");
}
},
stoppingToken);
}
}
Request-Reply Pattern
// Request
public class PricingService
{
private readonly IMessageClient _client;
public async Task<decimal> GetPriceAsync(string productId)
{
var response = await _client.RequestAsync<GetPriceRequest, GetPriceResponse>(
new GetPriceRequest { ProductId = productId },
timeout: TimeSpan.FromSeconds(5));
return response.Price;
}
}
// Reply
public class ProductService : IMessageHandler<GetPriceRequest>
{
public async Task<GetPriceResponse> HandleAsync(GetPriceRequest request)
{
var product = await _context.Products.FindAsync(request.ProductId);
return new GetPriceResponse { Price = product.Price };
}
}
Dead Letter Queue
public class QueueConsumer
{
private readonly IQueuePublisher _queue;
private const int MaxRetries = 3;
public async Task ConsumeWithRetryAsync<T>(string queueName, Func<T, Task> handler)
{
var message = await _queue.DequeueAsync<T>(queueName);
var retryCount = message.Headers.Get("retry-count", 0);
try
{
await handler(message.Body);
}
catch (Exception ex)
{
if (retryCount < MaxRetries)
{
// Retry with exponential backoff
var delay = TimeSpan.FromSeconds(Math.Pow(2, retryCount));
message.Headers.Set("retry-count", retryCount + 1);
await Task.Delay(delay);
await _queue.EnqueueAsync(message, queueName);
}
else
{
// Move to dead letter queue
await _queue.EnqueueAsync(message, $"{queueName}-dlq");
}
}
}
}
Best Practices
- Idempotent handlers: Safe to process twice
- Error handling: Implement retry and DLQ
- Monitoring: Track queue depth
- Ordering: Maintain message order when needed
- TTL: Set message expiration
Related Concepts
- Event sourcing
- CQRS pattern
- Saga pattern
- Distributed transactions
Summary
Message queues enable scalable, decoupled systems. Implement reliable patterns with proper error handling, retries, and dead letter queues.
Related Articles
Message Queues and RabbitMQ
Decouple services with message queues for reliable, asynchronous communication.
Read More messagingRabbitMQ Message Patterns
Implement reliable messaging with RabbitMQ patterns.
Read More architectureCQRS and Event Sourcing
Separate reads and writes with CQRS and Event Sourcing patterns.
Read More