In the previous post I showed how Rebus’ subscription storage could report itself as “centralized”, which would provide a couple of benefits regarding configuration. In the post before the previous post, I showed how Rebus could be extended in order to execute message handlers inside a System.Transactions.TransactionScope, which was easy to do, without touching a single bit in Rebus core.
This time, I’ll combine the stuff I talked about in the two posts, and show how the Azure Service Bus transport can hook itself into the right places in order to function as a multicast-enabled transport, i.e. a transport that natively supports publish/subscribe and thus can relieve Rebus of some of its work.
Again, let’s take it from the outside and in – I would love it if Rebus could be configured to use Azure Service Bus simply by doing something like this:
1 2 3 4 5 |
var activator = new BuiltinHandlerActivator(); Configure.With(activator) .Transport(t => t.UseAzureServiceBus(connectionString, "my_input_queue")) .Start(); |
to configure each endpoint, and then await bus.Subscribe<SomeMessage>() and await bus.Publish(new SomeMessage("woohoo it works!!")) in order to subscribe to messages and publish them, and then just sit back and see published messages flow to their subscribers without any additional work.
Now, what would it require to make that work?
Subscriptions must be stored somewhere
No matter which kind of technology you use to move messages around, and no matter how complex logic they support, I bet they somehow build on some kind of message queue building block – i.e. a thing, into which you may put messages meant for some specific recipient to receive, and out of which one specific recipient can get its messages – and then everything that the transport can do with the messages, like routing, filtering, fan-out, etc., is implemented as logic that hooks into places and does stuff, but it will always end out with messages going into message queues.
Since Rebus is made to be able to run right on top of some pretty basic queueing systems, like e.g. MSMQ, and then has some of its functions going on in “user space” (like pub/sub messaging), it has an abstraction for something that persists subscriptions: ISubscriptionStorage – this is the way a publisher “remembers” which queues to send to when it publishes a message.
When a transport offers its own pub/sub mechanism (and Azure Service Bus does that via topics and subscriptions) it means that it effectively works as a centralized implementation of ISubscriptionStorage (as described in the previous post) – and in fact, it turns out that that is the proper place to hook in in order to take advantage of the native pub/sub mechanism.
Let’s do it 🙂
How to replace the subscription storage
Similar to what I showed in the previous Rebus extension example, the UseAzureServiceBus function above is an extension method – it uses Injectionist to register an instance of the transport, and then it sets up resolvers to use the transport as the primary ITransport and ISubscriptionStorage implementations – it looks like this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
public static AzureServiceBusTransportSettings UseAzureServiceBus(this StandardConfigurer<ITransport> configurer, string connectionStringNameOrConnectionString, string inputQueueAddress) { var connectionString = GetConnectionString(connectionStringNameOrConnectionString); var settingsBuilder = new AzureServiceBusTransportSettings(); // register instance that implements ITransport and ISubscriptionStorage configurer .OtherService<AzureServiceBusTransport>() .Register(c => { var transport = new AzureServiceBusTransport(connectionString, inputQueueAddress); if (settingsBuilder.PrefetchingEnabled) { transport.PrefetchMessages(settingsBuilder.NumberOfMessagesToPrefetch); } if (settingsBuilder.AutomaticPeekLockRenewalEnabled) { transport.AutomaticallyRenewPeekLock(); } return transport; }); // resolve ISubscriptionStorage by forwarding to the transport instance configurer .OtherService<ISubscriptionStorage>() .Register(c => c.Get<AzureServiceBusTransport>()); // resolve ITransport by forwarding to the transport instance configurer.Register(c => c.Get<AzureServiceBusTransport>()); return settingsBuilder; } |
Now, the Azure Service Bus transport just needs to perform some meaningful actions as the subscription storage it is now claiming to be. Let’s take a look at
How to be a subscription storage
Subscription storages need to implement this interface:
1 2 3 4 5 6 7 8 9 10 |
public interface ISubscriptionStorage { Task<string[]> GetSubscriberAddresses(string topic); Task RegisterSubscriber(string topic, string subscriberAddress); Task UnregisterSubscriber(string topic, string subscriberAddress); bool IsCentralized { get; } } |
where you already know that the IsCentralized property must return true, indicating that subscribers can register themselves directly. And then, because it’s Azure Service Bus, we just need to
- ensure the given topic exists, and
- create a subscription on the given topic with ForwardTo set to the subscriber’s input queue
in order to start receiving the subscribed-to events. And that is in fact what the Azure Service Bus transport is doing now 🙂
Only thing left for this to work, is this:
Rebus must publish in the right way
When Rebus publishes a message, it goes through the following sequence:
- Asks the subscription storage for subscribers of the given topic
- For each subscriber: Asks the transport to send the message to that subscriber
Now, since the Azure Service Bus transport is both subscription storage and transport, we’ll take advantage of this fact by having GetSubscriberAddresses return only one single – fake! – subscriber address, on the form subscribers/<topic>(*).
And then, when the transport detects a destination address starting with the subscribers/ prefix (which cannot be a valid Azure Service Bus queue name), the transport will use a TopicClient for the right topic to publish the message instead of the usual QueueClient.
Conclusion
Rebus (since 0.90.8) can take advantage of Azure Service Bus’ native support for publish/subscribe, which means that you need not worry about routing of events or configuring any other kind of subscription storage.
(*) Azure Service Bus does not support “,”, “+”, and other characters in topics, and these characters can often be found in .NET type names (which Rebus likes to use as topics), Azure Service Bus transport will normalize topics by removing these illegal characters. In fact, all non-digit non-letter characters will be replaced by “_”, causing "System.String, mscorlib" to become a topic named "system_string__mscorlib".