Messaging as a programming model Part 2

This topic has generated a lot of interest, more than I could have imagined beforehand. Thanks to everyone for all the feedback, good, bad and indifferent, it’s much appreciated.

This post is part two of two on using messaging techniques within your everyday in-process applications. You can find the first part here.

A Quick Recap

All object oriented applications have an object model of sorts though from my experience it’s usually an accidental model and not one “designed”. Not many applications require a domain model but of those that do they have Domain Driven Design, and more recently, CQRS and Event Sourcing as tools, techniques, and methodologies to help developers discover and build useful models of the business domain in which they work. Object models tend to have nothing but the bog standard four tenets of object orientation to fall back on, usually in the context of CRUD applications. Whilst in theory, this should be enough, these types of applications are typically large and messy from the get go and steadily grow worse over time. It would be nice if there were a few more techniques that developers could reach for to help them produce better object models when working on these projects. Messaging is one such technique.

Messages are explicit concepts in code that can be acted upon in many ways enabling some useful functionality that ordinarily can be quite difficult to implement especially in those areas that have to span the entire application. These are known as cross cutting concerns and include things such as logging and transaction handling, or retry logic around certain operations. The naive approach sees developers littering their code with Log(“whatever”) statements in their methods and repeating it all over. There is though a better way once we’ve adopted the messaging mindset using Aspect Oriented Programming. As we’re using the Pipe and Filters pattern we’re in a good position to tackle those concerns but first I’m going to dig deeper into the PipeLine class mentioned in Part one and later flesh out a few simple alternatives.

The PipeLine

At last we come to the implementation of the PipeLine class. I know you’ve been wondering how you’ll cope with the sheer complexity of it, I bet you’ve even had a few sleepless nights waiting for this moment. Well here it is in all it’s glory:


public class PipeLine<T>
{
private readonly List<Action<T>> _actions = new List<Action<T>>();

public void Execute(T input)
{
_actions.ForEach(ac => ac(input));
}

public PipeLine<T> Register(Action<T> action)
{
return this;
}
}



Wow, complicated right? er…no, actually. It’s probably as simple as one could hope for.

So, we have a private list of Action of T with T being the type of Message that you declare when creating an instance of the PipeLine.





The Register method simply adds the callback you supply to the internal list and then returns itself through the this reference allowing the pipeline to be built by chaining the Register methods together:



.Register(msg => new CheckApiKeyIsEnabledForClient(msg))
.Register(msg => new ValidateAgainstMembershipApi(msg))
.Register(msg => new GetUserDetails(msg));



Each callback of course is now constrained, in this example, to work only with LogInMessage so it stands to reason that with this pipeline instance we can only use filters that take a LogInMessage parameter.

Having built the pipeline we call the Execute method to have them invoked one by one. As you can see we simply iterate over the list with the Linq ForEach method passing the supplied message (input) to each callback:


public void Execute(T input)
{
_actions.ForEach(ac => ac(input));
}



But Wait..There’s More

Whilst that’s all you need at the most basic level there are other questions that arise. How can we stop the pipeline from processing without having to throw an exception? What do we do if an exception does occur in one of the filters, how do we handle it? How can we make our pipeline execute asynchronously or make it execute in the context of a transaction? Because we’ve implemented the Pipe and Filter pattern all these questions can be answered easily by implementing some simple Aspects that we can wrap around our pipeline and we don’t need any fancy frameworks to do it.

AOP

Wikipedia supplies the following definition for Aspect Oriented Programming:


"aspect-oriented programming (aop) is a programming paradigm that
aims to increase modularity by allowing the separation of
cross-cutting concerns"



It’s amazing how often we come across code that mixes the responsibility of the object at hand with other concerns such as logging. It makes code harder to reason about, methods larger, and more likely to break when changes are made. Extracting out these behaviours can be a difficult task when code is poorly structured and more often than not no effort is made to extract them. Some inversion of control libraries such as Castle Windsor have support for AOP built in using reflection based libraries like Dynamic Proxy but with our pipeline we don’t need any of that. In fact, again like everything we’ve seen so far, it’s almost trivial to implement ourselves. So let’s start answering some of those questions and a few more besides.

Exception Logging

If we wanted to, we could simply add a try..catch block around the ForEach iteration in the PipeLine implementation above and bingo, exception handling implemented. Job done, right? Not quite. We don’t always want to handle exceptions the same way. Sometimes we may want to only log an exception other times we may want to log it and rollback a transaction. Maybe we want to retry a set number of times and then only log it if the we fail after all the retries have happened. Sounds like a difficult undertaking right? Not at all thanks to AOP.

Here’s an aspect we can use to log exceptions for us:


public class ExceptionLoggingAspect<T>
{
public ExceptionLoggingAspect(Action<T> action)
{
Handle = action;
}

public void Execute(T input)
{
try
{
Handle(input);
}
catch (Exception ex)
{
If(Log.IsErrorEnabled)
Log.Error("*** ERROR ***: {0}", ex);
}
}

private Action<T> Handle { get; set; }

private static readonly ILog Log = LogManager.GetLogger("MyLog");
}



Before explaining how this works let’s see it in use first. In part one I showed the Invoke method of a UserService class and said that I would flesh it out later. Well now we can do this using our aspect:


{
errorHandler.Execute(input);
}



