MediatR Pipeline Examples

A while ago, I blogged about using MediatR to build a processing pipeline for requests in the form of commands and queries in your application. MediatR is a library I built (well, extracted from client projects) to help organize my architecture into a CQRS architecture with distinct messages and handlers for every request in your system.

So when processing requests gets more complicated, we often rely on a mediator pipeline to provide a means for these extra behaviors. It doesn’t always show up – I’ll start without one before deciding to add it. I’ve also not built it in directly to MediatR  – because frankly, it’s hard and there are existing tools to do so with modern DI containers. First, let’s look at the simplest pipeline that could possible work:

public class MediatorPipeline<TRequest, TResponse> 
  : IRequestHandler<TRequest, TResponse>
  where TRequest : IRequest<TResponse>
{
    private readonly IRequestHandler<TRequest, TResponse> _inner;

    public MediatorPipeline(IRequestHandler<TRequest, TResponse> inner)
    {
        _inner = inner;
    }

    public TResponse Handle(TRequest message)
    {
        return _inner.Handle(message);
    }
 }

Nothing exciting here, it just calls the inner handler, the real handler. But we have a baseline that we can layer on additional behaviors.

Let’s get something more interesting going!

Contextual Logging and Metrics

Serilog has an interesting feature where it lets you define contexts for logging blocks. With a pipeline, this becomes trivial to add to our application:

public class MediatorPipeline<TRequest, TResponse> 
  : IRequestHandler<TRequest, TResponse>
  where TRequest : IRequest<TResponse>
{
    private readonly IRequestHandler<TRequest, TResponse> _inner;

    public MediatorPipeline(IRequestHandler<TRequest, TResponse> inner)
    {
        _inner = inner;
    }

    public TResponse Handle(TRequest message)
    {
        using (LogContext.PushProperty(LogConstants.MediatRRequestType, typeof(TRequest).FullName))
        {
            return _inner.Handle(message);
        }
    }
 }

In our logs, we’ll now see a logging block right before we enter our handler, and right after we exit. We can do a bit more, what about metrics? Also trivial to add:

using (LogContext.PushProperty(LogConstants.MediatRRequestType, requestType))
using (Metrics.Time(Timers.MediatRRequest))
{
    return _inner.Handle(request);
}

That Time class is just a simple wrapper around the .NET Timer classes, with some configuration checking etc. Those are the easy ones, what about something more interesting?

Validation and Authorization

Often times, we have to share handlers between different applications, so it’s important to have an agnostic means of cross-cutting concerns. Rather than bury our concerns in framework or application-specific extensions (like, say, an action filter), we can instead embed this behavior in our pipeline. First, with validation, we can use a tool like Fluent Validation with validator handlers for a specific type:

public interface IMessageValidator<in T>
{
    IEnumerable<ValidationFailure> Validate(T message);
}

What’s interesting here is that our message validator is contravariant, meaning I can have a validator of a base type work for messages of a derived type. That means we can declare common validators for base types or interfaces that your message inherits/implements. In practice this lets me share common validation amongst multiple messages simply by implementing an interface.

Inside my pipeline, I can execute my validation my taking a dependency on the validators for my message:

public class MediatorPipeline<TRequest, TResponse> 
  : IRequestHandler<TRequest, TResponse>
  where TRequest : IRequest<TResponse>
{
    private readonly IRequestHandler<TRequest, TResponse> _inner;
    private readonly IEnumearble<IMessageValidator<TRequest>> _validators;

    public MediatorPipeline(IRequestHandler<TRequest, TResponse> inner,
        IEnumerable<IMessageValidator<TRequest>> validators)
    {
        _inner = inner;
        _validators = validators;
    }

    public TResponse Handle(TRequest message)
    {
        using (LogContext.PushProperty(LogConstants.MediatRRequestType, requestType))
        using (Metrics.Time(Timers.MediatRRequest))
        {
            var failuers = _validators
                .Select(v => v.Validate(message))
                .SelectMany(result => result.Errors)
                .Where(f => f != null)
                .ToList();
            if (failures.Any())
                throw new ValidationException(failures);
            
            return _inner.Handle(request);
        }
    }
 }

And bundle up all my errors into a potential exception thrown. The downside of this approach is I’m using exceptions to provide control flow, so if this is a problem, I can wrap up my responses into some sort of Result object that contains potential validation failures. In practice it seems fine for the applications we build.

Again, my calling code INTO my handler (the Mediator) has no knowledge of this new behaviors, nor does my handler. I go to one spot to augment and extend behaviors across my entire system. Keep in mind, however, I still place my validators beside my message, handler, view etc. using feature folders.

Authorization is similar, where I define an authorizer of a message:

public interface IMessageAuthorizer {
  void Evaluate<TRequest>(TRequest request) where TRequest : class
}

Then in my pipeline, check authorization:

