Category Archives: nservicebus

Rebus and message expiration

If you have worked with NServiceBus, and you’ve tried Rebus as well, you might be wondering where that nifty [TimeToBeReceived] attribute went? You know, that attribute that you would decorate your messages with when you wanted to have MSMQ expire the messages for you to prevent your queues from clogging up with irrelevant messages if the recipient was down.

The answer is: There’s no such attribute in Rebus! And that’s because that would require your messages assembly to reference Rebus, and that should not be a requirement. Rebus allows your messages to be POCOs, so your messages assembly should be allowed to stay “plain” in that POCO sense.

How to set time to be received then?

With Rebus you set the time to be received property of your messages by attaching a special Rebus header to your messages: Headers.TimeToBeReceived. The value must be a HH:mm:ss string, e.g. "00:01:30" to set a 1 minute and 30 seconds expiration timeout on a message.

E.g. like so:

var somethingHappened = new SomethingHappened { Time = DateTime.UtcNow };
bus.AttachHeader(somethingHappened, Headers.TimeToBeReceived, "00:00:30");
bus.Publish(soomethingHappened);

This header will be detected later on by the transport implementation (e.g. MsmqMessageQueue), which will use the approproate settings when sending the message.

In MSMQ’s case, the message will then be deleted by the queueing system when the timeout expires, which is a nifty feature that can be used to avoid flooding the queueing system with expired and irrelevant messages.

But doesn’t that require a lot of tedious manual work?

Yes – which is why I suggest you use some of the nifty hooks in Rebus to reduce the amount of tedium involved. More on that in the next post.

Announcing: Rebus

Preface

Friends and acquaintances might know that I’ve spent the last 6 months or so of my pastime, hacking on something I call Rebus. To cut a long story short, I can tell you that I am a happy NServiceBus user who became very sad when NServiceBus went from being free to requiring a commercial license – not because I think NServiceBus isn’t worth spending money on, but because I think it hurts the adoption of NServiceBus.

Speaking for myself, I have already refrained from using NServiceBus in a couple of smaller projects, where it would otherwise have been an awesome addition.

I seriously considered forking the 2.0 version of NServiceBus, which is licensed under Apache v. 2.0, but I was overwhelmed by the sheer size of the project. This led me to re-implement my favorite features of NServiceBus as Rebus, trying to focus and stay lean along the way.

And here it is: Rebus

Rebus – the core – depends on .NET 4 only. And since .NET 4 has MSMQ, ADO.NET, and binary serialization, you get to send and receive messages transactionally and persist your stuff in SQL Server without anything but Rebus core.

3rd party integration (like e.g. MongoDB, NewtonSoft JSON serializer, etc) is provided through small, dedicated add-on projects. This granularity makes it easier to manage your own dependencies.

Update: And since JSON serialization was deemed to be the preferred form of message serialization, it has been ILMerged into Rebus core since right after this announcement. Therefore, JSON serialization is included in the box and is the default.

There’s currently two ways to get started with Rebus:

  1. Instantiate RebusBus manually, filling in the ginormous constructor with implementations for how you want the bus to run.
  2. Use the configuration API.

Let’s try the configuration API – with the following snippet of XML in your app.config:

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
  <configSections>
    <section name="log4net" type="log4net.Config.Log4NetConfigurationSectionHandler, log4net"/>
    <section name="rebus" type="Rebus.Configuration.RebusConfigurationSection, Rebus" />
  </configSections>
 
  <rebus inputQueue="myService.inputQueue" workers="5">
    <endpoints>
      <add messages="AnotherService.Messages" endpoint="anotherService.inputQueue"/>
    </endpoints>
  </Rebus>
 
  <log4net>...</log4net>
</configuration>

and the following piece of code in your app:

var myAdapter = new MyFavoriteContainerAdapter(myFavoriteContainer);
 
Configure.With(myAdapter)
    .Logging(c => c.Log4Net())
    .DetermineEndpoints(c => c.FromRebusConfigurationSection())
    .Transport(c => c.UseMsmqAndGetInputQueueNameFromAppConfig())
    .Serialization(c => c.UseJsonSerializer())
    .CreateBus()
    .Start();
 
