Branching messages with pipelines

If you’ve read my recent posts on Messaging as a programming model you might gather that I’m a fan of this approach. If nothing else, it’s a great way of breaking down a big use case into a small set of fine grained, easily maintainable and replaceable steps that make the code very easy to reason about. However, one of the drawbacks at first glance is that there doesn’t appear to be a way to easily branch the message as it moves through the sequence of filters. You may recall that using the essence of the Message Router pattern we can change which filters are executed by modifying the pipeline whilst it is executing by registering new steps based on a particular value carried on the message like so:


public class Filter3
{
    public Filter3(SomeMessage input, PipeLine<SomeMessage> pipeline)
    {
        Console.WriteLine("filter3");
   
        if(input.SomeCondition)
            pipeline.Register(msg => new Filter4(input));
        else
            pipeline.Register(msg => new Filter5(input));
    }
}

However, all this does is attach one of either Filter4 or Filter5 to the end of the existing list of registered filters. You can obviously register as many filters as you like under each path of the conditional if test but even so they are still added at the end with no means to then continue executing back on the original main branch. A diagram might be best here to help explain what I mean:

Alternative Filter Paths

The old

The picture on the left depicts the structure of the filters as the original approach allows. The picture on the right shows a more useful alternative that we’d like as a minimum, and ideally without having to pass the pipeline into the filter to change it. The question is how can we achieve this?

The answer lies in the IFilter interface as described in the Revisited post of the original series. By adopting this simple interface for all our filters we can implement a variation of the composite design pattern where we have filters that contain logic (the leaf) and filters that themselves contain pipelines of filters (the composite) with the addition of a new filter whose sole purpose is to test a condition and determine what to execute next. I’ll use a modified (and contrived) version of the original LogIn example to illustrate the technique. The original example looked like this:

Simple pipeline for logging in a user

It’s a straightforward linear sequence of steps that modify and enrich the message as it passes through. What if though, for arguments sake, we want to execute a bunch of different steps depending on the result of the conditional test in the IsUserLoginAllowed filter?

The new

Branching pipeline for logging in a user

Here with the addition of a few more filters the message will always pass through the first three steps but depending on the logic within the IsUserLoginAllowed filter will then either pass through the two filters on the left branch or through the three on the right branch before joining back up with the main branch to record the fact that a log in was attempted. This final filter will execute regardless of whether the message went left or right. This is a lot more useful and easily achievable through simple composition thanks to the use of our IFilter interface, so let’s look at the code starting with the interface that all our filters will implement:


public interface IFilter<T>
{
    void Execute(T msg);
}

and the Pipeline class which also implements the interface:


public class PipeLine<T> : IFilter<T>
{
   public PipeLine<T> Register(IFilter<T> filter)
   {
      _filters.Add(filter);
      return this;
   }	

   public void Execute(T input)
   {
       _filters.ForEach(f => f.Execute(input));
   }

   List<IFilter<T>> _filters = new List<IFilter<T>>();	
}

All straightforward and recognisable, as are the first two filters shown here:


public class CheckUserSuppliedCredentials : IFilter<LogInMessage>
{
   public void Execute(LogInMessage input)
   {
        Console.WriteLine("CheckUserSuppliedCredentials");
        if(string.IsNullOrEmpty(input.Username) || string.IsNullOrEmpty(input.Password))
        {
            input.Stop = true;
            input.Errors.Add("Invalid credentials");
        }
   }
}

public class CheckApiKeyIsEnabledForClient : IFilter<LogInMessage>
{
    public CheckApiKeyIsEnabledForClient(IClientDetailsDao dao)
    {
        _dao = dao;
    }

    public void Execute(LogInMessage input)
    {
        Console.WriteLine("CheckApiKeyIsEnabledForClient");
        var details = _dao.GetDetailsFrom(input.ApiKey);

        if(details == null)
        {
            input.Stop = true;
            input.Errors.Add("Client not found");
        }

        input.ClientDetails = details;
    }
}

Branch Filters

The next filter, IsUserLogInAllowed, is more interesting and has changed from previous implementations. I refer to it as a Branch filter because, well, it allows your message to branch!


public class IsUserLoginAllowed : IFilter<LogInMessage>
{
	public IsUserLoginAllowed(IFilter<LogInMessage> leftBranch, IFilter<LogInMessage> rightBranch)
	{
		_onDeny = leftBranch;
		_onAllow = rightBranch;
	}

	// only users of ClientX and ClientY can log in
	public void Execute(LogInMessage input)
	{
		Console.WriteLine("IsUserLoginAllowed");
		switch(input.ClientDetails.Name)
		{
			case "ClientX":
			case "ClientY":
				_onAllow.Execute(input);
				break;
			
			default:
				_onDeny.Execute(input);
				break;
		}
	}
	
	private IFilter<LogInMessage> _onAllow;
	private IFilter<LogInMessage> _onDeny;
}

