State Machine for Managing Sagas


MassTransit supports sagas, which are long-lived transactions consisting of multiple events. The saga support makes it easy to orchestrate the events into a process, but it doesn’t do much to help with state management. Since state management is fairly common, I felt it necessary to add some support for a state machine.

In MassTransit, a saga is defined by creating a class and attaching some interfaces for the messages that will be consumed. The messages are the events, and the class is the saga. An example saga is shown below.

 

public class DrinkPreparationSaga :
        InitiatedBy < NewOrderMessage >,
        Orchestrates < PaymentCompleteMessage >,
        ISaga
    {
        public void Consume(NewOrderMessage message)
        {
        }
        public void Consume(PaymentCompleteMessage message)
        {
        }
    }

 

I’ve omitted the code for each consumer to keep it short, but the business logic defined would handle the fact that once the drink is prepared, it should not be served until the PaymentCompleteMessage has been received. Storing this state is an application concern.

While I was at QCon San Francisco, I attended a tutorial session on DSLs. It was here that I got the idea to somehow make a DSL for defining the behavior of a state machine that could be used with sagas in MassTransit. It’s taken a couple of months, but I think I’ve managed to put something together that simplifies state management in sagas. With the addition of a new state machine in Magnum, along with persistence support for NHibernate, it is now easy to manage this state automatically.

To demonstrate this, below is the state machine for the above process.

 

public class DrinkPreprationStateMachine :
		StateMachine < DrinkPreprationStateMachine >,
		ISaga,
		InitiatedBy < NewOrderMessage >,
		Orchestrates < PaymentCompleteMessage >,
		Orchestrates < DrinkPreparedMessage >
	{
		static DrinkPreprationStateMachine()
		{
			Define(() =>
				{
					Initially(
						When(NewOrder)
							.Then(saga =>
								{
									// start preparing the drink
								})
							.TransitionTo(PreparingDrink));
					During(PreparingDrink,
						When(DrinkPrepared)
					       	.TransitionTo(WaitingForPayment),
						When(PaymentComplete)
					       	.TransitionTo(WaitingForDrink));
					During(WaitingForPayment,
					    When(PaymentComplete)
					       	.Then(saga =>
					       		{
					       			// publish drink ready message
					       		})
					       	.Complete());
					During(WaitingForDrink,
						When(DrinkPrepared)
					       	.Then(saga =>
					       		{
					       			// publish drink ready message
					       		})
					       	.Complete());
				});
		}
		// event and state definitions not shown, but are simple properties
	}

 

As shown above, all the logic of states, events, and transitions is wrapped into a clean fluent interface. The During() blocks define what is to be done when events are received during a particular state. The When() blocks define the behavior when the specified events occur. All state transitions are explicit, allowing them to be captured as part of the state machine.

There are two types of events. BasicEvent supports an event without any accompanying data. The DataEvent < V > allows data to be associated with an event. In MassTransit, each DataEvent would have a matching Consume() handler. For example, Event < NewOrder > would have a public void Consume(NewOrder message) that would call RaiseEvent(NewOrderEvent, message) to pass the event to the state machine along with the message.

The state machine also supports inspection, allowing the definition to be output for verification that the intent was properly conveyed by the interface. This not only makes it possible to verify the flow between states, but also could allow the creation of a graph to display the states, events, and transitions in a visual manner. The provided state machine inspector currently only outputs to the trace window, but could easily be enhanced by somebody with some skills.

Also, not shown above, is the ability to specify an expression using the .And() method to evaluate the data associated with an event to determine if that event is handled by that state event action. This expression is kept as an expression, allowing the details to be output using the inspector as well.

Currently, the saga must still implement the Consume() method for each message and call RaiseEvent() on the state machine to pass the message and trigger the event. I plan to add some new message sinks to the message pipeline to make those methods unnecessary, mapping the messages directly into the event handlers within the state machine. This isn’t done yet, but it is planned.

The StateMachine also provides an IUserType implementation for NHibernate in the Magnum.Infrastructure assembly. This makes it easy to persist the state using NHibernate, storing the current state as a string. This is just the default implementation, any other could be built if your needs are different.

I originally presented this syntax during the Virtual ALT.NET meeting last week. It was a last minute presentation, so I wasn’t sure I covered all the details. Hopefully this will help provide some guidance on how it is used, along with plans for the future.

MassTransit Turns One Year Old, Celebrations Held Around the World