Messaging as a programming model – Revisited

Well I know it hasn’t been long but such was the feedback I received from my Messaging as a programming model blog posts that I thought I would quickly follow up with some revisions and extra background information a) to clarify one or two decisions and b) to incorporate one or two additions or modifications so here goes.

Why are you doing it that way?

Why are you doing processing in the constructor and, while we’re at it, why the lambda in the Register method too?

A few comments brought this to light and really made me stop and think about it. Why did I choose to do it that way? As I sat and thought about it I realised that I hadn’t actively chosen it at as such, I had basically arrived at the first solution and failed to keep on iteratively questioning my own decisions. Sometimes you’re so close to what you’re doing that you forget to step back and look at things objectively. Having a code review performed by 18,000 eyes though soon highlights your deficiencies and I’m thankful for that. I was basically treating the constructor as just a procedure call and whilst it works it’s not good. Taking on board various suggestions (thanks everybody but particularly to Christian Palmstierna and Ralf Westerphal) I went back and refactored it to the better solution (that could no doubt be improved further still) which I present here.

First of all, create an interface that all filters will implement:

public interface IFilter<T>
{
    void Execute(T msg);
}

Change the PipeLine class to store a list of IFilter instances instead of Action of T:

public class PipeLine<T>
{
    private readonly List<IFilter<T>> _filters = new List<IFilter<T>>();

    public PipeLine<T> Register(IFilter<T> filter)
    {
        _filters.Add(filter);
        return this;
    }

    public void Execute(T input)
    {
        _filters.ForEach(f => f.Execute(input));
    }
}

Now the registration of the filters looks like this:

var loginPipeline = new PipeLine<LogInMessage>();

loginPipeline.Register(new CheckUserSuppliedCredentials())
             .Register(new CheckApiKeyIsEnabledForClient(new ClientDetailsDao()))
             .Register(new IsUserLoginAllowed())
             .Register(new ValidateAgainstMembershipApi(new MembershipService()))
             .Register(new GetUserDetails(new UserDetailsDao()));
             .Register(new PublishUserLoggedInEvent(new GetBus()));

The filters themselves implement the interface as the following refactored example from the previous post shows:


public class CheckUserSuppliedCredentials : IFilter<LogInMessage>
{
    public void Excute(LogInMessage input)
    {
        if(string.IsNullOrEmpty(input.Username) || 
                  string.IsNullOrEmpty(input.Password))
        {
            input.Stop = true;
            input.Errors.Add("Invalid credentials");
        }
    }
}

The end result is that the processing is now done in the Execute method instead of the constructor and registration of the filters no longer uses a lambda expression. This not only looks cleaner but is more efficient too.

PipeLine Composition

I had to kick myself for not even thinking of this one. My thanks to again to Ralf Westphal (who has been developing a technique called Flow Design for the last few years) for highlighting it. The question was, Why did I have such a long pipeline for the load file example?

As a reminder this is what it looked like:


load = new Pipeline<PartUploadMessage>();
load.Register(msg => new PreSpreadsheetLoad(msg))
    .Register(msg => new ImportExcelDataSet(msg))
    .Register(msg => new ValidateSheet(msg))
    .Register(msg => new ValidateColumnNames(msg))
    .Register(msg => new StripInvalidChars(msg))
    .Register(msg => new CopyToDataTable(msg))
    .Register(msg => new CreateExtraColumns(msg))
    .Register(msg => new CheckStockItemCodeColumnFormatting(msg))
    .Register(msg => new CheckForDuplicateStockItemCode(msg))
    .Register(msg => new ValidateUniqueClient(msg))
    .Register(msg => new ValidateRequiredFields(msg))
    .Register(msg => new ValidateColumnDataTypes(msg))
    .Register(msg => new ValidateContentOfFields(msg))
    .Register(msg => new ValidateCategories(msg))
    .Register(msg => new PostSpreadsheetLoad(msg));

validate = new Pipeline<PartUploadMessage>();
validate.Register(msg => new PreValidate(msg))
    .Register(msg => new CheckForDuplicateStockItemCode(msg))
    .Register(msg => new ValidateUniqueClient(msg))
    .Register(msg => new ValidateRequiredFields(msg))
    .Register(msg => new ValidateContentOfFields(msg))
    .Register(msg => new ValidateCategories(msg))
    .Register(msg => new PostValidate(msg));