Look at the constructor of this filter and notice that it takes two IFilter instances, one for each branch path. These can be normal, individual (leaf) filters but it gets more interesting and useful when they are filters that group other filters together, in other words, composite filters. A Branch filter’s only responsibility is to determine what to do next. If we look at how this is all wired up we’ll see that our Branch filter, IsUserLoginAllowed, takes two composite filters, OnDenyLogIn and OnAllowLogIn:


    var pipeline = new PipeLine<LogInMessage>();
    pipeline.Register(new CheckUserSuppliedCredentials());
    pipeline.Register(new CheckApiKeyIsEnabledForClient(new ClientDetailsDao()));
    pipeline.Register(new IsUserLoginAllowed(new OnDenyLogIn(), new OnAllowLogIn()));
    pipeline.Register(new RecordLogInAttempt());
		
    var msg = new LogInMessage { /* ... set values on the message */ };
    pipeline.Execute(msg);

Composite Filters

Composite filters contain their own pipelines each registering their own related filters and because all filters whether leaf or composite implement the IFilter interface thay can all be treated the same via the Execute method:

composed pipeline for logging in a user

The code is simple consisting of nothing more than registering filters in a pipeline.

OnDenyLogin:


public class OnDenyLogIn : IFilter<LogInMessage>
{
    public OnDenyLogIn()
    {
        _pipeline.Register(new LockAccountOnThirdAttempt());
        _pipeline.Register(new PublishLogInFailed());
    }

    public void Execute(LogInMessage input)
    {
         Console.WriteLine("OnDenyLogIn");
         _pipeline.Execute(input);
    }

    private PipeLine<LogInMessage> _pipeline = new PipeLine<LogInMessage>();
}

and OnAllowLogIn:


public class OnAllowLogIn : IFilter<LogInMessage>
{
    public OnAllowLogIn()
    {
        _pipeline.Register(new ValidateAgainstMembershipApi());
        _pipeline.Register(new GetUserData());	
        _pipeline.Register(new PublishLogInSucceeded());
    }
	
    public void Execute(LogInMessage input)
    {
        Console.WriteLine("OnAllowLogIn");
        _pipeline.Execute(input);
    }
	
    private PipeLine<LogInMessage> _pipeline = new PipeLine<LogInMessage>();
}

The fact that composites are self contained means that their own internal pipelines don’t even need to be for the same message type! Even though both of the example composite filters both take a LogInMessage via their Execute method, there’s nothing to stop them sending a completely different message type through the internal pipeline and then assigning the result, whatever that may be, to some property on the LogInMessage type before it continues on its way through the rest of the sequence. None of the others filters ever need know that a different message type was created and processed within a composite filter.

Benefits

Composing filters in this way has some important benefits. First it makes the main pipeline easier to read as there are a smaller number of steps at the top level and you can see immediately that there are two courses of action that can be taken when the IsUserLoginAllowed branch filter is executed without having to know the lower level details. Second, grouping related functionality into cohesive whole units like this not only more closely adheres to the Single Responsibility Principle but also allows for some quite complex pipelines to be composed as the following diagram demonstrates:

more complex pipeline

Unit testing is still easy whether we test individual filters or the whole pipeline, but now we have the added option of testing at a third level of granularity via a composite filter such as OnDenyLogIn or OnAllowLogIn. When we’re happy these work as expected we can plug them in and test them in the context of the whole pipeline. Additionally, we’re also still able to use aspects to wrap the entire execution path inside a transaction, for example:


var pipeline = new PipeLine<LogInMessage>();
pipeline.Register(new CheckUserSuppliedCredentials());
pipeline.Register(new CheckApiKeyIsEnabledForClient(new ClientDetailsDao()));
pipeline.Register(new IsUserLoginAllowed(new OnDenyLogIn(), new OnAllowLogIn()));
pipeline.Register(new RecordLogInAttempt());
		
var msg = new LogInMessage { /* ... set values on the message */ };
var tranAspect = new TransactionAspect<LogInMessage>(pipeline);
tranAspect.Execute(msg);

Running this would send the message through either branch (depending on the client):


start tran
CheckUserSuppliedCredentials
CheckApiKeyIsEnabledForClient
IsUserLoginAllowed
OnAllowLogIn
ValidateAgainstMembershipApi
GetUserData
PublishLogInSucceeded
RecordLogInAttempt
end tran

or alternatively:


start tran
CheckUserSuppliedCredentials
CheckApiKeyIsEnabledForClient
IsUserLoginAllowed
OnDenyLogIn
LockAccountOnThirdAttempt
PublishLogInFailed
RecordLogInAttempt
end tran

Summary

The adoption of the IFilter interface and the idea of building separate pipelines inside composite filters opens up more possibilities for messaging as a programming model, most notably by giving us a way to model more complex business processes via branching paths whilst still retaining all the benefits outlined in previous messaging posts. It’s another useful addition to the pipe and filter toolbox, adding variation, and gives more weight to the argument for using messaging techniques in our applications.

