Isaac.

messaging

MassTransit Advanced

Master complex messaging patterns with MassTransit.

By Emem IsaacNovember 30, 20233 min read
#messaging#masstransit#rabbitmq#distributed#patterns
Share:

A Simple Analogy

MassTransit is like a post office for your application. Messages are packages, RabbitMQ is the postal system, and MassTransit handles routing, delivery, and retries automatically.


Why MassTransit?

  • Abstraction: Swap transports (RabbitMQ, Azure Service Bus, etc.)
  • Sagas: Long-running processes with state
  • Retry logic: Automatic resilience
  • Scheduling: Delayed message delivery
  • Monitoring: Built-in observability

Basic Setup

// Program.cs
builder.Services.AddMassTransit(x =>
{
    x.AddConsumer<OrderCreatedConsumer>();
    x.AddConsumer<OrderShippedConsumer>();
    
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("localhost");
        cfg.ConfigureEndpoints(context);
    });
});

// Consumer
public class OrderCreatedConsumer : IConsumer<OrderCreated>
{
    private readonly IEmailService _emailService;
    
    public async Task Consume(ConsumeContext<OrderCreated> context)
    {
        var message = context.Message;
        await _emailService.SendOrderConfirmation(message.CustomerId, message.OrderId);
    }
}

// Publish event
public class OrderService
{
    private readonly IPublishEndpoint _publishEndpoint;
    
    public async Task CreateOrderAsync(Order order)
    {
        // Save order
        await SaveAsync(order);
        
        // Publish event
        await _publishEndpoint.Publish(new OrderCreated
        {
            OrderId = order.Id,
            CustomerId = order.CustomerId,
            Total = order.Total
        });
    }
}

Sagas for Long-Running Processes

// Saga state
public class OrderFulfillmentState : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string State { get; set; }
    public string OrderId { get; set; }
    public string CustomerId { get; set; }
    public DateTime CreatedAt { get; set; }
}

// Saga state machine
public class OrderFulfillmentStateMachine : StateMachine<OrderFulfillmentState>
{
    public State Submitted { get; private set; }
    public State PaymentProcessing { get; private set; }
    public State Shipped { get; private set; }
    public State Completed { get; private set; }

    public Event<OrderSubmitted> Submit { get; private set; }
    public Event<PaymentApproved> PaymentApproved { get; private set; }
    public Event<ShipmentReady> ShipmentReady { get; private set; }

    public OrderFulfillmentStateMachine()
    {
        InstanceState(x => x.State);

        Event(() => Submit, x => x.CorrelateById(context => context.Message.OrderId));
        Event(() => PaymentApproved);
        Event(() => ShipmentReady);

        Initially(
            When(Submit)
                .Then(context =>
                {
                    context.Instance.OrderId = context.Data.OrderId;
                    context.Instance.CustomerId = context.Data.CustomerId;
                    context.Instance.CreatedAt = DateTime.UtcNow;
                })
                .TransitionTo(PaymentProcessing)
                .Publish(context => new ProcessPayment { OrderId = context.Data.OrderId })
        );

        During(PaymentProcessing,
            When(PaymentApproved)
                .TransitionTo(Shipped)
                .Publish(context => new ShipOrder { OrderId = context.Instance.OrderId })
        );

        During(Shipped,
            When(ShipmentReady)
                .TransitionTo(Completed)
                .Publish(context => new SendDeliveryNotification { OrderId = context.Instance.OrderId })
        );
    }
}

Message Configuration

builder.Services.AddMassTransit(x =>
{
    x.AddConsumer<OrderCreatedConsumer>();
    
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("rabbitmq-host");
        
        cfg.ReceiveEndpoint("order-service", e =>
        {
            // Automatic retry
            e.UseMessageRetry(r => r.Interval(3, TimeSpan.FromSeconds(5)));
            
            // Circuit breaker
            e.UseCircuitBreaker(cb =>
            {
                cb.TrackingPeriod = TimeSpan.FromMinutes(1);
                cb.FailureThreshold = 5;
                cb.ResetInterval = TimeSpan.FromSeconds(30);
            });
            
            // Prefetch limit
            e.PrefetchCount = 16;
            
            // Configure consumer
            e.Consumer<OrderCreatedConsumer>(context);
        });
    });
});

Request/Reply Pattern

// Request
var response = await _requestClient.GetResponse<OrderResponse>(
    new GetOrder { OrderId = orderId });

Console.WriteLine($"Order total: {response.Message.Total}");

// Handler
public class GetOrderConsumer : IConsumer<GetOrder>
{
    private readonly IOrderRepository _repository;
    
    public async Task Consume(ConsumeContext<GetOrder> context)
    {
        var order = await _repository.GetAsync(context.Message.OrderId);
        
        await context.RespondAsync(new OrderResponse
        {
            OrderId = order.Id,
            Total = order.Total
        });
    }
}

Scheduled Messages

// Send message after delay
var scheduledTime = DateTime.UtcNow.AddMinutes(30);

await _scheduledBus.SchedulePublish<OrderReminder>(
    scheduledTime,
    new OrderReminder { OrderId = orderId });

// Or schedule a command
await _scheduledBus.ScheduleSend<ProcessOrder>(
    scheduledTime,
    new ProcessOrder { OrderId = orderId });

Best Practices

  1. Handle failures: Implement retry and dead-letter queues
  2. Correlate messages: Track related messages
  3. Design idempotently: Handle duplicate messages
  4. Monitor: Track message flow and errors
  5. Version messages: Support multiple versions

Related Concepts

  • Distributed transactions
  • Event sourcing
  • CQRS pattern
  • Service bus alternatives

Summary

MassTransit provides robust messaging with sagas, retries, and scheduling. Build resilient, event-driven systems that handle failures gracefully.

Share:

Written by Emem Isaac

Expert Software Engineer with 15+ years of experience building scalable enterprise applications. Specialized in ASP.NET Core, Azure, Docker, and modern web development. Passionate about sharing knowledge and helping developers grow.

Ready to Build Something Amazing?

Let's discuss your project and explore how my expertise can help you achieve your goals. Free consultation available.

💼 Trusted by 50+ companies worldwide | ⚡ Average response time: 24 hours