Enter Rebus.fm

little_rebusbus2_copy-500x500Since the inception of Rebus more than four years ago, I have been a happy user of this small messaging library targeted at being intuitively easy and natural to configure and use.

I am not the only one who has been using it, though. Over the course of Rebus’ lifetime I have had contact with several companies who were using it to solve their business problems, and I have always been delighted to hear about both good and bad experiences with it.

Now I have decided to take it to the next level: I will now begin to offer commercial services as a complement to Rebus from my company, Rebus.fm:

rebus.fm logo

Rebus has always been and will continue to be absolutely free and open source (with the MIT license).

This just means that serious Rebus users can receive a higher degree of commitment from me in the form of support and guaranteed bug fixes, and also in the form of supplementary tooling in the future.

This does not mean that I am ending my close cooperation with my employer, d60. We will continue to work together and support each other, which also means that I can welcome d60 as the first “Rebus.fm partner” (but I’ll talk some more about that later…. :o) ).

If this sounds interesting to you, I suggest you go and read some more about Rebus.fm at the Rebus.fm homepage. And don’t hesitate to get in touch if you have questions or comments.

How to use Rebus in a web application

If you’re building web applications, you will often encounter situations where delegating work to some asynchronous background process can be beneficial. Let’s take a very simple example: Sending emails!

We’re in the cloud

Let’s say we’re hosting the website in Azure, as an Azure Web Site, and we need to send an email at the end of some web request. The most obvious way of doing that could be to new up an SmtpClient with our SendGrid credentials, and then just construct a MailMessage and send it.

While this solution is simple, it’s not good, because it makes it impossible for us to have higher uptime than SendGrid (or whichever email provider you have picked). In fact, every time we add some kind of synchronous call to the outside from our system, we impose their potential availability problems on ourselves.

We can do better 🙂

Let’s make it asynchronous

Now, instead of sending the email directly, let’s use Rebus in our website and delegate the integration to external systems, SendGrid included, to a message handler in the background.

This way, at the end of the web request, we just do this:

and then our work handling the web request is over. Now we just need to have something handle the SendEmail message.

Where to host the backend?

We could configure Rebus to use either Azure Service Bus or Azure Storage Queues to transport messages. If we do that, we can host the backend anywhere, including as a process running on a 386 with a 3G modem in the attic of an abandoned building somewhere, but I’ve got a way that’s even cooler: Just host it in the web site!

This way, we can have the backend be subject to the same auto-scaling and whatnot we might have configured for the web site, and if we’re a low traffic site, we can even get away with hosting it on the Free tier.

Moreover, our backend can be Git-deployed together with the website, which makes for a super-smooth experience.

How to do it?

It’s a good idea to consider the backend a separate application, even though we chose to deploy it as though it was one. This is just a simple example on how processes and applications are really orthogonal concepts – in general, it’s limiting to attempt to enforce a 1-to-1 between processes and applications(*).

What we should do is to have a 1-to-1 relationship between IoC container instances and applications, because that’s what IoC containers are for: To function as a container of one, logical application. In this case that means that we’ll spin up one Windsor container (or whichever IoC container is your favorite) for the web application, and one for the backend. In an OWIN Startup configuration class, it might look like this:

In the code sample above, UseWindsorContainer and RegisterForDisposal are extension methods on IAppBuilder. UseWindsorContainer replaces Web API’s IHttpControllerActivator with a WindsorCompositionRoot like the one Mark Seemann wrote, and RegisterForDisposal looks like this:

which is how you make something be properly disposed when an OWIN-based application shuts down. Moreover, I’m using Windsor’s installer mechanism to register stuff in the containers.

Rebus configuration

Next thing to do, is to make sure that I configure Rebus correctly – since I have two separate applications, I will also treat them as such when I set up Rebus. This means that my web tier will have a one-way client, because it needs only to be able to bus.Send, whereas the backend will have a more full configuration.

The one-way client might be configured like this:

this registering an IBus instance in the container which is capable of sending SendEmail messages, which will be routed to the queue named backend.

And then, the backend might be configured like this:

Only thing left is to write the SendEmailHandler:

Email message handler