Advertisements
Branching messages with pipelines

Messaging as a programming model Part 1

Synopsis

This post – this first of two parts – is rather long, probably the longest one I’ve yet written. In short, what it emphasises is the benefits in terms of application architecture gained from adopting the use of explicit messaging in normal, in-process, non-distributed applications, and how it has enabled me to develop some interesting and cool solutions using techniques that have resulted in some of the cleanest, most extendable, testable, modular code I’ve ever written.

Background

I’ve been meaning to describe how I approach software development these days for a good while now but haven’t really known where to begin. I want to document, to anyone interested, the techniques I use to try and achieve good clean, modular code. It’s an approach that has reaped many rewards and I’ve yet to find a way that I like better. That’s not to say that what I’m going to describe is somehow unique or the best way, it’s just an approach that works really well in many situations and has enabled me to build solid, reliable systems. You may agree or disagree, like it or not and that’s fine, feel free to point out the flaws or tell me I’m an idiot and I should do “xyz” instead. I’m always on the lookout to improve so if you have any suggestions I’ll gladly take them on board.

OOP vs Procedural

We’re all used to the idea of the Object Oriented Programming paradigm but despite its ubiquity I’ve never felt like it was really as good as it was promised to be, mainly because it’s too easy to abuse. In OOP there is the idea that we don’t call a function on an object, we “send it a message” but that can sound a little abstract. What does it mean to send a message as opposed to calling a function as you would in a procedural system? My own interpretation of this is that in a procedural system you call a function and send in the structure to be acted upon, where as with OOP, the object owns the state, you just ask the object to do something with it on your behalf by sending it a message. This conforms to the “tell, don’t ask” principle. However, most OO applications are not written this way and the idea of “sending a message” is not really given due thought by most developers.

The Failure of OOP?

Even though object oriented languages are dominant today, most applications consist of objects that are nothing more than a bunch of properties which are then passed to some other class that has the behaviour that will act upon it. This feels very similar to the idea of functions in procedural programming. There’s nothing really wrong in doing this as long as you recognise what you are doing and you understand the trade offs. The usual problem with this approach is that often the structure of the application is not easy to see or reason about. It’s sometimes hard to identify responsibilities within the system or find out where you need to make changes. When you do find out you often discover large methods with many parameters. In short, it’s messy. When the time comes to “enhance” the application we often find ourselves adding more parameters to our methods and having to repeat this all the way down through the various layers in our application till we hit the data layer. This makes for tedious, repetitive, and error prone development but it is the reality for many enterprise developers every day.

This kind of programming model is not what OOP was invented for, and it makes it look like the paradigm has failed. The evidence is everywhere. How many times do we come across monolithic applications with tons of dependencies, many layers and poorly defined classes with too many responsibilities and poor cohesion. It seems to be the norm no matter how much information is “out there” telling us otherwise. We tend to look at code bases like this, realise intuitively that it’s wrong but still struggle to find a way to put it right that is clear, consistent, and maintainable for our future selves or those that will follow after us.

There is a way though that can give us the benefits we seek.

Messaging as a Programming Model

The “messaging as programming model” approach is the only one I’ve yet encountered where I can achieve clear, consistent code, time and time again with minimal effort. Not only clear and consistent but also modular, composable, easily testable, highly maintainable and extendable, and above all readable. Each invocation conforms to “tell, don’t ask” no matter whether performing a command or query. And yet it too, just like the messy procedural approach described earlier, takes a structure and asks an object to act upon it. The difference is in the uniformity and consistency of the approach, one that allows for some interesting possibilities.

Obviously, it’s not the be all and end all to writing good software but I believe it to be better than what I encounter routinely in application after application. I’m not saying OOP is “out” and messaging is “in”. The two are not mutually exclusive and clearly, as a C# developer I use an object oriented language. All I’m doing is embracing the idea of using messages explicitly in the context of normal OO applications. It’s certainly not typical but neither is it complex. All the normal OO techniques and rules can apply if required, it’s just the “where” and “how” they are applied that’s different.

Anyway, enough with the rambling. Let’s get going.

Pipes, Filters, Routers, WTF?

I’ll start with a simple (non-uml) diagram:

Simple pipeline for logging in a user

To cut to the chase, the main technique I’m using is a variation of the Pipe and Filter pattern as described in the Enterprise Integration Patterns book, a classic you should read if you’re at all interested in message patterns, which as you’re reading this I’ll assume you are.

Each filter in that pattern is loosely connected through an in queue and an out queue. The filters are not aware of each other they just know how to read a message from the inbound queue, do something with it and put the result on the outbound queue. In my variation of the pattern there are no queues, merely a list of filters executed one after the other that manipulate the message as it passes through, just as the diagram implies. Within a filter your typical object model can apply if necessary but in general this pipeline tends to turn the conventional idea of an object operating on its own state on its head. Here the message is the state and, unlike in most functional languages, it is also mutable. Despite this, the whole pattern lends itself well to concurrency as we’ll see later.