The load pipeline instance reuses validation filters that are registered in the validation pipeline instance but why do that? Why not instead remove them from the load pipeline instance and then compose a new pipeline made up of the two? What a great idea, so simple, and yet I didn’t ever consider it. I am indeed an idiot.

The best part is no new code is required just a change in how the pipelines are registered:


load = new Pipeline<PartUploadMessage>();
load.Register(msg => new PreSpreadsheetLoad(msg))
    .Register(msg => new ImportExcelDataSet(msg))
    .Register(msg => new StripInvalidChars(msg))
    .Register(msg => new CopyToDataTable(msg))
    .Register(msg => new CreateExtraColumns(msg))
    .Register(msg => new PostSpreadsheetLoad(msg));

validate = new Pipeline<PartUploadMessage>();
validate.Register(msg => new PreValidate(msg))
    .Register(msg => new ValidateSheet(msg))
    .Register(msg => new ValidateColumnNames(msg))
    .Register(msg => new CheckForDuplicateStockItemCode(msg))
    .Register(msg => new ValidateUniqueClient(msg))
    .Register(msg => new ValidateRequiredFields(msg))
    .Register(msg => new ValidateContentOfFields(msg))
    .Register(msg => new ValidateCategories(msg))
    .Register(msg => new PostValidate(msg));

Thinking about it, in the original implementation the loadPipeline instance had more than one responsibility. It both loaded and validated, but by employing pipeline composition we can now create a new pipeline composed of the other two to achieve the same result:


import = new Pipeline<PartUploadMessage>();
import.Register(load.Execute)
    .Register(validation.Execute);

Now the new import pipeline instance executes both the load and validation pipelines sequentially. That’s pretty cool!

Note that if we swap from the original pipeline implementation to the new IFilter implementation then the pipeline itself needs the interface to allow composition because the Register method now takes an IFilter of T instead of Action of T:

public class PipeLine<T> : IFilter<T>
{
    private readonly List<IFilter<T>> _filters = new List<IFilter<T>>();

    public PipeLine<T> Register(IFilter<T> filter)
    {
        _filters.Add(filter);
        return this;
    }

    public void Execute(T input)
    {
        _filters.ForEach(f => f.Execute(input));
    }
}

Our composition now looks like this:


import = new Pipeline<PartUploadMessage>();
import.Register(load)
    .Register(validation);

which is even cleaner. Nice.

But what about immutability?

This I’m not concerned about so much. C# is not a functional language (despite the addition of LINQ and its functional abilities) and therefore, unlike F#, does not support immutability natively. Attempting to do this I feel would be too expensive and not really worth the effort or the extra complexity I feel it would add. The overriding goal in my mind was simply to adopt the idea of an explicit message so one could easily reason about the path taken through the system not to try and bend the language to meet some theoretical purity. In C# it’s not even a trade-off, it just doesn’t make sense.

Different Strokes

Clearly there are a number of ways the implementation could have been different. Going through this process of questioning myself and allowing others to question me has confirmed that there are of course many ways to skin a cat. As I alluded to early on, I don’t think OOP is sufficient on its own as you only have to look at the codebases produced every day by enterprise teams all over. The trouble is despite me saying that, I still struggle to break free from the object oriented shackles myself because they’ve been rammed into my head for the last twenty years. It’s a hard habit to break but one thing that gives me inspiration is this talk by Bret Victor on the future of programming which is given as if the year is 1973 and looks ahead to what programming might look like in 40 years time i.e today! At the end he talks about being open to new ways of thinking and that’s my goal here. I’m trying to explore other ways of tackling problems I face at work every day not just churning out the same old service layer, repository, dto approach that frankly I think just fails miserably.

Object Orientation isn’t bad per se but I think it becomes cumbersome when used at a “macro” level. In other words, I think projects, applications, codebases, whatever you want to call them are too large. If we broke them down into smaller, more focused components we’d have a better chance of building systems that last and that are easily maintainable and extendable. Smaller components though have to communicate and that’s where messaging comes in again. I’m very interested in the Micro-services architecture as I think it has a lot of benefits. If you’re interested there’s a great talk by Fred George worth watching on this very topic.

