New Rebus processes messages through two pipelines: one for ougoing messages and one for incoming messages.
When you configure the bus, you can log the contents of the two pipelines (including their documentation if they have been decorated with the [StepDocumentation("bla bla bla")] attribute) by calling .Options(o => o.LogPipeline(verbose:true|false)) in the configuration, like so:
1 2 3 4 |
Configure.With(new BuiltinHandlerActivator()) .Transport(t => t.UseInMemoryTransport(new InMemNetwork(), "test")) .Options(o => o.LogPipeline(verbose:true)) .Start(); |
which will output something like this to whichever logger is configured:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
------------------------------------------------------------------------------ Message pipelines ------------------------------------------------------------------------------ Send pipeline: Rebus.Pipeline.Send.AssignGuidMessageIdStep Assigns a new GUID as the 'rbs2-msg-id' header if the message being sent does not already have a message ID. Rebus.Pipeline.Send.AssignReturnAddressStep If the outgoing message does not already have a 'rbs2-return-address' header, this step will assign the input queue address of the sending endpoint as the return address. Rebus.Pipeline.Send.AssignDateTimeOffsetHeader Sets the 'rbs2-senttime' header of the outgoing message to the current local time as a DateTimeOffset serialized with the 'O' format string. Rebus.Pipeline.Send.FlowCorrelationIdStep Sets the 'rbs2-corr-id' header of the outgoing message to one of the following three things: 1) The correlation ID of the message currently being handled. 2) The message ID of the message currently being handled. 3) The message´s own message ID. Rebus.Pipeline.Send.SerializeOutgoingMessageStep Serializes the outgoing message using the configured serializer, storing the resulting transport message back to the context. Rebus.Pipeline.Send.SendOutgoingMessageStep Final step that uses the current transport to send the transport message found in the context to all addresses found by looking up the DestinationAddress object from the context. Receive pipeline: Rebus.Retry.Simple.SimpleRetryStrategyStep Wraps the invocation of the entire receive pipeline in an exception handler, tracking the number of times the received message has been attempted to be delivered. If the maximum number of delivery attempts is reached, the message is moved to the error queue. Rebus.Pipeline.Receive.HandleDeferredMessagesStep If the incoming message should not be handled now, this step saves the message until it is time to deliver the message. This is done by checking if the incoming message has a 'rbs2-deferred-until' header with a desired time to be delivered. Rebus.Pipeline.Receive.DeserializeIncomingMessageStep Deserializes the current transport message using the configured serializer, saving the deserialized message back to the context. Rebus.Pipeline.Receive.ActivateHandlersStep Looks at the incoming message and decides how to handle it. A HandlerInvokers object is saved to the context to be invoked later. Rebus.Pipeline.Receive.LoadSagaDataStep Looks at the handler invokers in the context and sees if there´s one or more saga handlers in there. If that´s the case, relevant saga data is loaded/created, and the rest of the pipeline gets invoked. Afterwards, all the created/loaded saga data is updated appropriately. Rebus.Pipeline.Receive.DispatchIncomingMessageStep Gets all the handler invokers from the current context and invokes them in order. Please note that each invoker might choose to ignore the invocation internally. If no invokers were found, a RebusApplicationException is thrown. ------------------------------------------------------------------------------ |
As you can see, it’s fairly easy to get an understanding of what exactly is going on with these messages going in and out 🙂
When the bus needs either of the pipelines, it will use the IPipeline service to get them – IPipeline is pretty simple, it looks like this:
1 2 3 4 5 |
public interface IPipeline { IEnumerable<IOutgoingStep> SendPipeline(); IEnumerable<IIncomingStep> ReceivePipeline(); } |
Now, if you’ve read my previous post about how Rebus uses Injectionist to satisfy its dependencies, you can probably envision by now a decorator-based approach to modify the pipeline(s) in order to implement new functionality… and you are absolutely right, that is exactly what I’m about to show you 🙂
In fact, this scenario where you want to use a decorator to inject something into either of the pipelines is so common that it has a nifty decorator prepared to cover most scenarios: PipelineStepInjector. The PipelineStepInjector is a IPipeline decorator that can be configured to inject new steps into either pipeline, at a position relative to another step.
For example, if I have created a new implementation of IIncomingStep, LogResolvedMessageHandlersStep, that logs the type names of resolved message handlers, a natural place to inject that step would be right after ActivateHandlersStep, whose responsitiblity it is to resolve all implementations compatible with IHandleMessages<SomeMessage> from the configured handler activator.
In order to inject this new step, I could do it like this:
1 2 3 4 5 6 7 |
injectionist.Register<IPipeline>(c => { var pipeline = c.Get<IPipeline>(); var stepToInject = new LogResolvedMessageHandlersStep(); return new PipelineStepInjector(pipeline) .OnReceive(stepToInject, PipelineRelativePosition.After, typeof(ActivateHandlersStep)); }, isDecorator: true); |
which will cause my LogResolvedMessageHandlersStep to be positioned right after ActivateHandlersStep.
Nitfy, huh? 😀
Please note that PipelineStepInjector is not meant to be able to cover all scenarios. I could easily come up with complex injection rules that could not readily be handled simply by positioning each step relative to one of the existing steps, but so far it has covered quite a few scenarios nicely.