Here we’ve wrapped the Execute method of our pipeline by passing it into the constructor of an ExceptionLoggingAspect which immediately assigns it to the private Handle property. Notice that the constructor and Handle property expect an Action of T, in other words a callback that uses our Message type, whatever that may be. When we invoke Execute on the aspect it in turn invokes the method assigned to the Handle property, again passing through our message, and does so wrapped inside a try..catch block. Should an exception occur now inside one of our filters the aspect will catch the exception and let us log it. In the example shown I’m assuming a dependency on Log4Net but it can be whatever you like.

The upshot of this approach means that now we can decide if we want an exception to be caught and logged by using composition rather than having it forced on us as it would have been if we had put it in the plain vanilla PipeLine implementation.

General Logging

Notice that the ExceptionLoggingAspect only logs errors according to the logging level as defined in our Log4Net configuration. It does so by testing the IsErrorEnabled property. What if we want to be able to turn on logging so that we get to write out the message name as and when the user executes various pipelines within our application? Maybe we want to see how the user is using the application by looking at what messages they execute. For that we just need to define an ordinary message logging aspect:


public class MessageLoggingAspect<T>
{
public MessageLoggingAspect(Action<T> action)
{
Handle = action;
}

public void Execute(T input)
{
if(Log.IsDebugEnabled)

Handle(input);
}

private Action<T> Handle { get; set; }

private static readonly ILog Log = LogManager.GetLogger("MyLog");
}



and use it like so:


{
errorHandler.Execute(input);
}



Each time we compose the aspects we end up creating a chain of method calls as each method we pass in will be assigned to the current aspect’s Handle property. So now calling Execute on the errorHandler invokes Execute on messageLogger which finally invokes Execute on the pipeline itself. As long as the DEBUG logging level is enabled in our Log4Net configuration file then each time we call the Invoke method, our message’s class name will be written to the log file. Should an error occur we will also capture the exception in the log file. If we were to keep the log level at ERROR in our Log4Net config file then our MessageLoggingAspect would still be invoked, it just wouldn’t write anything to our file because we’ve told Log4Net to only log errors.

Okay let’s keep building this up to handle other cross cutting concerns

Automatic Retries

One of my favourite frameworks NServiceBus has a nice retry feature that kicks in whenever an exception is thrown whilst it is processing a message on a queue. It will, depending upon how you configure it, attempt to process the message over and over until a set number of retries have been attempted. Wouldn’t it be cool if we could do that too?


public class RetryAspect<T>
{
public RetryAspect(Action<T> action)
{
Handle = action;

// get these from config file
_maxRetries = 3;
_slideTime = 5000;
}

public void Execute(T input)
{
try
{
Handle(input);
}
catch(Exception)
{
_currentRetry++;
if(_currentRetry <= _maxRetries)
{
Execute(input);
} else {
throw;
}
}
}

private Action<T> Handle { get; set; }

private int _maxRetries;
private int _currentRetry = 0;
private int _slideTime;
}



At last, some code with a bit of meat on it. Here we have an aspect that will retry an operation if an exception is thrown by recursively calling the Execute method on itself until the maximum number of attempts has been reached. Notice that the retry operation waits a little bit longer each time a retry is needed to give any infrastructure problems such as a momentary drop out on the network a chance to recover. In this case, the first retry waits for 5 seconds, the second for 10 seconds, and the third 15 seconds before finally raising an exception. I’ve hard coded the values here but ordinarily you’d read these from a config file to allow changes without recompiling.

Notice too that I’m using Thread.Sleep here. This is a definitely more controversial as in general there shouldn’t ever really be a reason to use this but I’m lazy! There are issues around the various .Net Timer classes such as callbacks being executed on a different thread or exceptions being swallowed. As the whole point of this aspect is to retry on the same thread when an exception is thrown I chose to implement it this way for now. Most of the time I use the retry aspect in conjunction with an asynchronous aspect anyway so sleeping a ThreadPool thread shouldn’t really be a problem. Feel free to argue or provide your own implementation though.

Let’s build up the Invoke method again:


{
errorHandler.Execute(input);
}



Now we get automatic retries for little effort along with message logging and exception logging. I’ve used this technique when integrating with a certain UK based delivery courier who’s web service is to put it politely, sometimes unreliable. However, a few seconds after a failed call I usually get success. This technique comes in very handy for that kind of scenario.

Transactions

I think you might be getting the idea by now so I’ll just show the code for transaction handling:


public class TransactionAspect<T>
{
public TransactionAspect(Action<T> action)
{
Handle = action;
}

public void Execute(T input)
{
using(var scope = new TransactionScope())
{
Handle(input);
scope.Complete();
}
}

private Action<T> Handle { get; set; }
}



{
errorHandler.Execute(input);
}



As expected, it’s very similar to all the previous aspects. The only thing to take note of here is that I inserted the tranHandler instance between the logHandler and errorHandler. This is to ensure that should an exception occur the transaction won’t commit. If I had it ordered like so:


{
tranHandler.Execute(input);
}



then a raised exception would be logged by the ExceptionLoggingAspect but as it is not re-thrown it is therefore handled and so the transaction would commit which is not the behaviour we want. Sometimes the order of our aspects can be important other times it makes no difference.

Asynchronous Execution

Sometimes we want our pipeline to execute on a background thread. You may remember the example I gave for a WinForms batch import application in part one. Running long operations on the UI thread is expensive and blocks the UI from responding to a user’s input. Once again though providing an aspect to wrap our pipeline solves the problem easily:


public class AsyncAspect<T>
{
public AsyncAspect(Action<T> action)
{
Handle = action;
}

public void Execute(T input)
{
}

private Action<T> Handle { get; set; }
}



and here is our Invoke method:


{

asyncHandler.Execute(input);
}



Whilst the message itself is mutable, it is not shared. Each thread can safely modify its own message without affecting any others but that doesn’t stop you from accessing some shared resource in one of your filters so you still need to be careful about how you write multi-threaded code.

Authorization

Authorization is another one of those pesky cross cutting concerns yet trivial when using aspects. This one checks the current principal assigned to a thread which you may have done during a log in operation. If the returned GenericIdentity (or WindowsIdentity) has not been authenticated then we simply throw an exception and stop the pipeline from being processed.


public class AuthenticationAspect<T>
{
public AuthenticationAspect(Action<T> action)
{
Handle = action;
}

public void Execute(T input)
{

if (identity.IsAuthenticated)
{
Handle(input);
}
else
throw new Exception("Unable to authenticate. You are not authorised to perform this operation");
}

private Action<T> Handle { get; set; }
}
}



and now our final Invoke method implementation:


{

authHandler.Execute(input);
}



If we ignore the fact that this example happens to show all these aspects wrapping a login pipeline (no I wouldn’t do this, pretend it’s something else!) then we can see that a lot of potentially complicated requirements can be fulfilled in a trivial manner all because we adopted explicit messaging. Now our pipeline requires that the user must be authenticated before we start processing on a background thread, in the context of a transaction, with error logging, message logging, and automatic retries.

Okay, I know I keep banging on about messages, pipes, and filters, etc. but let’s nail it down because it’s worth emphasising. The reason we are able to get so many benefits out of relatively trivial code is because we only have one parameter, the message being passed to the pipeline via the Execute method. In our normal OOP/procedural hybrid approach we tend to find lots of methods all requiring a different number of parameters which makes it very difficult to provide a consistent, uniform way of handling them. When you adopt the messaging programming model whether or not you use Pipe And Filters or MessageRouters or what have you, the fact that you are passing around just one single concept opens the door to numerous possibilities allowing some very interesting things to be done like the various aspects shown here.

Loose Ends

I’m going to conclude by showing one or two other techniques you can choose to adopt if you have a need. I don’t always use these techniques and there are no doubt numerous different ways of achieving the same results but they’re worth mentioning for reference.

Stop message processing without throwing an exception

The first is one of those questions I asked earlier, how do you stop the pipeline from continuing to process messages when you don’t want to raise an exception? Maybe you consider throwing an exception bad form unless it’s truly exceptional circumstances. Perhaps you just want to drop a message in a given scenario, in effect, making the operation idempotent. One way would be to declare an interface along the following lines:


public interface IStopProcessing
{
bool Stop { get; set; }
}



and modify the PipeLine implementation to only allow messages that implement it:


public class PipeLine<T> where T : IStopProcessing
{
private List<Action<T>> _actions = new List<Action<T>>();

public void Execute(T input)
{
_actions.ForEach(ac => { if(!input.Stop)
ac(input);
});
}

public PipeLine<T> Register(Action<T> action)
{
return this;
}
}



This extra constraint on the message means that at any point in the pipeline a filter can set the Stop property on the message to true and no other filters will ever be invoked from that point onward.

This example is the same as that from part one checking that a user has supplied a valid user name and/or password but this time we don’t throw exceptions:


public class CheckUserSuppliedCredentials
{
{
Process(input);
}

{
{
input.Stop = true;
}
}
}



The only other addition here is an “Errors” property on the message that you add to when stopping the pipeline so that the caller can interrogate it later.

Guarantee filter execution even when an exception is thrown

Sometimes when an exception is thrown we use the try..catch..finally construct or even just try..finally to ensure some resource or other is cleaned up. How can we do this so that some filters are guaranteed to run no matter what? Yet again, it’s just a variation of our standard pipeline implementation:


public class PipeLine<T>
{
private List<Action<T>> _actions = new List<Action<T>>();
private List<Action<T>> _finallyActions = new List<Action<T>>();

public void Execute(T input)
{
try
{
_actions.ForEach(ac => ac(input));
}
finally
{
_finallyActions.ForEach(ac => ac(input));
}
}

public PipeLine<T> Register(Action<T> action)
{
return this;
}

public PipeLine<T> RegisterFinally(Action<T> action)
{
return this;
}
}



Here we’ve added a second list that will hold the callbacks that must be executed no matter what. You just need to register those that should always run using the new RegisterFinally method and as you can see in the Execute method when the finally block is entered those “finally” filters will be executed one by one.

Debug the current state of the message as it passes through the pipeline

Looking at the message after it has been modified step by step (or filter by filter) is easy enough if so desired. Just create yourself a filter that will print out the current state of the message to the Console or where ever you like:


public class DebugFilter<T>
{
public DebugFilter(T input)
{
Console.WriteLine("Message State: {0}", input);
}
}



This filter is different to all the others we’ve seen yet. This one is a generic filter so that we can reuse it with any message. The only other requirement is that your message override its ToString method to return its values as a formatted string. Now we can insert our filter at various points within the chain when registering our pipeline filters:



.Register(msg => new CheckApiKeyIsEnabledForClient(msg))
.Register(msg => new ValidateAgainstMembershipApi(msg))
.Register(msg => new GetUserDetails(msg))



