Messaging as a programming model Part 1

Synopsis

This post – this first of two parts – is rather long, probably the longest one I’ve yet written. In short, what it emphasises is the benefits in terms of application architecture gained from adopting the use of explicit messaging in normal, in-process, non-distributed applications, and how it has enabled me to develop some interesting and cool solutions using techniques that have resulted in some of the cleanest, most extendable, testable, modular code I’ve ever written.

Background

I’ve been meaning to describe how I approach software development these days for a good while now but haven’t really known where to begin. I want to document, to anyone interested, the techniques I use to try and achieve good clean, modular code. It’s an approach that has reaped many rewards and I’ve yet to find a way that I like better. That’s not to say that what I’m going to describe is somehow unique or the best way, it’s just an approach that works really well in many situations and has enabled me to build solid, reliable systems. You may agree or disagree, like it or not and that’s fine, feel free to point out the flaws or tell me I’m an idiot and I should do “xyz” instead. I’m always on the lookout to improve so if you have any suggestions I’ll gladly take them on board.

OOP vs Procedural

We’re all used to the idea of the Object Oriented Programming paradigm but despite its ubiquity I’ve never felt like it was really as good as it was promised to be, mainly because it’s too easy to abuse. In OOP there is the idea that we don’t call a function on an object, we “send it a message” but that can sound a little abstract. What does it mean to send a message as opposed to calling a function as you would in a procedural system? My own interpretation of this is that in a procedural system you call a function and send in the structure to be acted upon, where as with OOP, the object owns the state, you just ask the object to do something with it on your behalf by sending it a message. This conforms to the “tell, don’t ask” principle. However, most OO applications are not written this way and the idea of “sending a message” is not really given due thought by most developers.

The Failure of OOP?

Even though object oriented languages are dominant today, most applications consist of objects that are nothing more than a bunch of properties which are then passed to some other class that has the behaviour that will act upon it. This feels very similar to the idea of functions in procedural programming. There’s nothing really wrong in doing this as long as you recognise what you are doing and you understand the trade offs. The usual problem with this approach is that often the structure of the application is not easy to see or reason about. It’s sometimes hard to identify responsibilities within the system or find out where you need to make changes. When you do find out you often discover large methods with many parameters. In short, it’s messy. When the time comes to “enhance” the application we often find ourselves adding more parameters to our methods and having to repeat this all the way down through the various layers in our application till we hit the data layer. This makes for tedious, repetitive, and error prone development but it is the reality for many enterprise developers every day.

This kind of programming model is not what OOP was invented for, and it makes it look like the paradigm has failed. The evidence is everywhere. How many times do we come across monolithic applications with tons of dependencies, many layers and poorly defined classes with too many responsibilities and poor cohesion. It seems to be the norm no matter how much information is “out there” telling us otherwise. We tend to look at code bases like this, realise intuitively that it’s wrong but still struggle to find a way to put it right that is clear, consistent, and maintainable for our future selves or those that will follow after us.

There is a way though that can give us the benefits we seek.

Messaging as a Programming Model

The “messaging as programming model” approach is the only one I’ve yet encountered where I can achieve clear, consistent code, time and time again with minimal effort. Not only clear and consistent but also modular, composable, easily testable, highly maintainable and extendable, and above all readable. Each invocation conforms to “tell, don’t ask” no matter whether performing a command or query. And yet it too, just like the messy procedural approach described earlier, takes a structure and asks an object to act upon it. The difference is in the uniformity and consistency of the approach, one that allows for some interesting possibilities.

Obviously, it’s not the be all and end all to writing good software but I believe it to be better than what I encounter routinely in application after application. I’m not saying OOP is “out” and messaging is “in”. The two are not mutually exclusive and clearly, as a C# developer I use an object oriented language. All I’m doing is embracing the idea of using messages explicitly in the context of normal OO applications. It’s certainly not typical but neither is it complex. All the normal OO techniques and rules can apply if required, it’s just the “where” and “how” they are applied that’s different.

Anyway, enough with the rambling. Let’s get going.

Pipes, Filters, Routers, WTF?

I’ll start with a simple (non-uml) diagram:

Simple pipeline for logging in a user

To cut to the chase, the main technique I’m using is a variation of the Pipe and Filter pattern as described in the Enterprise Integration Patterns book, a classic you should read if you’re at all interested in message patterns, which as you’re reading this I’ll assume you are.

Each filter in that pattern is loosely connected through an in queue and an out queue. The filters are not aware of each other they just know how to read a message from the inbound queue, do something with it and put the result on the outbound queue. In my variation of the pattern there are no queues, merely a list of filters executed one after the other that manipulate the message as it passes through, just as the diagram implies. Within a filter your typical object model can apply if necessary but in general this pipeline tends to turn the conventional idea of an object operating on its own state on its head. Here the message is the state and, unlike in most functional languages, it is also mutable. Despite this, the whole pattern lends itself well to concurrency as we’ll see later.

What is a Message?

So the pipeline is the mechanism for processing messages but what qualifies as a message? The answer is simply anything in your application that should be made an explicit concept. We’ve seen this idea of explicitness before but usually in the context of DDD where it’s recommended that you group together a bunch of related fields/parameters and turn them into a single class known as a Value object. This then gives you a place to put related methods that operate on those properties.

Messages are also explicit concepts. They too contain a bunch of properties grouped together but they are just DTOs and have no methods on them at all. They are buckets of data but represent something important in your application. One of the benefits of making things explicit like this is that it increases your ability to reason about the code when you come back to it some months later. For instance, the diagram shows a pipeline that allows a user to log into an application. It’s easy to see the steps involved in logging in and gives you a big picture view without needing to look at the details. The actual code for this conveys the exact same information and is as easy to read as the diagram itself so let’s take a look at it.

A Log in Example