public class MediatorPipeline<TRequest, TResponse> 
  : IRequestHandler<TRequest, TResponse>
  where TRequest : IRequest<TResponse>
{
    private readonly IRequestHandler<TRequest, TResponse> _inner;
    private readonly IEnumearble<IMessageValidator<TRequest>> _validators;
    private readonly IMessageAuthorizer _authorizer;

    public MediatorPipeline(IRequestHandler<TRequest, TResponse> inner,
        IEnumerable<IMessageValidator<TRequest>> validator,
        IMessageAuthorizor authorizer
        )
    {
        _inner = inner;
        _validators = validators;
        _authorizer = authorizer;
    }

    public TResponse Handle(TRequest message)
    {
        using (LogContext.PushProperty(LogConstants.MediatRRequestType, requestType))
        using (Metrics.Time(Timers.MediatRRequest))
        {
            _securityHandler.Evaluate(message);
            
            var failures = _validators
                .Select(v => v.Validate(message))
                .SelectMany(result => result.Errors)
                .Where(f => f != null)
                .ToList();
            if (failures.Any())
                throw new ValidationException(failures);
            
            return _inner.Handle(request);
        }
    }
 }

The actual implementation of the authorizer will go through a series of security rules, find matching rules, and evaluate them against my request. Some examples of security rules might be:

  • Do any of your roles have permission?
  • Are you part of the ownership team of this resource?
  • Are you assigned to a special group that this resource is associated with?
  • Do you have the correct training to perform this action?
  • Are you in the correct geographic location and/or citizenship?

Things can get pretty complicated, but again, all encapsulated for me inside my pipeline.

Finally, what about potential augmentations or reactions to a request?

Pre/post processing

In addition to some specific processing needs, like logging, metrics, authorization, and validation, there are things I can’t predict one message or group of messages might need. For those, I can build some generic extension points:

public interface IPreRequestHandler<in TRequest>
{
    void Handle(TRequest);
}
public interface IPostRequestHandler<in TRequest, in TResponse>
{
    void Handle(TRequest request, TResponse response);
}
public interface IResponseHandler<in TResponse>
{
    void Handle(TResponse response);
}

Next I update my pipeline to include calls to these extensions (if they exist):

public class MediatorPipeline<TRequest, TResponse> 
  : IRequestHandler<TRequest, TResponse>
  where TRequest : IRequest<TResponse>
{
    private readonly IRequestHandler<TRequest, TResponse> _inner;
    private readonly IEnumearble<IMessageValidator<TRequest>> _validators;
    private readonly IMessageAuthorizer _authorizer;
    private readonly IEnumerable<IPreRequestProcessor<TRequest>> _preProcessors;
    private readonly IEnumerable<IPostRequestProcessor<TRequest, TResponse>> _postProcessors;
    private readonly IEnumerable<IResponseProcessor<TResponse>> _responseProcessors;

    public MediatorPipeline(IRequestHandler<TRequest, TResponse> inner,
        IEnumerable<IMessageValidator<TRequest>> validator,
        IMessageAuthorizor authorizer,
        IEnumerable<IPreRequestProcessor<TRequest>> preProcessors,
        IEnumerable<IPostRequestProcessor<TRequest, TResponse>> postProcessors,
        IEnumerable<IResponseProcessor<TResponse>> responseProcessors
        )
    {
        _inner = inner;
        _validators = validators;
        _authorizer = authorizer;
        _preProcessors = preProcessors;
        _postProcessors = postProcessors;
        _responseProcessors = responseProcessors;
    }

    public TResponse Handle(TRequest message)
    {
        using (LogContext.PushProperty(LogConstants.MediatRRequestType, requestType))
        using (Metrics.Time(Timers.MediatRRequest))
        {
            _securityHandler.Evaluate(message);
            
            foreach (var preProcessor in _preProcessors)
                preProcessor.Handle(request);
            
            var failures = _validators
                .Select(v => v.Validate(message))
                .SelectMany(result => result.Errors)
                .Where(f => f != null)
                .ToList();
            if (failures.Any())
                throw new ValidationException(failures);
            
            var response = _inner.Handle(request);
            
            foreach (var postProcessor in _postProcessors)
                postProcessor.Handle(request, response);
                
            foreach (var responseProcessor in _responseProcessors)
                responseProcessor.Handle(response);
                
            return response;
        }
    }
 }

So what kinds of things might I accomplish here?

  • Supplementing my request with additional information not to be found in the original request (in one case, barcode sequences)
  • Data cleansing or fixing (for example, a scanned barcode needs padded zeroes)
  • Limiting results of paged result models via configuration
  • Notifications based on the response

All sorts of things that I could put inside the handlers, but if I want to apply a general policy across many handlers, can quite easily be accomplished.

Whether you have specific or generic needs, a mediator pipeline can be a great place to apply domain-centric behaviors to all requests, or only matching requests based on generics rules, across your entire application.

About Jimmy Bogard