Now we get to see the state of the message at each point on its journey through the pipeline which can be a useful debugging aid though I wouldn’t necessarily insert one after each and every step – this is just for show.

Finally…

At last I’m done. To sum up, a lot of applications don’t need a domain model but care should still be taken to create a good object model. Unfortunately the majority of today’s applications don’t really have an explicit architecture or object model. The lack of a well-defined structure tends to make these applications hard to reason about, increases their complexity, and makes it hard to safely make changes.

Making a conscious decision to implement Messaging as a first-class concept in your application not only gives you the ability to easily implement some otherwise hard to achieve features but also provides a uniform and consistent structure that everyone on the team should be able to understand and work with. It’s easy to compose tasks or use cases by chaining together more fine grained responsibilities in the shape of filters, and as a consequence makes unit testing relatively easy too. It also reduces the proliferation of parameters that often have to be propagated down the layers each time some extra piece of data is required on existing methods.

The pipe and filter pattern has been around a long time but its use has often been limited to distributed messaging and message queues. By bringing the pattern into your normal in-process application code you open yourself up to learning to build your software in a more “functional” way whilst at the same time managing complexity and introducing some much needed structure. The best part is you don’t have to introduce any complex frameworks or dependencies to do it yet the benefits are real and tangible.

I’ve had a lot of success with this programming style in recent times and have applications in production using it. They are some of the easiest applications to work on, tend to be more reliable and, in my experience, less prone to introducing unexpected side effects due to their highly modular nature. It makes sense where there are a lot of procedural, sequential steps to invoke.

To re-iterate what I said at the beginning of part one, it’s not the only way to achieve modular code nor perhaps the best (no silver bullets, etc) but I do believe it produces a consistent, easy to read structure, something that is often lacking in modern enterprise applications. I also wouldn’t recommend building an entire application this way, but as I’ve said before, you can use it in tandem with normal OO techniques. Unfortunately in C# we have to use classes for the filters/steps of our pipeline. It would be nice if we could create functions without needing a class but I guess the answer to that would be to move to a functional language like F# which isn’t practical for most developers in the enterprise workplace.

All that said I think making concepts explicit in the form of messages has real merit and I’d encourage you to give it a go, play around with it and see if it’s something you could benefit from in your applications too.

Messaging as a programming model Part 1

Synopsis

This post – this first of two parts – is rather long, probably the longest one I’ve yet written. In short, what it emphasises is the benefits in terms of application architecture gained from adopting the use of explicit messaging in normal, in-process, non-distributed applications, and how it has enabled me to develop some interesting and cool solutions using techniques that have resulted in some of the cleanest, most extendable, testable, modular code I’ve ever written.

Background

I’ve been meaning to describe how I approach software development these days for a good while now but haven’t really known where to begin. I want to document, to anyone interested, the techniques I use to try and achieve good clean, modular code. It’s an approach that has reaped many rewards and I’ve yet to find a way that I like better. That’s not to say that what I’m going to describe is somehow unique or the best way, it’s just an approach that works really well in many situations and has enabled me to build solid, reliable systems. You may agree or disagree, like it or not and that’s fine, feel free to point out the flaws or tell me I’m an idiot and I should do “xyz” instead. I’m always on the lookout to improve so if you have any suggestions I’ll gladly take them on board.

OOP vs Procedural

We’re all used to the idea of the Object Oriented Programming paradigm but despite its ubiquity I’ve never felt like it was really as good as it was promised to be, mainly because it’s too easy to abuse. In OOP there is the idea that we don’t call a function on an object, we “send it a message” but that can sound a little abstract. What does it mean to send a message as opposed to calling a function as you would in a procedural system? My own interpretation of this is that in a procedural system you call a function and send in the structure to be acted upon, where as with OOP, the object owns the state, you just ask the object to do something with it on your behalf by sending it a message. This conforms to the “tell, don’t ask” principle. However, most OO applications are not written this way and the idea of “sending a message” is not really given due thought by most developers.

The Failure of OOP?

Even though object oriented languages are dominant today, most applications consist of objects that are nothing more than a bunch of properties which are then passed to some other class that has the behaviour that will act upon it. This feels very similar to the idea of functions in procedural programming. There’s nothing really wrong in doing this as long as you recognise what you are doing and you understand the trade offs. The usual problem with this approach is that often the structure of the application is not easy to see or reason about. It’s sometimes hard to identify responsibilities within the system or find out where you need to make changes. When you do find out you often discover large methods with many parameters. In short, it’s messy. When the time comes to “enhance” the application we often find ourselves adding more parameters to our methods and having to repeat this all the way down through the various layers in our application till we hit the data layer. This makes for tedious, repetitive, and error prone development but it is the reality for many enterprise developers every day.

This kind of programming model is not what OOP was invented for, and it makes it look like the paradigm has failed. The evidence is everywhere. How many times do we come across monolithic applications with tons of dependencies, many layers and poorly defined classes with too many responsibilities and poor cohesion. It seems to be the norm no matter how much information is “out there” telling us otherwise. We tend to look at code bases like this, realise intuitively that it’s wrong but still struggle to find a way to put it right that is clear, consistent, and maintainable for our future selves or those that will follow after us.

There is a way though that can give us the benefits we seek.

Messaging as a Programming Model