Here’s a simple message we can use when a request to log in arrives:


public class LogInMessage
{
    public string Pin { get; set; }
    public string Username { get; set; }
    public string Password { get; set; }
    public bool IsAllowed { get; set; }
    public Guid ApiKey { get; set; }
    public ClientDetail ClientDetails { get; set; }
    public UserDetail UserDetails { get; set; }
}

Let’s assume you have a user that’s invoking a login method on a web service. In order to log that user in there are a number of steps to be performed. Whilst this alone isn’t a hard problem to solve or on its own something likely to make your code hard to understand, it’s a good simple example to begin with. Remember my goal is to have clear, consistent code across ALL operations to provide a degree of uniformity and to keep the “surface area” of my immediate context small so I don’t get distracted looking at lots of surrounding code each perhaps doing things slightly differently and adding to the noise I have to deal with.

We register the steps we want to sequentially execute with the PipeLine through its Register method:


var loginPipeline = new PipeLine<LogInMessage>();

loginPipeline.Register(msg => new CheckUserSuppliedCredentials(msg))
             .Register(msg => new CheckApiKeyIsEnabledForClient(msg))
             .Register(msg => new IsUserLoginAllowed(msg))
             .Register(msg => new ValidateAgainstMembershipApi(msg))
             .Register(msg => new GetUserDetails(msg));

As you can see, we have a class PipeLine which works with a message of T (we’ll get to the implementation of this in Part 2). The pipeline lets you register lambdas (anonymous functions) that will be called in the order of registration, each time passing in the message you supply. Every time a callback is invoked, an instance of the current filter is created and the message is manipulated in some way before being passed to the next filter.

Invoking the pipeline is as simple as creating a message and executing it:


var msg = new LogInMessage {
    Pin = "1234",
    Username = "Jack A. Nory",
    Password = "@x1z/+==32a",
    ApiKey = Guid.Parse("a9aafe3f-76d3-4304-a39e-cf05edb870f2")
}

loginPipeline.Execute(msg);

It’s easy enough to imagine the path taken when calling execute. You only have to look at the registration to get that big picture view. The details are irrelevant unless you need to make changes. Each filter is stateless and short-lived. By the time all the filters have run, the message will have been enriched or changed in some way leaving you with data you can return to the caller.

Inside Filters

Let’s look at the first filter, CheckUserSuppliedCredentials which is really as simple as this:


public class CheckUserSuppliedCredentials
{
    public CheckUserSuppliedCredentials(LogInMessage input)
    {
        Process(input);
    }

    private void Process(LogInMessage input)
    {
        if(string.IsNullOrEmpty(input.Username))
            throw new UnauthorizedException("Invalid credentials");

        if(string.IsNullOrEmpty(input.Password))
            throw new UnauthorizedException("Invalid credentials");
    }
}

The filter has one responsibility, in this case to ensure that the message contains a username and password, and that’s it, nothing more. If it doesn’t satisfy either of those conditions an exception is thrown. The next filter is a little more complex but only just. Here we are assuming that we have a table in a database somewhere that contains clients that can have their access turned on or off by an ApiKey supplied (in our case) in the request header which (not shown) was extracted and assigned to the message. This filter has a dependency on a data access object in order to retrieve the client’s details which for the sake of simplicity we just new up inline:


public class CheckApiKeyIsEnabledForClient
{
    public CheckApiKeyIsEnabledForClient(LogInMessage input)
    {
        Process(input, new ClientDetailsDao());
    }

    private void Process(LogInMessage input, IClientDetailsDao dao)
    {
        var details = dao.GetDetailsFrom(input.ApiKey);

        if(details == null)
            throw new UnauthorizedException("Client not found");

        input.ClientDetails = details;
    }
}

Again, if the condition isn’t satisfied an exception is thrown and processing of the pipeline stops. If the condition is satisfied the message is enriched by having the ClientDetails property assigned the result of that call. This data is now available for use in subsequent filters.

Flexibilty

I’m not going to show every filter implementation here. Suffice to say they all follow the same basic pattern, each having a single responsibility and yet together forming a conceptual whole that is itself a single responsibility, in this case that of logging in. Before moving on though it’s worth mentioning the benefits already gained from this approach. Not only is it easy to reason about what this code will do just by looking at the registration steps, you can also change the order of the steps (within reason) if necessary just by swapping the order that the registration occurs. It’s also incredibly easy to modify when the boss comes along and says that actually wouldn’t it be great if we could easily determine who is using our system by capturing how often they login?

We could do that without any real disruption and with a great deal of confidence that we won’t break any existing code. All we need do is create a new filter:


public class PublishUserLoggedInEvent
{
    public PublishUserLoggedInEvent(LogInMessage input)
    {
        Process(input, GetBus());
    }

    private void Process(LogInMessage input, IBus bus)
    {
        var @loggedInEvent = new UserLoggedInEvent { 
            Name = input.UserDetails.Name,
            Client = input.ClientDetails.Organisation, 
            When = DateTime.Now
        }

        bus.Publish(@loggedInEvent);
    }
}

and register it:


var loginPipeline = new PipeLine<LogInMessage>();

loginPipeline.Register(msg => new CheckUserSuppliedCredentials(msg))
             .Register(msg => new CheckApiKeyIsEnabledForClient(msg))
             .Register(msg => new IsUserLoginAllowed(msg))
             .Register(msg => new ValidateAgainstMembershipApi(msg))
             .Register(msg => new GetUserDetails(msg))

             // add new step to pipeline
             .Register(msg => new PublishUserLoggedInEvent(msg));

Now when our message reaches the new filter then we must have passed through all the previous ones without incident. We can therefore assume the user is now successfully logged in and we can tell the world about it, in this case, by publishing an event over a bus whether that be NServiceBus, RabbitMQ or A.N.Other bus. Any subscribers will receive that message and do something with the data. For a simpler alternative you could just go straight to the database and record that the user logged in in a table. The beauty is that decisions taken now can be swapped out easily at some point in the future without worrying about breaking existing code.

