RabbitMQ Message Patterns
Implement reliable messaging with RabbitMQ patterns.
A Simple Analogy
RabbitMQ is like a telephone switchboard for your services. Messages (calls) are routed correctly, queued if needed, and delivered reliably.
Why RabbitMQ?
- Reliability: Messages persist until consumed
- Routing: Flexible message routing
- Scaling: Handle thousands of messages/sec
- Decoupling: Services don't need to know each other
- Patterns: Publish-subscribe, work queues, RPC
Basic Setup
// Connection
var factory = new ConnectionFactory() { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
// Declare queue
channel.QueueDeclare(queue: "orders", durable: true, exclusive: false);
// Publish message
var message = "Order123";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: "orders", body: body);
Console.WriteLine(" [x] Sent {0}", message);
Publisher/Subscriber
// Publisher
channel.ExchangeDeclare(exchange: "orders-exchange", type: "fanout");
var message = "Order created: 123";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "orders-exchange", routingKey: "", body: body);
// Subscriber 1 (Email service)
channel.ExchangeDeclare(exchange: "orders-exchange", type: "fanout");
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "orders-exchange", routingKey: "");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine(" [x] {0}", message);
// Send email
channel.BasicAck(ea.DeliveryTag, false);
};
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
// Subscriber 2 (Notification service)
// Receives same message independently
Routing Pattern
// Publish with routing key
channel.ExchangeDeclare(exchange: "orders", type: "direct");
channel.BasicPublish(
exchange: "orders",
routingKey: "order.created", // Routing key
body: body);
// Subscriber interested only in created orders
channel.QueueBind(queue: queueName, exchange: "orders", routingKey: "order.created");
// Subscriber interested only in cancelled orders
channel.QueueBind(queue: queueName2, exchange: "orders", routingKey: "order.cancelled");
// Topic pattern: can use wildcards
channel.ExchangeDeclare(exchange: "orders", type: "topic");
channel.BasicPublish(exchange: "orders", routingKey: "order.us.created", body: body);
channel.BasicPublish(exchange: "orders", routingKey: "order.eu.cancelled", body: body);
// Subscribe to pattern
channel.QueueBind(queue: queueName, exchange: "orders", routingKey: "order.us.*");
channel.QueueBind(queue: queueName2, exchange: "orders", routingKey: "order.*");
Work Queue with Multiple Consumers
// Publisher
channel.QueueDeclare(queue: "tasks", durable: true);
channel.BasicQos(0, 1, false); // Fair dispatch
var message = JsonSerializer.Serialize(new Task { Id = 123, Work = "process" });
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: "tasks", body: body);
// Consumer 1 (worker)
channel.BasicQos(0, 1, false); // Process one at a time
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var task = JsonSerializer.Deserialize<Task>(message);
// Process task
ProcessTask(task);
// Acknowledge when done
channel.BasicAck(ea.DeliveryTag, false);
};
channel.BasicConsume(queue: "tasks", autoAck: false, consumer: consumer);
// Consumer 2 (another worker)
// Automatically handles different task
Dead Letter Queue
// Main queue
channel.QueueDeclare(
queue: "main-queue",
durable: true,
arguments: new Dictionary<string, object>
{
{ "x-dead-letter-exchange", "dlx-exchange" },
{ "x-dead-letter-routing-key", "dead-letter-queue" },
{ "x-message-ttl", 60000 } // 60s before dead letter
});
// Dead letter queue
channel.QueueDeclare(queue: "dead-letter-queue", durable: true);
channel.QueueBind(queue: "dead-letter-queue", exchange: "dlx-exchange", routingKey: "dead-letter-queue");
// Messages that fail/expire go to DLQ
RPC Pattern
// Client
var replyQueueName = channel.QueueDeclare().QueueName;
var corrId = Guid.NewGuid().ToString();
var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;
props.CorrelationId = corrId;
var message = "5";
channel.BasicPublish(exchange: "", routingKey: "rpc_queue", body: body, basicProperties: props);
// Receive response
var consumer = new EventingBasicConsumer(channel);
var response = null;
consumer.Received += (model, ea) =>
{
if (ea.BasicProperties.CorrelationId == corrId)
{
response = Encoding.UTF8.GetString(ea.Body.ToArray());
}
};
channel.BasicConsume(queue: replyQueueName, autoAck: true, consumer: consumer);
// Server
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var n = int.Parse(message);
var response = Fibonacci(n).ToString();
var props = channel.CreateBasicProperties();
props.CorrelationId = ea.BasicProperties.CorrelationId;
channel.BasicPublish(
exchange: "",
routingKey: ea.BasicProperties.ReplyTo,
basicProperties: props,
body: Encoding.UTF8.GetBytes(response));
};
channel.BasicConsume(queue: "rpc_queue", autoAck: true, consumer: consumer);
Best Practices
- Make messages idempotent: Handle duplicates
- Acknowledge properly: Only after processing
- Use dead letter queues: Catch failed messages
- Monitor queue depth: Track backlog
- Set message TTL: Clean up old messages
Related Concepts
- Apache Kafka for streaming
- Azure Service Bus
- AWS SQS/SNS
- Event-driven architectures
Summary
RabbitMQ provides reliable messaging with flexible routing patterns. Use publish-subscribe, work queues, and RPC patterns to decouple services and build event-driven systems.
Related Articles
MassTransit Advanced
Master complex messaging patterns with MassTransit.
Read More architectureMessage Queue Patterns
Design reliable systems with message queues and async processing.
Read More infrastructureApache Kafka Streaming
Learn about Apache Kafka and event streaming architecture.
Read More