The “messaging as programming model” approach is the only one I’ve yet encountered where I can achieve clear, consistent code, time and time again with minimal effort. Not only clear and consistent but also modular, composable, easily testable, highly maintainable and extendable, and above all readable. Each invocation conforms to “tell, don’t ask” no matter whether performing a command or query. And yet it too, just like the messy procedural approach described earlier, takes a structure and asks an object to act upon it. The difference is in the uniformity and consistency of the approach, one that allows for some interesting possibilities.

Obviously, it’s not the be all and end all to writing good software but I believe it to be better than what I encounter routinely in application after application. I’m not saying OOP is “out” and messaging is “in”. The two are not mutually exclusive and clearly, as a C# developer I use an object oriented language. All I’m doing is embracing the idea of using messages explicitly in the context of normal OO applications. It’s certainly not typical but neither is it complex. All the normal OO techniques and rules can apply if required, it’s just the “where” and “how” they are applied that’s different.

Anyway, enough with the rambling. Let’s get going.

Pipes, Filters, Routers, WTF?

To cut to the chase, the main technique I’m using is a variation of the Pipe and Filter pattern as described in the Enterprise Integration Patterns book, a classic you should read if you’re at all interested in message patterns, which as you’re reading this I’ll assume you are.

Each filter in that pattern is loosely connected through an in queue and an out queue. The filters are not aware of each other they just know how to read a message from the inbound queue, do something with it and put the result on the outbound queue. In my variation of the pattern there are no queues, merely a list of filters executed one after the other that manipulate the message as it passes through, just as the diagram implies. Within a filter your typical object model can apply if necessary but in general this pipeline tends to turn the conventional idea of an object operating on its own state on its head. Here the message is the state and, unlike in most functional languages, it is also mutable. Despite this, the whole pattern lends itself well to concurrency as we’ll see later.

What is a Message?

So the pipeline is the mechanism for processing messages but what qualifies as a message? The answer is simply anything in your application that should be made an explicit concept. We’ve seen this idea of explicitness before but usually in the context of DDD where it’s recommended that you group together a bunch of related fields/parameters and turn them into a single class known as a Value object. This then gives you a place to put related methods that operate on those properties.

Messages are also explicit concepts. They too contain a bunch of properties grouped together but they are just DTOs and have no methods on them at all. They are buckets of data but represent something important in your application. One of the benefits of making things explicit like this is that it increases your ability to reason about the code when you come back to it some months later. For instance, the diagram shows a pipeline that allows a user to log into an application. It’s easy to see the steps involved in logging in and gives you a big picture view without needing to look at the details. The actual code for this conveys the exact same information and is as easy to read as the diagram itself so let’s take a look at it.

Here’s a simple message we can use when a request to log in arrives:


{
public string Pin { get; set; }
public string Username { get; set; }
public string Password { get; set; }
public bool IsAllowed { get; set; }
public Guid ApiKey { get; set; }
public ClientDetail ClientDetails { get; set; }
public UserDetail UserDetails { get; set; }
}



Let’s assume you have a user that’s invoking a login method on a web service. In order to log that user in there are a number of steps to be performed. Whilst this alone isn’t a hard problem to solve or on its own something likely to make your code hard to understand, it’s a good simple example to begin with. Remember my goal is to have clear, consistent code across ALL operations to provide a degree of uniformity and to keep the “surface area” of my immediate context small so I don’t get distracted looking at lots of surrounding code each perhaps doing things slightly differently and adding to the noise I have to deal with.

We register the steps we want to sequentially execute with the PipeLine through its Register method:



.Register(msg => new CheckApiKeyIsEnabledForClient(msg))
.Register(msg => new ValidateAgainstMembershipApi(msg))
.Register(msg => new GetUserDetails(msg));



As you can see, we have a class PipeLine which works with a message of T (we’ll get to the implementation of this in Part 2). The pipeline lets you register lambdas (anonymous functions) that will be called in the order of registration, each time passing in the message you supply. Every time a callback is invoked, an instance of the current filter is created and the message is manipulated in some way before being passed to the next filter.

Invoking the pipeline is as simple as creating a message and executing it:


var msg = new LogInMessage {
Pin = "1234",
ApiKey = Guid.Parse("a9aafe3f-76d3-4304-a39e-cf05edb870f2")
}



It’s easy enough to imagine the path taken when calling execute. You only have to look at the registration to get that big picture view. The details are irrelevant unless you need to make changes. Each filter is stateless and short-lived. By the time all the filters have run, the message will have been enriched or changed in some way leaving you with data you can return to the caller.

Inside Filters

Let’s look at the first filter, CheckUserSuppliedCredentials which is really as simple as this:


public class CheckUserSuppliedCredentials
{
{
Process(input);
}

{
throw new UnauthorizedException("Invalid credentials");

throw new UnauthorizedException("Invalid credentials");
}
}



The filter has one responsibility, in this case to ensure that the message contains a username and password, and that’s it, nothing more. If it doesn’t satisfy either of those conditions an exception is thrown. The next filter is a little more complex but only just. Here we are assuming that we have a table in a database somewhere that contains clients that can have their access turned on or off by an ApiKey supplied (in our case) in the request header which (not shown) was extracted and assigned to the message. This filter has a dependency on a data access object in order to retrieve the client’s details which for the sake of simplicity we just new up inline:


public class CheckApiKeyIsEnabledForClient
{
{
Process(input, new ClientDetailsDao());
}

private void Process(LogInMessage input, IClientDetailsDao dao)
{
var details = dao.GetDetailsFrom(input.ApiKey);

if(details == null)

input.ClientDetails = details;
}
}