Messaging is a different paradigm to object orientation in a number of ways so you do have to think a little differently. I have to remember to tell myself that an application written this way is not an object oriented program in the traditional sense. Just like we know that design patterns have different forces and consequences within OOP, I think the same applies to different programming paradigms. Sticking to the object oriented paradigm seems so natural why would you ever consider anything different? And yet I think messaging is different because it more closely adheres to the “tell, don’t ask” principle i.e. one-way communication. The cornerstones of object orientation are still useful of course, but messaging adds something extra with different trade-offs and benefits.

Flow Based Programming

It appears that there is a movement underway to bring something called Flow Based Programming (FBP) to the masses through a KickStarter project called NoFlo which looks very exciting. Until this last week I had never heard of either FBP or NoFlo but at its core the key concept appears to be about message passing between asynchronous components. There is apparently a major Canadian bank who have had a system in production for the best part of 40 years using FBP which truly goes to show that there are no new ideas in software. Clearly there is value in adopting a messaging approach and I think it’s another technique worth adding to your toolbox.

Finally…(again!)

Thanks to everyone that got in touch for all the feedback, especially to Ralf for the encouragement by email who has since written a number of posts in response to these articles which I link to here:

Messaging as a programming model – let’s get real.
Flows – Visualizing the Messaging Programming Model
Messaging for More Decoupling

Advertisements
Messaging as a programming model – Revisited

Messaging as a programming model Part 2

This topic has generated a lot of interest, more than I could have imagined beforehand. Thanks to everyone for all the feedback, good, bad and indifferent, it’s much appreciated.

This post is part two of two on using messaging techniques within your everyday in-process applications. You can find the first part here.

A Quick Recap

All object oriented applications have an object model of sorts though from my experience it’s usually an accidental model and not one “designed”. Not many applications require a domain model but of those that do they have Domain Driven Design, and more recently, CQRS and Event Sourcing as tools, techniques, and methodologies to help developers discover and build useful models of the business domain in which they work. Object models tend to have nothing but the bog standard four tenets of object orientation to fall back on, usually in the context of CRUD applications. Whilst in theory, this should be enough, these types of applications are typically large and messy from the get go and steadily grow worse over time. It would be nice if there were a few more techniques that developers could reach for to help them produce better object models when working on these projects. Messaging is one such technique.

Messages are explicit concepts in code that can be acted upon in many ways enabling some useful functionality that ordinarily can be quite difficult to implement especially in those areas that have to span the entire application. These are known as cross cutting concerns and include things such as logging and transaction handling, or retry logic around certain operations. The naive approach sees developers littering their code with Log(“whatever”) statements in their methods and repeating it all over. There is though a better way once we’ve adopted the messaging mindset using Aspect Oriented Programming. As we’re using the Pipe and Filters pattern we’re in a good position to tackle those concerns but first I’m going to dig deeper into the PipeLine class mentioned in Part one and later flesh out a few simple alternatives.

The PipeLine

At last we come to the implementation of the PipeLine class. I know you’ve been wondering how you’ll cope with the sheer complexity of it, I bet you’ve even had a few sleepless nights waiting for this moment. Well here it is in all it’s glory:


public class PipeLine<T>
{
    private readonly List<Action<T>> _actions = new List<Action<T>>();

    public void Execute(T input)
    {
        _actions.ForEach(ac => ac(input));
    }

    public PipeLine<T> Register(Action<T> action)
    {
        _actions.Add(action);
        return this;
    }
}

Wow, complicated right? er…no, actually. It’s probably as simple as one could hope for.

So, we have a private list of Action of T with T being the type of Message that you declare when creating an instance of the PipeLine.


var pipeline = new PipeLine<LogInMessage>();

The Register method simply adds the callback you supply to the internal list and then returns itself through the this reference allowing the pipeline to be built by chaining the Register methods together:


var loginPipeline = new PipeLine<LogInMessage>();

