MassTransit Advanced
Master complex messaging patterns with MassTransit.
A Simple Analogy
MassTransit is like a post office for your application. Messages are packages, RabbitMQ is the postal system, and MassTransit handles routing, delivery, and retries automatically.
Why MassTransit?
- Abstraction: Swap transports (RabbitMQ, Azure Service Bus, etc.)
- Sagas: Long-running processes with state
- Retry logic: Automatic resilience
- Scheduling: Delayed message delivery
- Monitoring: Built-in observability
Basic Setup
// Program.cs
builder.Services.AddMassTransit(x =>
{
x.AddConsumer<OrderCreatedConsumer>();
x.AddConsumer<OrderShippedConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost");
cfg.ConfigureEndpoints(context);
});
});
// Consumer
public class OrderCreatedConsumer : IConsumer<OrderCreated>
{
private readonly IEmailService _emailService;
public async Task Consume(ConsumeContext<OrderCreated> context)
{
var message = context.Message;
await _emailService.SendOrderConfirmation(message.CustomerId, message.OrderId);
}
}
// Publish event
public class OrderService
{
private readonly IPublishEndpoint _publishEndpoint;
public async Task CreateOrderAsync(Order order)
{
// Save order
await SaveAsync(order);
// Publish event
await _publishEndpoint.Publish(new OrderCreated
{
OrderId = order.Id,
CustomerId = order.CustomerId,
Total = order.Total
});
}
}
Sagas for Long-Running Processes
// Saga state
public class OrderFulfillmentState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string State { get; set; }
public string OrderId { get; set; }
public string CustomerId { get; set; }
public DateTime CreatedAt { get; set; }
}
// Saga state machine
public class OrderFulfillmentStateMachine : StateMachine<OrderFulfillmentState>
{
public State Submitted { get; private set; }
public State PaymentProcessing { get; private set; }
public State Shipped { get; private set; }
public State Completed { get; private set; }
public Event<OrderSubmitted> Submit { get; private set; }
public Event<PaymentApproved> PaymentApproved { get; private set; }
public Event<ShipmentReady> ShipmentReady { get; private set; }
public OrderFulfillmentStateMachine()
{
InstanceState(x => x.State);
Event(() => Submit, x => x.CorrelateById(context => context.Message.OrderId));
Event(() => PaymentApproved);
Event(() => ShipmentReady);
Initially(
When(Submit)
.Then(context =>
{
context.Instance.OrderId = context.Data.OrderId;
context.Instance.CustomerId = context.Data.CustomerId;
context.Instance.CreatedAt = DateTime.UtcNow;
})
.TransitionTo(PaymentProcessing)
.Publish(context => new ProcessPayment { OrderId = context.Data.OrderId })
);
During(PaymentProcessing,
When(PaymentApproved)
.TransitionTo(Shipped)
.Publish(context => new ShipOrder { OrderId = context.Instance.OrderId })
);
During(Shipped,
When(ShipmentReady)
.TransitionTo(Completed)
.Publish(context => new SendDeliveryNotification { OrderId = context.Instance.OrderId })
);
}
}
Message Configuration
builder.Services.AddMassTransit(x =>
{
x.AddConsumer<OrderCreatedConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("rabbitmq-host");
cfg.ReceiveEndpoint("order-service", e =>
{
// Automatic retry
e.UseMessageRetry(r => r.Interval(3, TimeSpan.FromSeconds(5)));
// Circuit breaker
e.UseCircuitBreaker(cb =>
{
cb.TrackingPeriod = TimeSpan.FromMinutes(1);
cb.FailureThreshold = 5;
cb.ResetInterval = TimeSpan.FromSeconds(30);
});
// Prefetch limit
e.PrefetchCount = 16;
// Configure consumer
e.Consumer<OrderCreatedConsumer>(context);
});
});
});
Request/Reply Pattern
// Request
var response = await _requestClient.GetResponse<OrderResponse>(
new GetOrder { OrderId = orderId });
Console.WriteLine($"Order total: {response.Message.Total}");
// Handler
public class GetOrderConsumer : IConsumer<GetOrder>
{
private readonly IOrderRepository _repository;
public async Task Consume(ConsumeContext<GetOrder> context)
{
var order = await _repository.GetAsync(context.Message.OrderId);
await context.RespondAsync(new OrderResponse
{
OrderId = order.Id,
Total = order.Total
});
}
}
Scheduled Messages
// Send message after delay
var scheduledTime = DateTime.UtcNow.AddMinutes(30);
await _scheduledBus.SchedulePublish<OrderReminder>(
scheduledTime,
new OrderReminder { OrderId = orderId });
// Or schedule a command
await _scheduledBus.ScheduleSend<ProcessOrder>(
scheduledTime,
new ProcessOrder { OrderId = orderId });
Best Practices
- Handle failures: Implement retry and dead-letter queues
- Correlate messages: Track related messages
- Design idempotently: Handle duplicate messages
- Monitor: Track message flow and errors
- Version messages: Support multiple versions
Related Concepts
- Distributed transactions
- Event sourcing
- CQRS pattern
- Service bus alternatives
Summary
MassTransit provides robust messaging with sagas, retries, and scheduling. Build resilient, event-driven systems that handle failures gracefully.
Related Articles
RabbitMQ Message Patterns
Implement reliable messaging with RabbitMQ patterns.
Read More architectureMessage Queue Patterns
Design reliable systems with message queues and async processing.
Read More infrastructureApache Kafka Streaming
Learn about Apache Kafka and event streaming architecture.
Read More