// IBus is in the container now :)

you should be up and running with a Rebus bus.

That was a small teaser – there’s more blog posts on the way, I promise ;)

So, what’s the state of this?

Well, that’s actually the whole point of this blog post – I’m currently in a situation where I really feel like putting Rebus to some serious use, but I cannot justify foisting it on a client until I have a big project to prove its worth. And I cannot prove its worth in a big project until I have a client I can fob it on.

That’s where you come in! :)

You:
  • You’re building something – it’s not totally mission critical, as in human lives depend on it, but it’s still something.
  • You want some nifty service bus technology to help you build a loosely coupled architecture that will allow your project to flex and bend and integrate with stuff in many years to come.
  • You wanted to use NServiceBus because it’s cool, but you can’t afford the cost up front.
  • You’re reading this blog post, or someone you know read it and is now telling you about it…

If that’s you, then you should meet Rebus!

Rebus:
  • Absolutely free to use and abuse.
  • Very very similar to NServiceBus, allowing you to migrate to NServiceBus some time in the future if Rebus is not enough for you.
  • Pretty simple.
  • Comes with unlimited1 support from its author.
  • Did I mention it was free?
A match made in heaven?

If you have the slightest bit of interest in what I’m suggesting, please contact me – either via email or on Twitter – and then we’ll talk about what Rebus can do for you. Oh, and please don’t hesitate to contact me – even if you’re NOT planning on using Rebus, I’d like to know why you picked NServiceBus, MassTransit, or Rhino Service Bus instead.

If you’re just eager to try it out, feel free to Install-Package -Pre one or more of Rebus’ NuGet packages.

  1. Within reason, of course – I have a day job, a wife, and two kids, so I’m not answering emails 24/7 – but I’ll go to great lengths to help you get a smooth adoption of Rebus – because that’s just how much I care :)

The duality of request/reply vs. publish/subscribe #2

In my last post, I described how the mechanics of publish/subscribe actually mirror those of request/reply.

In this post, I’ll look at the two operation from another angle: What do they mean?

What does it mean when you bus.Send?

Sending means either that the sender wants to

  • Command that another service does something.
  • Request that another service does something, and yields one or more replies1.

This means that the sender knows stuff about the other service, but that other service will most likely not know or care about who’s sending. In other words, the sender depends on that other service!

What does it mean when you bus.Publish?

Publishing means that the publisher wants to

  • Broadcast an event that contains information on something that has happened.

This means that the publisher most likely does stuff inside itself, maybe updates some internal state, and then goes on and publishes information on some aspect of what has happened. In doing this, the publisher will most likely not know or care about who’s receiving. In other words, the subscriber depends on the publisher!

Summing it up with a picture

Consider this illustration, where service dependencies are shown with arrows:

Again, see how comparing Send to Publish is actually like comparing two mirror images when the other mirror image is upside-down?

  1. Note that the request/reply pattern may impose unwanted temporal coupling in an architecture and should probably be used only in integration scenarios orchestrated by a saga.

The duality of request/reply vs. publish/subscribe #1

A question I often meet in relation to messaging frameworks like NServiceBus and Rebus, is this: Where do messages go?

The confusion often comes from comparing how bus.Publish works with how bus.Send works.

In this post, I’d like to describe the two operations and show that they are mirror images of each other – except maybe not as much a mirror image as a transposition.

Sending messages

In the case where you’re doing a bus.Send(message), the answer is trivial: The message gets sent to the endpoint specified in the sender’s enpoint mapping for that message type. Let’s say our sender is equipped with this snippet of XML1 in its app.config:

<UnicastBusConfig>
  <MessageEndpointMappings>
    <add Messages="MyService.Messages" Endpoint="my_service"/>
  </MessageEndpointMappings>
</UnicastBusConfig>

If we assume that message is an instance of a class from the MyService.Messages assembly, in this case a bus.Send(message) will be translated into bus.Send("my_service", message).

Publishing messages