loginPipeline.Register(msg => new CheckUserSuppliedCredentials(msg))
             .Register(msg => new CheckApiKeyIsEnabledForClient(msg))
             .Register(msg => new IsUserLoginAllowed(msg))
             .Register(msg => new ValidateAgainstMembershipApi(msg))
             .Register(msg => new GetUserDetails(msg));

Each callback of course is now constrained, in this example, to work only with LogInMessage so it stands to reason that with this pipeline instance we can only use filters that take a LogInMessage parameter.

Having built the pipeline we call the Execute method to have them invoked one by one. As you can see we simply iterate over the list with the Linq ForEach method passing the supplied message (input) to each callback:


public void Execute(T input)
{
    _actions.ForEach(ac => ac(input));
}

But Wait..There’s More

Whilst that’s all you need at the most basic level there are other questions that arise. How can we stop the pipeline from processing without having to throw an exception? What do we do if an exception does occur in one of the filters, how do we handle it? How can we make our pipeline execute asynchronously or make it execute in the context of a transaction? Because we’ve implemented the Pipe and Filter pattern all these questions can be answered easily by implementing some simple Aspects that we can wrap around our pipeline and we don’t need any fancy frameworks to do it.

AOP

Wikipedia supplies the following definition for Aspect Oriented Programming:


"aspect-oriented programming (aop) is a programming paradigm that
aims to increase modularity by allowing the separation of 
cross-cutting concerns"

It’s amazing how often we come across code that mixes the responsibility of the object at hand with other concerns such as logging. It makes code harder to reason about, methods larger, and more likely to break when changes are made. Extracting out these behaviours can be a difficult task when code is poorly structured and more often than not no effort is made to extract them. Some inversion of control libraries such as Castle Windsor have support for AOP built in using reflection based libraries like Dynamic Proxy but with our pipeline we don’t need any of that. In fact, again like everything we’ve seen so far, it’s almost trivial to implement ourselves. So let’s start answering some of those questions and a few more besides.

Exception Logging

If we wanted to, we could simply add a try..catch block around the ForEach iteration in the PipeLine implementation above and bingo, exception handling implemented. Job done, right? Not quite. We don’t always want to handle exceptions the same way. Sometimes we may want to only log an exception other times we may want to log it and rollback a transaction. Maybe we want to retry a set number of times and then only log it if the we fail after all the retries have happened. Sounds like a difficult undertaking right? Not at all thanks to AOP.

Here’s an aspect we can use to log exceptions for us:


public class ExceptionLoggingAspect<T>
{
    public ExceptionLoggingAspect(Action<T> action)
    {
        Handle = action;
    }
	
    public void Execute(T input)
    {
        try
        {
            Handle(input);
        }
        catch (Exception ex)
        {
            If(Log.IsErrorEnabled)
                Log.Error("*** ERROR ***: {0}", ex);
        }
    }
	
    private Action<T> Handle { get; set; }

    private static readonly ILog Log = LogManager.GetLogger("MyLog");
}

Before explaining how this works let’s see it in use first. In part one I showed the Invoke method of a UserService class and said that I would flesh it out later. Well now we can do this using our aspect:


public void Invoke(LogInMessage input)
{
    var errorHandler = new ExceptionLoggingAspect<LogInMessage>(_loginPipeline.Execute);
    errorHandler.Execute(input);
}

Here we’ve wrapped the Execute method of our pipeline by passing it into the constructor of an ExceptionLoggingAspect which immediately assigns it to the private Handle property. Notice that the constructor and Handle property expect an Action of T, in other words a callback that uses our Message type, whatever that may be. When we invoke Execute on the aspect it in turn invokes the method assigned to the Handle property, again passing through our message, and does so wrapped inside a try..catch block. Should an exception occur now inside one of our filters the aspect will catch the exception and let us log it. In the example shown I’m assuming a dependency on Log4Net but it can be whatever you like.

The upshot of this approach means that now we can decide if we want an exception to be caught and logged by using composition rather than having it forced on us as it would have been if we had put it in the plain vanilla PipeLine implementation.

General Logging