What is a Message?

So the pipeline is the mechanism for processing messages but what qualifies as a message? The answer is simply anything in your application that should be made an explicit concept. We’ve seen this idea of explicitness before but usually in the context of DDD where it’s recommended that you group together a bunch of related fields/parameters and turn them into a single class known as a Value object. This then gives you a place to put related methods that operate on those properties.

Messages are also explicit concepts. They too contain a bunch of properties grouped together but they are just DTOs and have no methods on them at all. They are buckets of data but represent something important in your application. One of the benefits of making things explicit like this is that it increases your ability to reason about the code when you come back to it some months later. For instance, the diagram shows a pipeline that allows a user to log into an application. It’s easy to see the steps involved in logging in and gives you a big picture view without needing to look at the details. The actual code for this conveys the exact same information and is as easy to read as the diagram itself so let’s take a look at it.

A Log in Example

Here’s a simple message we can use when a request to log in arrives:


public class LogInMessage
{
    public string Pin { get; set; }
    public string Username { get; set; }
    public string Password { get; set; }
    public bool IsAllowed { get; set; }
    public Guid ApiKey { get; set; }
    public ClientDetail ClientDetails { get; set; }
    public UserDetail UserDetails { get; set; }
}

Let’s assume you have a user that’s invoking a login method on a web service. In order to log that user in there are a number of steps to be performed. Whilst this alone isn’t a hard problem to solve or on its own something likely to make your code hard to understand, it’s a good simple example to begin with. Remember my goal is to have clear, consistent code across ALL operations to provide a degree of uniformity and to keep the “surface area” of my immediate context small so I don’t get distracted looking at lots of surrounding code each perhaps doing things slightly differently and adding to the noise I have to deal with.

We register the steps we want to sequentially execute with the PipeLine through its Register method:


var loginPipeline = new PipeLine<LogInMessage>();

loginPipeline.Register(msg => new CheckUserSuppliedCredentials(msg))
             .Register(msg => new CheckApiKeyIsEnabledForClient(msg))
             .Register(msg => new IsUserLoginAllowed(msg))
             .Register(msg => new ValidateAgainstMembershipApi(msg))
             .Register(msg => new GetUserDetails(msg));

As you can see, we have a class PipeLine which works with a message of T (we’ll get to the implementation of this in Part 2). The pipeline lets you register lambdas (anonymous functions) that will be called in the order of registration, each time passing in the message you supply. Every time a callback is invoked, an instance of the current filter is created and the message is manipulated in some way before being passed to the next filter.

Invoking the pipeline is as simple as creating a message and executing it:


var msg = new LogInMessage {
    Pin = "1234",
    Username = "Jack A. Nory",
    Password = "@x1z/+==32a",
    ApiKey = Guid.Parse("a9aafe3f-76d3-4304-a39e-cf05edb870f2")
}

loginPipeline.Execute(msg);

It’s easy enough to imagine the path taken when calling execute. You only have to look at the registration to get that big picture view. The details are irrelevant unless you need to make changes. Each filter is stateless and short-lived. By the time all the filters have run, the message will have been enriched or changed in some way leaving you with data you can return to the caller.

Inside Filters

Let’s look at the first filter, CheckUserSuppliedCredentials which is really as simple as this:


public class CheckUserSuppliedCredentials
{
    public CheckUserSuppliedCredentials(LogInMessage input)
    {
        Process(input);
    }

    private void Process(LogInMessage input)
    {
        if(string.IsNullOrEmpty(input.Username))
            throw new UnauthorizedException("Invalid credentials");

        if(string.IsNullOrEmpty(input.Password))
            throw new UnauthorizedException("Invalid credentials");
    }
}

The filter has one responsibility, in this case to ensure that the message contains a username and password, and that’s it, nothing more. If it doesn’t satisfy either of those conditions an exception is thrown. The next filter is a little more complex but only just. Here we are assuming that we have a table in a database somewhere that contains clients that can have their access turned on or off by an ApiKey supplied (in our case) in the request header which (not shown) was extracted and assigned to the message. This filter has a dependency on a data access object in order to retrieve the client’s details which for the sake of simplicity we just new up inline:


public class CheckApiKeyIsEnabledForClient
{
    public CheckApiKeyIsEnabledForClient(LogInMessage input)
    {
        Process(input, new ClientDetailsDao());
    }

    private void Process(LogInMessage input, IClientDetailsDao dao)
    {
        var details = dao.GetDetailsFrom(input.ApiKey);

        if(details == null)
            throw new UnauthorizedException("Client not found");

        input.ClientDetails = details;
    }
}

Again, if the condition isn’t satisfied an exception is thrown and processing of the pipeline stops. If the condition is satisfied the message is enriched by having the ClientDetails property assigned the result of that call. This data is now available for use in subsequent filters.

