MiniBus

I’ve been hacking away recently on a little MSMQ side project I call MiniBus that allows me to reliably integrate between small applications and services, etc. with a dose of NServiceBus-style goodness. This kind of begs the question though, why not just use NServiceBus? Well, for a start, getting a business to pay for it can be a difficult conversation, and if you’re going to use it at all then really it should be used as part of an overall system architecture that can justify it such as the new mobile eCommerce platform we’ve recently been building. Whilst it is clearly capable of handling simple application integration scenarios too, that isn’t what it was really designed for and can be overkill. It’s also quite complex and requires a good understanding of distributed systems architecture, something not all developers are willing to learn about. As much as I love NServiceBus (and hey, it must be good ’cause its managed to convince some people that I have a clue about architecture, but don’t tell my boss!) sometimes the project at hand calls for a simpler approach.

I often find myself working on small integration projects when we bring new clients on board. Some of the older apps that we have that already do similar work are written in such a way that there exists a direct temporal coupling between us and our clients, or even to various other parts of our own system. It makes for a rather brittle architecture. My desire to improve this situation had me looking around for something that, in NServiceBus terms, would essentially give me a bus.Send but not bus.Publish. All I needed was the ability to break that coupling and let me integrate apps reliably by offloading work, when appropriate, to other services but…I also wanted some of the nice things that NServiceBus has like automatic retries, JSON serialization, transaction support, and error queues for failed messages. Oh, and I also wanted something small and lean and I think I’ve achieved that goal so I’m happy with the end result.

MiniBus is not an ESB and it doesn’t do pub-sub, but it does have some bus-like properties. In a nutshell, it offers the following features:

  • Send to one or more queues (local and/or remote)
  • Load balancing via round-robin dispatching
  • Read messages synchronously or asynchronously
  • Choice of XML or JSON serialization
  • Enlist in ambient transactions
  • Configurable automatic retries
  • Move to error queue on failure
  • Automatically create local queues
  • Install MSMQ if not detected
  • Simple logging support
  • Ability to return error messages back to the read queue

Whilst the source code and full documentation can be found on GitHub here’s a quick overview of using MiniBus. Essentially you create a bus instance via the BusBuilder class and then use it to either send messages:


// create a bus for sending messages
IBus bus = new BusBuilder()
    .WithLogging(new FileLogger())
    .InstallMsmqIfNeeded()
    .DefineErrorQueue("MiniBus.errors")
    .DefineWriteQueue("MiniBus.messages1")
    .DefineWriteQueue("MiniBus.messages2@remotepc")
    .DefineWriteQueue("MiniBus.messages3")
    .CreateLocalQueuesAutomatically()
    .EnlistInAmbientTransactions()
    .JsonSerialization()
    .CreateBus();

// create your message type
var bob = new Person { p.Name = "Bob", Age = 22 };

// send it
bus.Send(bob);

or receive them:


// create a bus for reading messages
IBus bus = new BusBuilder()
    .WithLogging(new FileLogger())
    .DefineErrorQueue("MiniBus.errors")
    .DefineReadQueue("MiniBus.messages1")
    .JsonSerialization()
    .NumberOfRetries(3)
    .CreateBus();

// register a message handler
bus.RegisterHandler<Person>(new PersonHandler());

// process messages on the read queue
bus.Receive<Person>();

Receiving messages can be done synchronously as shown in the example or asynchronously. If, for example, you want to process messages only at specific times then the Receive method is the better choice. If you want to process messages as and when they arrive then call the ReceiveAsync method as soon as you’ve created the bus and configured your handler. Either way, the handler specific code runs in the context of a transaction and should an exception occur MiniBus will retry if you’ve configured the bus to do so. If after the maximum number of retries the message still can’t be successfully processed, MiniBus will move it to an error queue. When you’re ready to try again, the ReturnAllErrorMessages method will move failed messages off the error queue and back to the read queue. On the subject of handlers, you simply create one or more classes that implement the IHandleMessage<T> interface and then register them with the bus.

MiniBus also has a rudimentary “load balancing” option via the AutoDistributeOnSend method. Ordinarily, having defined multiple write queues, sending a message would mean that a copy is sent to all of those queues which is fine if you need different handlers for the same message type. On the other hand, if you want to spread the load by having two or more different endpoints deal with messages in the same way setting the AutoDistributeOnSend option will cause MiniBus to adopt a round-robin style balancing approach whereby the first call to Send will send the message to the first queue, the second call to Send will send the message to the second queue, and so on before going back to the first. This means that no single endpoint is tasked with having to do all the work. In other words, you gain some parallelism.

In order to keep it small and simple, MiniBus has no dependencies on any other library. Logging is up to the consuming application. An interface is provided (ILogMessages) that you can use to wrap your logging library of choice. If you don’t like specifying queue names in code, it’s up to you to read them out of your config file and set the various bus properties. Finally, If you need a hosting process I recommend TopShelf for Windows Services. Right now, MiniBus leaves all of this up to you though that might change in future iterations.

There’s probably a lot more that could be done with MiniBus but you have to draw the line somewhere. There’s no point in attempting to make another ESB when there’s already plenty of great options out there like NServiceBus, and Rebus, another cool library. With MiniBus I had a simple use case in mind and I’ve more or less built what I needed but I think it could be useful to others too and for that reason I think it’s worth sharing so I’ve decided to put it “out there” by making it available on both GitHub and Nuget. It’s small, simple to use and reliable too so feel free to give it a go, and if you find a use for it then great! If you do happen to find any issues though, be sure to let me know or better yet, submit a patch! :)