Again, if the condition isn’t satisfied an exception is thrown and processing of the pipeline stops. If the condition is satisfied the message is enriched by having the ClientDetails property assigned the result of that call. This data is now available for use in subsequent filters.

Flexibilty

I’m not going to show every filter implementation here. Suffice to say they all follow the same basic pattern, each having a single responsibility and yet together forming a conceptual whole that is itself a single responsibility, in this case that of logging in. Before moving on though it’s worth mentioning the benefits already gained from this approach. Not only is it easy to reason about what this code will do just by looking at the registration steps, you can also change the order of the steps (within reason) if necessary just by swapping the order that the registration occurs. It’s also incredibly easy to modify when the boss comes along and says that actually wouldn’t it be great if we could easily determine who is using our system by capturing how often they login?

We could do that without any real disruption and with a great deal of confidence that we won’t break any existing code. All we need do is create a new filter:


public class PublishUserLoggedInEvent
{
{
Process(input, GetBus());
}

private void Process(LogInMessage input, IBus bus)
{
var @loggedInEvent = new UserLoggedInEvent {
Name = input.UserDetails.Name,
Client = input.ClientDetails.Organisation,
When = DateTime.Now
}

bus.Publish(@loggedInEvent);
}
}



and register it:



.Register(msg => new CheckApiKeyIsEnabledForClient(msg))
.Register(msg => new ValidateAgainstMembershipApi(msg))
.Register(msg => new GetUserDetails(msg))

// add new step to pipeline
.Register(msg => new PublishUserLoggedInEvent(msg));



Now when our message reaches the new filter then we must have passed through all the previous ones without incident. We can therefore assume the user is now successfully logged in and we can tell the world about it, in this case, by publishing an event over a bus whether that be NServiceBus, RabbitMQ or A.N.Other bus. Any subscribers will receive that message and do something with the data. For a simpler alternative you could just go straight to the database and record that the user logged in in a table. The beauty is that decisions taken now can be swapped out easily at some point in the future without worrying about breaking existing code.

Design-time vs Runtime PipeLines

So, we’ve covered modular, and to some degree we can see how a responsibility is composed of smaller more focused responsibilities but we are making these decisions at design-time. If we want to be a bit more dynamic we can easily build the pipeline at runtime. For instance, one way to do it is to add filters to a pipeline only if a condition is met. This could be done in some kind of fluent Builder or Factory object:



if(someconditionA)

if(somecondtionB)

if(somecondtionC)

// ...etc



Message Routers

Another way is to have one of the filters themselves extend the current pipeline if a particular condition is met:


var pipeline = new PipeLine<SomeMessage>();

pipeline.Register(msg => new Filter1(msg))
.Register(msg => new Filter2(msg))
.Register(msg => new Filter3(msg, pipeline));



What’s interesting about this example is that the pipeline itself is passed into Filter3. When that filter is executed the filter can extend the pipeline again based on some condition being met:


public class Filter3
{
public Filter3(SomeMessage input, PipeLine<SomeMessage> pipeline)
{
Console.WriteLine("filter3");

if(input.SomeCondition)
pipeline.Register(msg => new Filter4(msg));
else
pipeline.Register(msg => new Filter5(msg));
}
}



Now if each filter was to write its name out to the console, then depending on the value of SomeCondition when the message was executed, you would get one of the following two outputs:

filter1
filter2
filter3
filter4


or

filter1
filter2
filter3
filter5


So now not only do we have design-time composability but also the possibility of run-time composability. In effect filter3 is a MessageRouter, another pattern from the Enterprise Integration Patterns book. It’s job is to decide where the message should go next and this is what filter3 is doing based on a value on the message itself (Content-based Router). Again very nice, very flexible.

Testability

Hopefully at this point you can see that following this approach give us a lot of flexibility and yet at the same time is very easy to read and maintain. Not only that but you might also see that writing unit tests for this kind of code becomes nothing short of trivial. The example below shows the CheckUserSuppliedCredentials filter from the earlier example being tested for an empty Username on the message. Each filter can be tested like this, in complete isolation, with message state set up in various ways to exercise the paths through the filter. Alternatively you can easily test an entire pipeline which aligns itself nicely with the concept of testing use cases. Just create a message, execute it, then assert off the changed state of the message itself. I’ve found this to be a real boon. Compared to your typical, monolithic enterprise application which seems to get harder to test as the application grows, testing pipelines provides natural boundaries for your test fixtures and makes reading those tests easy as well:


[TestFixture]
public class CheckUserSuppliedCredentialsTestFixture
{
[Test]
{

var exception = Assert.Throws<UnauthorizedException>(() => new CheckUserSuppliedCredentials(msg));

Assert.That(exception.Message, Is.EqualTo("Invalid credentials"));
}
}



In my earlier examples I supplied any dependencies for filters such as the IClientDetailsDao inline by just newing up an implementation. Obviously this was for brevity but in practice you won’t want to do this if you’re writing tests or want the flexibility to “inject” different implementations. To get around this all you need do is put the interface on the filter’s constructor and supply the implementation in the pipeline registration instead like this:



.Register(msg => new CheckApiKeyIsEnabledForClient(msg, new ClientDetailsDao()))
.Register(msg => new ValidateAgainstMembershipApi(msg, new MembershipService()))
.Register(msg => new GetUserDetails(msg, new UserDetailsDao()));
.Register(msg => new PublishUserLoggedInEvent(msg, new GetBus()));