Design-time vs Runtime PipeLines

So, we’ve covered modular, and to some degree we can see how a responsibility is composed of smaller more focused responsibilities but we are making these decisions at design-time. If we want to be a bit more dynamic we can easily build the pipeline at runtime. For instance, one way to do it is to add filters to a pipeline only if a condition is met. This could be done in some kind of fluent Builder or Factory object:


var loginPipeline = new PipeLine<LogInMessage>();

if(someconditionA)
    loginPipeline.Register(msg => new FilterForConditionA(msg))

if(somecondtionB)
    loginPipeline.Register(msg => new FilterForConditionB(msg))

if(somecondtionC)
    loginPipeline.Register(msg => new FilterForConditionC(msg))

// ...etc

return loginPipeline;

Message Routers

Another way is to have one of the filters themselves extend the current pipeline if a particular condition is met:


var pipeline = new PipeLine<SomeMessage>();

pipeline.Register(msg => new Filter1(msg))
        .Register(msg => new Filter2(msg))
        .Register(msg => new Filter3(msg, pipeline));

What’s interesting about this example is that the pipeline itself is passed into Filter3. When that filter is executed the filter can extend the pipeline again based on some condition being met:


public class Filter3
{
    public Filter3(SomeMessage input, PipeLine<SomeMessage> pipeline)
    {
        Console.WriteLine("filter3");
   
        if(input.SomeCondition)
            pipeline.Register(msg => new Filter4(msg));
        else
            pipeline.Register(msg => new Filter5(msg));
    }
}

Now if each filter was to write its name out to the console, then depending on the value of SomeCondition when the message was executed, you would get one of the following two outputs:

filter1
filter2
filter3
filter4

or

filter1
filter2
filter3
filter5

So now not only do we have design-time composability but also the possibility of run-time composability. In effect filter3 is a MessageRouter, another pattern from the Enterprise Integration Patterns book. It’s job is to decide where the message should go next and this is what filter3 is doing based on a value on the message itself (Content-based Router). Again very nice, very flexible.

Testability

Hopefully at this point you can see that following this approach give us a lot of flexibility and yet at the same time is very easy to read and maintain. Not only that but you might also see that writing unit tests for this kind of code becomes nothing short of trivial. The example below shows the CheckUserSuppliedCredentials filter from the earlier example being tested for an empty Username on the message. Each filter can be tested like this, in complete isolation, with message state set up in various ways to exercise the paths through the filter. Alternatively you can easily test an entire pipeline which aligns itself nicely with the concept of testing use cases. Just create a message, execute it, then assert off the changed state of the message itself. I’ve found this to be a real boon. Compared to your typical, monolithic enterprise application which seems to get harder to test as the application grows, testing pipelines provides natural boundaries for your test fixtures and makes reading those tests easy as well:


[TestFixture]
public class CheckUserSuppliedCredentialsTestFixture
{
    [Test]
    public void ShouldThrowExceptionIfUsernameNotSupplied()
    {
        var msg = new LogInMessage { Username = string.Empty };
        
        var exception = Assert.Throws<UnauthorizedException>(() => new CheckUserSuppliedCredentials(msg));

        Assert.That(exception.Message, Is.EqualTo("Invalid credentials"));
    }
}

In my earlier examples I supplied any dependencies for filters such as the IClientDetailsDao inline by just newing up an implementation. Obviously this was for brevity but in practice you won’t want to do this if you’re writing tests or want the flexibility to “inject” different implementations. To get around this all you need do is put the interface on the filter’s constructor and supply the implementation in the pipeline registration instead like this:


var loginPipeline = new PipeLine<LogInMessage>();

loginPipeline.Register(msg => new CheckUserSuppliedCredentials(msg))
             .Register(msg => new CheckApiKeyIsEnabledForClient(msg, new ClientDetailsDao()))
             .Register(msg => new IsUserLoginAllowed(msg))
             .Register(msg => new ValidateAgainstMembershipApi(msg, new MembershipService()))
             .Register(msg => new GetUserDetails(msg, new UserDetailsDao()));
             .Register(msg => new PublishUserLoggedInEvent(msg, new GetBus()));

Now it’s trivial to supply “fake” versions when constructing the pipeline in a test fixture.

A Web Api Example

Before moving on, I thought I would round out the login example by showing a more complete view of it within the context of a Web API ApiController.

To keep things cohesive I tend to have Service classes that group related responsibilites which are then passed into the controller, so for example, in the case of a user there might be a number of operations they can perform. Let’s say that in our system a user can log in and they can also be presented with system alerts that give them timely information such as when any maintenance is going to happen, etc. In this example, we have a UserController that takes a UserService via an interface:

public class UserController : ApiController
{
    private readonly IUserService _userService;

    public UserController(IUserService userService)
    {
        _userService= userService;
    }

    [HttpPost]
    [ActionName("login")]
    [RequestExceptionFilter]
    public HttpResponseMessage LogIn(User user)
    {
        var msg = new LogInMessage { Username = user.Username, Password = user.Password };
        _userService.Invoke(msg);
        return Request.CreateResponse(HttpStatusCode.OK, msg.UserDetails);
    }

    [HttpGet]
    [ActionName("alerts")]
    [RequestExceptionFilter]
    public HttpResponseMessage GetAlertsFor(string userId)
    {
        var msg = new AlertsMessage { UserId = userId };
        _userService.Invoke(msg);
        return Request.CreateResponse(HttpStatusCode.OK, msg.Alerts);
    }
}

