A while ago, on a project I am currently involved with which is based on NServiceBus, we needed to publish certain pieces of information at fixed intervals. I was not totally clear in my head on how this could be implemented in an NServiceBus service, so I asked for help on Twitter, which resulted in a nifty piece of advice from Andreas Öhlund: Set up a timer to do a bus.SendLocal at the specified interval.
That’s exactly what we did, and I think we ended up with a pretty nifty piece of code that I want to show off 🙂
PS: bus.SendLocal(message) effectively does a bus.Send(((UnicastBus)bus).Address, message) – i.e. it puts a message, MSMQ and all, in the service’s own input queue.
First, we have an API that looks like this (looking a little funny, I know – wait and see…):
1 2 3 4 |
public interface ISchedule { void Every(TimeSpan interval, Func<IMessage> messageFactoryMethod); } |
– which is implemented like this (registered as a singleton in the container):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
public class ServerBasedTimerSchedule : ISchedule, IDisposable { readonly IBus bus; readonly List<System.Timers.Timer> timers = new List<System.Timers.Timer>(); public ServerBasedTimerSchedule(IBus bus) { this.bus = bus; } public void Every(TimeSpan interval, Func<IMessage> messageFactoryMethod) { var timer = new System.Timers.Timer(); timer.Elapsed += (_, __) => bus.SendLocal(messageFactoryMethod()); timer.Interval = interval.TotalMilliseconds; timer.Start(); timers.Add(timer); } public void Dispose() { timers.ForEach(timer => timer.Dispose()); } } |
The System.Timers.Timer is a timer which uses the thread pool to schedule callbacks at the specified interval. It’s pretty easy to use, and it fits nicely with this scenario.
Now, in combination with this nifty class of extension goodness:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public static class TimeSpanExtensions { public static TimeSpan Seconds(this int seconds) { return TimeSpan.FromSeconds(seconds); } public static TimeSpan Minutes(this int minutes) { return TimeSpan.FromMinutes(minutes); } // ... etc + for doubles as well } |
– we can schedule our tasks like so:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
public class ScheduleRealTimeDataPublishing : IWantToRunAtStartup { public ScheduleRealTimeDataPublishing(ISchedule schedule) { this.schedule = schedule; } public void Run() { schedule.Every(5.Seconds(), () => new PublishRealTimeDataMessage()); } public void Stop() { } } |
Now, why is this good? It’s good because the actual task will then be carried out by whoever implements IHandleMessages<PublishRealTimeDataMessage> in the service, processing the tasks with all the benefits of the usual NServiceBus message processing pipeline.
Nifty, huh?
Looking over the simplicity and elegance of this solution, I’m kind of embarassed to tell that my first take on this was to implement the timer almost exactly like above, except instead of bus.SendLocal in the Elapsed-callback, we had a huge event handler that simulated most of our message processing pipeline – including NHibernateMessageModule, transactions, and whatnot….
Please note that ScheduleRealTimeDataPublishing is not re-entrant – in this form its Every method should only be used from within the Run and Stop methods of implementors of IWantToRunAtStartup, as these are run sequentially.
Where/how did you register ISchedule? I’m trying to do it during init(IWantCustomInitialization) and it is not resolving as I think IBus has not been created yet…
I register everything in the Init method of my EndpointConfig, but at this point there’s no dependency injection! This makes sense, because the Init method is typically where you would provide an IoC container of your own.
Why not just make a class implement IWantToRunAtStartup?
If you want it to run before everything else, make the class that implements IConfigureThisEndpoint inherit IWantToRunAtStartup – it will be run before any other IWantToRunAtStartup
If you’re in doubt as to how to supply a container and register stuff in it, check this out (the “Containers and Dependency Injection” section)
This is exactly what I was looking for. Super, simple, elegant API.
Thank you!
Nice (and elegant) solution to your real-time problem.
But how would you approach this when trying to guarantee the publishing of a message in a much longer interval?
I need to run a task once a month that assigns holiday credits to employees, if this doesn’t run that’s a big problem.
Especially when I want to avoid sending the message twice if the server goes down on the day when the task is supposed to run..
I’d probably schedule a check at a much shorter interval – say 1 hour or so – and then each check would see if it was “that time of the month” 🙂
I know this doesn’t take advantage of the nifty declarative API in ISchedule, but it would be a pretty simple and pragmatic solution to that problem.
Your comment does raise some questions, though – maybe NServiceBus would benefit from a formalized scheduling API, similar to how the Timeout Service works? I wonder if NSB 3 will have some news in this regard…