Get and publish streaming data

This tutorial shows you how to send and receive messages from a streaming server using Caplin’s MessageService library.

The caplin.services.messaging.MessageService is an interface that adapters for various data streaming libraries can use to provide data in a consistent way. This allows you to write components that can work with many different streaming services, without them having to be customised for each service.

Setup

To use the MessageService, your application needs to create an instance of a class that implements the MessageService interface, and register it with the ServiceRegistry.

var messageService = new test.blades.messagesexample.workbench.ExampleMessageService();
caplin.core.ServiceRegistry.registerService("caplin.message-service", messageService);

This would normally be done when the application initialises.

Using the MessageService

The MessageService has two methods: publish and subscribe. These are used for sending and receiving data.

1. Receiving Data

To receive data, you need to create code that will get notified when the data change. What you need to implement is documented in the caplin.services.messaging.SubscriptionListener interface.

It breaks down into three methods: onDataUpdate, which is called when some new data are published; onStatusUpdate, which is called when the status of the topic you are subscribing to changes (perhaps because it can no longer be provided, or the connection to the streaming server has failed); and onError, which is called if the subscription fails. This might occur if the server refuses to provide the data requested, either because it doesn’t have those data, or because you’re not permitted to view it.

function MyDataListener() {}
caplin.implement(MyDataListener, caplin.services.messaging.SubscriptionListener);

MyDataListener.prototype.onDataUpdate = function(sTopic, mData, mMetaData) {
    console.log("Update message received for", sTopic);
    for (var field in mData) {
      console.log("   "+field,"=",mData[field]);
    }
};

MyDataListener.prototype.onStatusUpdate = function(sTopic, sStatus) {
    console.log("Status changed for", sTopic, "new status =", sStatus);
};

MyDataListener.prototype.onError = function(sTopic, sErrorType) {
    console.log("Subscription error for", sTopic, "reason =", sErrorType);
};

You also have to know the name of the data you want to receive on the streaming service. On a lot of publish/subscribe systems, this is often called the 'topic'. If you’re using Caplin’s streaming server, Liberator, it’s commonly referred to as a 'subject'.

To start receiving updates for the data you’re interested in, get the MessageService and call subscribe, passing in the topic name (in this case "/FX/EURGBP") and an instance of your SubscriptionListener. The example below builds on the MyDataListener() function, shown above.

var dataListener = new MyDataListener();
var messageService = caplin.core.ServiceRegistry.getService("caplin.message-service");
messageService.subscribe("/FX/EURGBP", dataListener);

If your message service supports extra options, you can often pass these in as a third argument. The Caplin StreamLinkMessageService, supports restricting the fields that you are subscribing to, like so:

messageService.subscribe("/FX/EURGBP", dataListener, {fields: ["BestBid", "BestAsk"]});

Exactly which options are supported by a message service will vary from one to another. Check the documentation that comes with it for details.

If your subscription fails, the onError method on your listener will be called with some information about what went wrong. The error type passed in will be one of the strings specified in caplin.services.messaging.SubscriptionError.

If your subscription succeeds, then as well as getting messages, you’ll be notified of changes in status. For example: if the service is temporarily unable to send you the data you’ve requested (perhaps your server has lost its link to another server, from which it obtains those particular data), meaning that you might be missing messages, your listener will receive an onStatusUpdate method call, with a status of caplin.services.messaging.SubscriptionStatus.STALE. When the service is again able to send you those data, you’ll receive a status of caplin.services.messaging.SubscriptionStatus.OK.

2. Publishing Data

In order to get notified about the status of a message you send, you’ll need an object that implements caplin.services.messaging.PublishListener.

function MyPublishListener() {}
caplin.implement(MyPublishListener, caplin.services.messaging.PublishListener);

MyPublishListener.prototype.onSuccess = function(sSubject) {
    console.log('Data was successfully published to', sSubject);
};
MyPublishListener.prototype.onError = function(sSubject, sErrorType) {
    console.log('There was a problem publishing data to', sSubject, ":", sErrorType);
};

Then to publish data, you simply pass the topic name, the data and listener:

var messageService = caplin.core.ServiceRegistry.getService("caplin.message-service");
var publishListener = new MyPublishListener();
messageService.publish("/CHESS", {
    gameId: 27,
    moveNumber: 9,
    move: "Nf3 Nc6"
}, publishListener);

If the message is successfully sent, the onSuccess method will be called on your listener, while if the message cannot be sent, you’ll receive an onError callback.

The onError callback is passed one of the values on caplin.services.messaging.PublishError to indicate the kind of failure.