Conclusion

Hosting a Rebus endpoint inside an Azure Web Site can be compelling for several reasons, where smooth deployment of cohesive units of web+backend can be made trivial.

I’ve done this several times myself, in web sites with multiple processes, including sagas and timeouts stored in SQL Azure, and basically everything Rebus can do – and so far, it has worked flawlessly.

Depending on your requirements, you might need to flick on the “Always On” setting in the portal

Skærmbillede 2015-08-21 kl. 11.44.22

so that the site keeps running even though it’s not serving web requests.

Final words

If anyone has experiences doing something similar to this in Azure or with another cloud service provider, I’d be happy to hear about it 🙂


(*) This 1-to-1-ness is, in my opinion, a thing that the microservices community does nok mention enough. I like to think about processes and applications much the same way as Udi Dahan describes it.

Another Rebus extension example

In the previous post I showed how Rebus’ subscription storage could report itself as “centralized”, which would provide a couple of benefits regarding configuration. In the post before the previous post, I showed how Rebus could be extended in order to execute message handlers inside a System.Transactions.TransactionScope, which was easy to do, without touching a single bit in Rebus core.

This time, I’ll combine the stuff I talked about in the two posts, and show how the Azure Service Bus transport can hook itself into the right places in order to function as a multicast-enabled transport, i.e. a transport that natively supports publish/subscribe and thus can relieve Rebus of some of its work.

Again, let’s take it from the outside and in – I would love it if Rebus could be configured to use Azure Service Bus simply by doing something like this:

to configure each endpoint, and then await bus.Subscribe() and await bus.Publish(new SomeMessage("woohoo it works!!")) in order to subscribe to messages and publish them, and then just sit back and see published messages flow to their subscribers without any additional work.

Now, what would it require to make that work?

Subscriptions must be stored somewhere

No matter which kind of technology you use to move messages around, and no matter how complex logic they support, I bet they somehow build on some kind of message queue building block – i.e. a thing, into which you may put messages meant for some specific recipient to receive, and out of which one specific recipient can get its messages – and then everything that the transport can do with the messages, like routing, filtering, fan-out, etc., is implemented as logic that hooks into places and does stuff, but it will always end out with messages going into message queues.

Since Rebus is made to be able to run right on top of some pretty basic queueing systems, like e.g. MSMQ, and then has some of its functions going on in “user space” (like pub/sub messaging), it has an abstraction for something that persists subscriptions: ISubscriptionStorage – this is the way a publisher “remembers” which queues to send to when it publishes a message.

When a transport offers its own pub/sub mechanism (and Azure Service Bus does that via topics and subscriptions) it means that it effectively works as a centralized implementation of ISubscriptionStorage (as described in the previous post) – and in fact, it turns out that that is the proper place to hook in in order to take advantage of the native pub/sub mechanism.

Let’s do it 🙂

How to replace the subscription storage

Similar to what I showed in the previous Rebus extension example, the UseAzureServiceBus function above is an extension method – it uses Injectionist to register an instance of the transport, and then it sets up resolvers to use the transport as the primary ITransport and ISubscriptionStorage implementations – it looks like this:

Now, the Azure Service Bus transport just needs to perform some meaningful actions as the subscription storage it is now claiming to be. Let’s take a look at

How to be a subscription storage

Subscription storages need to implement this interface:

where you already know that the IsCentralized property must return true, indicating that subscribers can register themselves directly. And then, because it’s Azure Service Bus, we just need to

  1. ensure the given topic exists, and
  2. create a subscription on the given topic with ForwardTo set to the subscriber’s input queue

in order to start receiving the subscribed-to events. And that is in fact what the Azure Service Bus transport is doing now 🙂

Only thing left for this to work, is this:

Rebus must publish in the right way

When Rebus publishes a message, it goes through the following sequence:

  1. Asks the subscription storage for subscribers of the given topic
  2. For each subscriber: Asks the transport to send the message to that subscriber

Now, since the Azure Service Bus transport is both subscription storage and transport, we’ll take advantage of this fact by having GetSubscriberAddresses return only one single – fake! – subscriber address, on the form subscribers/<topic>(*).