Notice that the ExceptionLoggingAspect only logs errors according to the logging level as defined in our Log4Net configuration. It does so by testing the IsErrorEnabled property. What if we want to be able to turn on logging so that we get to write out the message name as and when the user executes various pipelines within our application? Maybe we want to see how the user is using the application by looking at what messages they execute. For that we just need to define an ordinary message logging aspect:


public class MessageLoggingAspect<T>
{
    public MessageLoggingAspect(Action<T> action)
    {
        Handle = action;
    }
	
    public void Execute(T input)
    {
        if(Log.IsDebugEnabled)
            Log.Debug("Message Received: {0}", input.GetType().Name); 

        Handle(input);               
    }
	
    private Action<T> Handle { get; set; }

    private static readonly ILog Log = LogManager.GetLogger("MyLog");
}

and use it like so:


public void Invoke(LogInMessage input)
{ 
    var logHandler = new MessageLoggingAspect<LogInMessage>(_loginPipeline.Execute);
    var errorHandler = new ExceptionLoggingAspect<LogInMessage>(logHandler.Execute);
    errorHandler.Execute(input);
}

Each time we compose the aspects we end up creating a chain of method calls as each method we pass in will be assigned to the current aspect’s Handle property. So now calling Execute on the errorHandler invokes Execute on messageLogger which finally invokes Execute on the pipeline itself. As long as the DEBUG logging level is enabled in our Log4Net configuration file then each time we call the Invoke method, our message’s class name will be written to the log file. Should an error occur we will also capture the exception in the log file. If we were to keep the log level at ERROR in our Log4Net config file then our MessageLoggingAspect would still be invoked, it just wouldn’t write anything to our file because we’ve told Log4Net to only log errors.

Okay let’s keep building this up to handle other cross cutting concerns

Automatic Retries

One of my favourite frameworks NServiceBus has a nice retry feature that kicks in whenever an exception is thrown whilst it is processing a message on a queue. It will, depending upon how you configure it, attempt to process the message over and over until a set number of retries have been attempted. Wouldn’t it be cool if we could do that too?


public class RetryAspect<T>
{
    public RetryAspect(Action<T> action)
    {
        Handle = action;		

        // get these from config file
        _maxRetries = 3; 
        _slideTime = 5000;
    }
	
    public void Execute(T input)
    {
        try
        {
            Handle(input);
        }
        catch(Exception)
        {
            _currentRetry++;
            if(_currentRetry <= _maxRetries)
            {				
                 Thread.Sleep(_slideTime * _currentRetry);				
                 Execute(input);
            } else {
                throw;		
            }
        }
    }

    private Action<T> Handle { get; set; }	

    private int _maxRetries;
    private int _currentRetry = 0;
    private int _slideTime;
}

At last, some code with a bit of meat on it. Here we have an aspect that will retry an operation if an exception is thrown by recursively calling the Execute method on itself until the maximum number of attempts has been reached. Notice that the retry operation waits a little bit longer each time a retry is needed to give any infrastructure problems such as a momentary drop out on the network a chance to recover. In this case, the first retry waits for 5 seconds, the second for 10 seconds, and the third 15 seconds before finally raising an exception. I’ve hard coded the values here but ordinarily you’d read these from a config file to allow changes without recompiling.

Notice too that I’m using Thread.Sleep here. This is a definitely more controversial as in general there shouldn’t ever really be a reason to use this but I’m lazy! There are issues around the various .Net Timer classes such as callbacks being executed on a different thread or exceptions being swallowed. As the whole point of this aspect is to retry on the same thread when an exception is thrown I chose to implement it this way for now. Most of the time I use the retry aspect in conjunction with an asynchronous aspect anyway so sleeping a ThreadPool thread shouldn’t really be a problem. Feel free to argue or provide your own implementation though.

Let’s build up the Invoke method again:


public void Invoke(LogInMessage input)
{ 
    var retryHandler = new RetryAspect<LogInMessage>(_loginPipeline.Execute);
    var logHandler = new MessageLoggingAspect<LogInMessage>(retryHandler.Execute);
    var errorHandler = new ExceptionLoggingAspect<LogInMessage>(logHandler.Execute);
    errorHandler.Execute(input);
} 

