Sagas, State Machines, and Abandoned Carts


    A few weeks ago, there was a post on how to use a saga to coordinate the business requirement of sending an email when a shopping cart is abandoned by a user. The use case demonstrated is fairly common, particularly if you generalize the solution.

    While a general solution can be extracted from the subject matter, I wanted to see how the same approach could be taken using an Automatonymous state machine and MassTransit.

    TL;DR – Go to the Sample Code

    The General Idea

    The idea is that through purely observing the events of a shopping cart, an external system could determine when a potential customer has added items to their cart only to abandon the cart after a period of time. Once the cart was abandoned, a separate event would trigger a process to send the user an email asking them to come back and spend some of those hard earned dollars on some swag.

    The Implementation

    The sample was built using MassTransit, RabbitMQ, and Automatonymous, as well as using Entity Framework to persist the state of the state machine and using Quartz to schedule the timeout event.

    The Observed Events

    A few events are needed to allow the cart activity to be observed. These would be produced by the web application, so a Shopping.Web site was created in the sample. My UI chops are terrible, but what a beautiful web site the template generator creates! The CartController is the majority of the work here. The events are simple interfaces, one for an item being added to the cart and another for when an order is submitted.

    public interface CartItemAdded
    {
        DateTime Timestamp { get; }
        string UserName { get; }
    }
    
    public interface OrderSubmitted
    {
        Guid OrderId { get; }
        DateTime Timestamp { get; }
        Guid CartId { get; }
        string UserName { get; }
    }
    

    The State Machine

    To design the business logic, a state machine is created using Automatonymous. The state machine defines the events observed, how the events are correlated, and the behavior of events in each state. But first, the actual state to be persisted should be defined.

    public class ShoppingCart :
        SagaStateMachineInstance
    {
        public string CurrentState { get; set; }
        public string UserName { get; set; }
        public DateTime Created { get; set; }
        public DateTime Updated { get; set; }
        public Guid? ExpirationId { get; set; }
        public Guid? OrderId { get; set; }
        public Guid CorrelationId { get; set; }
    }
    

    The ShoppingCart is a class, which will be persisted using Entity Framework. The code-first map is in the project, so you can check that out yourself. Now, the state machine.

    public class ShoppingCartStateMachine :
        MassTransitStateMachine<ShoppingCart>
    {
        public ShoppingCartStateMachine()
        {
            InstanceState(x => x.CurrentState);
    

    The state machine class, and the specification of the property for the current state of the machine are defined first.

            Event(() => ItemAdded, x => x.CorrelateBy(cart => cart.UserName, context => context.Message.UserName)
                .SelectId(context => Guid.NewGuid()));
    

    The event that is observed when an item is added to the cart, along with the correlation between the state machine instance and the message are defined. The id generator for the saga instance is also defined.

            Event(() => Submitted, x => x.CorrelateById(context => context.Message.CartId));
    

    The order submitted event, and the correlation for that order.

            Schedule(() => CartExpired, x => x.ExpirationId, x =>
            {
                x.Delay = TimeSpan.FromSeconds(10);
                x.Received = e => e.CorrelateById(context => context.Message.CartId);
            });
    

    In order to schedule the timeout, a schedule is defined, including the time delay for the scheduled event, and the correlation of the event back to the state machine.

    Now, it is time for the actual behavior of the events and how they interact with the state of the ShoppingCart.

            Initially(
                When(ItemAdded)
                    .Then(context =>
                    {
                        context.Instance.Created = context.Data.Timestamp;
                        context.Instance.Updated = context.Data.Timestamp;
                        context.Instance.UserName = context.Data.UserName;
                    })
                    .ThenAsync(context => Console.Out.WriteLineAsync($"Item Added: {context.Data.UserName} to {context.Instance.CorrelationId}"))
                    .Schedule(CartExpired, context => new CartExpiredEvent(context.Instance))
                    .TransitionTo(Active)
                );
    

    Initially defined events that can create a state machine instance. In the above, the properties of the instance are initialized, and then the CartExpired event is scheduled, after which the state is set to Active.

            During(Active,
                When(Submitted)
                    .Then(context =>
                    {
                        if (context.Data.Timestamp > context.Instance.Updated)
                            context.Instance.Updated = context.Data.Timestamp;
                        context.Instance.OrderId = context.Data.OrderId;
                    })
                    .ThenAsync(context => Console.Out.WriteLineAsync($"Cart Submitted: {context.Data.UserName} to {context.Instance.CorrelationId}"))
                    .Unschedule(CartExpired)
                    .TransitionTo(Ordered),
    

    While the shopping cart is active, if the order is submitted, the expiration is canceled (via Unschedule) and the state is set to Ordered.

                When(ItemAdded)
                    .Then(context =>
                    {
                        if (context.Data.Timestamp > context.Instance.Updated)
                            context.Instance.Updated = context.Data.Timestamp;
                    })
                    .ThenAsync(context => Console.Out.WriteLineAsync($"Item Added: {context.Data.UserName} to {context.Instance.CorrelationId}"))
                    .Schedule(CartExpired, context => new CartExpiredEvent(context.Instance)),
    

    If another item is added to the cart, the CartExpired event is scheduled, and the existence of a previously scheduled event (via the ExpirationId property) is used to cancel the previously scheduled event.

                When(CartExpired.Received)
                    .ThenAsync(context => Console.Out.WriteLineAsync($"Item Expired: {context.Instance.CorrelationId}"))
                    .Publish(context => new CartRemovedEvent(context.Instance))
                    .Finalize()
                );
    

    If the CartExpired event is received, the cart removed event is published and the shopping cart is finalized.

            SetCompletedWhenFinalized();
        }
    

    Signals that the state machine instance should be deleted if it is finalized. This is used to tell Entity Framework to delete the row from the database.

        public State Active { get; private set; }
        public State Ordered { get; private set; }
    

    The states of the shopping cart (Initial and Final are built-in states).

        public Schedule<ShoppingCart, CartExpired> CartExpired { get; private set; }
    

    The schedule definition for the CartExpired event.

        public Event<CartItemAdded> ItemAdded { get; private set; }
        public Event<OrderSubmitted> Submitted { get; private set; }
    }
    

    The events that are observed by the state machine (the correlations are defined earlier in the state machine).

    The Plumbing

    To connect the state machine to a bus endpoint, the saga repository is declared, and then the machine and repository are connected to the receive endpoint.

    _machine = new ShoppingCartStateMachine();
    
    SagaDbContextFactory sagaDbContextFactory = () => 
        new SagaDbContext<ShoppingCart, ShoppingCartMap>(SagaDbContextFactoryProvider.ConnectionString);
    
    _repository = new Lazy<ISagaRepository<ShoppingCart>>(
        () => new EntityFrameworkSagaRepository<ShoppingCart>(sagaDbContextFactory));
    

    Once the machine and repository are declared, the receive endpoint is declared on the bus configuration.

    _busControl = Bus.Factory.CreateUsingRabbitMq(x =>
    {
        IRabbitMqHost host = x.Host(...);
    
        x.ReceiveEndpoint(host, "shopping_cart_state", e =>
        {
            e.PrefetchCount = 8;
            e.StateMachineSaga(_machine, _repository.Value);
        });
    
        x.ReceiveEndpoint(host, "scheduler", e =>
        {
            x.UseMessageScheduler(e.InputAddress);
    
            e.PrefetchCount = 1;
    
            e.Consumer(() => new ScheduleMessageConsumer(_scheduler));
            e.Consumer(() => new CancelScheduledMessageConsumer(_scheduler));
        });
    });
    

    There are two endpoints on the tracking service bus, and both are shown as the message scheduler needs to be setup (using Quartz). Refer to the source for the details of configuring and starting the Quartz scheduler, but multiple endpoints can be setup on the bus.

    The key line is the registration:

    e.StateMachineSaga(_machine, _repository.Value);
    

    That’s where the state machine is connected to the endpoint. All of the events in the state machine are added to the exchange bindings in RabbitMQ so the events, when published, make it to the queue for processing by the state machine. It’s important to point out that:

    x.UseMessageScheduler(e.InputAddress);
    

    Is special, in that it’s configuring the bus to use the message scheduler on the scheduler receive endpoint. The subtle references to x and e aren’t obvious, but the input address of the endpoint is passed to the bus. Configuration in MassTransit is evaluated after it is all defined, so it’s possible to do crazy stuff like this.

    Enough Already

    This post has already gotten far too long, but I really wanted to share the experience of building an event-driven work flow using a state machine. I think the experience for a developer is pretty clean and easy to understand.

    Check out the source: Go to the Sample Code

    Share your thoughts!

    MassTransit v3 Update


    MassTransit v3 Update

    It has been nearly six months since the first alpha of MassTransit v3 was released, and a lot of progress has been made. It turns out that rewriting an entire code base takes time and attention. Nonetheless, the new architecture is working out wonderfully and the code is nicely separated by concerns.

    So, about the TPL…

    Make no mistake, the TPL (introduced in .NET 4.0), followed by the addition of async/await (in C# 5) has made the creation of asynchronous code clean and concise. That being said, knowing the exact behavior of the language constructs, and how the compiler translates the keywords is very important. Add to the mix the fact that many third-party assemblies were not designed for asynchronous invocation, and the resulting cesspool is quite a mess. However, it’s not that bad.

    At every layer, MassTransit has been built around the TPL, leveraging async/await for the best performance, and providing pipe and filter composition at every possible extension point. The middleware injection is extensive, and new filter can be created easily to support many advanced use cases such as rate limiting, concurrency restriction, and asynchronous transactions. More filters will be coming, but an initial release has to happen at some point…

    Some New Features

    The past six months have not been entirely about stabilization of the code. There have been several tasty new features added as well.

    External Message Data Storage

    Big messages are inevitable, and big messages can really clog up the works making small message suffer. And there are some brokers that just can’t deal with big messages (cough, Azure Service Bus, cough) at all. To support the transfer of big messages (those messages with large byte arrays or strings), MassTransit now has the ability to send and receive message data outside of the message body.

    A couple of standard repositories are available (in-memory, and file system) with more to come, including Azure Blob Storage and perhaps Amazon S3.

    _repository = new FileSystemMessageDataRepository(dataDirectory);
    

    When sending a message, during message construction the repository is used to store the message data, which returns an address which is written to the message property.

    string data = "Some really long string (or byte array)";
    var message = new BigMessage
    {
        Body = await _repository.PutString(data)
    };
    await endpoint.Send(message);
    

    Like all of MassTransit, the repository is async aware, so the Put is awaited. Then, the message is sent and the reference to the message data is saved in the message body. Reading the message data is as easy as decorating the consumer with the message data behavior, and then just using the property directly.

    x.ReceiveEndpoint("my_queue", e =>
    {
        e.UseMessageData<BigMessage>(_repository);
    
        e.Handle<BigMessage>(context =>
        {
            string body = await context.Message.Body.Value;
            Console.WriteLine(body);
        });
    }
    

    Big message consumers need not be aware of the external storage implementation in use. The consumer only needs to await on the message data property, and the resulting content (either a stream, or the byte[] or string) will be returned asynchronously.

    Message Transformation

    To support external message data, a mechanism for modifying the properties of a message as it passed through the consume pipeline was required. To that end, MassTransit now has the ability to specify a message transform.

    To specify a message transform, add a transform to the receive endpoint configuration.

    x.ReceiveEndpoint("my_queue", e =>
    {
        e.Transform<A>(t =>
        {
            t.Set(p => p.Second, context => "World");
        });
        e.Handle<A>(context => Console.WriteLine(context.Message.Second));
    });
    

    In the example above, the transform is applied to any A message type, and the Second property has the value “World” for any subsequent message filters, including any consumers, handlers, or sagas.

    By using the Set method, the original A message is not modified. A new version of the message is created that contains the new property value. This is in contrast to the Replace method, which changes the original message property.

    Instead of defining a message transform inline, a separate transform specification class can be created. There are many reasons to do this, including separation of code concerns, etc. but it’s become very useful.

    class MessageATransform :
        ConsumeTransformSpecification<A>
    {
        public MessageATransform()
        {
            Set(x => x.First, context => "Hello");
            Set(x => x.Second, context => "World");
        }
    }
    

    The transform is then applied to the receive endpoint.

    x.ReceiveEndpoint("my_queue", x =>
    {
        x.UseTransform<A>(x => x.Get<MessageATransform>());        
        x.Handle<A>(context => Console.WriteLine(context.Message.Second));
    });
    

    Simplified Saga Repository

    To make creating new saga repositories easy, the actual behavior required by a new saga implementation is reduced to two methods. The repository has also been redesigned to support composition and middleware, as well as full async operation, making it a clean and consistent implementation — on par with every other type of message consumer.

    There is probably some tuning and adjustments yet to be addressed, but it’s super sweet so far.

    What’s Left?

    There are a few more things to wrap up before making MassTransit v3 ready for the primary NuGet feed (it’s currently hidden behind the pre-release flag). The exception handling pipeline needs to be well tested and verified, including adding context to the messages in error queues. Really, it’s just a lot of exception and sad-path testing at this point. The majority of the functionality is working very well, including Azure Service Bus.

    If you are ambitious and ready to get started with the latest and greatest, I highly recommend pulling down the most recent pre-release packages and taking them for a spin. There are a couple of complete samples that demonstrate how to use MassTransit in a variety of scenarios.

    Sample-RequestResponse

    A complete request/response example, leveraging the IRequestClient to encapsulate the configuration and endpoint mapping, keeping the requestor code clean and simple. Source Code

    Sample-Courier

    A complete sample for using Courier, the Routing Slip implementation that is included with MassTransit. Examples of how to create and execute routing slips, as well as track the routing slip events and orchestrate those events using Automatonymous are included, as well as using SQLite as a saga repository for the state machine instances. Source Code

    Documentation

    Okay, the documentation still needs a lot of work, but it’s coming along. If you’re a great writer, the more help the better on this part.

    Stay Tuned

    This was meant as an interim update, just to give a status on the development of MassTransit. The initial feedback and encouragement has been great, both on the much simpler API, the overall design of the message pipeline, and the Azure Service Bus support. It’s feedback from developers that helps determine when it is ready for stable release, so test drive the alphas and keep the feedback coming!

    MassTransit 3 API Changes


    When MassTransit 3 was announced, it was also stated that many of the APIs have changed. This post covers some of the most data types, and describes how they are used as well as how they compare to previous version. In addition, the conceptual changes of MassTransit 3 will also be shared.

    The Bus and Receive Endpoints

    In MassTransit 3, a bus is a logical construct which connects to one or more hosts via a single message transport, either RabbitMQ or Azure Service Bus. A bus may include multiple receive endpoints in which each receiving endpoint is connected to a separate queue.

    Previous versions of MassTransit only supported a single queue per service bus (which was configured using the ReceiveFrom method).

    To support the configuration of receive endpoints, as well as advanced configuration specific to each transport, bus creation has changed. The shortest code to create a bus is shown below.

    var hostAddress = new Uri("rabbitmq://localhost/test_virtual_host"); IBusControl bus = Bus.Factory.CreateUsingRabbitMq(x => { x.Host(hostAddress, h => { h.Username("test"); h.Password("password"); }); }); 
    

    Previous versions of MassTransit created a service bus using ServiceBusFactory.New.

    The bus created above has no receive endpoints. However, if a request is sent to an endpoint using the bus’s address as the return address, a temporary queue is created to receive the response. The bus’s queue name is assigned by the broker (in the case of RabbitMQ, for Azure the name can be specified or dynamically generated).

    HINT: Don’t write a log message to display the bus’s address! Merely accessing the address property results in the creation of the temporary queue! How’s that for an awesome side effect! This should probably change to Task<Uri> GetBusAddress() to signal the impact of using it!

    IBus (so long IServiceBus)

    In MassTransit 3, IBus refers to a bus instance. A bus includes methods to obtain send endpoints, as well as publish messages.

    Previously IServiceBus was used, but there were significant changes to the method signatures with dangerous implications from improper use (many methods now return a Task and execute asynchronously). Therefore, changing the interface to IBus was decided to be the best approach to avoid confusing bugs when upgrading existing services.

    public interface IBus { // the address of the bus, which is dynamically created and exclusive // to the bus instance Uri Address { get; } // return a send endpoint for the specified address Task<ISendEndpoint> GetSendEndpoint(Uri address); // publish a message on the bus, using the publish conventions of the // underlying transport (8+ overloads for customizing) Task Publish<T>(T message, CancellationToken cancellationToken); } 
    

    Starting the Bus

    Once a bus is created, an IBusControl interface is returned. The IBusControl interface includes IBus and adds the Start method.

    public interface IBusControl : IBus { // start the bus, as well as any configured receive endpoints Task<BusHandle> Start(CancellationToken cancellationToken); } 
    

    When the application is ready to start the bus, the Start method should be called. This method returns a BusHandle which should be retained until the bus needs to be stopped.

    BusHandle busHandle = await bus.Start(cancellationToken); // application runs, then it's time to stop the service await busHandle.Stop(); 
    

    Consumers

    A consumer is a class that handles (or in this case, consumes) one or more message types. When a message is read from a queue, an instance of the consumer is created using the configured consumer factory.

    The lifecycle of the consumer is managed entirely by the consumer factory. This gives control over the construction and disposal of the consumer to the consumer factory. There are integrations for most of the dependency injection containers included with MassTransit (StructureMap, Autofac, Unity, Ninject, Castle Windsor).

    Consumer Message Handlers

    The consumer declares the handled message types via the IConsumer interface. An example consumer of two message types, A and B, is shown below.

    class AbConsumer : IConsumer<A>, IConsumer<B> { public async Task Consume(ConsumeContext<A> context) { } public async Task Consume(ConsumeContext<B> context) { } } 
    

    Previously, the Consumes<T>.All or Consumes<T>.Context interfaces were used to specify consumers. This was changed to simplify the consumer class definition — the original syntax was clever but not very discoverable.

    The Consume method is asynchronous, and returns a Task that is awaited before acknowledging the message. If the consumer runs to the completion (task status of RanToCompletion), the message is acknowledged and removed from the queue. If the consumer faults (such as throwing an exception, resulting in a task status of Faulted or Canceled), the message is nack’d and remains on the queue.

    Receive Endpoints

    Within a bus, zero or more receiving endpoints can be declared. Each receiving endpoint should have a different queue name, and can also specify the host on which the receive endpoint is to be connected. To configure a receive endpoint for the consumer above, see the example below.

    IBusControl bus = Bus.Factory.CreateUsingRabbitMq(x => { var host = x.Host("rabbitmq://localhost/test_virtual_host", h => { h.Username("testuser"); h.Password("password"); }); // declare the receive endpoint on the host x.ReceiveEndpoint(host, "consumer_queue", e => { // configure the consumer using the default constructor // consumer factory. e.Consumer<AbConsumer>(); }) }); 
    

    Sending a message to the endpoint

    To send a message to the receiving endpoint, as a quick example, the bus would be used to get the send endpoint, and then a message would be sent.

    var address = "rabbitmq://localhost/test_virtual_host/consumer_queue"; ISendEndpoint consumerEndpoint = await bus.GetSendEndpoint(address); var message = new A { Value = "Hello, World."}; await consumerEndpoint.Send(message); 
    

    The message type, A, is a simple object with properties:

    public class A { public string Value { get; set; } } 
    

    Consuming the Message

    Inside the consumer, the Consume method handles the message. All messages are delivered within a ConsumeContext<T> that is specific to the message and consumer. For example, if the consumer handled the message and then published an event it would use the context to publish the event.

    public async Task Consume(ConsumeContext<A> context) { string value = context.Message.Value; await Console.Out.WriteLineAsync(value); var thisJustHappend = new AHandled(value); // this is async, but read below why we don't have to await it context.Publish(thisJustHappened); } 
    

    In the consumer above, the value of the message is written (using the async IO methods) to a file (in this case, just the console — I’m not actually sure if the console is open for async i/o, but roll with it). Then, an event is published using the context. The published event is also just a class.

    public class AHandled { public AHandled(string value) { Value = value; } public string Value { get; private set;} } 
    

    Now, the publish could have been awaited — which completes once the broker acknowledges that the publish has been written to the queue. However, the context is delegating the Publish call, and therefore is able to capture the Task returned and keep track of it inside the consumer context. The message will not be acknowledged until all pending tasks have completed. So the consumer could publish a dozen events without awaiting each one (which would be silly, honestly – since they’re all async) and the framework will handle awaiting until all of the messages have been published and then acknowledge the message being consumed.

    Sending endpoints work the same way.

    public async Task Consume(ConsumeContext<A> context) { var sendEndpoint = await context.GetSendEndpoint(_serviceAddress); sendEndpoint.Send(someCommand); } 
    

    In the above example, the Task returned from sendEndpoint.Send is captured by the consume context (by inserting an intercepter in front of the ISendEndpoint interface) and awaited by the consumer before acknowledging the message.

    More to Come

    This is just a short introduction to some of the API changes, to make it easy to migrate applications to the new version.

    It’s important to remember that if JSON or XML serialization is being used, there is complete interoperability between services using MassTransit 2.x and MassTransit 3. So there is no need to update every service simultaneously — services can be updated as needed.

    There will be more content, as well as updated documentation, as MassTransit becomes ready for general use. Until then, enjoy the alpha bits and share your feedback on the changes!

    MassTransit ]|[


    This article describes an upcoming release of MassTransit. This release is still under development and will be pushed in various pre-release forms before the final release is finished. Early feedback is valuable, and you are encouraged to check out the early bits, but recognize that they are just that – early bits. There will be bugs and features that are not yet complete.

    Background

    MassTransit was started in 2007 after a very exciting ALT.NET meeting in Austin, TX. I met a lot of great, smart people with a lot of great ideas on how to make the .NET community a better place for software developers. It was shortly after that Dru Sellers and I started working on a messaging library for use with MSMQ to solve problems within our respective jobs. I had actually been using MSMQ since the late 90’s from C++, but was starting to make moves into C# and needed a managed solution.

    Originally NServiceBus was considered, but while it was open source, it was not accepting contributions at the time and was specific to less than stellar projects like Spring.NET. Having reviewed the source code, Dru and I came to the conclusion that we could start a new project to address our requirements using frameworks with which we were both more familiar. Since that time, NServiceBus has seen tremendous growth and is now a commercial product from Particular Software (started by Udi Dahan).

    In the past seven years, MassTransit has evolved as requirements for messaging in distributed systems changed. What started as an abstraction layer on top of MSMQ become a comprehensive platform for creating distributed applications, including support for sagas (okay, we can call them process monitors if you prefer), message scheduling (via Quartz), and a powerful routing slip implementation that has been used as the underpinnings of a modern scaled distributed interoperability platform. Each of these features is great standalone, but when combined they make possible some really incredible system design options that are reliable, scalable, and supportable.

    In with the New

    There have been many changes in the .NET framework since version 2.0 was released. The runtime and language (C#) have both evolved, adding powerful new features. The biggest changes appeared in .NET 4 with the introduction of the TPL. The new threading support has made asynchronous programming easier to understand and has greater performance with lower overhead than previous operating system threading models.

    In addition, the big puffy cloud has become mainstream and many applications are being built cloud-first, leading to the use of push messaging systems. When support for RabbitMQ was added, it took a while to get it right and get it cloud-friendly. The first incarnation followed the same pull model that was used for MSMQ, which led to expensive running services with the cloud’s pay-as-you-go model. Refining the transport to support push via AMQP brought the request charges down significantly, but started to identify limitations in the underlying transport model around which MassTransit was built.

    Along with the cloud came Azure and the Azure Service Bus. For many .NET developers, it’s the easiest option for systems being deployed to the cloud. Cloud services are cost effective ways to host applications, but having to run a RabbitMQ cluster in IaaS can be expensive. Support for Azure Service Bus has been limited to an external library due to the difference in messaging topology, particular the topic/queue structure that’s provided on top of SQL service broker. There is also the on-premise Service Bus option, although why somebody would run that over RabbitMQ eludes me.

    RabbitMQ also got a lot more awesome, with many improvements in broker-to-client communcation, including notifications to clients as the topology of cluster nodes changes and messages flow through the broker. The client library for .NET has gone from a limited, basic functionality library to include extensive support for the new features. This again identified some seams in MassTransit that were just not clean and made it difficult to take advantage of the new features.

    The term CQRS, or Command Query Responsibility Segregation, became a lot more normal, and the separation of reads from writes became obvious as more functional programming styles became commonplace. This is also true in messaging systems. For example, in RabbitMQ you send messages to exchanges and read messages from queues. Two completely separate constructs tuned to each side of the message conversation.

    So What’s New With MassTransit 3

    MassTransit v3 is a modern, enterprise-grade framework for creating reliable, scalable, and supportable distributed applications. Version 3 leverages the latest .NET framework features and supports current message brokers including RabbitMQ and Azure Service Bus (additional support for Event Hubs, as well as on-premise Service Bus will be available as well).

    Each message broker will be fully supported at the API level, allowing the distinct advantages of each broker to be utilized. The configuration API for endpoints and consumers is specialized for each transport, while consumers remain agnostic to the transport implementation details. This retains the ability to transplant consumers from one messaging system to another without a rewrite yet take advantage of transport specific features. Message consumers have access to transport-specific parameters and settings as well, in case more specific per-message controls are needed.

    Most modern frameworks provide middleware extension points, and MassTransit 3 is no exception. Full end-to-end middleware injection points, from the transport, post serialization, to handlers and consumers and everything in between, make MassTransit 3 the most customizable and extensible messaging framework. Every middleware component is asynchronous, ensuring maximum thread utilization, as well as scope owning, allowing full in/out processing and exception handling. All of this without any overhead that impacts message processing throughput. Traditional observation-style notification points are also available, including pre/post/fault consumer message handling, as well as pre/post/fault send message handling, either by a specific inbound message type or consumer type, outbound message type or endpoint address, or generic inbound and outbound message paths.

    A new MassTransit Host, which was part of the original releases, but was extracted to become Topshelf, is now included. And with it comes easy “create an assembly and hit F5” approach to building autonomous services with MassTransit. All of the setup, including separate receive endpoints for each consumer, is handled by convention to make getting started easier than ever. And those services are production ready with full logging support.

    Courier, the routing slip implementation for MassTransit, as well as Quartz integration are fully optimized for MassTransit 3. Both scheduling and courier are built into MassTransit, along with full container integration.

    Out with the Old

    Seven years of evolution in a framework means a lot of old code, a lot of which is old solving problems. To keep MassTransit 3 clean and tight, some things had to go.

    MSMQ is no longer supported. There was extensive code in MassTransit supporting features that were not supported by MSMQ out of the box. Modern message brokers have full support for topic- and/or exchange-based routing, which can be complex and error prone. Rather than try to keep MSMQ alive, everything related to supporting publish/subscribe on MSMQ is gone, deleted, left in the archives of Git history. While a send/recieve only transport for MSMQ may be added at some future point, developers can continue to use the 2.x versions for MSMQ applications.

    Magnum, is no longer a reference. Neither is Stact. Both of these poorly maintained libraries have been removed from MassTransit. A lot of the features in those libraries were there to compensate for limited .NET framework features. With .NET 4.5, the need for those extensions is no longer required. In fact, there are many code design changes that make generic support easier and more understandable, eliminating the dynamic code generation that was so heavily leveraged in previous version of MassTransit. This should lead to faster code execution and reduced process startup time.

    Of course, with no Magnum, there is no Magnum state machine. For applications built today, Automatonymous (docs) should be used anyway. It’s a better, more flexible, and more extensible state machine than Magnum implementation.

    A lot of legacy interfaces that were never suggested to be used, as well as tiny little features that were difficult to support and not used, are gone. Some interfaces may remain to make it easier to port v2 applications to v3, but those will be for compatibility only (and, in fact, packaged in a separate MassTransit.Compatibility assembly).

    Timelines

    The first early bits of MassTransit 3 should be available on NuGet soon. This includes support for Azure Service Bus and RabbitMQ. I’ll follow up the bits with some articles on how to get started with the new configuration API, and how to get your first message producers and consumers communicating. Details on the default conventions for RabbitMQ and Azure Service Bus will also be published (RabbitMQ is unchanged and 100% compatible with v2.x services).

    Automatonymous, Courier, and Quartz integration will also have pre-release versions available that are compatible with the new MassTransit 3 bits.

    In Closing

    Many more developers are being pushed into building decoupled message-based applications, and those developers need a clean and modern API that aligns closely to the .NET 4.5 framework. This is even more important as we move into vNext, with .NET now being open source. A fully vNext compatible version of MassTransit is definitely coming as the vNext release approaches. MassTransit 3 is being built to target those developers, as well as those who are maxing out with MassTransit v2.x and need better asynchronous support and extensibility for their applications.

    Watch for the early bits announcement soon!

    Separating Concerns – Part 2: Services


    In the previous article on Separation of Concerns, libraries were explained as a way to decompose an application into separate sets of functions, resulting in code that is easier to maintain and has higher cohesion. This article continues the subject, explaining how applications can benefit from using services and what differentiates them from libraries.

    Services

    A service is a set of related functions usable by multiple applications and accessible through one or more public interfaces. What differentiates a service from a library is the way an application uses it. In the case of a library, functions run in the execution context of the calling application, inheriting the application’s environment, which includes the process, thread, and user context. A service, however, runs in its own execution context and has its own set of policies that govern how it can be used.

    Due to framework limitations and/or the complexity of asynchronous programming, many applications invoke services synchronously — leaving the developer with the impression that the service has “taken control” of the execution context. In reality, the calling application is only blocked while waiting for the service to respond. And since the application is blocked, it behaves like a library function call, at least from the perspective of the debugger. Service invocation, however, invokes a complex exchange of information between the calling application’s process and the service’s process. This exchange is even further complicated by the fact that the application and service processes may exist on separate machines, be written using different languages, and be running on different operating systems. Also, it’s important to recognize that at any time during service invocation, the operating system, the process, the service, or the network can (and eventually will) fail.

    Service Interface

    The blocking scenario above pertains to applications that invoke services using a remote procedure call (or, RPC) form of inter-process communication. Examples include a web browser using a REST implementation over HTTP, a mobile client using a SOAP implementation (also over HTTP), and an intranet application using a binary implementation over TCP. A single service can support one or more of these implementations simultaneously, making the service available to a broader range of applications.

    Another form of inter-process communication is message passing, where the application sends a message to the service. Message passing is inherently asynchronous, the application can send the message and continue processing without waiting for the service to complete. There are many advantages to using asynchronous message passing instead of synchronous RPC, but an asynchronous programming model is also more complex for developers. Messages can also be passed using a durable message store, making it easier to recover from failure without losing service requests and/or commands. Message passing also eliminates temporal coupling, preventing the application from being dependent upon the availability of the service.

    And the advantage is… what?

    Given the complexity of a service, particularly in comparison to a library, why would anyone want to deal with the complexity of a service?

    Quite simply, with a service, the implementation details of the public interface are encapsulated within the service itself and do not become dependencies of the calling application. And since the application is only calling the public interface, the only dependency added to the application is the service interface. The application does not inherit the dependencies of the service, as those dependencies are private implementation details. For this reason alone, when a function has a dependency on another service or when a function depends upon dynamic data, it is better to create a service, encapsulating those dependencies and enabling the service to be managed separately from the applications using it.

    For example, a domain name validation function requires a list of valid domain names. However, the current list of valid domain names is constantly changing. If domain name validation was implemented as a library, the application must also be responsible for maintaining the list of valid domain names. Rather than adding these additional requirements to the application, a service is used to valid the domain name instead.

    So, the advantage of encapsulating dependencies while retaining the ability to reuse functionality is the key benefit of a service. The benefits of a library are also benefits of a service, including high cohesion, as long as the service focuses on a single concern or responsibility.

    In the next installment, frameworks will be explained.

    CRUD is Not a Service


    Introduction

    Many systems implement CRUD (create, read, update, and delete) using a repository pattern. An entity is loaded using a Get method, some business layer logic makes changes to the entity, and ultimately saved using a Put method. This exact pattern is replicated with as many names as there are examples, including EntityManager, EntityDataContext, EntityStorage, etc. In fact, the pattern itself has been completely generalized by libraries such as NHibernate, which provides an ISession interface for performing simple CRUD operations (yes, there are many additional advanced features that make NHibernate much more useful than just a simple CRUD library, but that’s not the point).

    A significant weakness of the repository pattern is ensuring that an entity’s state is valid. When an entity is saved, it is important that the contents are valid. If the contents are invalid, such as the order total not equaling the sum of the order items, the resulting inconsistency can spread throughout the application causing additional, and perhaps greater inconsistencies.

    Most of the time, developers using the repository pattern define classes for entities with properties for the attributes of the entity. And in most cases, the properties allow both reads and writes – making it possible for entity to become invalid. The repository pattern also does not allow intent to be either expressed or captured as changes are made.

    In order to properly validate an entity, the validation logic may need access to additional resources. For example, adding an activity to an itinerary may need to verify that there are seats available for a dining activity. Beyond simple validation, adding the activity to an itinerary may need to allocate an available seat to the itinerary’s owner. Subsequently, removing the activity would require that the previously allocated seat be released so that is available to others.

    As systems evolve, this type of behavior gets added to the repository class, checking for property changes in the Put method and invoking services when changes are found. The more property changes that require external systems to be notified, the more complex the Put method becomes resulting in a greater number of dependencies in the repository class.

    Don’t even ask what happens when the entity is saved and the property changes, having invoked external services are not persisted due to a transaction timeout or deadlock in the storage system. And don’t simply suggest that invoking the services after the save is complete is the right answer, because then what happens when the services cannot be invoked due to a network or availability issue.

    Command Query Separation

    Command Query Separation (or CQS) is a pattern that separates commands (typically write operations) from queries (including reads and operations that do not change data). By separating the concerns of reading and writing, it is possible to tune a system to meet specific scaling requirements without generalizing for both operations.

    The following provides an example of how CQS can be used to implement CRUD operations. With each operation, an approach is presented that allows for separation of concerns as well as an implementation that can scale reads and writes separately.

    Create

    Consider, for example, a dining reservation system for a local restaurant. The Reservation service exposes an API to schedule a reservation where the client specifies the date, number of guests, and the desired seating time. When called, the service checks availability and either adds the reservation to the calendar, or fails and notifies the caller that the requested time is not available. If the reservation is added, a table is reserved for the guests and the table is removed from the available seating. Once all available seating is reserved, the service will no longer accept reservations.

    The scheduling API above is an example of a command. A command tells a service to perform an operation. The service is responsible for implementing the command’s behavior, and is also the ultimate authority as to whether the command can be completed.

    From the perspective of the command’s initiator, the contract is well defined. Submit the required arguments (date, time, and the number of guests), and observe one of the two possible outcomes (scheduled, or unavailable). As long as there are available seats at the requested time, the reservation should succeed. If the command fails due to lack of availability, the initiator can choose to adjust the arguments (such as requesting a later time, or selecting a different date) and resubmit the command, or it can decide instead to try another time to check if an opening becomes available.

    Read

    In order to give the initiator a chance of successfully scheduling a reservation, it’s important that the reservation systems constraints are available so that initiators are able to submit reservations that will be accepted. This can be done many ways, but one way would be to expose the availability through a separate service.

    For example, an application may display the restaurant’s availability to the user so that the user can select a time. At the minimum, having access to the restaurant’s days and hours of operation would allow the user to know when the restaurant is open. However, the restaurant may only take reservations in the evening and on weekends. To convey this information to the application and the user, the availability service may supply more detailed availability including ranges of time and the seating availability for each range.

    The additional information provided by the availability service enables the application to determine in advance if a reservation will be accepted. If there is no seating available at a particular date and time, the application can assume that submitting a reservation for the date and time will fail. The application is not prevented from submitting the reservation, but it is likely that the reservation will fail.

    Update

    Plans change, and likewise the reservation service need to be able to make changes to reservations. Depending upon the type of change, however, the service needs to follow different behaviors.

    For example, if the reservation time changes, the service would need to determine if there was sufficient capacity at the new time for the reservation. On the other hand, if the number of guests increased, the service would need to ensure there was either sufficient seating at the already assigned table or if a larger table was available at the same time. A simple change, such as updating the name on the reservation, might not require any checks at all – unless the new name is identified as a VIP, in which case a check for upgraded tables or perhaps special menu selections would be performed to ensure that the VIP is treated to the best possible service.

    As the above examples clearly show, an update is not just a save operation. An update is a general term given to a wide variety of changes that may be applied to a reservation. Since the behavior of each change is different, each change should also be handled differently. A command should be created to define the contract for each change, and each command should be explicitly named to describe the change (UpdateReservationName, ChangeReservationGuests, ChangeReservationTime).

    While the update has changed from a single “write my object” operation to three separate commands, it is now easier to reason about the behavior of each command. If a new reservation time is requested, the initiator could check the published availability information and predetermine if the time slot is available. The initiator is not prevented from sending the command based on this information, but the likelihood of success is greater.

    Aggregate Roots and Scoping

    An aggregate root is a form of transactional boundary (defined in Domain Driven Design by Eric Evans) which defines the scope of an operation and its related data). For example, if the reservation service managed a list of guests with each reservation, the reservation would be the aggregate root and the list of guests would be contained within the reservation. This means that the addition or removal of a guest would be performed by or with the aggregate root. In practice, such as with a relational database, adding a guest to the reservation would not involve simply inserting into a ReservationGuest table, but actually loading the reservation and adding a guest. The reservation is the root entity, and the guests are a related or child entity.

    The reason for this is that a reservation should be treated as a whole and not a set of related entities. If the system has a rule that a reservation cannot exceed eight guests, and guests are arbitrarily added outside of the reservation, the logic to validate the number of guests ends up in multiple places (just read this as cut-n-paste, which makes it quite obvious why it is a bad thing). Keeping the validation logic as part of the reservation makes the rules easier to discover and understand compared to having validation logic spread across the service.

    Delete

    Continuing with the example, it’s likely that a guest may cancel a reservation. Plans change, and the service should be able to remove a reservation that is no longer required.

    To support canceling a reservation, the service may provide an additional API to cancel a reservation using the reservation number. When the command is received, the service would look up the reservation. If the reservation exists, the reservation would be marked as canceled and removed from the schedule – making the table available for scheduling by other patrons. If the reservation does not exist, the command would fail but the failure does not have any other effects. If the reservation existed but was already canceled, the command could be acknowledged as already canceled (there is no need to cancel a canceled reservation, but not failing ensures that the command is processed idempotently).

    The fact that the reservation existed does not change, so it is important that the history of the reservation is retained. While the service could simply delete the reservation from storage, the stakeholders may want to keep a history of reservations for future use, such as marketing or promotional events, or to follow up to solicit feedback as to why the reservation was canceled.

    Auditing

    When a command is executed, such as adding an activity to an itinerary, it is important to retain an audit trail of changes. This audit trail is important in case the contents of the itinerary are disputed. For example, a customer may argue that they did not add a particular activity or that an activity is missing. Without an audit trail, it would be impossible to determine the contents of an itinerary at a previous point in time or who made any changes to the itinerary.

    Retaining a history of commands executed on the itinerary along with preventing itinerary changes outside of the available commands can provide a reliable audit trail should the need arise. Additionally, ensuring that each command includes the user who initiated the command along with timestamps indicating when the command was initiated and executed can provide a chronological view of the changes made to the entity.

    To summarize a statement commonly made by Greg Young, “So you have an audit trail, how do you know it’s right?”

    By retaining every successful command, it is possible to rebuild the state of a reservation. In fact, in an event-sourced model, the actual commands are used to determine the current state. There are use cases for each approach, so if you have a highly event-based model, event sourcing may be worth consideration.

    Bonus: Transferring Data Between Systems

    In many organizations, separate test and production systems are used so that integrators and developers can test software or configuration changes prior to deploying them on production. For example, an integrator may configure a new customer on the test system prior to moving that configuration into production. More often than not, this transfer is performed using simple CRUD operations – typically behind the facade of an “import/export” link.

    A disadvantage of using bulk CRUD operations when transferring configuration between systems is that the system itself is not a participant in the data import process.

    Using Commands to Transfer Data

    Rather than transfer data at the entity level, the data in the source system should be used to generate a sequence of commands that can be executed on the target system. Those commands could include references to the original commands executed on the source system, along with the time those commands were originally executed and the initiating user details. Retaining this information may be crucial as changes are deployed, ensuring that the user performing the transfer is not made responsible for the actual changes performed by another user.

    Conclusion

    The use of commands to perform the creation, updating, and deleting of data has clear advantages over simple data access layer operations. Change tracking, auditing, and validation are critical to ensure that data is valid. As with most technical choices, whether or not this level of validation is required depends upon your requirements. in my experience, more often than not, the level of detail is required as auditing and change tracking eventually makes its way into the backlog.

    Implementing Routing Slip with MassTransit


    This article introduces MassTransit.Courier, a new project that implements the routing slip pattern on top of MassTransit, a free, open-source, and lightweight message bus for the .NET platform.

    Introduction

    When sagas were originally conceived in MassTransit, they were inspired by an excerpt from Chapter 5 in the book SOA Patterns by Arnon Rotem-Gal Oz. Over the past few months, the community has argued discussed how the use of the word saga has led to confusion and how early implementations included in both NServiceBus and MassTransit do not actually align with the original paper published in 1987 by Princeton University and written by Hector Garcia-Molina and Kenneth Salem in which the term was coined.

    With MassTransit Courier, the intent is to provide a mechanism for creating and executing distributed transactions with fault compensation that can be used alongside the existing MassTransit sagas for monitoring and recovery.

    Background

    Over the past few years building distributed systems using MassTransit, a pattern I consistently see repeated is the orchestration of multiple services into a single business transaction. Using the existing MassTransit saga support to manage the state of the transaction, the actual processing steps are created as autonomous services that are invoked by the saga using command messages. Command completion is observed using an event or response message by the saga, at which point the next processing step is invoked. When the saga has invoked the final service the business transaction is complete.

    As the processing required within a business transaction changes with evolving business requirements, a new version of the saga is required that includes the newly created processing steps. Knowledge of the new services becomes part of the saga, as well as the logic to identify which services need to be invoked for each transaction. The saga becomes rich with knowledge, and with great knowledge comes great responsibility (after all, knowledge is power right?). Now, instead of only orchestrating the transaction, the saga is responsible for identifying which services to invoke based on the content of the transaction. Another concern was the level of database contention on the saga tables. With every service invocation being initiated by the saga, combined with the saga observing service events and responses, the saga tables gets very busy.

    Beyond the complexity of increasing saga responsibilities, more recently the business has requested the ability to selectively route a message through a series of services based on the content of the message. In addition to being able to dynamically route messages, the business needs to allow new services to be created and added to the inventory of available services. And this should be possible without modifying a central control point that dispatches messages to each service.

    Like most things in computer science, this problem has already been solved.

    The Routing Slip Pattern

    A routing slip specifies a sequence of processing steps for a message. As each processing step completes, the routing slip is forwarded to the next step. When all the processing steps have completed, the routing slip is complete.

    A key advantage to using a routing slip is it allows the processing steps to vary for each message. Depending upon the content of the message, the routing slip creator can selectively add processing steps to the routing slip. This dynamic behavior is in contrast to a more explicit behavior defined by a state machine or sequential workflow that is statically defined (either through the use of code, a DSL, or something like Windows Workflow).

    MassTransit Courier

    MassTransit Courier is a framework that implements the routing slip pattern. Leveraging a durable messaging transport and the advanced saga features of MassTransit, MassTransit Courier provides a powerful set of components to simplify the use of routing slips in distributed applications. Combining the routing slip pattern with a state machine such as Automatonymous results in a reliable, recoverable, and supportable approach for coordinating and monitoring message processing across multiple services.

    In addition to the basic routing slip pattern, MassTransit Courier also supports compensations which allow processing steps to store process-related data so that reversible operations can be undone, using either a traditional rollback mechanism or by applying an offsetting operation. For example, a processing step that holds a seat for a patron could release the held seat when compensated.

    MassTransit Courier is free software and is covered by the same open source license as MassTransit (Apache 2.0). You can install MassTransit.Courier into your existing solution using NuGet.

    Activities

    In MassTransit Courier, an Activity refers to a processing step that can be added to a routing slip. To create an activity, create a class that implements the Activity interface.

    public class DownloadImageActivity :
        Activity<DownloadImageArguments, DownloadImageLog>
    {
    }
    

    The Activity interface is generic with two arguments. The first argument specifies the activity’s input type and the second argument specifies the activity’s log type. In the example shown above, DownloadImageArguments is the input type and DownloadImageLog is the log type. Both arguments must be interface types so that the implementations can be dynamically created.

    Implementing an Activity

    An activity must implement two interface methods, Execute and Compensate. The Execute method is called while the routing slip is executing activities and the Compensate method is called when a routing slip faults and needs to be compensated.

    When the Execute method is called, an execution argument is passed containing the activity arguments, the routing slip TrackingNumber, and methods to mark the activity as completed or faulted. The actual routing slip message, as well as any details of the underlying infrastructure, are excluded from the execution argument to prevent coupling between the activity and the implementation. An example Execute method is shown below.

    ExecutionResult Execute(Execution<DownloadImageArguments> execution)
    {
        DownloadImageArguments args = execution.Arguments;
        string imageSavePath = Path.Combine(args.WorkPath, 
            execution.TrackingNumber.ToString());
    
        _httpClient.GetAndSave(args.ImageUri, imageSavePath);
    
        return execution.Completed(new DownloadImageLogImpl(imageSavePath));
    }
    

    Once activity processing is complete, the activity returns an ExecutionResult to the host. If the activity executes successfully, the activity can elect to store compensation data in an activity log which is passed to the Completed method on the execution argument. If the activity chooses not to store any compensation data, the activity log argument is not required. In addition to compensation data, the activity can add or modify variables stored in the routing slip for use by subsequent activities.

    In the example above, the activity creates an instance of a private class that implements the DownloadImageLog interface and stores the log information in the object properties. The object is then passed to the Completed method for storage in the routing slip before sending the routing slip to the next activity.

    When an activity fails, the Compensate method is called for previously executed activities in the routing slip that stored compensation data. If an activity does not store any compensation data, the Compensate method is never called. The compensation method for the example above is shown below.

    CompensationResult Compensate(Compensation<DownloadImageLog> compensation)
    {
        DownloadImageLog log = compensation.Log;
        File.Delete(log.ImageSavePath);
    
        return compensation.Compensated();
    }
    

    Using the activity log data, the activity compensates by removing the downloaded image from the work directory. Once the activity has compensated the previous execution, it returns a CompensationResult by calling the Compensated method. If the compensating actions could not be performed (either via logic or an exception) and the inability to compensate results in a failure state, the Failed method can be used instead, optionally specifying an Exception.

    Building a Routing Slip

    Developers are discouraged from directly implementing the RoutingSlip message type and should instead use a RoutingSlipBuilder to create a routing slip. The RoutingSlipBuilder encapsulates the creation of the routing slip and includes methods to add activities, activity logs, and variables to the routing slip. For example, to create a routing slip with two activities and an additional variable, a developer would write:

    var builder = new RoutingSlipBuilder(NewId.NextGuid());
    builder.AddActivity(“DownloadImage”, “rabbitmq://localhost/execute_downloadimage”, new
        {
            ImageUri = new Uri(“http://images.google.com/someImage.jpg”)
        });
    builder.AddActivity(“FilterImage”, “rabbitmq://localhost/execute_filterimage”);
    builder.AddVariable(“WorkPath”, “\\dfs\work”);
    
    var routingSlip = builder.Build();
    

    Each activity requires a name for display purposes and a URI specifying the execution address. The execution address is where the routing slip should be sent to execute the activity. For each activity, arguments can be specified that are stored and presented to the activity via the activity arguments interface type specify by the first argument of the Activity interface. The activities added to the routing slip are combined into an Itinerary, which is the list of activities to be executed, and stored in the routing slip.

    Managing the inventory of available activities, as well as their names and execution addresses, is the responsibility of the application and is not part of the MassTransit Courier. Since activities are application specific, and the business logic to determine which activities to execute and in what order is part of the application domain, the details are left to the application developer.

    Once built, the routing slip is executed, which sends it to the first activity’s execute URI. To make it easy and to ensure that source information is included, an extension method to IServiceBus is available, the usage of which is shown below.

    bus.Execute(routingSlip); // pretty exciting, eh?
    

    It should be pointed out that if the URI for the first activity is invalid or cannot be reached, an exception will be thrown by the Execute method.

    Hosting Activities in MassTransit

    To host an activity in a MassTransit service bus instance, the configuration namespace has been extended to include two additional subscription methods (thanks to the power of extension methods and a flexible configuration syntax, no changes to MassTransit were required). Shown below is the configuration used to host an activity.

    var executeUri = new Uri(“rabbitmq://localhost/execute_example”);
    var compensateUri = new Uri(“rabbitmq://localhost/compensate_example”);
    
    IServiceBus compensateBus = ServiceBusFactory.New(x =>
        {
            x.ReceiveFrom(compensateUri);
            x.Subscribe(s => s.CompensateActivityHost<ExampleActivity, ExampleLog>(
                _ => new ExampleActivity());
        });
    
    IServiceBus executeBus = ServiceBusFactory.New(x =>
        {
            x.ReceiveFrom(executeUri);
            x.Subscribe(s => s.ExecuteActivityHost<ExampleActivity, ExampleArguments>(
                compensateUri,
                 _ => new ExampleActivity());
        });
    

    In the above example two service bus instances are created, each with their own input queue. For execution, the routing slip is sent to the execution URI, and for compensation the routing slip is sent to the compensation URI. The actual URIs used are up to the application developer, the example merely shows the recommended approach so that the two addresses are easily distinguished. The URIs must be different!

    Monitoring Routing Slips

    During routing slip execution, events are published when the routing slip completes or faults. Every event message includes the TrackingNumber as well as a Timestamp (in UTC, of course) indicating when the event occurred:

    • RoutingSlipCompleted
    • RoutingSlipFaulted
    • RoutingSlipCompensationFailed

    Additional events are published for each activity, including:

    • RoutingSlipActivityCompleted
    • RoutingSlipActivityFaulted
    • RoutingSlipActivityCompensated
    • RoutingSlipActivityCompensationFailed

    By observing these events, an application can monitor and track the state of a routing slip. To maintain the current state, an Automatonymous state machine could be created. To maintain history, events could be stored in a database and then queried using the TrackingNumber of the RoutingSlip.

    Wrapping Up

    MassTransit Courier is a great way to compose dynamic processing steps into a routing slip that can be executed, monitored, and compensated in the event of a fault. When used in combination with the existing saga features of MassTransit, it is possible to coordinate a distributed set of services into a reliable and supportable system.

    IDisposable, Done Right


    IDisposable is a standard interface in the .NET framework that facilitates the deterministic release of unmanaged resources. Since the Common Language Runtime (CLR) uses Garbage Collection (GC) to manage the lifecycle of objects created on the heap, it is not possible to control the release and recovery of heap objects. While there are methods to force the GC to collect unreferenced objects, it is not guaranteed to clear all objects, and it is highly inefficient for an application to force garbage collection as part of the service control flow.

    Implementing IDisposable

    Despite IDisposable having only a single method named Dispose to implement, it is commonly implemented incorrectly. After reading this blog post it should be clear how and when to implement IDisposable, as well as how to ensure that resources are properly disposed when bad things happen (also knows as exceptions).

    First, the IDisposable interface definition:

    public interface IDisposable
    {
        void Dispose();
    }
    

    Next, the proper way to implement IDisposable every single time it is implemented:

    public class DisposableClass :
        IDisposable
    {
        bool _disposed;
    
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }
    
        ~DisposableClass()
        {
            Dispose(false);
        }
    
        protected virtual void Dispose(bool disposing)
        {
            if (_disposed)
                return;
    
            if (disposing)
            {
                // free other managed objects that implement
                // IDisposable only
            }
    
            // release any unmanaged objects
            // set the object references to null
    
            _disposed = true;
        }
    }
    

    The pattern above for implementing IDisposable ensures that all references are properly disposed and released. Using the finalizer, along with the associated dispose methods, will ensure that in every case references will be properly released. There are some subtle things going on in the code, however, as described below.

    Dispose()

    The implementation of the Dispose method calls the Dispose(bool disposing) method, passing true, which indicates that the object is being disposed. This method is never automatically called by the CLR, it is only called explicitly by the owner of the object (which in some cases may be another framework, such as ASP.NET or MassTransit, or an object container, such as Autofac or StructureMap).

    ~DisposableClass

    Immediately before the GC releases an object instance, it calls the object’s finalizer. Since an object’s finalizer is only called by the GC, and the GC only calls an objects finalizer when there are no other references to the object, it is clear that the Dispose method will never be called on the object. In this case, the object should release any managed or unmanaged references, allowing the GC to release those objects as well. Since the same object references are being released as those that are released when Dispose is called, this method calls the Dispose(bool disposing) method passing false, indicating that the references objects Dispose method should not be called.

    Dispose(bool)

    All object references and unmanaged resources are released in this method. However, the argument indicates whether or not the Dispose method should be called on any managed object references. If the argument is false, the references to managed objects that implement IDisposable should be set to null, however, the Dispose method on those objects should not be called. The reason being that the owning objects Dispose method was not called (Dispose(false) is only called by the finalizer, and not the Dispose method.

    Overriding Dispose

    In the example above, the Dispose(bool disposing) method is declared as protected virtual. This is to allow classes that inherit from this class to participate in the disposable of the object without impacting the behavior of the base class. In this case, a subclass should override the method as shown below.

    public class SubDisposableClass : 
        DisposableClass
    {
        private bool _disposed;
    
        // a finalizer is not necessary, as it is inherited from
        // the base class
    
        protected override void Dispose(bool disposing)
        {
            if (!_disposed)
            {
                if (disposing)
                {
                    // free other managed objects that implement
                    // IDisposable only
                }
    
                // release any unmanaged objects
                // set object references to null
    
                _disposed = true;
            }
    
            base.Dispose(disposing);
        }
    }
    

    The subclass overrides the method, releasing (and optionally disposing) object references first, and then calling the base method. This ensures that objects are released in the proper order (at least between the subclass and the base class, the proper order of releasing/disposing objects within the subclass itself is the responsibility of the developer).

    Exceptions, Happen

    Prior to .NET 2.0, if an object’s finalizer threw an exception, that exception was swallowed by the runtime. Since .NET 2.0, however, throwing an exception from a finalizer will cause the application to crash, and that’s bad. Therefore, it is important that a finalizer never throw an exception.

    But what about the Dispose method, should it be allowed to throw an exception? The short answer, is no. Except when the answer is yes, which is almost never. Therefore, it is important to wrap any areas of the Dispose(bool disposing) method that could throw an exception in a try/catch block as shown below.

    protected virtual void Dispose(bool disposing)
    {
        if (_disposed)
            return;
    
        if (disposing)
        {
            _session.Dispose();
        }
    
        try
        {
            _channelFactory.Close();
        }
        catch (Exception ex)
        {
            _log.Warn(ex);
    
            try
            {
                _channelFactory.Abort();
            }
            catch (Exception cex)
            {
                _log.Warn(cex);
            }
        }
    
        _session = null;
        _channelFactory = null;
    
        _disposed = true;
    }
    

    In the example, **session is a reference to an NHibernate ISession and </strong>channelFactory is a reference to a WCF IChannelFactory. An NHibernate ISession implements IDisposable, so the owner must call Dispose on it when the object is no longer needed. In the case of the IChannelFactory reference, there is no Dispose method, however, the object must be closed (and subsequently aborted in case of an exception). Because either of these methods can throw an exception, it is important to catch the exception (and, as shown above, log it for troubleshooting or perhaps just ignore it) so that it doesn’t cause either the Dispose method or the object’s finalizer to propagate the exception.</p>

    Constructor Exceptions

    On a related topic, when an object’s constructor throws an exception, the runtime considers the object to have never existed. And while the GC will release any object allocated by the constructor, it will not call the Dispose method on any disposable objects. Therefore, if an object is creating references to managed objects in the constructor (or even more importantly, unmanaged objects that consume limited system resources, such as file handles, socket handles, or threads), it should be sure to dispose of those resources in the case of a constructor exception by using a try/catch block.

    While one might be tempted to call _Dispose_ from the constructor to handle an exception, don’t do it. When the constructor throws an exception, technically the object does not exist. Calling methods, particularly virtual methods, should be avoided.

    Of course, in the case of managed objects such as an ISession, it is better to take the object as a dependency on the constructor and have it passed into the object by an object factory (such as a dependency injection container, such as Autofac) and let the object factory manage the lifecycle of the dependency.

    Container Lifecycle Management

    Dependency injection containers are powerful tools, handling object creation and lifecycle management on behalf of the developer. However, it is important to have a clear understanding of how to use the container in the context of an application framework.

    For example, ASP.NET has a request lifecycle for every HTTP request received by the server. To support this lifecycle, containers typically have integration libraries that hook into the framework to ensure proper object disposal. For instance, Autofac has a number of integration libraries for ASP.NET, ASP.NET MVC, ASP.NET Web API, and various other application frameworks. These libraries, when configured into the stack as HttpModules, ensure that objects are properly disposed when each request completes.

    Conclusion

    The reason for IDisposable is deterministic release of references by an object (something that used to happen manually with unmanaged languages by calling delete on an object). Implementing it both properly and consistently helps create applications that have predictable resource usage and more easy to troubleshoot. Therefore, consider the example above as a reference point for how objects should be disposed.

    References:

    Autofac Web Integration

    Microsoft Documentation

    Bonus:

    Resharper Template

    Separating Concerns – Part 1: Libraries


    Introduction

    In large applications, particularly in enterprise applications, separation of concerns is critical to ease maintainability. Without proper separation of concerns, applications become too large and too complex, which in turn makes maintenance and enhancement extremely difficult. Separating application concerns leads to high cohesion, allowing developers to better understand code behavior which leads to easier code maintenance.

    History

    In the previous decade, architects designed applications using an n-tier approach, separating the application into horizontal layers such as user interface, business logic, and data access. This approach is incomplete, however, as it fails to address partitioning applications vertically. Unrelated concerns are commingled, resulting in a confusing architecture which lacks clearly defined boundaries and has low cohesion.

    The other problem with an n-tier architecture is how it is organized from top to bottom, with the topmost layer being the presentation layer or user interface, and the bottommost layer representing the persistence layer or database. Instead of thinking of the architecture as horizontal layers, think of them as rings, as described by the Onion Architecture described by Jeffrey Palermo. (While Jeffrey proposed the pattern name, the architectural patterns have been defined previously by others.)

    Separating Concerns

    Given that a separation of concerns and increasing cohesion are the goals, there are several mechanisms towards achieving them. The solutions that follow include the use of libraries, services, and frameworks as ways to reach these goals.

    The Library

    A library is a set of functions used to build software applications. Rather than requiring an application to be a single project containing every source file, most programming languages provide a means to segregate functionality into libraries. While the facility name varies, a partial list of which includes package, module, gem, jar, and assembly, the result is enabling developers to separate functions physically from the main application project, improving both cohesion and maintainability.

    Core, the new Manager

    A library should not be a collection of unrelated functions, it should contain related functions so that it is highly cohesive. An application developer should be able to select a library for use based on its name and purpose, rather than having to pour through the source code to find the function or functions needed. A library should have a descriptive name and contain a cohesive set of functions towards a singular purpose or responsibility.

    Creating a library named Core containing a large set of unrelated functions is separation of the sake of separation, and that library should not be treated as a library but as part of the application — it should not be reused by other applications.

    Coupling (aka, the Path of Pain)

    When an industry analyst shares their observations about code reuse in the enterprise, the findings indicate that actual code reuse is very low. A main reason that code reuse is so low is tight coupling. Coupling refers to how two libraries (or functions) rely on each other. When a library relies upon another library, the library relied on is referred to as a dependency. When an application relies on a library, it implicitly relies on the library’s dependencies as well. In many larger applications, this can lead straight to dependency hell.

    Since tight coupling can lead to serious maintenance issues during an application’s lifecycle, limiting dependencies should be first and foremost in application and library design. If a function is to be moved from an application to a library, and that function must bring with it a dependency that was not previously required by the target library, the cost of adding the new dependency to the library must be considered. Too often, particularly in the enterprise where code is only reviewed internally by a single development team, poor choices are made when creating libraries. Functions are routinely moved out of the main project and placed into arbitrary libraries with little thought given to the additional dependencies of the library.

    An Example

    As an example, a web application has a set of functions for validating email addresses. The simplest validation methods may only depend upon regular expression functions, which are part of every modern language runtime used today. A more complete validation of an email address may check that the domain is actually valid and has a properly registered MX record in DNS. However, validating the domain involves sending a request to a service and waiting for the response indicating a valid domain before the email address is determined to be valid.

    There are many things wrong in this example. First, the email validation function has a dependency on a domain validation function. Due to the fact that the set of valid domains is continuously changing, the domain validation function itself has a dependency on a domain name service. Of course, the domain name service depends upon a network domain name service, which may subsequently depend upon an internet service as well. By calling one library function, the application has managed to send a request to another machine and block a thread waiting for a response.

    In the case of an error, the disposition of the email address is then unknown. Is it a valid email address that could not be validated due to a network error? Or is it a valid email address but flagged as invalid because the domain name could not be validated due to an internal DNS server not allowing external domains to be returned?

    The coupling in the email validation library is clearly a problem, but what happens as the business requirements evolve over the life of the application? Consider the situation where new accounts are being created by spammers from other countries. To combat the spam accounts, email addresses must now be validated to ensure that the IP address originates from within the United States. The email validation function now has a new dependency, a geolocation service that returns the physical address of a domain. However, the service requires the use of separate endpoints for testing and production. The email address validation function is now dependent upon two services and configuration data to determine which service endpoint to use.

    At this point, it is obvious that the complexity of validating an email address is not something that can be accomplished in a library function.

    This article will continue with Part 2 on services.

    Interview on .NET Rocks Episode 798 Published


    Last weekend, the guys from .NET Rocks! interviewed me for the show, and the show is now available from the usual outlets (iTunes, etc.). You can read the summary on their web site, as well as listen to the show! It was a fun discussion that covered a variety of topics, including MassTransit, Topshelf, Magnum, and other open source themes.

    Check it out!

subscribe via RSS