Flexibilty

I’m not going to show every filter implementation here. Suffice to say they all follow the same basic pattern, each having a single responsibility and yet together forming a conceptual whole that is itself a single responsibility, in this case that of logging in. Before moving on though it’s worth mentioning the benefits already gained from this approach. Not only is it easy to reason about what this code will do just by looking at the registration steps, you can also change the order of the steps (within reason) if necessary just by swapping the order that the registration occurs. It’s also incredibly easy to modify when the boss comes along and says that actually wouldn’t it be great if we could easily determine who is using our system by capturing how often they login?

We could do that without any real disruption and with a great deal of confidence that we won’t break any existing code. All we need do is create a new filter:


public class PublishUserLoggedInEvent
{
    public PublishUserLoggedInEvent(LogInMessage input)
    {
        Process(input, GetBus());
    }

    private void Process(LogInMessage input, IBus bus)
    {
        var @loggedInEvent = new UserLoggedInEvent { 
            Name = input.UserDetails.Name,
            Client = input.ClientDetails.Organisation, 
            When = DateTime.Now
        }

        bus.Publish(@loggedInEvent);
    }
}

and register it:


var loginPipeline = new PipeLine<LogInMessage>();

loginPipeline.Register(msg => new CheckUserSuppliedCredentials(msg))
             .Register(msg => new CheckApiKeyIsEnabledForClient(msg))
             .Register(msg => new IsUserLoginAllowed(msg))
             .Register(msg => new ValidateAgainstMembershipApi(msg))
             .Register(msg => new GetUserDetails(msg))

             // add new step to pipeline
             .Register(msg => new PublishUserLoggedInEvent(msg));

Now when our message reaches the new filter then we must have passed through all the previous ones without incident. We can therefore assume the user is now successfully logged in and we can tell the world about it, in this case, by publishing an event over a bus whether that be NServiceBus, RabbitMQ or A.N.Other bus. Any subscribers will receive that message and do something with the data. For a simpler alternative you could just go straight to the database and record that the user logged in in a table. The beauty is that decisions taken now can be swapped out easily at some point in the future without worrying about breaking existing code.

Design-time vs Runtime PipeLines

So, we’ve covered modular, and to some degree we can see how a responsibility is composed of smaller more focused responsibilities but we are making these decisions at design-time. If we want to be a bit more dynamic we can easily build the pipeline at runtime. For instance, one way to do it is to add filters to a pipeline only if a condition is met. This could be done in some kind of fluent Builder or Factory object:


var loginPipeline = new PipeLine<LogInMessage>();

if(someconditionA)
    loginPipeline.Register(msg => new FilterForConditionA(msg))

if(somecondtionB)
    loginPipeline.Register(msg => new FilterForConditionB(msg))

if(somecondtionC)
    loginPipeline.Register(msg => new FilterForConditionC(msg))

// ...etc

return loginPipeline;

Message Routers

Another way is to have one of the filters themselves extend the current pipeline if a particular condition is met:


var pipeline = new PipeLine<SomeMessage>();

pipeline.Register(msg => new Filter1(msg))
        .Register(msg => new Filter2(msg))
        .Register(msg => new Filter3(msg, pipeline));

What’s interesting about this example is that the pipeline itself is passed into Filter3. When that filter is executed the filter can extend the pipeline again based on some condition being met:


public class Filter3
{
    public Filter3(SomeMessage input, PipeLine<SomeMessage> pipeline)
    {
        Console.WriteLine("filter3");
   
        if(input.SomeCondition)
            pipeline.Register(msg => new Filter4(msg));
        else
            pipeline.Register(msg => new Filter5(msg));
    }
}

Now if each filter was to write its name out to the console, then depending on the value of SomeCondition when the message was executed, you would get one of the following two outputs:

filter1
filter2
filter3
filter4

or

filter1
filter2
filter3
filter5

So now not only do we have design-time composability but also the possibility of run-time composability. In effect filter3 is a MessageRouter, another pattern from the Enterprise Integration Patterns book. It’s job is to decide where the message should go next and this is what filter3 is doing based on a value on the message itself (Content-based Router). Again very nice, very flexible.

Testability

Hopefully at this point you can see that following this approach give us a lot of flexibility and yet at the same time is very easy to read and maintain. Not only that but you might also see that writing unit tests for this kind of code becomes nothing short of trivial. The example below shows the CheckUserSuppliedCredentials filter from the earlier example being tested for an empty Username on the message. Each filter can be tested like this, in complete isolation, with message state set up in various ways to exercise the paths through the filter. Alternatively you can easily test an entire pipeline which aligns itself nicely with the concept of testing use cases. Just create a message, execute it, then assert off the changed state of the message itself. I’ve found this to be a real boon. Compared to your typical, monolithic enterprise application which seems to get harder to test as the application grows, testing pipelines provides natural boundaries for your test fixtures and makes reading those tests easy as well:


[TestFixture]
public class CheckUserSuppliedCredentialsTestFixture
{
    [Test]
    public void ShouldThrowExceptionIfUsernameNotSupplied()
    {
        var msg = new LogInMessage { Username = string.Empty };
        
        var exception = Assert.Throws<UnauthorizedException>(() => new CheckUserSuppliedCredentials(msg));

        Assert.That(exception.Message, Is.EqualTo("Invalid credentials"));
    }
}

In my earlier examples I supplied any dependencies for filters such as the IClientDetailsDao inline by just newing up an implementation. Obviously this was for brevity but in practice you won’t want to do this if you’re writing tests or want the flexibility to “inject” different implementations. To get around this all you need do is put the interface on the filter’s constructor and supply the implementation in the pipeline registration instead like this:


var loginPipeline = new PipeLine<LogInMessage>();

loginPipeline.Register(msg => new CheckUserSuppliedCredentials(msg))
             .Register(msg => new CheckApiKeyIsEnabledForClient(msg, new ClientDetailsDao()))
             .Register(msg => new IsUserLoginAllowed(msg))
             .Register(msg => new ValidateAgainstMembershipApi(msg, new MembershipService()))
             .Register(msg => new GetUserDetails(msg, new UserDetailsDao()));
             .Register(msg => new PublishUserLoggedInEvent(msg, new GetBus()));

Now it’s trivial to supply “fake” versions when constructing the pipeline in a test fixture.

A Web Api Example

Before moving on, I thought I would round out the login example by showing a more complete view of it within the context of a Web API ApiController.

To keep things cohesive I tend to have Service classes that group related responsibilites which are then passed into the controller, so for example, in the case of a user there might be a number of operations they can perform. Let’s say that in our system a user can log in and they can also be presented with system alerts that give them timely information such as when any maintenance is going to happen, etc. In this example, we have a UserController that takes a UserService via an interface:

public class UserController : ApiController
{
    private readonly IUserService _userService;

    public UserController(IUserService userService)
    {
        _userService= userService;
    }

    [HttpPost]
    [ActionName("login")]
    [RequestExceptionFilter]
    public HttpResponseMessage LogIn(User user)
    {
        var msg = new LogInMessage { Username = user.Username, Password = user.Password };
        _userService.Invoke(msg);
        return Request.CreateResponse(HttpStatusCode.OK, msg.UserDetails);
    }

    [HttpGet]
    [ActionName("alerts")]
    [RequestExceptionFilter]
    public HttpResponseMessage GetAlertsFor(string userId)
    {
        var msg = new AlertsMessage { UserId = userId };
        _userService.Invoke(msg);
        return Request.CreateResponse(HttpStatusCode.OK, msg.Alerts);
    }
}

This is as complex as my controllers get though there are a couple of points I want to emphasise. For instance, the message that is created on line 15 is the same object that is used as part of the response on line 17. The message is modified as it passes through the various filters and therefore, unless an exception gets thrown, will contain new data that we can return to the caller, in the case of the LogInMessage some information about the user contained in the UserDetails property.

The only other thing to mention is that if an exception is thrown the RequestExceptionFilterAttribute kicks in and returns the appropriate response to the caller such as a 403 Forbidden.

Aesthetic Interfaces

Each controller takes an interface describing the messages that can be executed. I don’t know about you but I think there’s something nice about the following interface definition:

public interface IUserService
{
    void Invoke(LogInMessage msg);
    void Invoke(AlertsMessage msg);
}

For one, it has only void methods, and two they have the same name differing only by the message type. It too conforms to the simple and readable philosophy that the whole approach seems to excel at.

The implementation of the interface is where the pipelines are created. In this example they are built in the constructor taking in the supplied dependencies:


public sealed class UserService : IUserService
{
    public UserService(IClientDetailsDao clientDao, IMembershipService membershipService, IUserDetailsDal userDetailsDao, IBus bus)
    {
        _loginPipeline
             .Register(msg => new CheckUserSuppliedCredentials(msg))
             .Register(msg => new CheckApiKeyIsEnabledForClient(msg, clientDao))
             .Register(msg => new IsUserLoginAllowed(msg))
             .Register(msg => new ValidateAgainstMembershipApi(msg, membershipService))
             .Register(msg => new GetUserDetails(msg, userDetailsDao));
             .Register(msg => new PublishUserLoggedInEvent(msg, bus));

        _alertsPipeline
             .Register(msg => new CheckApiKeyIsEnabledForClient(msg, clientDao))
             .Register(msg => new IsGetAlertsAllowed(msg))
             .Register(msg => new GetUserAlerts(msg, userDetailsDao));
    }

    public void Invoke(LogInMessage msg)
    {
        _loginPipeline.Execute(msg);
    }

    public void Invoke(AlertsMessage msg)
    {
        _alertsPipeline.Execute(msg);
    }
}

