Branching messages with pipelines

If you’ve read my recent posts on Messaging as a programming model you might gather that I’m a fan of this approach. If nothing else, it’s a great way of breaking down a big use case into a small set of fine grained, easily maintainable and replaceable steps that make the code very easy to reason about. However, one of the drawbacks at first glance is that there doesn’t appear to be a way to easily branch the message as it moves through the sequence of filters. You may recall that using the essence of the Message Router pattern we can change which filters are executed by modifying the pipeline whilst it is executing by registering new steps based on a particular value carried on the message like so:


public class Filter3
{
    public Filter3(SomeMessage input, PipeLine<SomeMessage> pipeline)
    {
        Console.WriteLine("filter3");
   
        if(input.SomeCondition)
            pipeline.Register(msg => new Filter4(input));
        else
            pipeline.Register(msg => new Filter5(input));
    }
}

However, all this does is attach one of either Filter4 or Filter5 to the end of the existing list of registered filters. You can obviously register as many filters as you like under each path of the conditional if test but even so they are still added at the end with no means to then continue executing back on the original main branch. A diagram might be best here to help explain what I mean:

Alternative Filter Paths

The old

The picture on the left depicts the structure of the filters as the original approach allows. The picture on the right shows a more useful alternative that we’d like as a minimum, and ideally without having to pass the pipeline into the filter to change it. The question is how can we achieve this?

The answer lies in the IFilter interface as described in the Revisited post of the original series. By adopting this simple interface for all our filters we can implement a variation of the composite design pattern where we have filters that contain logic (the leaf) and filters that themselves contain pipelines of filters (the composite) with the addition of a new filter whose sole purpose is to test a condition and determine what to execute next. I’ll use a modified (and contrived) version of the original LogIn example to illustrate the technique. The original example looked like this:

Simple pipeline for logging in a user

It’s a straightforward linear sequence of steps that modify and enrich the message as it passes through. What if though, for arguments sake, we want to execute a bunch of different steps depending on the result of the conditional test in the IsUserLoginAllowed filter?

The new

Branching pipeline for logging in a user

Here with the addition of a few more filters the message will always pass through the first three steps but depending on the logic within the IsUserLoginAllowed filter will then either pass through the two filters on the left branch or through the three on the right branch before joining back up with the main branch to record the fact that a log in was attempted. This final filter will execute regardless of whether the message went left or right. This is a lot more useful and easily achievable through simple composition thanks to the use of our IFilter interface, so let’s look at the code starting with the interface that all our filters will implement:


public interface IFilter<T>
{
    void Execute(T msg);
}

and the Pipeline class which also implements the interface:


public class PipeLine<T> : IFilter<T>
{
   public PipeLine<T> Register(IFilter<T> filter)
   {
      _filters.Add(filter);
      return this;
   }	

   public void Execute(T input)
   {
       _filters.ForEach(f => f.Execute(input));
   }

   List<IFilter<T>> _filters = new List<IFilter<T>>();	
}

All straightforward and recognisable, as are the first two filters shown here:


public class CheckUserSuppliedCredentials : IFilter<LogInMessage>
{
   public void Execute(LogInMessage input)
   {
        Console.WriteLine("CheckUserSuppliedCredentials");
        if(string.IsNullOrEmpty(input.Username) || string.IsNullOrEmpty(input.Password))
        {
            input.Stop = true;
            input.Errors.Add("Invalid credentials");
        }
   }
}

public class CheckApiKeyIsEnabledForClient : IFilter<LogInMessage>
{
    public CheckApiKeyIsEnabledForClient(IClientDetailsDao dao)
    {
        _dao = dao;
    }

    public void Execute(LogInMessage input)
    {
        Console.WriteLine("CheckApiKeyIsEnabledForClient");
        var details = _dao.GetDetailsFrom(input.ApiKey);

        if(details == null)
        {
            input.Stop = true;
            input.Errors.Add("Client not found");
        }

        input.ClientDetails = details;
    }
}

