reactive extensions

Notifications from RavenDB server

Introduction

Nowadays, all too often we are facing the need to make data available to our clients as soon as we put our hands on it (aka stored it in the database). Depending on the used technology stack and the nature by which we get the data, this job can become harder to achieve. However, RavenDB features the Changes API, which aims at notifying you, that something has made it’s way into the database (or got changed). There are several type of notifications that get pushed to you, and while their names are self-explanatory, we can talk about their usage scenarios:

  • ForAllDocuments
    • This is a general purpose notification, and well suited for a rather static database, that doesn’t suffer frequent changes;
  • ForDocumentsStartingWith
    • This can be used for general change notification of documents of a type (ex: Trades, Users, etc);
  • ForDocument
    • The most fine grained document change notification – it allows you to track a single document for changes (for example: it gives you the ability to notify the current user that the document has been changed server-side, and he or she should refresh);
  • ForAllIndexes
    • General purpose notification, similar to ForAllDocuments, except that the target of monitoring is the indexes and not the documents;
    • Note! You will receive notifications only for indexes defined by you – thus, you will not get a notification for the Raven/DocumentsByEntityName index;
  • ForIndex
    • The most fine grained index change notification, triggered every time an indexed document is changed;
    • Note! If you have an index which targets documents of type People, then every time a People document get changed you will receive an index changed notification (this happens regardless if the document will end up in the index or not);
  • ForBulkInsert
    • Lets you know when a bulk insert operation starts (DocumentChangeTypes.BulkInsertStarted), finishes (DocumentChangeTypes.BulkInsertEnded), or errors (DocumentChangeTypes.BulkInsertError)
  • ForAllReplicationConflicts
    • Lets you know when a replication conflict happened including a payload letting you know the replication type that failed (ReplicationConflictTypes.DocumentReplicationConflict or ReplicationConflictTypes.AttachmentReplicationConflict), and the attempted operation (ReplicationOperationTypes.Put or ReplicationOperationTypes.Delete)

Subscribe, handle, and unsubscribe

There are two ways of subscribing to these notifications, one of them is by creating a type implementing the IObserver<T> (where T represents the notification type published by the notification you are subscribing to):

var connectionTask = await _store.Changes().ForIndex("MyIndex").Task;

connectionTask.Subscribe(new IndexChangeObserver());

//IndexChangeObserver is your custom type that implements IObserver<IndexChangeNotification>

Or, and I recommend this approach, by adding a reference to Reactive Extensions, and handling notifications in a Rx manner:

var connectionTask = await _store.Changes().ForIndex("MyIndex").Task;

connectionTask.Subscribe(
   notification =>
   {
      //notification received
   },
   error =>
   {
      //error happened
   });

If you opt for using the second approach, then you can easily add more behaviour to your subscription. For instance, lets say that your database has a lot of changes, and you might not want to be notified for each change, but instead, be notified when a batch of changes finishes:

var connectionTask = await _store.Changes().ForIndex("MyIndex").Task;

connectionTask
   .Throttle(TimeSpan.FromSeconds(1))
   .Subscribe(
      notification =>
      { 
         //notification received
      },
      error =>
      {
         //error happened
      });

You can see that I am using the Throttle method, this causes all the notifications to be ignored, until there is a delay of 1 second between notifications, only then, the latest notification will be received by the subscriber.

Unsubscribing from the notifications follows the unsubscribing pattern from reactive extensions, and simply consists of calling Dispose on the subscription token received when subscribing:

var connectionTask = await _store.Changes().ForIndex("").Task;

var subscriptionToken = connectionTask.Subscribe(notification => { });

subscribeTask.Dispose(); //unsubscribing from notifications

 

Error handling

When it comes to error handling, RavenDb makes your life easier in the following ways:

  • If the server is offline when you try  to connect to it, the client will silently and infinitely continue to attempt to connect to the server
  • If the connection between the client and the server goes down, as previously mentioned, the client will continuously attempt to reestablish the connection, and the server will save the notifications which your client has missed. Therefore, when the connection is reestablished, the client will receive all the notifications that has missed while the connection was lost. An important note here, is that the server will only hold your missed notifications for about a minute, after that it will discard them, therefore, you have about a minute to get back in touch, and you wont miss anything, even though Ayende talks about this behavior in a blog post, I think that post is obsolete, as I wasn’t able to reproduce this behavior, nor do I see anything in the ravendb codebase that would achieve that. After further investigation, I found out that it was removed in this commit, due to this issue.

 

All in all, changes API is a neat feature of RavenDB, that can make some tasks incredibly easy to achieve. However, care must be taken when using it, as the client might miss server side events. This can be easily translated to: do not use this feature to implement your own caching – RavenDB already has several caches in place – there is no need for you to roll out your own.