This is as complex as my controllers get though there are a couple of points I want to emphasise. For instance, the message that is created on line 15 is the same object that is used as part of the response on line 17. The message is modified as it passes through the various filters and therefore, unless an exception gets thrown, will contain new data that we can return to the caller, in the case of the LogInMessage some information about the user contained in the UserDetails property.

The only other thing to mention is that if an exception is thrown the RequestExceptionFilterAttribute kicks in and returns the appropriate response to the caller such as a 403 Forbidden.

Aesthetic Interfaces

Each controller takes an interface describing the messages that can be executed. I don’t know about you but I think there’s something nice about the following interface definition:

public interface IUserService
{
    void Invoke(LogInMessage msg);
    void Invoke(AlertsMessage msg);
}

For one, it has only void methods, and two they have the same name differing only by the message type. It too conforms to the simple and readable philosophy that the whole approach seems to excel at.

The implementation of the interface is where the pipelines are created. In this example they are built in the constructor taking in the supplied dependencies:


public sealed class UserService : IUserService
{
    public UserService(IClientDetailsDao clientDao, IMembershipService membershipService, IUserDetailsDal userDetailsDao, IBus bus)
    {
        _loginPipeline
             .Register(msg => new CheckUserSuppliedCredentials(msg))
             .Register(msg => new CheckApiKeyIsEnabledForClient(msg, clientDao))
             .Register(msg => new IsUserLoginAllowed(msg))
             .Register(msg => new ValidateAgainstMembershipApi(msg, membershipService))
             .Register(msg => new GetUserDetails(msg, userDetailsDao));
             .Register(msg => new PublishUserLoggedInEvent(msg, bus));

        _alertsPipeline
             .Register(msg => new CheckApiKeyIsEnabledForClient(msg, clientDao))
             .Register(msg => new IsGetAlertsAllowed(msg))
             .Register(msg => new GetUserAlerts(msg, userDetailsDao));
    }

    public void Invoke(LogInMessage msg)
    {
        _loginPipeline.Execute(msg);
    }

    public void Invoke(AlertsMessage msg)
    {
        _alertsPipeline.Execute(msg);
    }
}

and that’s it. Imagine that structure applied across many controllers and services and hopefully you can see the benefit of such a consistent and flexible approach. The Invoke methods are the simplest implementation possible for now. We’ll flesh these out in part 2 to enable some useful extra functionality.

I know I haven’t touched on the actual pipeline implementation itself yet, I’m going to leave that for part two where I’ll go over the opportunities it provides for cross cutting concerns, again in a flexible, easily maintainable manner. Before finishing Part 1 though, I want to show one other technique I find very useful in the context of desktop or console applications.

Batch Imports

If you’ve never had to write an ETL application you’ve never worked in enterprise development :)

In my current place of work there is a need to take excel spreadsheets (yes, even in 2013) from clients containing batch orders or parts and import them into the back office system. Unfortunately the contents and structure of these spreadsheets can be quite different depending on the type of import and the particular client involved. These spreadsheets often contain lots of errors and sometimes aren’t even filled in correctly, this being the nature of the enterprise world. Regardless, we have to have a way to deal with this and so we have developed a WinForms application that can take these spreadsheets, parse and validate them, and only when all the errors have been corrected, import the data they contain into the database. You’ll not be surprised to hear that the pipeline approach has proven useful here too.


load = new Pipeline<PartUploadMessage>();
load.Register(msg => new PreSpreadsheetLoad(msg))
    .Register(msg => new ImportExcelDataSet(msg))
    .Register(msg => new ValidateSheet(msg))
    .Register(msg => new ValidateColumnNames(msg))
    .Register(msg => new StripInvalidChars(msg))
    .Register(msg => new CopyToDataTable(msg))
    .Register(msg => new CreateExtraColumns(msg))
    .Register(msg => new CheckStockItemCodeColumnFormatting(msg))
    .Register(msg => new CheckForDuplicateStockItemCode(msg))
    .Register(msg => new ValidateUniqueClient(msg))
    .Register(msg => new ValidateRequiredFields(msg))
    .Register(msg => new ValidateColumnDataTypes(msg))
    .Register(msg => new ValidateContentOfFields(msg))
    .Register(msg => new ValidateCategories(msg))
    .Register(msg => new PostSpreadsheetLoad(msg));

validate = new Pipeline<PartUploadMessage>();
validate.Register(msg => new PreValidate(msg))
    .Register(msg => new CheckForDuplicateStockItemCode(msg))
    .Register(msg => new ValidateUniqueClient(msg))
    .Register(msg => new ValidateRequiredFields(msg))
    .Register(msg => new ValidateContentOfFields(msg))
    .Register(msg => new ValidateCategories(msg))
    .Register(msg => new PostValidate(msg));
 
submit = new Pipeline<PartUploadMessage>();
submit.Register(msg => new PreBackOfficeSubmit(msg))
    .Register(msg => new SubmitPartsToBackOffice(msg))
    .Register(msg => new PostBackOfficeSubmit(msg));

Here we have three pipelines. Each one is invoked by a different button on the user interface. When the user browses to and selects a spreadsheet file on their system the load pipeline is executed passing in a message that contains the filename and various other properties. At the end of execution, a spreadsheet has been loaded and validated and the data is bound to a DataGrid on a form. The user can make changes if required and having done so, can ask for the data to be validated again by clicking a button which will execute the validate pipeline. In our case, we have some quite complicated rules about what combinations of data can co-exist in a row but using a validation pipeline simplifies the problem without creating a spaghetti mess. Notice how we are able to reuse the validation filters that are used in the load pipeline. Keeping filters small and focused enables such reuse. Once validation has succeeded, the user can press a button to invoke the submit pipeline which will begin importing the data into the back office system.

The Hollywood Principle