And then, when the transport detects a destination address starting with the subscribers/ prefix (which cannot be a valid Azure Service Bus queue name), the transport will use a TopicClient for the right topic to publish the message instead of the usual QueueClient.

Conclusion

Rebus (since 0.90.8) can take advantage of Azure Service Bus’ native support for publish/subscribe, which means that you need not worry about routing of events or configuring any other kind of subscription storage.


(*) Azure Service Bus does not support “,”, “+”, and other characters in topics, and these characters can often be found in .NET type names (which Rebus likes to use as topics), Azure Service Bus transport will normalize topics by removing these illegal characters. In fact, all non-digit non-letter characters will be replaced by “_”, causing "System.String, mscorlib" to become a topic named "system_string__mscorlib".

Storing subscriptions

TL;DR: With Rebus2, subscription storages can be configured to be “centralized”, which means that all endpoints are allowed to register themselves as subscribers directly in the subscription storage. This way, since the central subscription storage is shared between all endpoints, no additional routing information is needed in order to implement pub/sub.

Long version:

Rebus has always had a subscription storage, which was the persistence abstration that a publisher would use to remember who subscribed to what.

Basic workings of the subscription mechanism

In the most basic scenario, it works like this:

1: Subscriber locates publisher

A subscriber, some_subscriber wants to subscribe to all string events.

In order to do this, the subscriber asks The Router (I’ll talk some more about The Router in another blog post, I promise 🙂 ): “Router – who owns System.String, mscorlib?” to which The Router replies: “some_publisher” (The Router is just concise like that…)

2: Subscriber subscribes

The subscriber now sends a SubscribeRequest to some_publisher, which is basically saying something like “hi, I’m some_subscriber – I’d like to subscribe to System.String, mscorlib.

3: Publisher remembers

Having received the SubscribeRequest, the publisher then saves this information to its subscription storage (e.g. a table in SQL Server) in the form of this simple tuple: ("System.String, mscorlib", "some_subscriber").

4: Publisher publishes

For all eternity (or until the subscriber unsubscribes again), the publisher will then publish its string events by checking its subscription storage for subscribers of System.String, mscorlib.

Let’s pretend that it gets these two bad boys:

  • ("System.String, mscorlib", "some_subscriber")
  • ("System.String, mscorlib", "another_subscriber")

With these two in hand, the publisher will then just go on and send the event to each subscriber directly, i.e. to some_subscriber and another_subscriber in this case.

To sum it up

The basic pub/sub sequence can be sketched as this, which more accurately could be called “sub/pub”, because that’s the order of events – first, steps 1, 2, and 3 (router lookup omitted for clarity)

subscribe

and then step 4

pub

A nifty trick with new Rebus

Check out the current layout of the ISubscriptionStorage interface:

In Rebus2 (i.e. versions 0.90.* and on), the subscription storage has had the ability to be “centralized” – i.e. an implementation of Rebus’ ISubscriptionStorage could return true from the IsCentralized property, which will cause Rebus to bypass the publisher lookup in the router and the sending of the SubscribeRequest, thereby shortcutting the subscription process a great deal.

Which makes perfect sense, because it doesn’t matter who makes the ("System.String, mscorlib", "some_subscriber") registration in the subscription storage, if only the publisher gets it when it’s time to publish some messages. Therefore – if a subscription storage is configured to be centralized, the subscription process is as short as this:

1: Subscriber subscribes

The subscriber saves its subscription to the subscription storage (e.g. a table in SQL Server) in the form of this simple tuple: ("System.String, mscorlib", "some_subscriber").

and then we would already be ready for

4: Publisher publishes

(which I will not repeat here)

To sum it up

When the subscription storage is centralized, the subscription process can be sketched like this:

subscribe2

When to use what?

The distributed way

Pros: The basic IsCentralized = false way is the most distributed form of publish/subscribe, since all subscribers rely on knowing the addresses of their dependent publishers only, and need not worry about having a connection to a subscription database somewhere.

With the non-centralized solution, each publisher can have its own subscription storage, which can e.g. be a local SQL Server Express, a JSON file, or whatever each publisher feels like. This makes the non-centralized way more resilient to failures, because there’s no central database required for publishers to work.