Now it’s trivial to supply “fake” versions when constructing the pipeline in a test fixture.

A Web Api Example

Before moving on, I thought I would round out the login example by showing a more complete view of it within the context of a Web API ApiController.

To keep things cohesive I tend to have Service classes that group related responsibilites which are then passed into the controller, so for example, in the case of a user there might be a number of operations they can perform. Let’s say that in our system a user can log in and they can also be presented with system alerts that give them timely information such as when any maintenance is going to happen, etc. In this example, we have a UserController that takes a UserService via an interface:

public class UserController : ApiController
{

public UserController(IUserService userService)
{
_userService= userService;
}

[HttpPost]
[RequestExceptionFilter]
{
_userService.Invoke(msg);
return Request.CreateResponse(HttpStatusCode.OK, msg.UserDetails);
}

[HttpGet]
[RequestExceptionFilter]
{
var msg = new AlertsMessage { UserId = userId };
_userService.Invoke(msg);
}
}


This is as complex as my controllers get though there are a couple of points I want to emphasise. For instance, the message that is created on line 15 is the same object that is used as part of the response on line 17. The message is modified as it passes through the various filters and therefore, unless an exception gets thrown, will contain new data that we can return to the caller, in the case of the LogInMessage some information about the user contained in the UserDetails property.

The only other thing to mention is that if an exception is thrown the RequestExceptionFilterAttribute kicks in and returns the appropriate response to the caller such as a 403 Forbidden.

Aesthetic Interfaces

Each controller takes an interface describing the messages that can be executed. I don’t know about you but I think there’s something nice about the following interface definition:

public interface IUserService
{
}


For one, it has only void methods, and two they have the same name differing only by the message type. It too conforms to the simple and readable philosophy that the whole approach seems to excel at.

The implementation of the interface is where the pipelines are created. In this example they are built in the constructor taking in the supplied dependencies:


public sealed class UserService : IUserService
{
public UserService(IClientDetailsDao clientDao, IMembershipService membershipService, IUserDetailsDal userDetailsDao, IBus bus)
{
.Register(msg => new CheckUserSuppliedCredentials(msg))
.Register(msg => new CheckApiKeyIsEnabledForClient(msg, clientDao))
.Register(msg => new ValidateAgainstMembershipApi(msg, membershipService))
.Register(msg => new GetUserDetails(msg, userDetailsDao));
.Register(msg => new PublishUserLoggedInEvent(msg, bus));

.Register(msg => new CheckApiKeyIsEnabledForClient(msg, clientDao))
}

{
}

{
}
}



and that’s it. Imagine that structure applied across many controllers and services and hopefully you can see the benefit of such a consistent and flexible approach. The Invoke methods are the simplest implementation possible for now. We’ll flesh these out in part 2 to enable some useful extra functionality.

I know I haven’t touched on the actual pipeline implementation itself yet, I’m going to leave that for part two where I’ll go over the opportunities it provides for cross cutting concerns, again in a flexible, easily maintainable manner. Before finishing Part 1 though, I want to show one other technique I find very useful in the context of desktop or console applications.

Batch Imports

If you’ve never had to write an ETL application you’ve never worked in enterprise development :)

In my current place of work there is a need to take excel spreadsheets (yes, even in 2013) from clients containing batch orders or parts and import them into the back office system. Unfortunately the contents and structure of these spreadsheets can be quite different depending on the type of import and the particular client involved. These spreadsheets often contain lots of errors and sometimes aren’t even filled in correctly, this being the nature of the enterprise world. Regardless, we have to have a way to deal with this and so we have developed a WinForms application that can take these spreadsheets, parse and validate them, and only when all the errors have been corrected, import the data they contain into the database. You’ll not be surprised to hear that the pipeline approach has proven useful here too.


.Register(msg => new ImportExcelDataSet(msg))
.Register(msg => new ValidateSheet(msg))
.Register(msg => new ValidateColumnNames(msg))
.Register(msg => new StripInvalidChars(msg))
.Register(msg => new CopyToDataTable(msg))
.Register(msg => new CreateExtraColumns(msg))
.Register(msg => new CheckStockItemCodeColumnFormatting(msg))
.Register(msg => new CheckForDuplicateStockItemCode(msg))
.Register(msg => new ValidateUniqueClient(msg))
.Register(msg => new ValidateRequiredFields(msg))
.Register(msg => new ValidateColumnDataTypes(msg))
.Register(msg => new ValidateContentOfFields(msg))
.Register(msg => new ValidateCategories(msg))

validate.Register(msg => new PreValidate(msg))
.Register(msg => new CheckForDuplicateStockItemCode(msg))
.Register(msg => new ValidateUniqueClient(msg))
.Register(msg => new ValidateRequiredFields(msg))
.Register(msg => new ValidateContentOfFields(msg))
.Register(msg => new ValidateCategories(msg))
.Register(msg => new PostValidate(msg));

submit.Register(msg => new PreBackOfficeSubmit(msg))
.Register(msg => new SubmitPartsToBackOffice(msg))
.Register(msg => new PostBackOfficeSubmit(msg));



