Isaac.

messaging

RabbitMQ Message Patterns

Implement reliable messaging with RabbitMQ patterns.

By Emem IsaacJune 1, 20244 min read
#rabbitmq#messaging#patterns#reliability#event-driven
Share:

A Simple Analogy

RabbitMQ is like a telephone switchboard for your services. Messages (calls) are routed correctly, queued if needed, and delivered reliably.


Why RabbitMQ?

  • Reliability: Messages persist until consumed
  • Routing: Flexible message routing
  • Scaling: Handle thousands of messages/sec
  • Decoupling: Services don't need to know each other
  • Patterns: Publish-subscribe, work queues, RPC

Basic Setup

// Connection
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

// Declare queue
channel.QueueDeclare(queue: "orders", durable: true, exclusive: false);

// Publish message
var message = "Order123";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: "orders", body: body);

Console.WriteLine(" [x] Sent {0}", message);

Publisher/Subscriber

// Publisher
channel.ExchangeDeclare(exchange: "orders-exchange", type: "fanout");

var message = "Order created: 123";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "orders-exchange", routingKey: "", body: body);

// Subscriber 1 (Email service)
channel.ExchangeDeclare(exchange: "orders-exchange", type: "fanout");
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "orders-exchange", routingKey: "");

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body.ToArray());
    Console.WriteLine(" [x] {0}", message);
    // Send email
    channel.BasicAck(ea.DeliveryTag, false);
};

channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

// Subscriber 2 (Notification service)
// Receives same message independently

Routing Pattern

// Publish with routing key
channel.ExchangeDeclare(exchange: "orders", type: "direct");

channel.BasicPublish(
    exchange: "orders",
    routingKey: "order.created",  // Routing key
    body: body);

// Subscriber interested only in created orders
channel.QueueBind(queue: queueName, exchange: "orders", routingKey: "order.created");

// Subscriber interested only in cancelled orders
channel.QueueBind(queue: queueName2, exchange: "orders", routingKey: "order.cancelled");

// Topic pattern: can use wildcards
channel.ExchangeDeclare(exchange: "orders", type: "topic");
channel.BasicPublish(exchange: "orders", routingKey: "order.us.created", body: body);
channel.BasicPublish(exchange: "orders", routingKey: "order.eu.cancelled", body: body);

// Subscribe to pattern
channel.QueueBind(queue: queueName, exchange: "orders", routingKey: "order.us.*");
channel.QueueBind(queue: queueName2, exchange: "orders", routingKey: "order.*");

Work Queue with Multiple Consumers

// Publisher
channel.QueueDeclare(queue: "tasks", durable: true);
channel.BasicQos(0, 1, false);  // Fair dispatch

var message = JsonSerializer.Serialize(new Task { Id = 123, Work = "process" });
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: "tasks", body: body);

// Consumer 1 (worker)
channel.BasicQos(0, 1, false);  // Process one at a time
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body.ToArray());
    var task = JsonSerializer.Deserialize<Task>(message);
    
    // Process task
    ProcessTask(task);
    
    // Acknowledge when done
    channel.BasicAck(ea.DeliveryTag, false);
};

channel.BasicConsume(queue: "tasks", autoAck: false, consumer: consumer);

// Consumer 2 (another worker)
// Automatically handles different task

Dead Letter Queue

// Main queue
channel.QueueDeclare(
    queue: "main-queue",
    durable: true,
    arguments: new Dictionary<string, object>
    {
        { "x-dead-letter-exchange", "dlx-exchange" },
        { "x-dead-letter-routing-key", "dead-letter-queue" },
        { "x-message-ttl", 60000 }  // 60s before dead letter
    });

// Dead letter queue
channel.QueueDeclare(queue: "dead-letter-queue", durable: true);
channel.QueueBind(queue: "dead-letter-queue", exchange: "dlx-exchange", routingKey: "dead-letter-queue");

// Messages that fail/expire go to DLQ

RPC Pattern

// Client
var replyQueueName = channel.QueueDeclare().QueueName;
var corrId = Guid.NewGuid().ToString();

var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;
props.CorrelationId = corrId;

var message = "5";
channel.BasicPublish(exchange: "", routingKey: "rpc_queue", body: body, basicProperties: props);

// Receive response
var consumer = new EventingBasicConsumer(channel);
var response = null;

consumer.Received += (model, ea) =>
{
    if (ea.BasicProperties.CorrelationId == corrId)
    {
        response = Encoding.UTF8.GetString(ea.Body.ToArray());
    }
};

channel.BasicConsume(queue: replyQueueName, autoAck: true, consumer: consumer);

// Server
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body.ToArray());
    var n = int.Parse(message);
    var response = Fibonacci(n).ToString();
    
    var props = channel.CreateBasicProperties();
    props.CorrelationId = ea.BasicProperties.CorrelationId;
    
    channel.BasicPublish(
        exchange: "",
        routingKey: ea.BasicProperties.ReplyTo,
        basicProperties: props,
        body: Encoding.UTF8.GetBytes(response));
};

channel.BasicConsume(queue: "rpc_queue", autoAck: true, consumer: consumer);

Best Practices

  1. Make messages idempotent: Handle duplicates
  2. Acknowledge properly: Only after processing
  3. Use dead letter queues: Catch failed messages
  4. Monitor queue depth: Track backlog
  5. Set message TTL: Clean up old messages

Related Concepts

  • Apache Kafka for streaming
  • Azure Service Bus
  • AWS SQS/SNS
  • Event-driven architectures

Summary

RabbitMQ provides reliable messaging with flexible routing patterns. Use publish-subscribe, work queues, and RPC patterns to decouple services and build event-driven systems.

Share:

Written by Emem Isaac

Expert Software Engineer with 15+ years of experience building scalable enterprise applications. Specialized in ASP.NET Core, Azure, Docker, and modern web development. Passionate about sharing knowledge and helping developers grow.

Ready to Build Something Amazing?

Let's discuss your project and explore how my expertise can help you achieve your goals. Free consultation available.

💼 Trusted by 50+ companies worldwide | ⚡ Average response time: 24 hours