Prash's Blog

Introduction to NServiceBus – Part 2 July 29, 2014

Filed under: C# — prazjain @ 7:59 pm
Tags: ,

Service Bus Host

 

In the previous post, we have created project to send messages on nservicebus, now we will create a project to host the service bus.

Create a class library project “App.Backend”, in solution “ServiceBus”. Add reference to “App.Contracts” project in this new project.

Right Click on “References” -> “Manage NuGet References”. Search online for “NServiceBus RavenDB” and add it.

Right Click on “References” -> “Manage NuGet References”. Search online for “NServiceBus Host” and add it.

This auto-generates a class “EndpointConfig” class. This class configures this endpoint and assigns it the role of server. A host can have more than one endpoint and an endpoint can have only one queue for it.

Lets add a class that will handle messages sent by our earlier application

Create a class PlaceOrderHandler, which just prints out the order id for now

    public class PlaceOrderHandler : IHandleMessages<PlaceOrder>
    {
        public void Handle(PlaceOrder message)
        {
            Console.WriteLine("Order received : " + message.OrderId);
        }
    }

IHandleMessages interface of NServiceBus tell the bus, this implementation will process the message of type (as specified in generic args).

 

Housekeeping

 

Lets cleanup the queues created earlier, so we see new queues there.

In “Console.Request” client project from previous post -> “App.Config”, update the UnicastBusConfig section as follows:

  <UnicastBusConfig>
    <MessageEndpointMappings>
      <add Messages="App.Contracts" Endpoint="App.Backend" />
    </MessageEndpointMappings>
  </UnicastBusConfig>

 

Now is the time to run this application.

Set App.Backend as startup project and start it in debug mode.

Now navigate to the exe for “Console.Request” project and run it.

Once they are both up, you will see output like this :

Console.Request output

Generated new Order id : 7417a25b-c278-46ab-bb35-758177ca822d

App.Backend output

Order received : 7417a25b-c278-46ab-bb35-758177ca822d

Check the queues created, it will look something like this:

check queues

 

These queues were generated by the host project when it started up, and Audit queue contains the message that was sent and processed above.

 

Advanced Concepts

 

Fault Tolerance

 

Our Console.Request application may send a message and continue doing about its business, but while processing the message in App.Backend database transaction is rolled back due to database failure or application crash, then we may loose the message.

So we want to make sure when our database rolls back, our queue also rolls back. Similarly when our database commits, our queue also commits (as in removes the message). NService Bus provides following options to deal with such cases:
Simple Retrying can solve transient exceptions like deadlock in database

<MsmqTransportConfig MaxRetries="5"/>

 

Second level retries (SLR) like database is down and will come back up in couple of seconds or minutes

<SecondLevelRetriesConfig Enabled="true" TimeIncrease="00:00:05" NumberOfRetries="2"/>

 

Messages that always fail are moved to an error queue

<MessageForwardingInCaseOfFaultConfig ErrorQueue="error"/> <!-- for remote server : error@remotemachine -->

For permanent exceptions like message deserialization, nservice bus will not even re-try and move them to error queue directly.

To see what will happen when you add above settings in your App.Backend project -> app.config, change PlaceOrderhandler.cs class as follows:

    public class PlaceOrderHandler : IHandleMessages<PlaceOrder>
    {
        public void Handle(PlaceOrder message)
        {
            throw new Exception("Exception occurred");
            Console.WriteLine("Order received : " + message.OrderId);
        }
    }

Now run App.Backend in debug mode, and navigate to Console.Request.exe path and run it as well.

You will notice that when you sent Order through Console.Request, App.Backend threw an exception in Handle method above. This method will be call 5 times in quick succession because of MsmqTransportConfig maxRetries configuration. If still not succeeding then Second Level Retries (SLR) kick-in they further enable another 5 retries, if that fails it SLR makes another try after 5 seconds as configured which enabled further 5 retries as configuration in MsmqTransportConfig.

When all tries fail, the message is saved in ErrorQueue=”error” as configured above. (You can navigate to the queues and check the saved message there for its contents).

Now lets say, the problem has been fixed and we want to process the messages which were ignored earlier. (remove the line – throw new Exception(“Exception occurred”); )

For this you will need to install “NServiceBus Tools” nuget package.

 

Move the error messages from error queue back to their source queues, run this executable :

<SolutionDirectory>\ServiceBus\packages\NServiceBus.Tools.4.4.3\ReturnToSourceQueue.exe

 

This will move the message back to the incoming queue for App.Backend application.

Now run the App.Backend application (if it is not already running), and you will see, it picks up the messages from its incoming queue and processes them as usual.

 

Distributed Transactions

 

Let’s ensure Network DTC is enabled. Go to “Component Services” from Windows start menu.

dtc_enabled

Install “RavenDB Client” package in App.Backend project.

Now update PlaceOrderHandler.cs as follows to save some data in RavenDB. Code is self explanatory. (place both classes below in same file, unless you want to create a separate file for Order class with just one property)

public class PlaceOrderHandler : IHandleMessages<PlaceOrder>
    {
        public void Handle(PlaceOrder message)
        {
            // Initialize the data store, point url to where db is listening
            var store = new DocumentStore { Url = "http://localhost:8080", DefaultDatabase = "App.Backend" };
            store.Initialize();
            using (var session = store.OpenSession())
            {
                session.Store(new Order() { OrderId = message.OrderId });
                session.SaveChanges();
            }
            //throw new Exception ("Exception!");
            Console.WriteLine("Order received : " + message.OrderId);
        }
    }

    public class Order
    {
        public Guid OrderId { get; set; }
    }