Cons: With the non-centralized solution, all subscribers must have an endpoint mapping in their routers, mapping one single publisher as the owner of each event type. More discipline is required during development, and there’s simply more moving parts at runtime.

The centralized way

Pros: The new IsCentralized = true way is easier to get started with, and it may be easier to grasp for newcomers. It also alleviates all subscribers of the burden of having to map publishers as owners of all the different types of events.

Cons: All publishers and subscribers are required to have the centralized subscription storage configured, and thus everyone needs to be able to connect to the same central database.

A Rebus extension example

In my previous post about the ready-made pipeline step injector, I showed how to easily inject a new pipeline step into the pipeline at a position relative to another step.

A concrete example of doing this could be the Rebus.TransactionScope extension, which wraps the execution of handlers in a .NET System.Transactions.TransactionScope.

Let’s take this one from the outside-in. I want a configuration experience that looks like this:

In the example shown, HandleMessagesInsideTransactionScope is an extension method on Rebus’ OptionsConfigurer.

You know about extension methods? They’re great, they allow you to bring in a NuGet package from the outside, which contributes with a function that magically pop up somewhere else, e.g. on a fluent configuration builder thingie – exactly what we need 🙂

From now on, since you’ve read the post about the PipelineStepInjector, it’s so simple that you can probably already imagine what I’m about to show you – here’s the implementation:

As you can see, this is a fairly declarative way of positioning our TransactionScopeIncomingStep right before the message is dispatched (in DispatchIncomingMessageStep).

Only thing left is to show the actual implementation of the step:

How about that? 😀


Please note that the Rebus.TransactionScope extension requires at least .NET 4.5.1, because that’s the version of the framework that introduced transaction scopes that would flow to continuations properly when using async/await.

One way of extending the Rebus pipeline

New Rebus processes messages through two pipelines: one for ougoing messages and one for incoming messages.

When you configure the bus, you can log the contents of the two pipelines (including their documentation if they have been decorated with the [StepDocumentation("bla bla bla")] attribute) by calling .Options(o => o.LogPipeline(verbose:true|false)) in the configuration, like so:

which will output something like this to whichever logger is configured:

As you can see, it’s fairly easy to get an understanding of what exactly is going on with these messages going in and out 🙂

When the bus needs either of the pipelines, it will use the IPipeline service to get them – IPipeline is pretty simple, it looks like this:

Now, if you’ve read my previous post about how Rebus uses Injectionist to satisfy its dependencies, you can probably envision by now a decorator-based approach to modify the pipeline(s) in order to implement new functionality… and you are absolutely right, that is exactly what I’m about to show you 🙂

In fact, this scenario where you want to use a decorator to inject something into either of the pipelines is so common that it has a nifty decorator prepared to cover most scenarios: PipelineStepInjector. The PipelineStepInjector is a IPipeline decorator that can be configured to inject new steps into either pipeline, at a position relative to another step.

For example, if I have created a new implementation of IIncomingStep, LogResolvedMessageHandlersStep, that logs the type names of resolved message handlers, a natural place to inject that step would be right after ActivateHandlersStep, whose responsitiblity it is to resolve all implementations compatible with IHandleMessages<SomeMessage> from the configured handler activator.

In order to inject this new step, I could do it like this:

which will cause my LogResolvedMessageHandlersStep to be positioned right after ActivateHandlersStep.

Nitfy, huh? 😀

Please note that PipelineStepInjector is not meant to be able to cover all scenarios. I could easily come up with complex injection rules that could not readily be handled simply by positioning each step relative to one of the existing steps, but so far it has covered quite a few scenarios nicely.

Basic extension model in Rebus: Injectionist

The new Rebus (i.e. “Rebus 2”, which is currently available from version 0.90.* on NuGet) is very easy to extend, due to the fact that the configuration API is backed by a very simple IoC-like container: the “Injectionist“.

The Injectionist is based on a simple general principle: For each type of service that you register (e.g. ISubscriptionStorage), you are allowed to register ONE primary implementation of that service (e.g. SqlServerSubscriptionStorage), and any number of decorators to that service (e.g. CachingSubscriptionStorage, etc.).