Now we get automatic retries for little effort along with message logging and exception logging. I’ve used this technique when integrating with a certain UK based delivery courier who’s web service is to put it politely, sometimes unreliable. However, a few seconds after a failed call I usually get success. This technique comes in very handy for that kind of scenario.

Transactions

I think you might be getting the idea by now so I’ll just show the code for transaction handling:


public class TransactionAspect<T>
{
    public TransactionAspect(Action<T> action)
    {
        Handle = action;
    }

    public void Execute(T input)
    {
        using(var scope = new TransactionScope())
        {
            Handle(input);
            scope.Complete();
        }		
    }
	
    private Action<T> Handle { get; set; }
}


public void Invoke(LogInMessage input)
{ 
    var retryHandler = new RetryAspect<LogInMessage>(_loginPipeline.Execute);
    var logHandler = new MessageLoggingAspect<LogInMessage>(retryHandler.Execute);
    var tranHandler = new TransacionAspect<LogInMessage>(logHandler.Execute);
    var errorHandler = new ExceptionLoggingAspect<LogInMessage>(tranHandler.Execute);
    errorHandler.Execute(input);
} 

As expected, it’s very similar to all the previous aspects. The only thing to take note of here is that I inserted the tranHandler instance between the logHandler and errorHandler. This is to ensure that should an exception occur the transaction won’t commit. If I had it ordered like so:


public void Invoke(LogInMessage input)
{ 
    var retryHandler = new RetryAspect<LogInMessage>(_loginPipeline.Execute);
    var logHandler = new MessageLoggingAspect<LogInMessage>(retryHandler.Execute);
    var errorHandler = new ExceptionLoggingAspect<LogInMessage>(logHandler.Execute);
    var tranHandler = new TransacionAspect<LogInMessage>(errorHandler.Execute);
    tranHandler.Execute(input);
} 

then a raised exception would be logged by the ExceptionLoggingAspect but as it is not re-thrown it is therefore handled and so the transaction would commit which is not the behaviour we want. Sometimes the order of our aspects can be important other times it makes no difference.

Asynchronous Execution

Sometimes we want our pipeline to execute on a background thread. You may remember the example I gave for a WinForms batch import application in part one. Running long operations on the UI thread is expensive and blocks the UI from responding to a user’s input. Once again though providing an aspect to wrap our pipeline solves the problem easily:


public class AsyncAspect<T>
{
    public AsyncAspect(Action<T> action)
    {
        Handle = action;		
    }

    public void Execute(T input)
    {
        ThreadPool.QueueUserWorkItem(i => Handle(input));
    }

    private Action<T> Handle { get; set; }
}

and here is our Invoke method:


public void Invoke(LogInMessage input)
{ 
    var retryHandler = new RetryAspect<LogInMessage>(_loginPipeline.Execute);
    var logHandler = new MessageLoggingAspect<LogInMessage>(retryHandler.Execute);
    var tranHandler = new TransacionAspect<LogInMessage>(logHandler.Execute);
    var errorHandler = new ExceptionLoggingAspect<LogInMessage>(tranHandler.Execute);    
    var asyncHandler = new AsyncAspect<LogInMessage>(errorHandler.Execute);
    
    asyncHandler.Execute(input);
} 

Whilst the message itself is mutable, it is not shared. Each thread can safely modify its own message without affecting any others but that doesn’t stop you from accessing some shared resource in one of your filters so you still need to be careful about how you write multi-threaded code.

Authorization

Authorization is another one of those pesky cross cutting concerns yet trivial when using aspects. This one checks the current principal assigned to a thread which you may have done during a log in operation. If the returned GenericIdentity (or WindowsIdentity) has not been authenticated then we simply throw an exception and stop the pipeline from being processed.


public class AuthenticationAspect<T>
{        
    public AuthenticationAspect(Action<T> action)
    {
        Handle = action;
    }

    public void Execute(T input)
    {
        var identity = Thread.CurrentPrincipal.Identity;

        if (identity.IsAuthenticated)
        {
            Handle(input);
        }
        else
            throw new Exception("Unable to authenticate. You are not authorised to perform this operation");
        }

        private Action<T> Handle { get; set; }  
    }
}