Now run the App.Backend project, and navigate to Console.Request.exe and run it to send request.

When you see request generated and processed, navigate to http://localhost:8080 and check documents to see the newly inserted Order object.

To see how exception handling and transaction work, uncomment Exception line in PlaceOrderHandler.Handle method and run App.Backend again.

This time you will when handling a new order from Console.Request, App.Backend throws exception and goes through all the retry logic (as configured), but no data is saved!

Even though we had a session.SaveChanges() call, no data was saved in RavenDB, this is because NServiceBus encloses each message handling method inside a transaction. So if a message handler completes successfully, transaction is committed else transaction is rolledback.

(You can now comment out the exception line again before proceeding ahead)

 

Dependency Injection

 

If you look at our PlaceOrderHandler class, you can see we instantiate DocumentStore and Session objects there. These are good candidates for dependency injection.

Update the PlaceOrderHandler code as below:

 public class PlaceOrderHandler : IHandleMessages<PlaceOrder>
    {
        public IDocumentStore Store { get; set; }
        public IDocumentSession Session { get; set; }
 
        public void Handle(PlaceOrder message)
        {
            Session.Store(new Order() { OrderId = message.OrderId });
            Session.SaveChanges();
            Console.WriteLine("Order received : " + message.OrderId);
        }
    }

Now lets add a class RavenBootstrapper.cs in App.Backend project, which will instantiate these dependencies and register them with NServiceBus container. NServiceBus container will then inject them into PlaceOrderHandler properties. Below is the code for RavenBootstrapper.cs:

public class RavenBootstrapper : INeedInitialization
    {
        public void Init()
        {
            Configure.Instance.RavenDBStorage("RavenDB", "App.Backend");
            Configure.Instance.Configurer.ConfigureComponent<IDocumentStore>(
                () =>
                    {
                        // Initialize the data store, point url to where db is listening
                        var store = new DocumentStore { Url = "http://localhost:8080", DefaultDatabase = "App.Backend" };
                        store.Initialize();
                        store.JsonRequestFactory.DisableRequestCompression = true;
                        return store;
                    }
                    , DependencyLifecycle.SingleInstance);

            Configure.Instance.Configurer.ConfigureComponent<IDocumentSession>(() =>
                {
                    return Configure.Instance.Builder.Build<IDocumentStore>().OpenSession();
                }
                , DependencyLifecycle.InstancePerCall);

        }
    }

INeedInitialization interface, as the name suggests, says to NServiceBus that it needs to be initialialized. It is here that we have configured and registered a DocumentStore and DocumentSession.

DocumentStore has dependency lifecycle of SingleInstance because we only need one instance of DataStore for the life of this application, but DocumentSession has dependency lifecycle of InstancePerCall because this needs to be instantiated for every call.

In NServiceBus a single message can have multiple handlers, so a single Order message above can be sent to multiple handlers. In such cases we do not want to have SaveChanges() call in every handler, we can have it once at the end, when all handlers have been invoked for that message.

So now lets update PlaceOrderHandler Handle method as below:

public void Handle(PlaceOrder message)
        {
            Session.Store(new Order() { OrderId = message.OrderId });
            Console.WriteLine("Order received : " + message.OrderId);
        }

To make sure SaveChanges is called on Session, update the RavenBootstrapper.cs file as below:

namespace App.Backend
{
    public class RavenBootstrapper : INeedInitialization
    {
        public void Init()
        {
            Configure.Instance.RavenDBStorage("RavenDB", "App.Backend");
            Configure.Instance.Configurer.ConfigureComponent<IDocumentStore>(
                () =>
                    {
                        // Initialize the data store, point url to where db is listening
                        var store = new DocumentStore { Url = "http://localhost:8080", DefaultDatabase = "App.Backend" };
                        store.Initialize();
                        store.JsonRequestFactory.DisableRequestCompression = true;
                        return store;
                    }
                    , DependencyLifecycle.SingleInstance);

            Configure.Instance.Configurer.ConfigureComponent<IDocumentSession>(() =>
                {
                    return Configure.Instance.Builder.Build<IDocumentStore>().OpenSession();
                }
                , DependencyLifecycle.InstancePerUnitOfWork);

            Configure.Instance.Configurer.ConfigureComponent<RavenUnitOfWork>(DependencyLifecycle.InstancePerUnitOfWork);
        }
    }

    public class RavenUnitOfWork : IManageUnitsOfWork
    {
        public IDocumentSession Session { get; set; }

        public void Begin()
        {
            
        }

        public void End(Exception ex = null)
        {
            if (ex==null) Session.SaveChanges();
        }
    }
}

Over here, we have created a RavenUnitOfWork, which implements IManageUnitsOfWork, this is registered with NServiceBus which manages its lifecycle as InstancePerUnitOfWork (i.e per message processing across multiple handlers).

We have also update the dependency life cycle of DocumentSession to be InstancePerUnitOfWork, because now we want instance to be created / disposed only once after the message has been processed through all its handlers (rather instantiating and disposing than per handler).

 

Lets conclude the post here, in this post we have seen:

  • How to write a backend application using NServiceBus
  • How to deal with exception handling and recover unprocessed messages.
  • How to interact with database.
  • How to use Dependency Injection
Advertisements
 

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s