But where do messages go when they’re published? Well, they go to whomever subscribed to that particular message type – and with NServiceBus (and, for that matter, with Rebus as well) subscribers get subscribed by sending some kind of subscription message, which is basically saying: “Hey there, mr. Publisher – I’m some_subscriber, and I’d like to subscribe to MyService.Messages.SomeParticularMessage“.

From this point on, the publisher will store the mapping of the message type along with the subscriber’s address, allowing a bus.Publish(message) method to be implemented something along the lines of

public void Publish(object message)
{
    foreach(var subscriberEndpoint in GetSubscribersFor(message.GetType()))
    {
        Send(subscriberEndpoint, message);
    }
}

So – how do we understand this on a higher level, allowing us to never ever confuse these things again? Let’s dive into…

The duality

Consider these two sequence-like diagrams:

See how request/reply and publish/subscribe are actually the same patterns? The reason these two are often confused, is that the Send operation is often countered by Publish, when in fact it would be more fitting to see the subscription message (i.e. subscription request) as the counterpart of Send. Thus, Publishing is more like replying. And thus, Send is actually the transposition of Publish.

Now, when you realize this, you’re never going to confuse these things again :) In the next post, I’ll touch a little bit on another difference between Send and Publish.

  1. The snippet is an endpoint mapping in NServiceBus format, which can also be understood by Rebus when it’s running with the DetermineDestinationFromNServiceBusEndpointMappings implementation of IDetermineDestination

I will be speaking at Miracle Open World 2012

In April, I will be doing two presentations at Miracle’s Open World conference. It looks like a lot of cool people are going, and it’s my first time at MOW, so it goes without saying that I’m excited about it!

First, I’ll be doing a brand new intro to NServiceBus, which I have used extensively for the last two years. Even though I wish it was free for everyone to use, NServiceBus continues to be an awesome framework, so I’d like to continue spread the word about it – you can read my abstract here: Ride the Bus!.

After that, it seems I’ll be topping off day one with a brand new, condensed, platform-agnostic and pure MongoDB tour – this one will not do the usual “and this is NoSQL, and this is what characterizes a document DB”-intro, this will be full-on and to the point. You can read about it here: So you want to liberate your data?

I hope to see a lot of engaged people there :)

How to set the current culture in NServiceBus

Today we were experiencing some weird behavior when running an integration test with DillPickle, where – apparently – values of doubles would lose their decimal point when they were transferred in messages from our test to an NServiceBus service.

Stopping the service and inspecting the message in the queue quickly revealed a message that looked somewhat like this:

<Messages>
    <SetCurrentValueMessage>
        <Value>13.56</Value>
    </SetCurrentValueMessage>
</Messages>

which is all fine and dandy.

Now, I’m used to being Danish, so I know that we’re somewhat deviant in regards to our decimal point – “,” – so we quickly diagnosed the problem: Our integration tests were running with the invariant culture, to allow us to parse Gherkin files in English and use “.” as the decimal point – but Windows and everything else was running with da-DK, so 13.56 would be improperly deserialized to the value 1356 when it reached our NServiceBus service.

Solution: Normalize the culture of all the processes of our system.

Our first attempt was to modify the culture in our endpoint configuration like so:

public class EndpointConfiguration : IConfigureThisEndpoint, AsA_Server
{
    public EndpointConfiguration()
    {
        // does NOT work!
        Thread.CurrentThread.CurrentCulture = CultureInfo.InvariantCulture;
        Thread.CurrentThread.CurrentUICulture = CultureInfo.InvariantCulture;
    }
}

but obviously this did not work, because NServiceBus does not deserialize messages on this thread!

Our solution was to create a message module, which seems to get called before transport messages are deserialized, setting the culture in there – like so:

public class SetCurrentCultureMessageModule : IMessageModule
{
    public void HandleBeginMessage()
    {
        // works!
        Thread.CurrentThread.CurrentCulture = CultureInfo.InvariantCulture;
        Thread.CurrentThread.CurrentUICulture = CultureInfo.InvariantCulture;
    }
 
    public void HandleEndMessage()
    {
    }
 
    public void HandleError()
    {
    }
}