and now our final Invoke method implementation:


public void Invoke(LogInMessage input)
{ 
    var retryHandler = new RetryAspect<LogInMessage>(_loginPipeline.Execute);
    var logHandler = new MessageLoggingAspect<LogInMessage>(retryHandler.Execute);
    var tranHandler = new TransacionAspect<LogInMessage>(logHandler.Execute);
    var errorHandler = new ExceptionLoggingAspect<LogInMessage>(tranHandler.Execute);    
    var asyncHandler = new AsyncAspect<LogInMessage>(errorHandler.Execute);
    var authHandler = new AuthenticationAspect<LogInMessage>(asyncHandler.Execute);

    authHandler.Execute(input);
} 

If we ignore the fact that this example happens to show all these aspects wrapping a login pipeline (no I wouldn’t do this, pretend it’s something else!) then we can see that a lot of potentially complicated requirements can be fulfilled in a trivial manner all because we adopted explicit messaging. Now our pipeline requires that the user must be authenticated before we start processing on a background thread, in the context of a transaction, with error logging, message logging, and automatic retries.

Okay, I know I keep banging on about messages, pipes, and filters, etc. but let’s nail it down because it’s worth emphasising. The reason we are able to get so many benefits out of relatively trivial code is because we only have one parameter, the message being passed to the pipeline via the Execute method. In our normal OOP/procedural hybrid approach we tend to find lots of methods all requiring a different number of parameters which makes it very difficult to provide a consistent, uniform way of handling them. When you adopt the messaging programming model whether or not you use Pipe And Filters or MessageRouters or what have you, the fact that you are passing around just one single concept opens the door to numerous possibilities allowing some very interesting things to be done like the various aspects shown here.

Loose Ends

I’m going to conclude by showing one or two other techniques you can choose to adopt if you have a need. I don’t always use these techniques and there are no doubt numerous different ways of achieving the same results but they’re worth mentioning for reference.

Stop message processing without throwing an exception

The first is one of those questions I asked earlier, how do you stop the pipeline from continuing to process messages when you don’t want to raise an exception? Maybe you consider throwing an exception bad form unless it’s truly exceptional circumstances. Perhaps you just want to drop a message in a given scenario, in effect, making the operation idempotent. One way would be to declare an interface along the following lines:


public interface IStopProcessing
{
    bool Stop { get; set; }
}

and modify the PipeLine implementation to only allow messages that implement it:


public class PipeLine<T> where T : IStopProcessing
{
    private List<Action<T>> _actions = new List<Action<T>>();

    public void Execute(T input)
    {
        _actions.ForEach(ac => { if(!input.Stop)
                                     ac(input);
                               });
    }

    public PipeLine<T> Register(Action<T> action)
    {
        _actions.Add(action);
        return this;
    }
}

This extra constraint on the message means that at any point in the pipeline a filter can set the Stop property on the message to true and no other filters will ever be invoked from that point onward.

This example is the same as that from part one checking that a user has supplied a valid user name and/or password but this time we don’t throw exceptions:


public class CheckUserSuppliedCredentials
{
    public CheckUserSuppliedCredentials(LogInMessage input)
    {
        Process(input);
    }

    private void Process(LogInMessage input)
    {
        if(string.IsNullOrEmpty(input.Username) || 
                  string.IsNullOrEmpty(input.Password))
        {
            input.Stop = true;
            input.Errors.Add("Invalid credentials");
        }
    }
}

The only other addition here is an “Errors” property on the message that you add to when stopping the pipeline so that the caller can interrogate it later.

Guarantee filter execution even when an exception is thrown

Sometimes when an exception is thrown we use the try..catch..finally construct or even just try..finally to ensure some resource or other is cleaned up. How can we do this so that some filters are guaranteed to run no matter what? Yet again, it’s just a variation of our standard pipeline implementation:


public class PipeLine<T>
{
    private List<Action<T>> _actions = new List<Action<T>>();
    private List<Action<T>> _finallyActions = new List<Action<T>>();

    public void Execute(T input)
    {
        try
        {
            _actions.ForEach(ac => ac(input));
        }
        finally
        {
            _finallyActions.ForEach(ac => ac(input));
        }
    }

