MiniBus

I’ve been hacking away recently on a little MSMQ side project I call MiniBus that allows me to reliably integrate between small applications and services, etc. with a dose of NServiceBus-style goodness. This kind of begs the question though, why not just use NServiceBus? Well, for a start, getting a business to pay for it can be a difficult conversation, and if you’re going to use it at all then really it should be used as part of an overall system architecture that can justify it such as the new mobile eCommerce platform we’ve recently been building. Whilst it is clearly capable of handling simple application integration scenarios too, that isn’t what it was really designed for and can be overkill. It’s also quite complex and requires a good understanding of distributed systems architecture, something not all developers are willing to learn about. As much as I love NServiceBus (and hey, it must be good ’cause its managed to convince some people that I have a clue about architecture, but don’t tell my boss!) sometimes the project at hand calls for a simpler approach.

I often find myself working on small integration projects when we bring new clients on board. Some of the older apps that we have that already do similar work are written in such a way that there exists a direct temporal coupling between us and our clients, or even to various other parts of our own system. It makes for a rather brittle architecture. My desire to improve this situation had me looking around for something that, in NServiceBus terms, would essentially give me a bus.Send but not bus.Publish. All I needed was the ability to break that coupling and let me integrate apps reliably by offloading work, when appropriate, to other services but…I also wanted some of the nice things that NServiceBus has like automatic retries, JSON serialization, transaction support, and error queues for failed messages. Oh, and I also wanted something small and lean and I think I’ve achieved that goal so I’m happy with the end result.

MiniBus is not an ESB and it doesn’t do pub-sub, but it does have some bus-like properties. In a nutshell, it offers the following features:

  • Send to one or more queues (local and/or remote)
  • Load balancing via round-robin dispatching
  • Read messages synchronously or asynchronously
  • Choice of XML or JSON serialization
  • Enlist in ambient transactions
  • Configurable automatic retries
  • Move to error queue on failure
  • Automatically create local queues
  • Install MSMQ if not detected
  • Simple logging support
  • Ability to return error messages back to the read queue

Whilst the source code and full documentation can be found on GitHub here’s a quick overview of using MiniBus. Essentially you create a bus instance via the BusBuilder class and then use it to either send messages:


// create a bus for sending messages
IBus bus = new BusBuilder()
    .WithLogging(new FileLogger())
    .InstallMsmqIfNeeded()
    .DefineErrorQueue("MiniBus.errors")
    .DefineWriteQueue("MiniBus.messages1")
    .DefineWriteQueue("MiniBus.messages2@remotepc")
    .DefineWriteQueue("MiniBus.messages3")
    .CreateLocalQueuesAutomatically()
    .EnlistInAmbientTransactions()
    .JsonSerialization()
    .CreateBus();

// create your message type
var bob = new Person { p.Name = "Bob", Age = 22 };

// send it
bus.Send(bob);

or receive them:


// create a bus for reading messages
IBus bus = new BusBuilder()
    .WithLogging(new FileLogger())
    .DefineErrorQueue("MiniBus.errors")
    .DefineReadQueue("MiniBus.messages1")
    .JsonSerialization()
    .NumberOfRetries(3)
    .CreateBus();

// register a message handler
bus.RegisterHandler<Person>(new PersonHandler());

// process messages on the read queue
bus.Receive<Person>();

Receiving messages can be done synchronously as shown in the example or asynchronously. If, for example, you want to process messages only at specific times then the Receive method is the better choice. If you want to process messages as and when they arrive then call the ReceiveAsync method as soon as you’ve created the bus and configured your handler. Either way, the handler specific code runs in the context of a transaction and should an exception occur MiniBus will retry if you’ve configured the bus to do so. If after the maximum number of retries the message still can’t be successfully processed, MiniBus will move it to an error queue. When you’re ready to try again, the ReturnAllErrorMessages method will move failed messages off the error queue and back to the read queue. On the subject of handlers, you simply create one or more classes that implement the IHandleMessage<T> interface and then register them with the bus.