Updates

I’ll keep this page updated with changes as and when they happen

24/11/2013

MiniBus 0.1.3 released

  • ReturnErrorMessages renamed to ReturnAllErrorMessages
  • Added ReturnErrorMessage method to move specific message from error queue
  • Added Fail fast option
  • The fail fast option is useful for scenarios where it’s important that messages are processed in order. As soon as an error is detected no more messages are processed and the failing message stays on the read queue.

    Advertisements
    MiniBus

    Integrating with External Clients via NServiceBus

    I thought I’d write up how we’ve been using NServiceBus at work to help us handle a rather large integration project that we’re in the middle of and how much simpler the task was than it might otherwise have been without such an awesome tool.

    We have a new client who are essentially outsourcing part of their business to us. That means we have to be able to receive orders from them and also be able to send them order dispatch notifications and stock updates via web services that they own. Their order data is fed into an existing system and eventually ends up populating a couple of tables that contain the outgoing data, one for dispatches and one for stock levels. This data needs to be read from these tables and sent to the client via calls to their web services.

    Potentially this could be a tricky problem as there’s no guarantee that the web service will actually be there at the time we make the call. Each call could hang around for a while before timing out which would slow the system down too. If we had chosen to read this data and invoke the web service within the same process then this would meet the definition of temporal coupling where we package together two modules (data reads and web service calls) because we believe they need to happen at the same time but this can and probably should be avoided. Even if we are able to successfully read and send data it still doesn’t mean we are guaranteed to succeed. What if there’s a validation issue? How do we handle a failed attempt to call them? Can we just throw the data away and wait until we have another request to make or do we need some kind of error handling and retry mechanism?

    Thankfully, in NServiceBus we have a tool at our disposal that while it wasn’t necessarily designed for this scenario, it actually handles it with ease.

    My solution to this problem was to create a simple console application that can be run under the Windows Task Scheduler that periodically reads data from these tables and creates messages that it Bus.Sends to a queue – two queues actually, one for dispatches, one for stock updates, so that each endpoint can be individually monitored to assess the volume of messages it is having to work with (we expect that stock updates will outnumber dispatches by a large margin).

    At a high level it looks like this:

    About

    The client process is essentially nothing more than the following snippet running every 10 minutes:

    using(var scope = new TransactionScope())
    {
    	LoadMessages();
    	SendMostRecentStockMessages();
    	SendAllDispatchMessages();
    	MarkAsSent();
    	scope.Complete();
    }
    

    Digging deeper, I split the LoadMessages method into two more methods – ReadStockMessages() and ReadDispatchMessages(). Here I used Dapper (another favourite tool of mine) as a simple way to map table columns and rows into the actual message types I want to place on the queue:

    stockMessages = cnn.Query<StockMessage>("SELECT * FROM StockTable").ToList();
    
    disptachMessages = cnn.Query<DispatchMessage>("SELECT * FROM DispatchTable").ToList();
    

    The SendMostRecentStockMessages() and SendAllDispatchMessages() methods basically iterate over the messages pushing them on to their respective queues:

    dispatchMessages.ForEach(msg => Bus.Send(msg));
    

    The logic for sending Stock messages was a little more involved as we didn’t want to send all messages for a given part number, only the most recent as the client is only interested in knowing what the current stock level is not what it might have been multiple times since we last polled the database. A little LINQ query helped here.

    Once the messages are on the queue I then update a DateSent field in each row so that the system can move them to an audit table. All this is done using the TransactionScope (and therefore the Microsoft DTC) so that if any problems arise with bad data then no messages will be sent. The nice thing about NServiceBus is that it takes care of configuring the DTC for you when you first download it meaning that you don’t really need to give it much thought at all.

    With the transaction scope completed and messages safely on their respective queues NServiceBus kicks in and invokes the appropriate handlers. Anyone familiar with NServiceBus will be aware of the Profile feature that allow us to do something different with the received message depending on the profile selected. We used the profiles mainly as a means to help us test the client’s web services by invoking their test urls in the Integration profile and the live urls in the Production profile. With profiles testing becomes simple.

    For me, the stand out feature of NServiceBus though is the error handling which takes us back to the original statement about temporal coupling. If we had not gone with NServiceBus and attempted to read data and send it in one hit we would have been faced with having to roll our own error handling and somehow ensure that any failures did not result in lost data. Once again, this is taken care of in NServiceBus automatically. Any failure that occurs now results in the message being moved to an error queue for later inspection by an admin who can, once the initial problem is resolved, move the message back to its original queue at which point NServiceBus will then attempt to send it again. If it goes through, great. If not, it’s back to the error queue and we try to resolve whatever the issue is this time. It’s reassuring to know that no matter what goes wrong we never lose data! It may be that the data is now too old and no longer worth sending. In this case we can just delete the message from the error queue but at least we are in a position to make that decision.

    It has to be said that NServiceBus when used as intended, i.e. in a pub/sub environment costs money but in this scenario we’ve used it on a single machine (which is free under the Express Edition licence) with the sole purpose of providing a robust integration point and it works very well. We don’t need publish and subscribe right now but what business doesn’t need reliability and durability? The fact that NServiceBus gives this to us in a simple manner makes it a great way to provide integration between disparate systems where fault tolerance is a must-have and it is something I’d encourage anyone faced with a similar problem to consider.

    Integrating with External Clients via NServiceBus