Isaac.

architecture

Message Queue Patterns

Design reliable systems with message queues and async processing.

By Emem IsaacDecember 14, 20233 min read
#message queues#async#rabbitmq#patterns#reliability
Share:

A Simple Analogy

Message queues are like a postal system. You drop your message in a queue, and it's reliably delivered to the recipient, even if they're not home.


Why Message Queues?

  • Decoupling: Systems don't depend directly
  • Scalability: Process at own pace
  • Reliability: Messages persist
  • Async processing: Don't wait for slow operations
  • Load balancing: Distribute work

Publisher-Subscriber

// Publisher
public class OrderService
{
    private readonly IMessagePublisher _publisher;
    
    public async Task<Order> CreateOrderAsync(CreateOrderRequest request)
    {
        var order = new Order { /* ... */ };
        await _context.Orders.AddAsync(order);
        await _context.SaveChangesAsync();
        
        // Publish event
        await _publisher.PublishAsync(new OrderCreatedEvent
        {
            OrderId = order.Id,
            CustomerId = order.CustomerId,
            Total = order.Total
        });
        
        return order;
    }
}

// Subscriber
public class EmailNotificationHandler : IMessageHandler<OrderCreatedEvent>
{
    private readonly IEmailService _emailService;
    
    public async Task HandleAsync(OrderCreatedEvent @event)
    {
        var customer = await _context.Customers.FindAsync(@event.CustomerId);
        await _emailService.SendAsync(new EmailMessage
        {
            To = customer.Email,
            Subject = "Order Confirmation",
            Body = $"Order #{@event.OrderId} for ${@event.Total}"
        });
    }
}

Work Queue Pattern

// Producer
public class ImageProcessingService
{
    private readonly IQueuePublisher _queue;
    
    public async Task QueueImageProcessingAsync(string imageId)
    {
        await _queue.EnqueueAsync(new ImageProcessingJob
        {
            ImageId = imageId,
            CreatedAt = DateTime.UtcNow
        }, "image-processing-queue");
    }
}

// Consumer
public class ImageProcessingWorker : BackgroundService
{
    private readonly IQueueConsumer _queue;
    private readonly IImageProcessor _processor;
    
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await _queue.ConsumeAsync<ImageProcessingJob>(
            "image-processing-queue",
            async job =>
            {
                try
                {
                    await _processor.ProcessAsync(job.ImageId);
                }
                catch (Exception ex)
                {
                    // Move to dead letter queue
                    await _queue.EnqueueAsync(job, "image-processing-dlq");
                }
            },
            stoppingToken);
    }
}

Request-Reply Pattern

// Request
public class PricingService
{
    private readonly IMessageClient _client;
    
    public async Task<decimal> GetPriceAsync(string productId)
    {
        var response = await _client.RequestAsync<GetPriceRequest, GetPriceResponse>(
            new GetPriceRequest { ProductId = productId },
            timeout: TimeSpan.FromSeconds(5));
        
        return response.Price;
    }
}

// Reply
public class ProductService : IMessageHandler<GetPriceRequest>
{
    public async Task<GetPriceResponse> HandleAsync(GetPriceRequest request)
    {
        var product = await _context.Products.FindAsync(request.ProductId);
        return new GetPriceResponse { Price = product.Price };
    }
}

Dead Letter Queue

public class QueueConsumer
{
    private readonly IQueuePublisher _queue;
    private const int MaxRetries = 3;
    
    public async Task ConsumeWithRetryAsync<T>(string queueName, Func<T, Task> handler)
    {
        var message = await _queue.DequeueAsync<T>(queueName);
        var retryCount = message.Headers.Get("retry-count", 0);
        
        try
        {
            await handler(message.Body);
        }
        catch (Exception ex)
        {
            if (retryCount < MaxRetries)
            {
                // Retry with exponential backoff
                var delay = TimeSpan.FromSeconds(Math.Pow(2, retryCount));
                message.Headers.Set("retry-count", retryCount + 1);
                
                await Task.Delay(delay);
                await _queue.EnqueueAsync(message, queueName);
            }
            else
            {
                // Move to dead letter queue
                await _queue.EnqueueAsync(message, $"{queueName}-dlq");
            }
        }
    }
}

Best Practices

  1. Idempotent handlers: Safe to process twice
  2. Error handling: Implement retry and DLQ
  3. Monitoring: Track queue depth
  4. Ordering: Maintain message order when needed
  5. TTL: Set message expiration

Related Concepts

  • Event sourcing
  • CQRS pattern
  • Saga pattern
  • Distributed transactions

Summary

Message queues enable scalable, decoupled systems. Implement reliable patterns with proper error handling, retries, and dead letter queues.

Share:

Written by Emem Isaac

Expert Software Engineer with 15+ years of experience building scalable enterprise applications. Specialized in ASP.NET Core, Azure, Docker, and modern web development. Passionate about sharing knowledge and helping developers grow.

Ready to Build Something Amazing?

Let's discuss your project and explore how my expertise can help you achieve your goals. Free consultation available.

💼 Trusted by 50+ companies worldwide | ⚡ Average response time: 24 hours