Whilst there’s nothing new on the surface here apart from having a pipeline for each “Task”, the really interesting story is how does the UI get notified of important events happening within the pipeline? For instance, how do we disable various buttons while validation or submit is underway? How do we update a progress bar on the screen to give the user feedback as to how far through the submit process they are?

Let’s look at a definition for the PartUploadMessage:


public class PartUploadMessage
{
    public string Filename { get; set; }
    public DataSet Data { get; set; }
    public DataTable Parts { get; set; }

    public Action OnBeforeFileLoad { get; set; }    
    public Action OnAfterFileLoad { get; set; }
    public Action OnBeforeValidation { get; set; };
    public Action OnAfterValidation { get; set; };
    public Action<int> OnStartPartsUpload { get; set; };
    public Action<string> OnPartUploaded { get; set; };
    public Action OnEndPartsUpload { get; set; };
}

You can see that the first three are the usual kind of mutable state properties. Nothing new there. However, the next set of properties are all callbacks that you can assign a method to, which the various filters will invoke at the appropriate point in the pipelines’ execution lifetime.

For example, the PreSpreadsheetLoad filter will invoke the OnBeforeFileLoad callback whilst the PostSpreadsheetLoad filter will invoke the OnAfterFileLoad callback in the same way. Similarly, the PreValidate and PostValidate filters invoke OnBeforeValidation and OnAfterValidation respectively. Finally, the PreBackOfficeSubmit filter informs the UI that we are about to submit parts by invoking the OnStartPartsUpload callback. It tells the UI how many parts we are going to be submitting by passing in the number of rows from the Parts table which is a property of the message. We could use this to set the Max property of a progress bar:


public class PreBackOfficeSubmit 
{
    public PreBackOfficeSubmit (PartUploadMessage input)
    {
        input.OnStartPartsUpload (input.Parts.RowCount); 
    }
}

The SubmitPartsToBackOffice filter loops through the rows inserting them one by one. Each time a row is inserted it invokes the OnPartUploaded callback with the part number of the current row. This callback can also be used to update the progress bar, incrementing the Value property by 1 each time it is invoked. When all parts are submitted the PostBackOfficeSubmit filter invokes the OnEndPartsUpload callback where the progress bar is reset to zero and the submit button is re-enabled.

Initializing the message is just a case of assigning the various values and callbacks:


var msg = new PartUploadMessage {

    Filename = "C:\path\to\spreeadsheet.xls",    
    OnBeforeFileLoad = () => this.RunOnMainThread(WhenAboutToLoadSpreadsheet),
    OnAfterFileLoad = () => this.RunOnMainThread(WhenSpreadsheetFinishedLoading),
    OnBeforeValidation = () => this.RunOnMainThread(WhenAboutToValidate),
    OnAfterValidation = () => this.RunOnMainThread(WhenValidationComplete),
    OnStartPartsUpload = (total) => this.RunOnMainThread(WhenAboutToSubmitParts(total)),
    OnPartUploaded = (partNumber) => this.RunOnMainThread(WhenCurrentPartSubmitted(partNumber)),
    OnEndPartsUpload = () => this.RunOnMainThread(WhenAllPartsSubmitted)

}

Threading

Anyone who’s done any desktop development will be aware that running a potentially long operation on the UI thread can cause the UI to block and stop being responsive which makes for a poor user experience. Luckily, the pipeline approach easily lends itself to asynchronous execution which I’ll go over in Part 2. For now, let’s assume our pipelines are being executed on a background thread but when the time comes to update the UI we need to be back on the main thread or we’ll run into the dreaded “Cross-thread operation not valid” exception message or such like. The RunOnMainThread method in the example is a simple extension method on the Control class which marshals our data back to the main thread so everything works as expected when we try to update control values. For completeness and for those curious, that method looks like this:

public static void RunOnMainThread<TControl>(this TControl control, Action action, params object[] args) where TControl : Control
{
    if (control.InvokeRequired)
        control.Invoke(action, args);
    else
        action();
    }

Now imagine this spreadsheet import process has to be done for 20 different spreadsheets, all with different structures, data, and validation requirements. The Pipe and Filter approach has allowed our application to accommodate these needs without growing too complex. Anyone looking at the source-code can immediately see the structure of the application, and once they understand how one import pipeline works they understand them all.

One last thing before winding up, using callbacks on the message allows you to get information from the user before allowing the pipeline to continue. Imagine a point of sale system using this technique. The cashier clicks a button that perhaps begins a discount operation. As the pipeline is executing one of the filters may require input from the cashier perhaps asking for some information that determines how much of a discount the customer is entitled to. When the filter executes it can invoke a callback in the UI that takes the message itself, perhaps showing a message box asking the cashier a question. The value the cashier enters is then assigned to the passed in message before dismissing the dialogue and allowing the pipeline to continue executing with its newly acquired information. The same thing can be done if the operation needs admin permissions. Stop the pipeline by asking for a password or pin and only allow execution to resume if the input is valid. I have used this technique in the past and it works very well.

Moving Towards Autonomous Components and Services

I truly believe that written this way an application is far easier to understand and maintain than the traditional layered approach which in my opinion starts to break down and become a burden once it grows above a certain size, especially as developers come and go each doing things slightly differently. This approach provides a kind of template or blueprint that helps keep code consistent and maintainable with real tangible benefits. It also lends itself well to the idea of Component Based development where each set of related pipelines could be built into their own assemblies or modules containing a vertical slice of the traditional layered approach and installed as Windows Services communicating by commands and events. By adopting explicit messages in your in-process code it becomes almost trivial, should the need arise, to serialize and send those messages across the wire or save them to a database. At this point we are very close to autonomous business components in a SOA based micro-services architecture.

Okay, I think that’s enough to digest for now as we’ve covered quite a lot. In the next post I’ll examine the actual pipeline implementation and how easy it is to extend or enhance to perform some other interesting possibilities, things we usually describe as cross-cutting concerns.