and that’s it. Imagine that structure applied across many controllers and services and hopefully you can see the benefit of such a consistent and flexible approach. The Invoke methods are the simplest implementation possible for now. We’ll flesh these out in part 2 to enable some useful extra functionality.

I know I haven’t touched on the actual pipeline implementation itself yet, I’m going to leave that for part two where I’ll go over the opportunities it provides for cross cutting concerns, again in a flexible, easily maintainable manner. Before finishing Part 1 though, I want to show one other technique I find very useful in the context of desktop or console applications.

Batch Imports

If you’ve never had to write an ETL application you’ve never worked in enterprise development :)

In my current place of work there is a need to take excel spreadsheets (yes, even in 2013) from clients containing batch orders or parts and import them into the back office system. Unfortunately the contents and structure of these spreadsheets can be quite different depending on the type of import and the particular client involved. These spreadsheets often contain lots of errors and sometimes aren’t even filled in correctly, this being the nature of the enterprise world. Regardless, we have to have a way to deal with this and so we have developed a WinForms application that can take these spreadsheets, parse and validate them, and only when all the errors have been corrected, import the data they contain into the database. You’ll not be surprised to hear that the pipeline approach has proven useful here too.


load = new Pipeline<PartUploadMessage>();
load.Register(msg => new PreSpreadsheetLoad(msg))
    .Register(msg => new ImportExcelDataSet(msg))
    .Register(msg => new ValidateSheet(msg))
    .Register(msg => new ValidateColumnNames(msg))
    .Register(msg => new StripInvalidChars(msg))
    .Register(msg => new CopyToDataTable(msg))
    .Register(msg => new CreateExtraColumns(msg))
    .Register(msg => new CheckStockItemCodeColumnFormatting(msg))
    .Register(msg => new CheckForDuplicateStockItemCode(msg))
    .Register(msg => new ValidateUniqueClient(msg))
    .Register(msg => new ValidateRequiredFields(msg))
    .Register(msg => new ValidateColumnDataTypes(msg))
    .Register(msg => new ValidateContentOfFields(msg))
    .Register(msg => new ValidateCategories(msg))
    .Register(msg => new PostSpreadsheetLoad(msg));

validate = new Pipeline<PartUploadMessage>();
validate.Register(msg => new PreValidate(msg))
    .Register(msg => new CheckForDuplicateStockItemCode(msg))
    .Register(msg => new ValidateUniqueClient(msg))
    .Register(msg => new ValidateRequiredFields(msg))
    .Register(msg => new ValidateContentOfFields(msg))
    .Register(msg => new ValidateCategories(msg))
    .Register(msg => new PostValidate(msg));
 
submit = new Pipeline<PartUploadMessage>();
submit.Register(msg => new PreBackOfficeSubmit(msg))
    .Register(msg => new SubmitPartsToBackOffice(msg))
    .Register(msg => new PostBackOfficeSubmit(msg));

Here we have three pipelines. Each one is invoked by a different button on the user interface. When the user browses to and selects a spreadsheet file on their system the load pipeline is executed passing in a message that contains the filename and various other properties. At the end of execution, a spreadsheet has been loaded and validated and the data is bound to a DataGrid on a form. The user can make changes if required and having done so, can ask for the data to be validated again by clicking a button which will execute the validate pipeline. In our case, we have some quite complicated rules about what combinations of data can co-exist in a row but using a validation pipeline simplifies the problem without creating a spaghetti mess. Notice how we are able to reuse the validation filters that are used in the load pipeline. Keeping filters small and focused enables such reuse. Once validation has succeeded, the user can press a button to invoke the submit pipeline which will begin importing the data into the back office system.

The Hollywood Principle

Whilst there’s nothing new on the surface here apart from having a pipeline for each “Task”, the really interesting story is how does the UI get notified of important events happening within the pipeline? For instance, how do we disable various buttons while validation or submit is underway? How do we update a progress bar on the screen to give the user feedback as to how far through the submit process they are?

Let’s look at a definition for the PartUploadMessage:


public class PartUploadMessage
{
    public string Filename { get; set; }
    public DataSet Data { get; set; }
    public DataTable Parts { get; set; }

    public Action OnBeforeFileLoad { get; set; }    
    public Action OnAfterFileLoad { get; set; }
    public Action OnBeforeValidation { get; set; };
    public Action OnAfterValidation { get; set; };
    public Action<int> OnStartPartsUpload { get; set; };
    public Action<string> OnPartUploaded { get; set; };
    public Action OnEndPartsUpload { get; set; };
}

You can see that the first three are the usual kind of mutable state properties. Nothing new there. However, the next set of properties are all callbacks that you can assign a method to, which the various filters will invoke at the appropriate point in the pipelines’ execution lifetime.