Branch Filters

The next filter, IsUserLogInAllowed, is more interesting and has changed from previous implementations. I refer to it as a Branch filter because, well, it allows your message to branch!


public class IsUserLoginAllowed : IFilter<LogInMessage>
{
	public IsUserLoginAllowed(IFilter<LogInMessage> leftBranch, IFilter<LogInMessage> rightBranch)
	{
		_onDeny = leftBranch;
		_onAllow = rightBranch;
	}

	// only users of ClientX and ClientY can log in
	public void Execute(LogInMessage input)
	{
		Console.WriteLine("IsUserLoginAllowed");
		switch(input.ClientDetails.Name)
		{
			case "ClientX":
			case "ClientY":
				_onAllow.Execute(input);
				break;
			
			default:
				_onDeny.Execute(input);
				break;
		}
	}
	
	private IFilter<LogInMessage> _onAllow;
	private IFilter<LogInMessage> _onDeny;
}

Look at the constructor of this filter and notice that it takes two IFilter instances, one for each branch path. These can be normal, individual (leaf) filters but it gets more interesting and useful when they are filters that group other filters together, in other words, composite filters. A Branch filter’s only responsibility is to determine what to do next. If we look at how this is all wired up we’ll see that our Branch filter, IsUserLoginAllowed, takes two composite filters, OnDenyLogIn and OnAllowLogIn:


    var pipeline = new PipeLine<LogInMessage>();
    pipeline.Register(new CheckUserSuppliedCredentials());
    pipeline.Register(new CheckApiKeyIsEnabledForClient(new ClientDetailsDao()));
    pipeline.Register(new IsUserLoginAllowed(new OnDenyLogIn(), new OnAllowLogIn()));
    pipeline.Register(new RecordLogInAttempt());
		
    var msg = new LogInMessage { /* ... set values on the message */ };
    pipeline.Execute(msg);

Composite Filters

Composite filters contain their own pipelines each registering their own related filters and because all filters whether leaf or composite implement the IFilter interface thay can all be treated the same via the Execute method:

composed pipeline for logging in a user

The code is simple consisting of nothing more than registering filters in a pipeline.

OnDenyLogin:


public class OnDenyLogIn : IFilter<LogInMessage>
{
    public OnDenyLogIn()
    {
        _pipeline.Register(new LockAccountOnThirdAttempt());
        _pipeline.Register(new PublishLogInFailed());
    }

    public void Execute(LogInMessage input)
    {
         Console.WriteLine("OnDenyLogIn");
         _pipeline.Execute(input);
    }

    private PipeLine<LogInMessage> _pipeline = new PipeLine<LogInMessage>();
}

and OnAllowLogIn:


public class OnAllowLogIn : IFilter<LogInMessage>
{
    public OnAllowLogIn()
    {
        _pipeline.Register(new ValidateAgainstMembershipApi());
        _pipeline.Register(new GetUserData());	
        _pipeline.Register(new PublishLogInSucceeded());
    }
	
    public void Execute(LogInMessage input)
    {
        Console.WriteLine("OnAllowLogIn");
        _pipeline.Execute(input);
    }
	
    private PipeLine<LogInMessage> _pipeline = new PipeLine<LogInMessage>();
}

The fact that composites are self contained means that their own internal pipelines don’t even need to be for the same message type! Even though both of the example composite filters both take a LogInMessage via their Execute method, there’s nothing to stop them sending a completely different message type through the internal pipeline and then assigning the result, whatever that may be, to some property on the LogInMessage type before it continues on its way through the rest of the sequence. None of the others filters ever need know that a different message type was created and processed within a composite filter.

Benefits

Composing filters in this way has some important benefits. First it makes the main pipeline easier to read as there are a smaller number of steps at the top level and you can see immediately that there are two courses of action that can be taken when the IsUserLoginAllowed branch filter is executed without having to know the lower level details. Second, grouping related functionality into cohesive whole units like this not only more closely adheres to the Single Responsibility Principle but also allows for some quite complex pipelines to be composed as the following diagram demonstrates:

more complex pipeline

Unit testing is still easy whether we test individual filters or the whole pipeline, but now we have the added option of testing at a third level of granularity via a composite filter such as OnDenyLogIn or OnAllowLogIn. When we’re happy these work as expected we can plug them in and test them in the context of the whole pipeline. Additionally, we’re also still able to use aspects to wrap the entire execution path inside a transaction, for example:


var pipeline = new PipeLine<LogInMessage>();
pipeline.Register(new CheckUserSuppliedCredentials());
pipeline.Register(new CheckApiKeyIsEnabledForClient(new ClientDetailsDao()));
pipeline.Register(new IsUserLoginAllowed(new OnDenyLogIn(), new OnAllowLogIn()));
pipeline.Register(new RecordLogInAttempt());
		
var msg = new LogInMessage { /* ... set values on the message */ };
var tranAspect = new TransactionAspect<LogInMessage>(pipeline);
tranAspect.Execute(msg);

Running this would send the message through either branch (depending on the client):


start tran
CheckUserSuppliedCredentials
CheckApiKeyIsEnabledForClient
IsUserLoginAllowed
OnAllowLogIn
ValidateAgainstMembershipApi
GetUserData
PublishLogInSucceeded
RecordLogInAttempt
end tran

or alternatively:


start tran
CheckUserSuppliedCredentials
CheckApiKeyIsEnabledForClient
IsUserLoginAllowed
OnDenyLogIn
LockAccountOnThirdAttempt
PublishLogInFailed
RecordLogInAttempt
end tran

Summary

The adoption of the IFilter interface and the idea of building separate pipelines inside composite filters opens up more possibilities for messaging as a programming model, most notably by giving us a way to model more complex business processes via branching paths whilst still retaining all the benefits outlined in previous messaging posts. It’s another useful addition to the pipe and filter toolbox, adding variation, and gives more weight to the argument for using messaging techniques in our applications.

Branching messages with pipelines

Messaging with CPS

The great thing about feedback is that it’s an opportunity. An opportunity to learn, improve and sometimes even reject what you previously held to be true. All the questions that arose from my earlier posts got me thinking and discussions with other developers lead me to some new insights as well as more questions in my own mind. I decided to revisit the LogIn example that I used in the original post to see how adopting some of those ideas would work out.

The main change is a big one and centres around the complete removal of the pipe and filter “framework” (I know some people don’t particularly care for it) replaced by the idea of CPS otherwise known as Continuation Passing Style which basically turns normal sequential steps inside-out from the point of view of the caller. In the context of our pipe and filters approach this means we no longer have separate classes for each filter but rather these become just plain old methods within the class that represents the task or use case at hand. Our use case class is completely self contained in terms of the steps to be taken by the message rather than composed from the outside as was the case with the Register methods previously.

What is a Continuation?

Before diving in let’s define what a continuation actually is. Wikipedia has this to say on continuations:

First-class continuations are a language’s ability to completely control the execution order of instructions. They can be used to jump to a function that produced the call to the current function, or to a function that has previously exited. One can think of a first-class continuation as saving the state of the program. However, it is important to note that true first-class continuations do not save program data, only the execution context. This is illustrated by the “continuation sandwich” description:

Say you’re in the kitchen in front of the refrigerator, thinking about a sandwich. You take a continuation right there and stick it in your pocket. Then you get some turkey and bread out of the refrigerator and make yourself a sandwich, which is now sitting on the counter. You invoke the continuation in your pocket, and you find yourself standing in front of the refrigerator again, thinking about a sandwich. But fortunately, there’s a sandwich on the counter, and all the materials used to make it are gone. So you eat it. :-)

and this to say on Continuation Passing Style