MiniBus also has a rudimentary “load balancing” option via the AutoDistributeOnSend method. Ordinarily, having defined multiple write queues, sending a message would mean that a copy is sent to all of those queues which is fine if you need different handlers for the same message type. On the other hand, if you want to spread the load by having two or more different endpoints deal with messages in the same way setting the AutoDistributeOnSend option will cause MiniBus to adopt a round-robin style balancing approach whereby the first call to Send will send the message to the first queue, the second call to Send will send the message to the second queue, and so on before going back to the first. This means that no single endpoint is tasked with having to do all the work. In other words, you gain some parallelism.

In order to keep it small and simple, MiniBus has no dependencies on any other library. Logging is up to the consuming application. An interface is provided (ILogMessages) that you can use to wrap your logging library of choice. If you don’t like specifying queue names in code, it’s up to you to read them out of your config file and set the various bus properties. Finally, If you need a hosting process I recommend TopShelf for Windows Services. Right now, MiniBus leaves all of this up to you though that might change in future iterations.

There’s probably a lot more that could be done with MiniBus but you have to draw the line somewhere. There’s no point in attempting to make another ESB when there’s already plenty of great options out there like NServiceBus, and Rebus, another cool library. With MiniBus I had a simple use case in mind and I’ve more or less built what I needed but I think it could be useful to others too and for that reason I think it’s worth sharing so I’ve decided to put it “out there” by making it available on both GitHub and Nuget. It’s small, simple to use and reliable too so feel free to give it a go, and if you find a use for it then great! If you do happen to find any issues though, be sure to let me know or better yet, submit a patch! :)

Updates

I’ll keep this page updated with changes as and when they happen

24/11/2013

