MassTransit v3 Update

MassTransit v3 Update

It has been nearly six months since the first alpha of MassTransit v3 was released, and a lot of progress has been made. It turns out that rewriting an entire code base takes time and attention. Nonetheless, the new architecture is working out wonderfully and the code is nicely separated by concerns.

So, about the TPL…

Make no mistake, the TPL (introduced in .NET 4.0), followed by the addition of async/await (in C# 5) has made the creation of asynchronous code clean and concise. That being said, knowing the exact behavior of the language constructs, and how the compiler translates the keywords is very important. Add to the mix the fact that many third-party assemblies were not designed for asynchronous invocation, and the resulting cesspool is quite a mess. However, it’s not that bad.

At every layer, MassTransit has been built around the TPL, leveraging async/await for the best performance, and providing pipe and filter composition at every possible extension point. The middleware injection is extensive, and new filter can be created easily to support many advanced use cases such as rate limiting, concurrency restriction, and asynchronous transactions. More filters will be coming, but an initial release has to happen at some point…

Some New Features

The past six months have not been entirely about stabilization of the code. There have been several tasty new features added as well.

External Message Data Storage

Big messages are inevitable, and big messages can really clog up the works making small message suffer. And there are some brokers that just can’t deal with big messages (cough, Azure Service Bus, cough) at all. To support the transfer of big messages (those messages with large byte arrays or strings), MassTransit now has the ability to send and receive message data outside of the message body.

A couple of standard repositories are available (in-memory, and file system) with more to come, including Azure Blob Storage and perhaps Amazon S3.

_repository = new FileSystemMessageDataRepository(dataDirectory);

When sending a message, during message construction the repository is used to store the message data, which returns an address which is written to the message property.

string data = "Some really long string (or byte array)";
var message = new BigMessage
{
    Body = await _repository.PutString(data)
};
await endpoint.Send(message);

Like all of MassTransit, the repository is async aware, so the Put is awaited. Then, the message is sent and the reference to the message data is saved in the message body. Reading the message data is as easy as decorating the consumer with the message data behavior, and then just using the property directly.

x.ReceiveEndpoint("my_queue", e =>
{
    e.UseMessageData<BigMessage>(_repository);

    e.Handle<BigMessage>(context =>
    {
        string body = await context.Message.Body.Value;
        Console.WriteLine(body);
    });
}

Big message consumers need not be aware of the external storage implementation in use. The consumer only needs to await on the message data property, and the resulting content (either a stream, or the byte[] or string) will be returned asynchronously.

Message Transformation

To support external message data, a mechanism for modifying the properties of a message as it passed through the consume pipeline was required. To that end, MassTransit now has the ability to specify a message transform.

To specify a message transform, add a transform to the receive endpoint configuration.

x.ReceiveEndpoint("my_queue", e =>
{
    e.Transform<A>(t =>
    {
        t.Set(p => p.Second, context => "World");
    });
    e.Handle<A>(context => Console.WriteLine(context.Message.Second));
});

In the example above, the transform is applied to any A message type, and the Second property has the value “World” for any subsequent message filters, including any consumers, handlers, or sagas.

By using the Set method, the original A message is not modified. A new version of the message is created that contains the new property value. This is in contrast to the Replace method, which changes the original message property.

Instead of defining a message transform inline, a separate transform specification class can be created. There are many reasons to do this, including separation of code concerns, etc. but it’s become very useful.

class MessageATransform :
    ConsumeTransformSpecification<A>
{
    public MessageATransform()
    {
        Set(x => x.First, context => "Hello");
        Set(x => x.Second, context => "World");
    }
}

The transform is then applied to the receive endpoint.

x.ReceiveEndpoint("my_queue", x =>
{
    x.UseTransform<A>(x => x.Get<MessageATransform>());        
    x.Handle<A>(context => Console.WriteLine(context.Message.Second));
});

Simplified Saga Repository

To make creating new saga repositories easy, the actual behavior required by a new saga implementation is reduced to two methods. The repository has also been redesigned to support composition and middleware, as well as full async operation, making it a clean and consistent implementation — on par with every other type of message consumer.

There is probably some tuning and adjustments yet to be addressed, but it’s super sweet so far.

What’s Left?

There are a few more things to wrap up before making MassTransit v3 ready for the primary NuGet feed (it’s currently hidden behind the pre-release flag). The exception handling pipeline needs to be well tested and verified, including adding context to the messages in error queues. Really, it’s just a lot of exception and sad-path testing at this point. The majority of the functionality is working very well, including Azure Service Bus.

If you are ambitious and ready to get started with the latest and greatest, I highly recommend pulling down the most recent pre-release packages and taking them for a spin. There are a couple of complete samples that demonstrate how to use MassTransit in a variety of scenarios.

Sample-RequestResponse

A complete request/response example, leveraging the IRequestClient to encapsulate the configuration and endpoint mapping, keeping the requestor code clean and simple. Source Code

Sample-Courier

A complete sample for using Courier, the Routing Slip implementation that is included with MassTransit. Examples of how to create and execute routing slips, as well as track the routing slip events and orchestrate those events using Automatonymous are included, as well as using SQLite as a saga repository for the state machine instances. Source Code

Documentation

Okay, the documentation still needs a lot of work, but it’s coming along. If you’re a great writer, the more help the better on this part.

Stay Tuned

This was meant as an interim update, just to give a status on the development of MassTransit. The initial feedback and encouragement has been great, both on the much simpler API, the overall design of the message pipeline, and the Azure Service Bus support. It’s feedback from developers that helps determine when it is ready for stable release, so test drive the alphas and keep the feedback coming!

About Chris Patterson

Chris is a senior architect for RelayHealth, the connectivity business of the nation's leading healthcare services company. There he is responsible for the architecture and development of applications and services that accelerate care delivery by connecting patients, providers, pharmacies, and financial institutions. Previously, he led the development of a new content delivery platform for TV Guide, enabling the launch of a new entertainment network seen on thousands of cable television systems. In his spare time, Chris is an active open-source developer and a primary contributor to MassTransit, a distributed application framework for .NET. In 2009, he was awarded the Most Valuable Professional award by Microsoft for his technical community contributions.
This entry was posted in masstransit. Bookmark the permalink. Follow any comments here with the RSS feed for this post.
  • niaher

    Thank you for your great work. I am wondering if the support for Azure Service Bus also means support for the on-premises version of it, Service Bus for Windows Server?

  • PhatBoyG

    I have not set Service Bus for Windows up personally (I tried, and failed, the requirements are too invasive for my development machine with all the Azure work I’m doing). But the API is supposed to be the same, so it _should_ work with it.

    That said, there may be some configuration API tweaks to specify the connection details that are missing for an on-premise installation. That’s just a guess, it might work. If you have an installation of it that is internet-facing, I could verify with it, I just failed at configuring a standalone instance of it.

  • I’m curious if CrossTown is being updated along with MT3? In many environments, having that Java option seems to hedge MT above other .NET only solutions.

    Great work here Chris!

  • Rob van Geloven

    Just wondering: the V2 api request / response didn’t need a queue specified using the PublishRequest extension.

    The v3 interface does (as far as I can tell) and doens’t have this extension. Is there any way to replicate the extensions functionality within Masstransit v3?

  • Anno Winson

    @PhatBoyG:disqus
    “the resulting content (either a stream, or the byte[] or string)”

    I only found the methods for string and byte[], but not for Stream. How to use Streams as claim check to minimize memory consumption?