In the future I’ll make sure that the culture is explicitly set in all processes of systems I am building. It’s kind of scary that errors could happen where stuff like “debit account 100.00″ could be mis-interpreted as “debit account 10000″!! :-o

Assuring that those IWantToRunAtStartup can actually run at startup

When building NServiceBus services based on the generic host, you may need to do some stuff whenever your service starts up and shuts down. The way to do that is to create classes that implement IWantToRunAtStartup, which will be picked up by the host and registered in the container as an implementation of that interface.

When the time comes to run whoever wants to run at startup, the host does a

container.ResolveAll<IWantToRunAtStartup>()

to get all the relevant instances (or something similar if you aren’t using Windsor…).

If, however, one or more instances cannot be instantiated due to missing dependencies, you will get no kind of warning or error whatsoever! 1 This means that the service will silently ignore the fact that one or more IWantToRunAtStartups could not be instantiated and run.

In order to avoid this error, I have written a test that looks somewhat like this:

[Test]
public void WhoeverWantsToRunAtStartupCanActuallyRun()
{
  var container = new WindsorContainer();
 
  PerformTheUsualRegistration(container);
 
  var typesThatWantToRun = from type in typeof (MyService.EndpointConfiguration).Assembly.GetTypes()
                         where typeof(IWantToRunAtStartup).IsAssignableFrom(type) 
                             && !type.IsAbstract && !type.IsInterface 
                         select type;
 
  ManuallyRegister(container, typesThatWantToRun);
  RegisterFakeBus(container);
 
  var typesThatCouldRun = container.ResolveAll<IWantToRunAtStartup>().Select(c => c.GetType());
  var typesThatCouldNotRun = typesThatWantToRun.Except(typesThatCouldRun);
 
  if (typesThatCouldNotRun.Any())
  {
    Assert.Fail(string.Join(Environment.NewLine + Environment.NewLine,
          typesThatCouldNotRun.Select(t => GenerateErrorDetailsFor(t, container)).ToArray()));
  }
}
 
string GenerateErrorDetailsFor(Type type, IWindsorContainer container)
{
  // first, register the class as itself if is has not already been done
  if (!container.Kernel.HasComponent(type))
  {
    container.Register(Component.For(type).Named(type.name + " that wants to run"));
  }
 
  // next, make Windsor throw an exception with all the nasty details...
  var exceptionText = "";
 
  try
  {
    container.Resolve(type);
  }
  catch (HandlerException e)
  {
    exceptionText = e.Message;
  }
 
  return string.Format(@"Class: {0}
 
Reason:
 
{1}", type.Name, exceptionText);
}
 
void PerformTheUsualRegistration(IWindsorContainer container)
{
  container.Install(FromAssembly.Containing<MyService.EndpointConfiguration>());
}
 
void ManuallyRegister(IWindsorContainer container, IEnumerable<Type> typesThatWantToRun)
{
  container.Register(typesThatWantToRun
                        .Select(t => Component.For<IWantToRunAtStartup>().ImplementedBy(t))
                        .ToArray());
}
 
void RegisterFakeBus(IWindsorContainer container) 
{
  container.Register(Component.For<IBus>().Instance(MockRepository.GenerateMock<IBus>());
}

I admit that the code is kind of clunky even though I distilled the interesting parts from some of the plumbing in our real test… moreover, our real test is iterating through all the possible configurations our container can have – one for each environment – so you can probably imagine that it’s not pretty :)

But who cares??? The test has proven almost infinitely useful already! Whenever something that wants to run at startup cannot run at startup, TeamCity gives us error messages like this:

Class: RunSomething
 
Reason:
 
Can't create component 'RunSomething that wants to run'  as it has dependencies to be satisfied. 
RunSomething that wants to run is waiting for the following dependencies: 
 
Services: 
- OneOfOurProjects.Api.ISomethingElse which was not registered.
  1. At least this is the case when using Castle Windsor – I don’t know if this is also the behavior of other IoC containers… Maybe someone can clarify this…?

Trifork Geek Nights

Just wanted to say that I will be speaking at a couple of Trifork Geek Nights in November and December, about Castle Windsor and NServiceBus respectively.

First,

Advanced Windsor-Tricks

A quick introduction followed by some of the more advanced features of my favorite IoC container. If you’re new to IoC, or you are wondering what the fuzz is about, or you are interested in letting your container do some more work for you, you should come to this one :)