All registrations in the Injectionist are functions that must return objects – so for example to register our subscription storage example from above, we would do it like this:

and then, since the RebusBus instance is registered like this:

the configuration API can end up calling injectionist.Get<IBus>() in order to build a bus instance with the chosen implementations of the various abstractions.

In addition to making it easy to gradually build a configuration, the Injectionist enables YOU to intercept almost anything by registering your own decorators…

Want to mutate incoming/outgoing messages? There’s several ways of doing that, but one way would be to register a decorator for the ISerializer interface.

Want to measure the time it takes to dispatch a message? Again, there’s several ways to do that, but one way could be to decorate the IPipelineInvoker interface.

And so on 🙂

Introducing Rebus 2

small-bus-logo-1I’ve been meaning to do this for a while, and while I am aware of the danger of doing it, I’ve done it anyways: I’ve rewritten Rebus!

Why would you do that?

Rebus was initially made to make me happy. I made it as an implementation of the parts of NServiceBus that I liked best in order to use it on a couple of projects, and thus I did not put too much effort into making a long-lasting design. Also, I aligned all APIs very closely to their NServiceBus counterparts, although I did loosen up on this as time went by.

In addition to this, Rebus was my first open-source project, and to this date, more than 20 besides me have contributed code to it, and many more are using it. It’s been fun to manage a project with actual contributors, and at times it’s been challenging to govern the project with regards to features, quality, ease-of-use, responsiveness, etc.

To sum it up: I have learned a lot since I started the Rebus project!

And usually, when you learn things, you want to put those things to use. At this point, I feel I’ve learned so much that I needed to put those things back into Rebus,

Why would you do it like that?

Rebus has – since its inception almost 5 years ago – done a good job of keeping message flowing and developers happy, so why would I rewrite it from scratch?

I’ve been meaning to improve a lot of things in Rebus for quite a while now, like reducing complexity, introduce message pipelines for incoming and outgoing messages, etc. I could probably have undertaken this as an incremental task, but my feeling is that the end result would not be nearly as clean as it will be with a rewrite.

Therefore, my conclusion was this: Rebus is small enough that a rewrite was the best way to refactor it (given the changes I wanted to make, obviously)!

Also, I did not actually throw old Rebus away. Rebus 2 is async to the core, and the OWIN-pipeliney design is heavily inspired by my (very brief) work with the NServiceBus team, but I kept everything aligned with old Rebus as far as it made sense, thus allowing me to port most of the integration implementations forward with very few modifications.

What has changed?
  • Rebus 2 is async to the core. All APIs are, in their most basic incarnation, async. Therefore, you await bus.Send(yourMessages) now. In other words, one single physical thread can do a lot of work, while consuming almost no memory, and almost no CPU.
  • Rebus has pipelines. Incoming and outgoing messages are passed through pipelines, which allows for extreme flexibility in how Rebus behaves. Several of Rebus’ “built-in” functionalities like e.g. compression and encryption are simply extra pipeline steps that are applied to incoming and outgoing messages.
  • Routing is based on topics. Topics based on .NET type names (which is how old Rebus worked) is thus just one case of a more general scheme where any string can be a topic.
  • There’s no batching. Batching (i.e. the fact that each and every transport message was actually a container of zero or more logical messages) provided almost no benefit, considering the amount of code it took to implement it. You can easily implement batching yourself in those few cases where you want it.
  • There’s an in-memory transport, an Amazon SQS transport, an Azure Storage Queues transport, a Jil serializer, and more cool stuff to come!
  • Idempotent sagas can be had by deriving your handler off of IdempotentSaga instead of Saga.
  • …and much more 🙂
Show me the code!

Chances are that many Rebus users will not experience too many differences. Check this out – here’s how I start the usual MSMQ-enabled endpoint, capable of receiving requests and delivering replies:

and then, in order to send SomeMessage to the endpoint, do this:

Get started

Rebus 2 is initially released as Rebus 0.90.0, along with all of its integration packages. You can install-package rebus to get started right away.

How to upgrade