There is another excellent explanation on continuations and continuation passing style (CPS) which can be found here. It goes into a lot of detail about what CPS can give you as a developer but essentially, for our purposes, as C# does not support first-class continuations, what we’re talking about here is really an emulation of sorts through callbacks that give us the continuation passing style and let us invoke the rest of the computation from within the currently executing step. Hopefully, the LogIn example below illustrate this more clearly:

The LogIn Example

First off, we’ll define a class with the dependencies we need to fulfil the use case:


public class LogInTask
{
    public LogInTask(IClientDetailsDao clientDao, IUserDetailsDao userDao, IMembershipService membershipService, IBus bus)
    {
        _clientDao = clientDao;
        _userDao = userDao;
        _membershipService = membershipService;
        _bus = bus;
    }

    private readonly IClientDetailsDao _clientDao;
    private readonly IUserDetailsDao _userDao;
    private readonly IMembershipService _membershipService;
    private readonly IBus _bus;
}

Next we’ll define a method to do what was originally done by a separate filter class in the pipe and filters approach:


public class LogInTask
{
    ...
	
    private void CheckUserSuppliedCredentials(LogInMessage input, Action onNext)
    {
        if(!string.IsNullOrEmpty(input.Username) && !string.IsNullOrEmpty(input.Password))           
            onNext();
        else
            input.Status = "Invalid credentials";
    }
}

Notice that having checked the supplied credentials we either callback via the onNext invocation or we set a reason for the failure. We’re not throwing an exception or even implementing an interface like IStopProcessing defined in Part 2. There’s no need to set a boolean Stop property here.

We define the rest of the methods in a similar manner:


public class LogInTask
{
    ...

    private void CheckApiKeyIsEnabledForClient(LogInMessage input, Action onNext)
    {
        input.ClientDetails = _clientDao.GetDetailsFrom(input.ApiKey);
        if(input.ClientDetails != null)
            onNext();
        else
            input.Status = "ApiKey not valid"
    }
	
    private void IsUserLoginAllowed(LogInMessage input, Action onNext)
    {	
        if(input.ClientDetails.IsEnabled)
            onNext();
        else
            input.Status = "Client not enabled";
    }
	
    private void ValidateAgainstMembershipApi(LogInMessage input, Action onNext)
    {
        if(_membershipService.Validate(input.Username, input.Password))
            onNext();
        else
            input.Status = "User not found";
    }
	
    private void GetUserDetails(LogInMessage input)
    {
        input.UserDetails = _userDao.GetDetailsFrom(input.Username);
    }
	
    private void PublishUserLoggedInEvent(LogInMessage input)
    {
        _bus.Publish(new UserLoggedInEvent{ Name = input.Username });
    }
}

The only thing left to do is to define the method that actually pushes the message through each step making use of the continuation style. As you saw, some of the methods take the message and an Action delegate to invoke, if and only if the message has successfully passed through the particular step in question. Our invoke method looks like this:


public void Invoke(LogInMessage msg)
{
    CheckUserSuppliedCredentials(msg, () => {
        CheckApiKeyIsEnabledForClient(msg, () => {
            IsUserLoginAllowed(msg, () => {
                ValidateAgainstMembershipApi(msg, () => {
                    GetUserDetails(msg);
                    PublishUserLoggedInEvent(msg);
                });
            });
        });
    });
}
	

Hopefully the indentation helps to clarify that only if the first call succeeds does the second call get invoked. For example, the CheckUserSuppliedCredentials method does not invoke the onNext callback if the supplied values are invalid. Instead it sets the Status property and the rest of the chain is never called. Similarly, if the credentials are valid then CheckApiKeyIsEnabledForClient is invoked but it too will only allow the rest of the chain to proceed if the call to retrieve the actual details from the database does not result in a null value. Each check in the chain either succeeds and therefore invokes the next step or sets a reason as to why it can’t continue. Once the final check, ValidateAgainstMembershipApi, has succeeded both the GetUserDetails and PublishUserLoggedInEvent mehods are invoked as part of a single continuation.

We can execute this use case like so:


var task = new LogInTask(new ClientDetailsDao(), 
                         new UserDetailsDao(), 
                         new MembershipService(), 
                         new MySimpleBus()
);

var msg = new LogInMessage { 
    Username = "", 
    Password = "1234", 
    ApiKey = Guid.Parse("a9aafe3f-76d3-4304-a39e-cf05edb870f2") 
};

task.Invoke(msg);

We can now interrogate the Status property of the message immediately after the call to Invoke. If all is well it should be empty. If, on the other hand, as the example message shows, the Username property was not supplied, we would get back the reason for the failure:


Console.WriteLine(msg.Status); // Invalid credentials

Just like the original pipe and filters approach, once the message has successfully passed through the steps it will contain data we can return to the client.

On Error Goto

One way we could extend the example further would be by passing in a continuation callback that should be called in the event of an error. This means that instead of setting the Status property of the message and then evaluating it after the call to Invoke, we instead supply a callback that is executed when the error occurs. This does add complexity to the Invoke method though as it would then look like this:


public void Invoke(LogInMessage msg)
{
    CheckUserSuppliedCredentials(msg, () => {
        CheckApiKeyIsEnabledForClient(msg, () => {
            IsUserLoginAllowed(msg, () => {
                ValidateAgainstMembershipApi(msg, () => {
                    GetUserDetails(msg);
                    PublishUserLoggedInEvent(msg);
                }, error => Log(error));
            }, error => Log(error));
        }, error => Log(error));
    }, error => Log(error));
}
	

But now it’s not only starting to get complex but also real ugly with repetitive calls to the same error handling logic. Of course, if you want to do different things depending on which step failed then you may be able to justify it but my preferred approach would be to keep error handling outside of the chain by taking advantage of aspect oriented programming techniques as I described in part 2 which are all still completely applicable with this style. Ultimately your choice will be dictated by the context of the use case you’re attempting to implement and how complex it is.

So now that we have an alternative to the original approach let’s compare them.

Advantages

  • In the continuation style we have a single class where the work is defined. This means you can see the entire implementation in one place without having to go and look for the filter class that implements one of the steps. The context of the use case is localised.
  • There are fewer classes to be coded
  • The flow of execution can be controlled more easily without throwing exceptions or requiring interfaces
  • Disadvantages

  • In the continuation style we have a single class where the work is defined. Er..that looks suspiciously like the first advantage listed above. Yes, that’s right, it can also be a disadvantage because now, unlike the pipe and filters approach, we’ve lost the ability to dynamically compose our pipeline at runtime.
  • More code in one place means higher complexity and potentially more risk of bugs creeping in – In the pipeline approach every filter class does one simple thing making it highly cohesive and easy to replace without touching other code
  • A complex use case can make for a hard to read Invoke method
  • I’m sure there are other advantages and disadvantages to both approaches but those are just a few that immediately come to mind. One thing it does give you though is options and if you so choose you could mix and match. Personally I would stick with one style or the other because as I tried to stress in the original article I want a consistent and uniform code base as much as possible, mainly because different ways of doing the same thing ultimately lead to a hard to maintain system.

    Summary

    Continuation passing is a technique we can use with messaging that provides another way of achieving the same goal as the pipe and filters approach of pushing a message through a series of steps to satisfy a given use case. Which one you choose is largely a matter of preference but like any technique it has to be traded off against the alternatives. Whilst on the surface not having to implement a pipeline “framework” can seem appealing keep in mind that you lose the ability to compose the pipeline on the fly based on runtime conditions which can, in my opinion, be too big a price to pay. Personally, whilst I was open to the idea which prompted me to investigate it and see how it worked, (and yes, it does work) I think I’m more likely to stick with the pipeline approach as I just think it has more to offer (flexibility, readability, maintainability to name three). On the other hand, if you don’t need that degree of flexibility then CPS can make another useful addition to your tool belt. I know which one I prefer though.

    Messaging with CPS