MiniBus 0.1.3 released

  • ReturnErrorMessages renamed to ReturnAllErrorMessages
  • Added ReturnErrorMessage method to move specific message from error queue
  • Added Fail fast option
  • The fail fast option is useful for scenarios where it’s important that messages are processed in order. As soon as an error is detected no more messages are processed and the failing message stays on the read queue.

    Advertisements
    MiniBus

    Branching messages with pipelines

    If you’ve read my recent posts on Messaging as a programming model you might gather that I’m a fan of this approach. If nothing else, it’s a great way of breaking down a big use case into a small set of fine grained, easily maintainable and replaceable steps that make the code very easy to reason about. However, one of the drawbacks at first glance is that there doesn’t appear to be a way to easily branch the message as it moves through the sequence of filters. You may recall that using the essence of the Message Router pattern we can change which filters are executed by modifying the pipeline whilst it is executing by registering new steps based on a particular value carried on the message like so:

    
    public class Filter3
    {
        public Filter3(SomeMessage input, PipeLine<SomeMessage> pipeline)
        {
            Console.WriteLine("filter3");
       
            if(input.SomeCondition)
                pipeline.Register(msg => new Filter4(input));
            else
                pipeline.Register(msg => new Filter5(input));
        }
    }
    
    

    However, all this does is attach one of either Filter4 or Filter5 to the end of the existing list of registered filters. You can obviously register as many filters as you like under each path of the conditional if test but even so they are still added at the end with no means to then continue executing back on the original main branch. A diagram might be best here to help explain what I mean:

    Alternative Filter Paths

    The old

    The picture on the left depicts the structure of the filters as the original approach allows. The picture on the right shows a more useful alternative that we’d like as a minimum, and ideally without having to pass the pipeline into the filter to change it. The question is how can we achieve this?

    The answer lies in the IFilter interface as described in the Revisited post of the original series. By adopting this simple interface for all our filters we can implement a variation of the composite design pattern where we have filters that contain logic (the leaf) and filters that themselves contain pipelines of filters (the composite) with the addition of a new filter whose sole purpose is to test a condition and determine what to execute next. I’ll use a modified (and contrived) version of the original LogIn example to illustrate the technique. The original example looked like this:

    Simple pipeline for logging in a user

    It’s a straightforward linear sequence of steps that modify and enrich the message as it passes through. What if though, for arguments sake, we want to execute a bunch of different steps depending on the result of the conditional test in the IsUserLoginAllowed filter?

    The new

    Branching pipeline for logging in a user

    Here with the addition of a few more filters the message will always pass through the first three steps but depending on the logic within the IsUserLoginAllowed filter will then either pass through the two filters on the left branch or through the three on the right branch before joining back up with the main branch to record the fact that a log in was attempted. This final filter will execute regardless of whether the message went left or right. This is a lot more useful and easily achievable through simple composition thanks to the use of our IFilter interface, so let’s look at the code starting with the interface that all our filters will implement:

    
    public interface IFilter<T>
    {
        void Execute(T msg);
    }
    
    

    and the Pipeline class which also implements the interface:

    
    public class PipeLine<T> : IFilter<T>
    {
       public PipeLine<T> Register(IFilter<T> filter)
       {
          _filters.Add(filter);
          return this;
       }	
    
       public void Execute(T input)
       {
           _filters.ForEach(f => f.Execute(input));
       }
    
       List<IFilter<T>> _filters = new List<IFilter<T>>();	
    }
    
    

    All straightforward and recognisable, as are the first two filters shown here:

    
    public class CheckUserSuppliedCredentials : IFilter<LogInMessage>
    {
       public void Execute(LogInMessage input)
       {
            Console.WriteLine("CheckUserSuppliedCredentials");
            if(string.IsNullOrEmpty(input.Username) || string.IsNullOrEmpty(input.Password))
            {
                input.Stop = true;
                input.Errors.Add("Invalid credentials");
            }
       }
    }
    
    public class CheckApiKeyIsEnabledForClient : IFilter<LogInMessage>
    {
        public CheckApiKeyIsEnabledForClient(IClientDetailsDao dao)
        {
            _dao = dao;
        }
    
        public void Execute(LogInMessage input)
        {
            Console.WriteLine("CheckApiKeyIsEnabledForClient");
            var details = _dao.GetDetailsFrom(input.ApiKey);
    
            if(details == null)
            {
                input.Stop = true;
                input.Errors.Add("Client not found");
            }
    
            input.ClientDetails = details;
        }
    }
    
    

    Branch Filters

    The next filter, IsUserLogInAllowed, is more interesting and has changed from previous implementations. I refer to it as a Branch filter because, well, it allows your message to branch!

    
    public class IsUserLoginAllowed : IFilter<LogInMessage>
    {
    	public IsUserLoginAllowed(IFilter<LogInMessage> leftBranch, IFilter<LogInMessage> rightBranch)
    	{
    		_onDeny = leftBranch;
    		_onAllow = rightBranch;
    	}
    
    	// only users of ClientX and ClientY can log in
    	public void Execute(LogInMessage input)
    	{
    		Console.WriteLine("IsUserLoginAllowed");
    		switch(input.ClientDetails.Name)
    		{
    			case "ClientX":
    			case "ClientY":
    				_onAllow.Execute(input);
    				break;
    			
    			default:
    				_onDeny.Execute(input);
    				break;
    		}
    	}
    	
    	private IFilter<LogInMessage> _onAllow;
    	private IFilter<LogInMessage> _onDeny;
    }
    
    

    Look at the constructor of this filter and notice that it takes two IFilter instances, one for each branch path. These can be normal, individual (leaf) filters but it gets more interesting and useful when they are filters that group other filters together, in other words, composite filters. A Branch filter’s only responsibility is to determine what to do next. If we look at how this is all wired up we’ll see that our Branch filter, IsUserLoginAllowed, takes two composite filters, OnDenyLogIn and OnAllowLogIn:

    
        var pipeline = new PipeLine<LogInMessage>();
        pipeline.Register(new CheckUserSuppliedCredentials());
        pipeline.Register(new CheckApiKeyIsEnabledForClient(new ClientDetailsDao()));
        pipeline.Register(new IsUserLoginAllowed(new OnDenyLogIn(), new OnAllowLogIn()));
        pipeline.Register(new RecordLogInAttempt());
    		
        var msg = new LogInMessage { /* ... set values on the message */ };
        pipeline.Execute(msg);
    
    

    Composite Filters

    Composite filters contain their own pipelines each registering their own related filters and because all filters whether leaf or composite implement the IFilter interface thay can all be treated the same via the Execute method:

    composed pipeline for logging in a user

    The code is simple consisting of nothing more than registering filters in a pipeline.

    OnDenyLogin:

    
    public class OnDenyLogIn : IFilter<LogInMessage>
    {
        public OnDenyLogIn()
        {
            _pipeline.Register(new LockAccountOnThirdAttempt());
            _pipeline.Register(new PublishLogInFailed());
        }
    
        public void Execute(LogInMessage input)
        {
             Console.WriteLine("OnDenyLogIn");
             _pipeline.Execute(input);
        }
    
        private PipeLine<LogInMessage> _pipeline = new PipeLine<LogInMessage>();
    }
    

    and OnAllowLogIn:

    
    public class OnAllowLogIn : IFilter<LogInMessage>
    {
        public OnAllowLogIn()
        {
            _pipeline.Register(new ValidateAgainstMembershipApi());
            _pipeline.Register(new GetUserData());	
            _pipeline.Register(new PublishLogInSucceeded());
        }
    	
        public void Execute(LogInMessage input)
        {
            Console.WriteLine("OnAllowLogIn");
            _pipeline.Execute(input);
        }
    	
        private PipeLine<LogInMessage> _pipeline = new PipeLine<LogInMessage>();
    }
    
    

    The fact that composites are self contained means that their own internal pipelines don’t even need to be for the same message type! Even though both of the example composite filters both take a LogInMessage via their Execute method, there’s nothing to stop them sending a completely different message type through the internal pipeline and then assigning the result, whatever that may be, to some property on the LogInMessage type before it continues on its way through the rest of the sequence. None of the others filters ever need know that a different message type was created and processed within a composite filter.

    Benefits

    Composing filters in this way has some important benefits. First it makes the main pipeline easier to read as there are a smaller number of steps at the top level and you can see immediately that there are two courses of action that can be taken when the IsUserLoginAllowed branch filter is executed without having to know the lower level details. Second, grouping related functionality into cohesive whole units like this not only more closely adheres to the Single Responsibility Principle but also allows for some quite complex pipelines to be composed as the following diagram demonstrates:

    more complex pipeline

    Unit testing is still easy whether we test individual filters or the whole pipeline, but now we have the added option of testing at a third level of granularity via a composite filter such as OnDenyLogIn or OnAllowLogIn. When we’re happy these work as expected we can plug them in and test them in the context of the whole pipeline. Additionally, we’re also still able to use aspects to wrap the entire execution path inside a transaction, for example:

    
    var pipeline = new PipeLine<LogInMessage>();
    pipeline.Register(new CheckUserSuppliedCredentials());
    pipeline.Register(new CheckApiKeyIsEnabledForClient(new ClientDetailsDao()));
    pipeline.Register(new IsUserLoginAllowed(new OnDenyLogIn(), new OnAllowLogIn()));
    pipeline.Register(new RecordLogInAttempt());
    		
    var msg = new LogInMessage { /* ... set values on the message */ };
    var tranAspect = new TransactionAspect<LogInMessage>(pipeline);
    tranAspect.Execute(msg);
    
    

    Running this would send the message through either branch (depending on the client):

    
    start tran
    CheckUserSuppliedCredentials
    CheckApiKeyIsEnabledForClient
    IsUserLoginAllowed
    OnAllowLogIn
    ValidateAgainstMembershipApi
    GetUserData
    PublishLogInSucceeded
    RecordLogInAttempt
    end tran
    
    

    or alternatively:

    
    start tran
    CheckUserSuppliedCredentials
    CheckApiKeyIsEnabledForClient
    IsUserLoginAllowed
    OnDenyLogIn
    LockAccountOnThirdAttempt
    PublishLogInFailed
    RecordLogInAttempt
    end tran
    
    

    Summary

    The adoption of the IFilter interface and the idea of building separate pipelines inside composite filters opens up more possibilities for messaging as a programming model, most notably by giving us a way to model more complex business processes via branching paths whilst still retaining all the benefits outlined in previous messaging posts. It’s another useful addition to the pipe and filter toolbox, adding variation, and gives more weight to the argument for using messaging techniques in our applications.

    Branching messages with pipelines