Event Aggregator Using the Magnum Pipeline

In the past few weeks, both Udi Dahan and Jeremy D. Miller have posted on events. Udi posted about domain events, while Jeremy posted about his use of the event aggregator pattern in StoryTeller. In each case, events are represented as messages and each message is a class in C#. And in each post, a small publish/subscribe system is described that allowed objects (be it a domain object, domain service, or a controller) to subscribe to messages. Other objects could then use that same system to publish events to the subscribed objects.

Now while you could use MassTransit out of the box to handle this type of event aggregation, it is a bit heavy. The in-memory message transport serializes the message, which makes it impossible to pass a continuation or an object reference as part of an event. There is also a very service-oriented thread model where each consumer runs on a different thread making synchronization an important concern for unit testing. While it would work, it is not always the shiniest hammer in the toolbox for UI-based application.

To address this, one of the things I’ve been adding to Magnum over the past few weeks is a new version of the pipeline that handles message distribution in MassTransit. In this implementation, I wanted a way to implement the event aggregator pattern with the same flexibility that I get with MassTransit but designed for an in-process mode of execution. At the same time, I wanted to make sure that I could scale this solution via adapters to extend events to MassTransit for publishing out-of-process.

Note, I use the word event and message interchangeably in this post.

First, I wanted it to be able to handle any object without any constraints on the type. To this end, I came up with a very narrow API that only deals with the publishing of a message.

public interface Pipe
{
	void Send(T message) where T : class;
}

The Send method is fairly obvious, it is used to send a message to any consumers that are subscribed to the message. With this implementation, consumers that are subscribed to any type to which the message can be assigned will also get the message. Consider the following class structure:

public class CustomerChanged
{
	public Customer Customer { get; set; }
}

public class CustomerRatingDowngraded : 
	CustomerChanged
{
}

A consumer that subscribed to the CustomerChanged type would receive the message if a CustomerRatingDowngraded message was published. It also works for interfaces, as long as the message object being published supports the interface.

An obvious omission from this API is any method of subscribing consumers to the pipeline. To subscribe to the pipeline, an extension method on the Pipe interface creates a new subscription scope. A subscription scope, represented by the ISubscriptionScope interface, makes changes to the pipeline resulting in the creation of a new pipeline. A series of visitors are used to create a new version of the pipeline with the consumers added, along with another visitor to remove the consumers when they unsubscribe. ISubscriptionScope implements IDisposable so to unsubscribe your application can just dispose of the object.

It is interesting to note that much like the Expression class in .NET, pipelines are immutable. Since pipelines cannot be changed, the need to lock parts of the pipeline during message distribution is removed. By removing the need for locking to ensure safe operation in a concurrent environment, performance improves and blocking is eliminated. At the same time, consumers can subscribe and unsubscribe from the pipeline as needed without disrupting the system.

public void Start()
{
	// this creates an empty pipeline that accepts any object
	_eventAggregator=PipeSegment.Input(PipeSegment.End());

	_scope=_eventAggregator.NewSubscriptionScope();
	_scope.Subscribe(message=>Trace.WriteLine("Customer changed: "+message.CustomerName));
}

In this example, pipe and scope would likely be member variables that would be released when the containing object is stopped or disposed. Multiple subscriptions can be added to a single scope, each one modifying the pipeline as it is added.

When I discuss event-based programming, I often mention the need for visualization tools in order to ensure the system is performing as expected. In the example above, I could use the TracePipeVisitor to verify that the consumer was indeed subscribed to the pipeline (by calling new TracePipeVisitor() .Trace(_eventAggregator) and viewing the results in the output window).

Input: 
RecipientList: 
     Filter: Allow Magnum.Specs.Pipeline.Messages.CustomerChanged
     RecipientList: 
          MessageConsumer: 

As consumers are added, the pipeline is built up using a series of PipeSegment classes. The Input segment is the initial entry point to the pipeline and by having the responsibility is the only segment that actually changes in the pipeline. The RecipientList is a one-to-many switch that delivers incoming messages to each consumer. The Filter segment only passes a specific type through the filter, preventing unwanted messages from receiving the consumer. The MessageConsumer actually invokes the method that was subscribed to the message.

In the above example, the message consumer was accepted using the MessageConsumer delegate type, which is analogous to Action with T being the message type. Another way to subscribe is to implement the IConsume method as shown below.

public class ListViewController :
	IConsume
{
	public ListViewController(ListView customerListView)
	{
		_customerListView = customerListView;
	}
	public void Consume(CustomerChanged message)
	{
		_customerListView.DoSomeUpdate(message.Customer);
	}
}

A class can implement the IConsume method to indicate that it is interested in messages of type T. In this case, the CustomerChanged message is of particular interest as it is used to update the user interface in response to a customer change event. The instance of the controller can be subscribed to the pipeline by calling the Subscribe method passing the object reference itself.

public void BootstrapUserInterfaceControllers()
{
	_customerListViewController = new ListViewController(customerListView);

	_scope=_eventAggregator.NewSubscriptionScope();
	_scope.Subscribe(_customerListViewController);
}

This is the first in a series of posts about the pipeline in Magnum. As I add the remaining functionality, including asynchronous message consumers, aggregate consumers, and automatic binding to the Magnum StateMachine (similar to how sagas are done using MassTransit), I’ll post about how they are used. I encourage you to take a look at the code and particularly the unit tests to see the different ways the pipeline can be used.

Related Articles:

Post Footer automatically generated by Add Post Footer Plugin for wordpress.

About Chris Patterson

Chris is a senior architect for RelayHealth, the connectivity business of the nation's leading healthcare services company. There he is responsible for the architecture and development of applications and services that accelerate care delivery by connecting patients, providers, pharmacies, and financial institutions. Previously, he led the development of a new content delivery platform for TV Guide, enabling the launch of a new entertainment network seen on thousands of cable television systems. In his spare time, Chris is an active open-source developer and a primary contributor to MassTransit, a distributed application framework for .NET. In 2009, he was awarded the Most Valuable Professional award by Microsoft for his technical community contributions.
This entry was posted in .net. Bookmark the permalink. Follow any comments here with the RSS feed for this post.
  • JBland

    How does/can this integrate with IoC. In particular, im interested in auto-registration of subscribers (some of which can have varying lifetimes.)

  • http://www.lostechies.com/members/phatboyg/default.aspx Chris Patterson

    The next post will cover how that works in a container-agnostic fashion, assuming your container has the ability to enumerate all the registered types that implement an interface. I’ll also provide an example of doing it without a container by just enumerating the types in the assembly.

  • http://amandabynesnudiesny.xanga.com lilikindsli

    pTbF6i I want to say – thank you for this!