Because of the many changes, upgrading from Rebus 0.84.0 (the last “Rebus 1” version) to 0.90.0 (the first “Rebus 2 beta” version) may not be trivial.

Rebus 1 endpoints do not readily work with Rebus 2 endpoints, so you will need to upgrade all endpoints at the same time or manually bridge things while you upgrade.

Bugfixes might still be released for the 0.84.0 version of Rebus, but if you have the opportunity to pick a version, I recommend you go with 0.90.0.

As always, I’ll be very grateful to hear about your experiences with it – good or bad!

Happy messaging 😀

Passing user context in message headers with Rebus

When you’re building messaging systems, I bet you’ve run into a situation where you’ve had the need to pass some extra information along with your messages: Some kind of metadata that plays a role in addressing a cross-cutting concern, like e.g. bringing along contextual information about the user that initiated the current message correspondence.

To show how that can easily be achieved with Rebus, I’ve added the UserContextHeaders sample to the sample repository. This sample demonstrates how an “ambient user context” can be established in a client application which then automagically flows along with all messages sent as a consequence of the initiating message.

I this blog post, I’ll go through the key parts that make up the solution – if you’re interested in the details, I suggest you go download the source code.

First, let’s

See how Rebus is configured

As you can see, I’ve added some extra setup in an extension method called AutomaticallyTransferUserContext – it looks like this:

It’s pretty clear that it hooks into Rebus’ MessageSent and MessageContextEstablished events which are called for all outgoing messages and all incoming messages respectively.

First, let’s see what we must do when we’re

Sending messages

Let’s see what’s inside that EncodeUserContextHeadersIfPossible method from the snippet above:

As you can see, it checks for the presence of an ambient user context (most likely attached to the currently executing thread), encoding that context in a message header if one was found. This way, the user context will be included on all outgoing messages, no matter if they’re sent, published or replied.

Now, let’s see how we’re

Handling incoming messages

Let’s just check out DecodeUserContextHeadersIfPossible mentioned in the configuration extension above:

As you can see, it checks for the presence of a user context header on the incoming message, decoding it and adding it to the message context if it was there. This means that message handlers can access the context like this:

These were the main parts that make up the automatically flowing user context – there’s few more details in the sample code, like e.g. an implementation of a thread-bound ambient user context and constructor-injected UserContext into message handlers – pretty sweet, if I have to say so myself 🙂 please head over to the UserContextHeaders sample for some working code to look at.

Ways of scaling out with Rebus #5: MSMQ

I deliberately waited until the last post in my series on how to scale your Rebus work to talk about how to do it with MSMQ. The thing is, MSMQ doesn’t lend itself well to the competing consumers pattern and thus requires some other mechanism in order to be able to effectively distribute work between a number of workers.

NServiceBus achieves this ability with its Distributor process, which is a sophisticated load balancer that communicates with its worker processes in order to avoid overloading them with work, etc.

The Rebus way of doing this is much simpler. I’ve long contemplated building a simple load balancer – one that blindly distributes work in a round-robin style fashion, which I believe would provide a sufficiently powerful and wonderfully unsophisticated way of scaling out your chunks of work.

This means that in a producer/consumer scenario, you would send all the work to the distributor, which would forward pieces of work to consumers in equal portions. It can be shown like this:

rebus msmq load balancer

This is so simple, it cannot fail.

Which is why I’ve made the Rebus.MsmqLoadBalancer package, containing a LoadBalancerService.

Once you’ve executed the following Powershell spell:

the load balancer service can be started like this (assuming YourProject was a console application):

In this example, the load balancer will receive messages from the local distributor queue and forward them, distributing them evenly among the three workers residing on machine1, machine2, and machine3.

If you want to put this bad boy to some serious action, you’ll most likely want to wrap it in some kind of host process – either hosting it in the same process as the producer of your work, or in a Topshelf service or something.

One thing to note though: I haven’t had the chance to try the load balancer myself yet, mainly because my own horizontal scaling experience comes from working with RabbitMQ and Azure Service Bus only.

If you’re interested in using it, please don’t hesitate to contact me with questions etc., and I’ll be happy to help you on your way!