A while ago, on a project I am currently involved with which is based on NServiceBus, we needed to publish certain pieces of information at fixed intervals. I was not totally clear in my head on how this could be implemented in an NServiceBus service, so I asked for help on Twitter, which resulted in a nifty piece of advice from Andreas Öhlund: Set up a timer to do a bus.SendLocal at the specified interval.
That’s exactly what we did, and I think we ended up with a pretty nifty piece of code that I want to show off 🙂
PS: bus.SendLocal(message) effectively does a bus.Send(((UnicastBus)bus).Address, message) – i.e. it puts a message, MSMQ and all, in the service’s own input queue.
First, we have an API that looks like this (looking a little funny, I know – wait and see…):
1 2 3 4 |
public interface ISchedule { void Every(TimeSpan interval, Func<IMessage> messageFactoryMethod); } |
– which is implemented like this (registered as a singleton in the container):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
public class ServerBasedTimerSchedule : ISchedule, IDisposable { readonly IBus bus; readonly List<System.Timers.Timer> timers = new List<System.Timers.Timer>(); public ServerBasedTimerSchedule(IBus bus) { this.bus = bus; } public void Every(TimeSpan interval, Func<IMessage> messageFactoryMethod) { var timer = new System.Timers.Timer(); timer.Elapsed += (_, __) => bus.SendLocal(messageFactoryMethod()); timer.Interval = interval.TotalMilliseconds; timer.Start(); timers.Add(timer); } public void Dispose() { timers.ForEach(timer => timer.Dispose()); } } |
The System.Timers.Timer is a timer which uses the thread pool to schedule callbacks at the specified interval. It’s pretty easy to use, and it fits nicely with this scenario.
Now, in combination with this nifty class of extension goodness:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public static class TimeSpanExtensions { public static TimeSpan Seconds(this int seconds) { return TimeSpan.FromSeconds(seconds); } public static TimeSpan Minutes(this int minutes) { return TimeSpan.FromMinutes(minutes); } // ... etc + for doubles as well } |
– we can schedule our tasks like so:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
public class ScheduleRealTimeDataPublishing : IWantToRunAtStartup { public ScheduleRealTimeDataPublishing(ISchedule schedule) { this.schedule = schedule; } public void Run() { schedule.Every(5.Seconds(), () => new PublishRealTimeDataMessage()); } public void Stop() { } } |
Now, why is this good? It’s good because the actual task will then be carried out by whoever implements IHandleMessages<PublishRealTimeDataMessage> in the service, processing the tasks with all the benefits of the usual NServiceBus message processing pipeline.
Nifty, huh?
Looking over the simplicity and elegance of this solution, I’m kind of embarassed to tell that my first take on this was to implement the timer almost exactly like above, except instead of bus.SendLocal in the Elapsed-callback, we had a huge event handler that simulated most of our message processing pipeline – including NHibernateMessageModule, transactions, and whatnot….
Please note that ScheduleRealTimeDataPublishing is not re-entrant – in this form its Every method should only be used from within the Run and Stop methods of implementors of IWantToRunAtStartup, as these are run sequentially.