Here we have three pipelines. Each one is invoked by a different button on the user interface. When the user browses to and selects a spreadsheet file on their system the load pipeline is executed passing in a message that contains the filename and various other properties. At the end of execution, a spreadsheet has been loaded and validated and the data is bound to a DataGrid on a form. The user can make changes if required and having done so, can ask for the data to be validated again by clicking a button which will execute the validate pipeline. In our case, we have some quite complicated rules about what combinations of data can co-exist in a row but using a validation pipeline simplifies the problem without creating a spaghetti mess. Notice how we are able to reuse the validation filters that are used in the load pipeline. Keeping filters small and focused enables such reuse. Once validation has succeeded, the user can press a button to invoke the submit pipeline which will begin importing the data into the back office system.

The Hollywood Principle

Whilst there’s nothing new on the surface here apart from having a pipeline for each “Task”, the really interesting story is how does the UI get notified of important events happening within the pipeline? For instance, how do we disable various buttons while validation or submit is underway? How do we update a progress bar on the screen to give the user feedback as to how far through the submit process they are?

Let’s look at a definition for the PartUploadMessage:


{
public string Filename { get; set; }
public DataSet Data { get; set; }
public DataTable Parts { get; set; }

public Action OnBeforeFileLoad { get; set; }
public Action OnAfterFileLoad { get; set; }
public Action OnBeforeValidation { get; set; };
public Action OnAfterValidation { get; set; };
public Action<int> OnStartPartsUpload { get; set; };
public Action<string> OnPartUploaded { get; set; };
public Action OnEndPartsUpload { get; set; };
}



You can see that the first three are the usual kind of mutable state properties. Nothing new there. However, the next set of properties are all callbacks that you can assign a method to, which the various filters will invoke at the appropriate point in the pipelines’ execution lifetime.

For example, the PreSpreadsheetLoad filter will invoke the OnBeforeFileLoad callback whilst the PostSpreadsheetLoad filter will invoke the OnAfterFileLoad callback in the same way. Similarly, the PreValidate and PostValidate filters invoke OnBeforeValidation and OnAfterValidation respectively. Finally, the PreBackOfficeSubmit filter informs the UI that we are about to submit parts by invoking the OnStartPartsUpload callback. It tells the UI how many parts we are going to be submitting by passing in the number of rows from the Parts table which is a property of the message. We could use this to set the Max property of a progress bar:


public class PreBackOfficeSubmit
{
{
}
}



The SubmitPartsToBackOffice filter loops through the rows inserting them one by one. Each time a row is inserted it invokes the OnPartUploaded callback with the part number of the current row. This callback can also be used to update the progress bar, incrementing the Value property by 1 each time it is invoked. When all parts are submitted the PostBackOfficeSubmit filter invokes the OnEndPartsUpload callback where the progress bar is reset to zero and the submit button is re-enabled.

Initializing the message is just a case of assigning the various values and callbacks:


var msg = new PartUploadMessage {

}



Anyone who’s done any desktop development will be aware that running a potentially long operation on the UI thread can cause the UI to block and stop being responsive which makes for a poor user experience. Luckily, the pipeline approach easily lends itself to asynchronous execution which I’ll go over in Part 2. For now, let’s assume our pipelines are being executed on a background thread but when the time comes to update the UI we need to be back on the main thread or we’ll run into the dreaded “Cross-thread operation not valid” exception message or such like. The RunOnMainThread method in the example is a simple extension method on the Control class which marshals our data back to the main thread so everything works as expected when we try to update control values. For completeness and for those curious, that method looks like this:

public static void RunOnMainThread<TControl>(this TControl control, Action action, params object[] args) where TControl : Control
{
if (control.InvokeRequired)
control.Invoke(action, args);
else
action();
}


Now imagine this spreadsheet import process has to be done for 20 different spreadsheets, all with different structures, data, and validation requirements. The Pipe and Filter approach has allowed our application to accommodate these needs without growing too complex. Anyone looking at the source-code can immediately see the structure of the application, and once they understand how one import pipeline works they understand them all.

One last thing before winding up, using callbacks on the message allows you to get information from the user before allowing the pipeline to continue. Imagine a point of sale system using this technique. The cashier clicks a button that perhaps begins a discount operation. As the pipeline is executing one of the filters may require input from the cashier perhaps asking for some information that determines how much of a discount the customer is entitled to. When the filter executes it can invoke a callback in the UI that takes the message itself, perhaps showing a message box asking the cashier a question. The value the cashier enters is then assigned to the passed in message before dismissing the dialogue and allowing the pipeline to continue executing with its newly acquired information. The same thing can be done if the operation needs admin permissions. Stop the pipeline by asking for a password or pin and only allow execution to resume if the input is valid. I have used this technique in the past and it works very well.

Moving Towards Autonomous Components and Services

I truly believe that written this way an application is far easier to understand and maintain than the traditional layered approach which in my opinion starts to break down and become a burden once it grows above a certain size, especially as developers come and go each doing things slightly differently. This approach provides a kind of template or blueprint that helps keep code consistent and maintainable with real tangible benefits. It also lends itself well to the idea of Component Based development where each set of related pipelines could be built into their own assemblies or modules containing a vertical slice of the traditional layered approach and installed as Windows Services communicating by commands and events. By adopting explicit messages in your in-process code it becomes almost trivial, should the need arise, to serialize and send those messages across the wire or save them to a database. At this point we are very close to autonomous business components in a SOA based micro-services architecture.

Okay, I think that’s enough to digest for now as we’ve covered quite a lot. In the next post I’ll examine the actual pipeline implementation and how easy it is to extend or enhance to perform some other interesting possibilities, things we usually describe as cross-cutting concerns.

In the meantime, if you have any feedback let me know. I’d be interested to hear what you think.