I'm a technical architect with Headspring in Austin, TX. I focus on DDD, distributed systems, and any other acronym-centric design/architecture/methodology. I created AutoMapper and am a co-author of the ASP.NET MVC in Action books.
This entry was posted in MediatR. Bookmark the permalink. Follow any comments here with the RSS feed for this post.
  • rdev

    I’m using services.AddMediatR(…) from “MediatR.Extensions.Microsoft.DependencyInjection” to register MediatR but receive an error “The type ‘MyType.MediatorPipeline`2[TRequest,TResponse]‘ is not assignable to service ‘MediatR.IRequestHandler`2′” when adding the MediatorPipeline class above.

    • jbogard

      Github issue if you could please :)

      • rdev

        Done.

  • Peter Parker

    Can you do a post showing the unit tests for the complete mediator?

    • Dejan Miličić

      +1

      • Michael Lennon

        ++1 :) That would be most welcome

  • Dejan Miličić

    I think _securityHandler.Evaluate(message) should be _authorizer..Evaluate(message) in code examples.
    Is authorizer firing exceptions in the same manner failed validation is firing exceptions?

  • Andreas Kroll

    Hi Jimmy,

    great Post. This was exactly what I was searching to add to the awesome programming model of using MediatR. Perfect timing!

    I want to move into angular2 programming, and I was wondering if you ever thought about bridging client/server gap with MediatR?
    Wouldn’t it be a great thing if you could issue a Request on the client and react to it on the server?
    I thought about using SignalR 2 with a sort of protocol to establish safe delivery of requests and responses.

    • Betty

      It fits quite well with a message based framework like Redux. SignalR or plain WebAPI doesn’t make much difference except if you need to push messages from server to client.

      • Andreas Kroll

        Hi Betty,

        thanks for the reply. I also looked at Redux, because it is an interesting concept having the state immutable and only modify it using reducer functions.

  • Gideon Korir

    I’ve been meaning to do this but sadly the default aspnet container doesn’t support this I’ll have to look for another container because Windsor isn’t supported on .NET core yet :( . How do you deal with the scenario where your handler does two distinct things and you would like to measure metrics separately? e.g. I have a command that includes File to upload + data to save to db; I want to measure how long the save takes vs how long the db transaction takes

    • Andreas Kroll

      Shouldn’t you have two messages according to “separation of concern” anyway?

      You could have a message for your functional unit of work that itself sends two messages (one for File upload, one for data to save) and include both results into its own?

      • Gideon Korir

        True I should and probably I should have 3 metrics in that case:

        1. The total time it took to do the operation
        2. Part of the time in (1) it took for the upload
        3. Part of the time in (2) it took for the tx

        The design would still hold, I’m happy now :)

  • plasticsyntax

    Hi Jimmy –

    Good post. Thanks for MediatR, I’m really enjoying working with it!

    Have you had to deal with a situation where you have to share a partial or complete set of authorization data with the client?

    In my case, my client app needs to show/hide GUI elements depending on permissions the user has – CanCreateUsers, CanDeleteUsers, etc. To keep my code DRY, I have a SecurityContext class that I can share with the GUI that is also used for authorization in my pipeline. Since rule-per-request is also useful, I also implement custom request specific rules but obviously don’t share with client as they are Request data specific. The mix I’ve ended up with is manageable enough, but still curious if you’ve encountered requirements such as mine.

  • I really like the approach I ended up with when designing MicroBus (it’s a Mediator as well but uses Send, Publish and Query). I also decided to bake in the concept of a pipeline which really simplified the registration of global handlers and chaining them all together. I’d be interested to know what you think of this approach?

    If you’re interested you can check it out on github https://github.com/Lavinski/Enexure.MicroBus

    • jbogard

      I will! Incorporating a pipeline directly is something that quite a few people have asked for, definitely something I’m looking at for 3.0.

  • Rowan Beckworth

    Cool stuff. This is just the decorator pattern in action right? Just used this to add cross cutting logging to my command handlers using ShortBus and SimpleInjector. It’s kind of mind blowing the first time your realise how powerful this approach is. Never looking back from command/queries in my projects now!

    • jbogard

      Yep!

  • jerome

    Hey Jimmy,

    I was using version 1.0 and I had one handler implementing IPostRequestHandler.
    when I tried to take 2.0 I don’t see IPostRequestHandler anymore.

    why did you remove it and how do I replace it now?

    thanks!

    • jbogard

      You can create the pipeline yourself is the new way.

      • jerome

        ok. any reason for removing it?

        • jbogard

          Yep, right now I saw better capabilities in the DI containers.

          But honestly, with MediatR 3, I’m looking at having a pipeline supported by MediatR. Those original interfaces were too limiting.

          • jerome

            cool. thanks!

  • I won’t really call it a pipe_line_ because of just one class handling so many different responsibilities. I believe multiple handlers, granulated per their internal function like LoggingHandler, ValidatingHandler, AuthorizationHandler, which form a real pipe, would be a better model of pipes and filters. Then you can use functional composition to assemble the pipe.

    • jbogard

      I’m on it :)