Will be held on

from 4:30 pm to 6:30 pm.

And then,

Distributed systems in .NET with NServiceBus

Introduction to messaging in .NET with NServiceBus. Will give an introduction to some fundamental messaging patterns and go on to show how these can be put to use with NServiceBus.

If you’re new to IoC, you can probably benefit from showing up at the “Advanced Windsor-Tricks” geek night before going to this one, because NServiceBus is relying heavily on having a container.

Will be held on

from 4:30 pm to 6:30 pm as well.

Hope to see some enthusiastic coders there.

PS: The Geek Nights will be held in Danish :)

Scheduling recurring tasks in NServiceBus

A while ago, on a project I am currently involved with which is based on NServiceBus, we needed to publish certain pieces of information at fixed intervals. I was not totally clear in my head on how this could be implemented in an NServiceBus service, so I asked for help on Twitter, which resulted in a nifty piece of advice from Andreas Öhlund: Set up a timer to do a bus.SendLocal at the specified interval.

That’s exactly what we did, and I think we ended up with a pretty nifty piece of code that I want to show off :)

PS: bus.SendLocal(message) effectively does a bus.Send(((UnicastBus)bus).Address, message) – i.e. it puts a message, MSMQ and all, in the service’s own input queue.

First, we have an API that looks like this (looking a little funny, I know – wait and see…):

public interface ISchedule
{
    void Every(TimeSpan interval, Func<IMessage> messageFactoryMethod);
}

- which is implemented like this (registered as a singleton in the container):

public class ServerBasedTimerSchedule : ISchedule, IDisposable
{
    readonly IBus bus;
    readonly List<System.Timers.Timer> timers = new List<System.Timers.Timer>();
 
    public ServerBasedTimerSchedule(IBus bus)
    {
        this.bus = bus;
    }
 
    public void Every(TimeSpan interval, Func<IMessage> messageFactoryMethod)
    {
        var timer = new System.Timers.Timer();
        timer.Elapsed += (_, __) => bus.SendLocal(messageFactoryMethod());
        timer.Interval = interval.TotalMilliseconds;
        timer.Start();
        timers.Add(timer);
    }
 
    public void Dispose()
    {
        timers.ForEach(timer => timer.Dispose());
    }
}

The System.Timers.Timer is a timer which uses the thread pool to schedule callbacks at the specified interval. It’s pretty easy to use, and it fits nicely with this scenario.

Now, in combination with this nifty class of extension goodness:

public static class TimeSpanExtensions
{
    public static TimeSpan Seconds(this int seconds)
    {
        return TimeSpan.FromSeconds(seconds);
    }
 
    public static TimeSpan Minutes(this int minutes)
    {
        return TimeSpan.FromMinutes(minutes);
    }
 
    // ... etc + for doubles as well
}

- we can schedule our tasks like so:

public class ScheduleRealTimeDataPublishing : IWantToRunAtStartup
{
    public ScheduleRealTimeDataPublishing(ISchedule schedule)
    {
        this.schedule = schedule;
    }
 
    public void Run()
    {
        schedule.Every(5.Seconds(), () => new PublishRealTimeDataMessage());
    }
 
    public void Stop()
    {
    }
}

Now, why is this good? It’s good because the actual task will then be carried out by whoever implements IHandleMessages<PublishRealTimeDataMessage> in the service, processing the tasks with all the benefits of the usual NServiceBus message processing pipeline.

Nifty, huh?

Looking over the simplicity and elegance of this solution, I’m kind of embarassed to tell that my first take on this was to implement the timer almost exactly like above, except instead of bus.SendLocal in the Elapsed-callback, we had a huge event handler that simulated most of our message processing pipeline – including NHibernateMessageModule, transactions, and whatnot….

Please note that ScheduleRealTimeDataPublishing is not re-entrant – in this form its Every method should only be used from within the Run and Stop methods of implementors of IWantToRunAtStartup, as these are run sequentially.