For example, the PreSpreadsheetLoad filter will invoke the OnBeforeFileLoad callback whilst the PostSpreadsheetLoad filter will invoke the OnAfterFileLoad callback in the same way. Similarly, the PreValidate and PostValidate filters invoke OnBeforeValidation and OnAfterValidation respectively. Finally, the PreBackOfficeSubmit filter informs the UI that we are about to submit parts by invoking the OnStartPartsUpload callback. It tells the UI how many parts we are going to be submitting by passing in the number of rows from the Parts table which is a property of the message. We could use this to set the Max property of a progress bar:


public class PreBackOfficeSubmit 
{
    public PreBackOfficeSubmit (PartUploadMessage input)
    {
        input.OnStartPartsUpload (input.Parts.RowCount); 
    }
}

The SubmitPartsToBackOffice filter loops through the rows inserting them one by one. Each time a row is inserted it invokes the OnPartUploaded callback with the part number of the current row. This callback can also be used to update the progress bar, incrementing the Value property by 1 each time it is invoked. When all parts are submitted the PostBackOfficeSubmit filter invokes the OnEndPartsUpload callback where the progress bar is reset to zero and the submit button is re-enabled.

Initializing the message is just a case of assigning the various values and callbacks:


var msg = new PartUploadMessage {

    Filename = "C:\path\to\spreeadsheet.xls",    
    OnBeforeFileLoad = () => this.RunOnMainThread(WhenAboutToLoadSpreadsheet),
    OnAfterFileLoad = () => this.RunOnMainThread(WhenSpreadsheetFinishedLoading),
    OnBeforeValidation = () => this.RunOnMainThread(WhenAboutToValidate),
    OnAfterValidation = () => this.RunOnMainThread(WhenValidationComplete),
    OnStartPartsUpload = (total) => this.RunOnMainThread(WhenAboutToSubmitParts(total)),
    OnPartUploaded = (partNumber) => this.RunOnMainThread(WhenCurrentPartSubmitted(partNumber)),
    OnEndPartsUpload = () => this.RunOnMainThread(WhenAllPartsSubmitted)

}

Threading

Anyone who’s done any desktop development will be aware that running a potentially long operation on the UI thread can cause the UI to block and stop being responsive which makes for a poor user experience. Luckily, the pipeline approach easily lends itself to asynchronous execution which I’ll go over in Part 2. For now, let’s assume our pipelines are being executed on a background thread but when the time comes to update the UI we need to be back on the main thread or we’ll run into the dreaded “Cross-thread operation not valid” exception message or such like. The RunOnMainThread method in the example is a simple extension method on the Control class which marshals our data back to the main thread so everything works as expected when we try to update control values. For completeness and for those curious, that method looks like this:

public static void RunOnMainThread<TControl>(this TControl control, Action action, params object[] args) where TControl : Control
{
    if (control.InvokeRequired)
        control.Invoke(action, args);
    else
        action();
    }

Now imagine this spreadsheet import process has to be done for 20 different spreadsheets, all with different structures, data, and validation requirements. The Pipe and Filter approach has allowed our application to accommodate these needs without growing too complex. Anyone looking at the source-code can immediately see the structure of the application, and once they understand how one import pipeline works they understand them all.

One last thing before winding up, using callbacks on the message allows you to get information from the user before allowing the pipeline to continue. Imagine a point of sale system using this technique. The cashier clicks a button that perhaps begins a discount operation. As the pipeline is executing one of the filters may require input from the cashier perhaps asking for some information that determines how much of a discount the customer is entitled to. When the filter executes it can invoke a callback in the UI that takes the message itself, perhaps showing a message box asking the cashier a question. The value the cashier enters is then assigned to the passed in message before dismissing the dialogue and allowing the pipeline to continue executing with its newly acquired information. The same thing can be done if the operation needs admin permissions. Stop the pipeline by asking for a password or pin and only allow execution to resume if the input is valid. I have used this technique in the past and it works very well.

Moving Towards Autonomous Components and Services

I truly believe that written this way an application is far easier to understand and maintain than the traditional layered approach which in my opinion starts to break down and become a burden once it grows above a certain size, especially as developers come and go each doing things slightly differently. This approach provides a kind of template or blueprint that helps keep code consistent and maintainable with real tangible benefits. It also lends itself well to the idea of Component Based development where each set of related pipelines could be built into their own assemblies or modules containing a vertical slice of the traditional layered approach and installed as Windows Services communicating by commands and events. By adopting explicit messages in your in-process code it becomes almost trivial, should the need arise, to serialize and send those messages across the wire or save them to a database. At this point we are very close to autonomous business components in a SOA based micro-services architecture.

Okay, I think that’s enough to digest for now as we’ve covered quite a lot. In the next post I’ll examine the actual pipeline implementation and how easy it is to extend or enhance to perform some other interesting possibilities, things we usually describe as cross-cutting concerns.

In the meantime, if you have any feedback let me know. I’d be interested to hear what you think.

Messaging as a programming model Part 1