In the meantime, if you have any feedback let me know. I’d be interested to hear what you think.

About these ads

57 thoughts on “Messaging as a programming model Part 1

  1. Hello. The resource oriented computing paradigm (roc) offers this. Send a request to resource and get a representation back. This means linear scalability (the micro kernel handles the threads) everything is treated as a resource.. Code, transports data on the database and everything else..

    http://www.1060research.com/netkernel/

    Take your time to look into it, it has a learning curve, but obce you do everything broken about oop goes away

    Have a great day

      • Cool its a mature production ready system that once you get the hang of you can never go back :) I come from a nonprogrammer backgroung but can easily build very complex system very fast, and make changes in them.. ..fast. The overhead caused by the microkernel handeling the requests is neglible. a big thing is also that every requested resource is cached automaticly by the kernel so if the same resource is reequested again its served from cache. Feel free to contact me if you have any thoughts about it.

        Best
        Fredrik

    • Axiom 0a is, pound for pound, by far the biggest part of my religion- “any resource of significance should be given a URI”- and with that I’ll claim to be on your side and to be a proponent of resource oriented systems.

      Yet microkernels and resource orientation are not enough by themselves. What Pipeline/Chain-of-Command/Chain-of-Responsibility/S-exps do is make the planned computation a manipulable resource. “Design time v.s. Runtime Pipelines” starts to show this capability, showing decisions going into deciding what computation is going to be run. It is followed by “Message Routers,” where a Filter modifies the Pipeline of staged work to be done.

      You kind of showed up in the comments and made a tall claim about some other general system. Kindly favor us with some kind of linkage between the world we came here to read about, and whatever it is you have in mind when you raise ROC. It’s unseemly to leave us guessing, with no hints as to where you think ROC is a better substitute. I’m genuinely curious and hopeful that NetKernel or other ROC systems indeed have a cognizance that the topology of computing itself is a resource, one that the agents of the ROC world regularly reflect on and manipulate, and I challenge you to provide some means for us to engage a real discussion here.

  2. I like your style here. Makes me wish we could build some syntactic sugar to make building those pipelines feel like declaring a function that calls a bunch of other functions, like:

    void pipeline Login(LoginMessage msg)
    {
    CheckUserSuppliedCredentials(msg);
    CheckApiKeyIsEnabledForClient(msg, clientDao);
    IsUserLoginAllowed(msg);
    ValidateAgainstMembershipApi(msg, membershipService);
    GetUserDetails(msg, userDetailsDao);
    PublishUserLoggedInEvent(msg, bus);
    }

    although I guess that could be achieved by changing the organization of the codebase after looking at it.

    • Use a Pipeline constructor which accepts IFilter { void exec(T msg) } implementations:
      new Pipeline(new CheckUserSuppliedCredentials(), new CheckApiKeyIsEnabledForClient(clientDao), new IsUserLoginAllowed(), new ValidateAgainstMembershipApi(membershipService), new GetUserDetails(userDetailsDao), new PublishUserLoggedInEvent(bus));

  3. Cool pattern. I can’t get past doing everything in the constructor though (for the filters). I wonder if there is a better way to do it without introducing restrictions or “cumbersomeness”.

    • I know what you mean but I guess we have to live with the limitations of the language. If C# allowed us to declare functions without requiring a class we’d be able to get around it but unfortunately we don’t have that luxury. It’s something you get used to though – well, I did! ;)

      • The alternative which I would recommend is using an IFilter interface, for a few reasons:

        1) In C#, it is generally considered bad practice to perform more than minimal work (e.g. initialization) in the constructor. It’s not only about style, but also because of how the CLR works. While it may not specifically apply to this case, for example, Eric Lippert (one of the C# compiler developers) recently demonstrated in a blog post how the constructor and destructor can in edge cases actually end up being run simultaneously.

        2) If you were to have polymorphic filters (which in an advanced scenario might be desirable), you would be limited to having the base class execute first, then the derived classes. Then you can just as well have a sequence of filters instead. If your polymorphic filters contain virtual methods, no matter which class is doing the call, you will always call the most derived implementation.

        3) Finally, using an interface would give you even cleaner code. For one, the filter will not know anything about how or when it should execute it’s work; it only knows that in order to be an IFilter, it must provide a Process method that takes a parameter of type TMessage. This is a much better indication for other developers about where they should place their code. Secondly, it would allow you to remove the constructor lambda expression from your Register method. Instead of writing Register(msg => new Foo(msg)), you can simply call Register(). Just change the signature to Register<IFilter>() and have the Pipeline class call the constructor (through new TMessage()), and then call the Process(TMessage msg) method on the IFilter interface.

        Hope that made sense. It’s not easy writing code examples in comments. ;)

      • Hi Christian,

        I could see the problems you were having attempting to post so I’ve tried to tidy it up for you.

        Thanks for a great comment, I must admit I’ll need to think on it a little bit but I’m always open to alternatives or having the flaws pointed out so thanks again.

  4. Hey Steve,

    Cool post. Pipelines are incredibly popular in Unix, I guess it’s because they allow you to construct the ‘pipeline’ from a set of interchangeable parts (like you said: modularity). If you have module A B and C, and if B and C are interchangeable you could do pipeline(A,B) or pipeline(A,C). I´d say they are incredibly useful for when you need this ability but if the process has no interchangeable parts, seems like it just adds unnecessary overhead and you are better off just coding it in the procedural way (like Scott Mitting’s code). I’d love to hear your thoughts about this.

    • Thanks for the comment. Yes, the fact that Unix employs the same principle shows that in software, nothing is truly new. My main concern is really about having an explicit architecture in the code that’s simple for everyone from juniors to seniors to understand and enhance. Based on the code bases I’ve encountered in my time I think there’s a bigger overhead in poorly structured code that’s hard to maintain than what we get with this approach but I definitely understand your concerns.

      • I have some doubts about this problem. Not sure it would help to apply a style of architecture that plays no role in meeting the functional requirements. For example, the Batch Imports example, I’d argue that you can write the same thing in plain old procedural style and it would look cleaner and therefore would be easier to maintain. i.e. (just the last pipeline)

        function submit(msg) {
        PreBackOfficeSubmit(msg);
        SubmitPartsToBackOffice(msg);
        PostBackOfficeSubmit(msg);
        }

      • Not sure what you mean by “just the last pipeline” but yes, you can argue solutions to problems many ways, that’s what makes software development interesting. I’m just presenting one I’ve used, what you take from it is up to you. Horses for courses, and all that.

        Edit

        Ah I see now, you mean the last of the three in the batch example. Yes it could be as simple as you said but I actually omitted a larger set of filters from that in an effort to keep the line count down. If you only had a very small number of steps then yes, you could keep it simple and do as you suggest, and if/when the steps involved grow more complex refactor to a different solution like the pipeline approach.

    • The Pipeline pattern is a first-class model of computation that is going to happen, and can be modified. Running a pipeline is kind of like running a UNIX pipeline, but in UNIX we have to design all the computational steps of the pipeline and run that. A Pipeline or Chain of Responsibility pattern allows us to have a data-structure holding all of our computation, listing each step one after another, AND we can modify (add to, remove from) that data-structure at runtime (unlike UNIX), constructing new Filters or Commands and adding them to the Pipeline.

      Pipeline is a meta-programming construct that is entirely first-class.

  5. I liked your article; it made me think about software arquitecture again. Too many time with PHP and I started to forget all that design patterns stuff I use to see in Java / .NET environments.

  6. Messaging destroys locality (by definition), which degrades performance, especially in the marshal-by-value world of web services. There may be a place for messaging, but not as the sole model of programming, in my opinion.

    • “Chain of Responsibility[/Pipeline] passes a sender request along a chain of potential receivers,” so yes, Pipeline is a messaging system. But it’s just one message, being passed through multiple systems. Thus, it resembles a normal, well performing computational system: it uses local data- the one message- being passed into and through various procedural systems.

      I don’t ascribe to your general fear of messaging, but you have a far greater work cut out for yourself establishing the idea that a single message being passed into multiple functions can destroy locality and degrade performance. And it’s certainly not marshal by value in any of the examples shown.

      • You’re right. By messaging, I wasn’t referring to pipelining, with an implementation as described by this snippet at the end of the post:

        “…each set of related pipelines could be built into their own assemblies or modules containing a vertical slice of the traditional layered approach and installed as Windows Services communicating by commands and events.”

        Making independently-running services is great from a maintainability standpoint, but this will tend to lead you toward a marshal by value paradigm, reducing locality and, therefore, degrading performance.

      • Hi Andy,

        The very nature of a distributed program implies that there is no single locality. Any application that talks via commands and events (pub/sub) is going to be asynchronous and temporally decoupled. Granted, it’s a whole slew of different problems (no silver bullet, etc.) but I don’t agree with the premise that performance is degraded. It’s merely one of a number of different approaches all of which have trade-offs.

  7. Over the last couple of years or so I’ve been using messaging more and more for both intra- and inter-application communication. However I’ve never used messages in pipelines as you describe. I like it though! The only issue I have is the use of exceptions to break out of the pipeline. I’ll need to re-read and think about this :-)

    • Hi Franklin, completely agree but we don’t have them in C#. That’s what I have to deal with at work so this is what I do to compensate. Maybe one day F# will become the dominant .Net language but until then…

    • One of the big advantages of Pipeline is that it is a very discrete and isolated computational model that is very easy to reason about and manipulate.

      I also think Monads are the wrong approach & that Pipelines ought forward along a consistent shared context rather than leave each computational stage free to compose and pass along whatever it wants: consistent context throughout allows far more modularity, and involves far less re-composing of what is to be passed down. Filters can be independent, monadic systems are special f’ing snowflakes all the day down.

      http://eventuallyconsistent.net/2013/08/12/messaging-as-a-programming-model-part-1/#comment-858

  8. I vastly prefer the Chain-of-Responsibility nomenclature, and prefer even vastly more the sometimes used name, the Chain-of-Command pattern.
    “Chain of Responsibility passes a sender request along a chain of potential receivers.”

    http://c2.com/cgi/wiki?ChainOfResponsibilityPattern

    I personally think there’s a huge advantage to considering the thing being passed along as a computational context, rather than a message. The Command patterns, the handlers of the pipeline, work together to sufficiently handle an input, and giving them a joint operating space that includes the data to work on, the chain of command itself, and any of their own contribution is in fact “the whole shebang” for that request’s “subprocess.” Sometimes there may not be inter-relations between the handlers, but that is a special case: in the general case, considering a shared context which is relayed along is a more interesting and collaborative set-up.

    The technique of the Chain-of-Command is, further, very much alike the Continuation Passing Style. Your example of passing the Pipeline in to a handler allows the handler to manipulate and stage future operations which are where flow control will carry on to. Instead of a passed in continuation being invoked with a result parameter they can use to complete themselves and stage future flow control upon, Commands are passed the pipeline, giving them a far more useful means for staging . This is an un-fucked CPS, CPS done right. CPS is a decoupling, but it can be pulled inside out while retaining the granted capability, and a Chain of Command is a structure for modelling malleable flow control on.

    I recommend thinking of the Chain of Command pattern as Context Passing Style, where context is both the chain itself, and also the ongoing data of the joint computation.

    I’m not sure why Pipelining and Chain of Command keeps being rediscovered with new names. Node.js’s big inflection point upwards was the arrival of Connect, which at it’s core is a non-dynamic, trivial Chain-of-Command implementation. EIP similarly eschewed using the well-trodden nomenclature, adopting the same terminology as the .NET kids opted to pick up. Yet it’s all chain of command, and Apache was here a long time ago. I have my own recent implementation too. The above said, the chain of command does have a clear and obvious more general predecessor, is itself a reinvention of a large part of the LISt Processor aka LISP. Yet I do prefer the Chain of Command greatly, as it’s Context Passing is more ideologically rooted and speaks to what I see as coremost to computing.

    http://commons.apache.org/proper/commons-chain/

    https://github.com/rektide/ex-cathedra-js/blob/master/test/chain-1.js

    • Good stuff. I realised early the similarity with CoR when I first started doing this. I guess the upshot is there’s a million and one ways to solve a given problem, I’m just putting out there an approach that’s worked for me and is easy to explain to other developers. Thanks for your input.

      • Thanks for your words, and thanks for the great coverage Steve. Passing through by way of Twitter and was happily surprised discovering such an approachable review that- without fanfare and without causing head scratching- touches on some very powerful dynamical computational questions and answers.

  9. pipeline.Register(msg => new Filter1(msg))
    .Register(msg => new Filter2(msg))
    .Register(msg => new Filter3(msg, pipeline));

    Here you have a cyclic reference: pipeline refers to filter, filter refers to the pipeline. It is much better to pass pipeline by another interface, e.g,:

    interface Filters
    {
    void Register(filter);
    }

    interface Pipeline
    {
    void Add(filter);
    }

    class UserAuthenticationPipeline implements Fillters, implements Pipeline;

    And pass the pipeline to Filter3 by Filters interface.

  10. nice article really. some very well-laid out points. it is similar to how Juval Lowy explains Service-Orientation, did you hear his “every class as a service” thing?

  11. Filters, the way you design them, do not have clear Input-Ouput semantics.
    The flaw is, that your messages are mutable objects. That led you to a design where you have void methods that modify their (input) parameters. This is bad for at least to reasons:
    1. It’s not explicit – hence harder to read/understand
    2. Your system may have hard to find side-effects.
    Therefore you should declare your messages as immutable objects. Your filters would than have signatures like:

        Message Process(Message input)

    That would improve your design a lot.

    • * Filters, the way you design them, do not have clear Input-Ouput semantics.

      I’m pretty sure I mention this.

      * The flaw is, that your messages are mutable objects.

      I think I might have mentioned that too

      * 1. It’s not explicit – hence harder to read/understand

      As opposed to what? As I describe it, it’s way easier to read/understand than your typical layered CRUD app architecture

      * 2. Your system may have hard to find side-effects.

      If filters are fine-grained single responsibilities any side effects are easy to find as it’s only the message that is manipulated within them

      * Therefore you should declare your messages as immutable objects

      If C# supported immutability without having to jump through hoops to do it manually, I’d have done it. As it stands, it’s too expensive

      The fact is, I’ve been doing this for a long time now. I can tell you it works and the benefits from a code readability/maintainability point of view are immeasurable. I’ve seen the results. In my opinion, any downsides are negligible compared to the downsides of the the typical C# OOP/procedural hybrid approach that most applications employ. You have to balance the trade offs of an approach and compare it with the trade off of your alternative approaches before dismissing them as unsuitable.

      Thanks for your input

  12. Pingback: Messaging as a programming model Part 2 | Eventually Consistent

  13. Thanks for both of these posts; they’ve been very fun to read!

    I’ve been playing around with these ideas in Clojure using core.async and immutable messages, more for fun than for profit. Here’s what I’ve come up with so far: https://github.com/zachallaun/async-pipeline. There’s quite a bit of additional complexity, which I’m not sure is a good thing, but this has been a lot of fun to thing about.

  14. Thx, Steve, for a great article. That´s the way to go :-) I strongly believe in message passing and flow-oriented programming. And all this without the burden of multi-threading or distribution. This is worthwhile in every day situations. It´s an approach for any application, be it web, desktop, batch or mobile.

    qammm has already pointed to my English blog where I´ve written a bit about this approach – which I call Flow-Design. “Design”, because I think the implementation needs to be preceded by thinking. And for this thinking in message passing terms we need tools, too.

    Also the other way round. Like some have expressed here: the implementation could look different – and still it would count as message passing. So it seems prudent to me to not just focus on one implementation style and still be able to talk about the fundamental idea.

    Some aspects of your presentation struck me odd:

    1. Why put the processing in a ctor? As long as you use an OO language this – to me – looks like a violation of basic OO principles. Ctors should never fail.
    2. Also why instanciate the filters for every message anew? You can do that, but why force it? You´re foregoing any benefit of being able to keep state between messages.
    3. You´re not using levels of abstraction. Strange. Your load pipeline is very long and thus hard to read – even though it´s just a sequence. Why not allow pipelines to be registered?

    I´d be happy to talk with you more about your experience with message passing. Email me, if you like.

    • Hi Ralf,

      Thanks for the comments, I will indeed email you as I think the comment section is probably not the place for what might turn into a deeper discussion. :)

      Steve

  15. Slightly late to the party perhaps but I like the overall pattern, I just wish there was a better mechanism that throwing exceptions. Exceptions are costly (in terms of speed) and if we stick to the old adage that “exceptions are thrown for exceptional circumstances”; I just don’t consider bad user input to be exceptional (I find it relatively common).
    Any alternative suggestions for error handling? Should the messages have an error state which the pipeline can check and report back to the caller?

  16. Pingback: Flow-based Programming | Tales from a Trading Desk

  17. Pingback: Messaging als Programmiermodell | DaRaFF's Blog

  18. Pingback: Messaging with CPS | Eventually Consistent

Comments are closed.