    public PipeLine<T> Register(Action<T> action)
    {
        _actions.Add(action);
        return this;
    }

    public PipeLine<T> RegisterFinally(Action<T> action)
    {
        _finallyActions.Add(action);
        return this;
    }
}

Here we’ve added a second list that will hold the callbacks that must be executed no matter what. You just need to register those that should always run using the new RegisterFinally method and as you can see in the Execute method when the finally block is entered those “finally” filters will be executed one by one.

Debug the current state of the message as it passes through the pipeline

Looking at the message after it has been modified step by step (or filter by filter) is easy enough if so desired. Just create yourself a filter that will print out the current state of the message to the Console or where ever you like:


public class DebugFilter<T>
{
    public DebugFilter(T input)
    {
        Console.WriteLine("Message State: {0}", input);
    }
}

This filter is different to all the others we’ve seen yet. This one is a generic filter so that we can reuse it with any message. The only other requirement is that your message override its ToString method to return its values as a formatted string. Now we can insert our filter at various points within the chain when registering our pipeline filters:


var loginPipeline = new PipeLine<LogInMessage>();

loginPipeline.Register(msg => new CheckUserSuppliedCredentials(msg))
             .Register(msg => new DebugFilter<LogInMessage>(msg))             
             .Register(msg => new CheckApiKeyIsEnabledForClient(msg))
             .Register(msg => new DebugFilter<LogInMessage>(msg))
             .Register(msg => new IsUserLoginAllowed(msg))
             .Register(msg => new DebugFilter<LogInMessage>(msg))             
             .Register(msg => new ValidateAgainstMembershipApi(msg))
             .Register(msg => new DebugFilter<LogInMessage>(msg))
             .Register(msg => new GetUserDetails(msg))
             .Register(msg => new DebugFilter<LogInMessage>(msg));

Now we get to see the state of the message at each point on its journey through the pipeline which can be a useful debugging aid though I wouldn’t necessarily insert one after each and every step – this is just for show.

Finally…

At last I’m done. To sum up, a lot of applications don’t need a domain model but care should still be taken to create a good object model. Unfortunately the majority of today’s applications don’t really have an explicit architecture or object model. The lack of a well-defined structure tends to make these applications hard to reason about, increases their complexity, and makes it hard to safely make changes.

Making a conscious decision to implement Messaging as a first-class concept in your application not only gives you the ability to easily implement some otherwise hard to achieve features but also provides a uniform and consistent structure that everyone on the team should be able to understand and work with. It’s easy to compose tasks or use cases by chaining together more fine grained responsibilities in the shape of filters, and as a consequence makes unit testing relatively easy too. It also reduces the proliferation of parameters that often have to be propagated down the layers each time some extra piece of data is required on existing methods.

The pipe and filter pattern has been around a long time but its use has often been limited to distributed messaging and message queues. By bringing the pattern into your normal in-process application code you open yourself up to learning to build your software in a more “functional” way whilst at the same time managing complexity and introducing some much needed structure. The best part is you don’t have to introduce any complex frameworks or dependencies to do it yet the benefits are real and tangible.

I’ve had a lot of success with this programming style in recent times and have applications in production using it. They are some of the easiest applications to work on, tend to be more reliable and, in my experience, less prone to introducing unexpected side effects due to their highly modular nature. It makes sense where there are a lot of procedural, sequential steps to invoke.

To re-iterate what I said at the beginning of part one, it’s not the only way to achieve modular code nor perhaps the best (no silver bullets, etc) but I do believe it produces a consistent, easy to read structure, something that is often lacking in modern enterprise applications. I also wouldn’t recommend building an entire application this way, but as I’ve said before, you can use it in tandem with normal OO techniques. Unfortunately in C# we have to use classes for the filters/steps of our pipeline. It would be nice if we could create functions without needing a class but I guess the answer to that would be to move to a functional language like F# which isn’t practical for most developers in the enterprise workplace.

All that said I think making concepts explicit in the form of messages has real merit and I’d encourage you to give it a go, play around with it and see if it’s something you could benefit from in your applications too.

Messaging as a programming model Part 2