Subscribing and publishing to channels

This page provides example code for subscribing and publishing to channels using StreamLink for JavaScript.

Before reading this page, it may help you to read Channels.

Introduction to channels

A channel is a variation on the standard subscription model that establishes a secure, one-to-one channel of communication between a StreamLink client and a backend DataSource. Channels are used for exchanging messages, differentiating between clients when serving data, and accepting commands from clients (for an example of the latter, see the Watchlist Control Object in Watchlist API design).

A channel has the following key characteristics:

  • A StreamLink client can write data to the channel, and the backend DataSource can securely identify the client as the source of the data.

  • A DataSource can write data to a channel opened by a StreamLink client, and only that StreamLink client will receive the data.

StreamLink clients can send data to any subject (if permissions allow); channels are not unique in this regard. However, channels are by far the most common target of StreamLink publications. Only channels allow a DataSource to identify the source of incoming contributions and to respond to the source exclusively.

In Caplin terminology, data published by a StreamLink client is sometimes called a 'contribution' or 'contrib'.

For more information and examples, see Channels.

To interact with a channel, you can use StreamLink’s standard publish-and-subscribe API or StreamLink’s channel API. The channel API is a high-level construct; internally it uses the standard publish-and-subscribe API.

Subscribing and publishing (contributing) to a subject are separate in StreamLink. StreamLink does not even force you to subscribe to a subject in order to publish to it; however, if you are expecting a response from the subject (as you would with a channel), then subscribe to the subject before you publish to it.

To subscribe and publish to a channel using StreamLink’s standard publish-and-subscribe API, follow the steps below:

  1. Use the streamlinkfactory class to create a StreamLink object.

  2. Call StreamLink.connect to connect to Liberator.

  3. Create a SubscriptionListener object to handle subscription status, error, and data-update events.

  4. Call streamlink.subscribe to register the SubscriptionListener.

  5. To publish a message on the channel, call StreamLink.publishToSubject.

The template below illustrates the steps above, with event handlers and fields stripped from the code to make the structure clearer:

// (1) Create a StreamLink object
var streamLink = caplin.streamlink.StreamLinkFactory.create({
  username : "...",
  password : "...",
  liberator_urls: "rttp://..."
});

// (2) Connect to Liberator
streamLink.connect();

// (3) Create a SubscriptionListener
var subscriptionListener = {
  onSubscriptionStatus : function(subscription, event) { ... },
  onSubscriptionError : function(subscription, event) { ... },
  onRecordUpdate : function(subscription, event) { ... }
};

// (4) Subscribe to the channel
var subject = "...";
streamLink.subscribe(subject, subscriptionListener);

// (5) Publish a message on the channel
var fields = { ... };
var commandListener = {
  onCommandOk : function(subject) { ... },
  onCommandError : function(subject, error) { ... }
};
streamLink.publishToSubject(subject, fields, commandListener);

To subscribe and publish to a channel using StreamLink’s high-level API, follow the steps below:

  1. Use the StreamLinkFactory class to create a StreamLink object.

  2. Call StreamLink.connect to connect to Liberator.

  3. Create a Channel object.

  4. To publish a message on the channel, call Channel.send.

The code below illustrates the steps above, with event handlers and fields stripped from the code to make the structure clearer:

// (1) Create a StreamLink object
var streamLink = caplin.streamlink.StreamLinkFactory.create({
  username : "...",
  password : "...",
  liberator_urls: "rttp://..."
});

// (2) Connect to Liberator
streamLink.connect();

// (3) Create channel
var subject = "...";
var channelListener = {
  onChannelData: function(channel, evt) { ... },
  onChannelStatus: function(channel, evt) { ... },
  onChannelError: function (channel, evt) { ... }
};
var channel = streamLink.createChannel(subject, channelListener, null);

// (4) Publish a message on the channel
var fields = { ... };
var commandListener = {
  onCommandOk : function(subject) { ... },
  onCommandError : function(subject, error) { ... }
};
channel.send(fields, commandListener);

End-to-end example

This example is based on a customised version of the StreamLink for JavaScript (SLJS) interactive example 2.9, and a Java integration adapter compiled from Caplin’s Java Pricing Adapter Template.

The SLJS interactive examples are packaged with Liberator and are available from Liberator’s web interface (http://localhost:18080/docs/sljs/interactive on a fresh install of Liberator).

The subject written to in this example, /TEMPLATE/CHANNEL, is not initially configured as a channel. This keeps the example simple and illustrates that StreamLink clients can send data to any subject. Instructions to convert /TEMPLATE/CHANNEL to a true channel are provided at the end of the example.

Requirements

To run this example, you will need:

  • Caplin Deployment Framework 7

  • Caplin Liberator 7

  • Caplin StreamLink for JavaScript (SLJS) 7

  • A licence for the Caplin Integration Suite (CIS) 7, which gives you the right to download the CIS 7 libraries from the Caplin Download Portal or from the Caplin Software Repository.

  • A web server on the same host as Liberator (for example, Apache httpd), with a website accessible at host "localhost"

  • OpenJDK 8 or OpenJDK 11

  • [optional] A Java IDE (for example, Eclipse or IntelliJ)

Backend set up

Follow the steps below to set up Liberator and the Java integration adapter used in this example.

  1. Install the Caplin Deployment Framework 7.

  2. Deploy Liberator 7 to the Deployment Framework.

  3. Follow instructions on the Pricing Adapter Template page to run the Pricing Adapter Template in a Java IDE and connect to your Liberator.

    By default, the Pricing Adapter Template is configured to connect to Liberator via Transformer. This example does not use Transformer. To configure the Pricing Adapter Template to connect directly to Liberator, edit the file blade/blade_config/bootstrap.conf and change define ROUTE_VIA_TRANSFORMER TRUE to define ROUTE_VIA_TRANSFORMER FALSE.

At this stage, the subject /TEMPLATE/CHANNEL is not configured as a channel.

Front end set up

Follow the instructions below to prepare Apache to serve the front-end resources used in this example.

  1. Copy the SLJS file streamlink.js to your local website’s /js directory

  2. Copy the example file channel-example.html (below) to your local website’s / directory

The file, channel-example.html, is a customised version of the SLJS interactive example 2.9 (Channels):

<!DOCTYPE html>
<html>

<head>
  <script type="text/javascript" src="js/streamlink.js"></script>
  <link href="examplepage.css" type="text/css" rel="stylesheet" />
</head>

<body onunload="streamLink.disconnect();">

<p>Key : <input id="key" value="operation"></input></p>
<p>Value : <input id="value" value="Ping"></input> <br/></p>
<p><button onclick="sendThroughChannel();">Send through channel</button></p>

<pre id="channelLog"></pre>
<pre id='displayLog'>Log:</pre>

<script>
//Create a StreamLink object
var streamLink = caplin.streamlink.StreamLinkFactory.create({
  liberator_urls: "rttp://localhost:18080",
  username:"admin",
  password:"admin"
});

// Connect to Liberator
streamLink.connect();

// Create a StreamLink Channel to provide bi-directional communication with a record
var subject = "/TEMPLATE/CHANNEL";
var channelListener = {
  onChannelData: function(channel, evt) {
    log("ChannelSnippet.onChannelData() " + evt);
    for (field in evt.getFields()) {
      fields[field] = evt.getFields()[field];
    }
    render();
  },
  onChannelStatus: function(channel, evt) {
    log("ChannelSnippet.onChannelStatus() " + evt);
  },
  onChannelError: function (channel, evt) {
    log("ChannelSnippet.onChannelError() " + evt);
  }
};
var channel = streamLink.createChannel(subject, channelListener, null);

// This function is called by the button 'Send through channel' on the UI
var sendThroughChannel = function() {
  var displayLog = document.getElementById("displayLog");
  displayLog.scrollTop = displayLog.scrollHeight;

  // Assemble the field data to send through the channel
  var fieldData = {};
  fieldData[document.getElementById('key').value] = document.getElementById('value').value;

  // Send field data through channel providing a CommandListener for asynchronous callbacks
  channel.send(fieldData, {
    onCommandOk: function(subject) {
      log("Channel send succeeded for subject: " + subject);
    },
    onCommandError: function(subject, evt) {
      log("ChannelSnippet.onCommandError() subject: " + subject);
    }
  });
};

// Show the latest fields of the subject subscribed by the channel
var fields = {};
var channelLog = document.getElementById("channelLog");
function render() {
  var currentObjString = "Subject "+subject+"\n";
  for (field in fields) {
    currentObjString += "   "+field + " : " + fields[field] + "\n";
  }
  channelLog.innerHTML = currentObjString;
}

// Write to the log area.
var logArea = document.getElementById("displayLog");
function log(line)  {
  logArea.innerHTML += "\n"+line;
}
</script>

</body>
</html>

Sequence diagram

The sequence diagram below shows the relationship between the StreamLink JS example code above and the event handlers in the Pricing Adapter Template.

StreamLinkJSStreamLinkJSLiberatorLiberatorAdapterAdapterStreamLinkJS subscribes to /TEMPLATE/CHANNEL(/TEMPLATE/CHANNEL)requeststreamLink.createChannel()(/TEMPLATE/CHANNEL)requestChannelListener.onChannelOpen()operation="Hello"description="Please send..."ScheduledExecutorService(simulated update from backend)operation="Hello"description="Please send..."ChannelListener.onChannelData()StreamLinkJS sends a message to /TEMPLATE/CHANNEL(/TEMPLATE/CHANNEL)operation="Ping"Channel.send()(/TEMPLATE/CHANNEL)operation="Ping"ChannelListener.onMessageReceived()(/TEMPLATE/CHANNEL)operation="Pong"(/TEMPLATE/CHANNEL)operation="Pong"ChannelListener.onChannelData()

Converting /TEMPLATE/CHANNEL to a channel

This step has been left until last to keep the first part of the example simple and to demonstrate that a StreamLink client can, if Liberator permits, send data to any subject.

The example above uses a literal subject rather than a mapped subject. This doesn’t stop the adapter from receiving client contributions, but it does have the following consequences:

  • When the adapter’s ChannelListener.onRecordReceived method receives a message (contribution) from a client, there is no way for the adapter to determine securely which user sent the message

  • When the adapter replies to a client contribution, there is no way for the adapter to reply to only the client that sent the message. The reply is sent to all clients subscribed to /TEMPLATE/CHANNEL. For example, if both Alice and Bob subscribe to /TEMPLATE/CHANNEL, and Alice sends the message [operation="Ping"], then both Alice and Bob receive the message [operation="Pong"].

This is rarely the desired behaviour. The solution is to map each user’s subscription request for /TEMPLATE/CHANNEL to its own, unique subject. This is the key change that converts /TEMPLATE/CHANNEL into a channel; all other changes we need to make follow logically from this one change.

Changes to Liberator’s configuration

In this section, we make the following changes to Liberator’s configuration:

  • map subscription requests for /TEMPLATE/CHANNEL…​ to /TEMPLATE/<username>/CHANNEL…​

  • update the data service definition for /TEMPLATE/CHANNEL to match the new mapped subject pattern.

To map /TEMPLATE/CHANNEL…​ to /TEMPLATE/<username>/CHANNEL…​, add the following line of configuration to the file blade/Liberator/etc/rttpd.conf in the source code for the Pricing Adapter Template:

object-map /TEMPLATE/CHANNEL%1  /TEMPLATE/%u/CHANNEL%1

The first argument to object-map is the subject that the client requested. The second argument is the subject that Liberator should map it to.

There are two substitution macros in the arguments:

  • %1 acts as a group-capture in the first parameter and as a substitution macro in the second parameter. This ensures that any suffix to the requested subject /TEMPLATE/CHANNEL becomes a suffix of the mapped subject. For an example, see /TEMPLATE/CHANNEL/TRADE in the table below.

  • %u in the second parameter is substituted with the StreamLink user’s username.

The effect of this mapping is illustrated in the table below:

Username Requested subject Mapped subject

alice

/TEMPLATE/CHANNEL

/TEMPLATE/alice/CHANNEL

bob

/TEMPLATE/CHANNEL

/TEMPLATE/bob/CHANNEL

alice

/TEMPLATE/CHANNEL/TRADE

/TEMPLATE/alice/CHANNEL/TRADE

bob

/TEMPLATE/CHANNEL/TRADE

/TEMPLATE/bob/CHANNEL/TRADE

Now that /TEMPLATE/CHANNEL has been mapped to a new subject format, we need to update the include-pattern for the data service definition for /TEMPLATE/CHANNEL. In the same file, blade/Liberator/etc/rttpd.conf, change the value of the add-data-service > include-pattern configuration item to the regular expression ^/TEMPLATE/[^/]+/CHANNEL :

add-data-service
  service-name        ${ADAPTER_NAME}ChannelSvc${THIS_LEG}
  include-pattern     ^/TEMPLATE/[^/]+/CHANNEL
  ⋮
end-data-service

Changes to the adapter’s code

In the previous section, we updated the data-service definition for /TEMPLATE/CHANNEL to match the new mapped version of the subject, /TEMPLATE/<username>/CHANNEL. Similarly, in this section we update the subject prefix (the 'namespace') that the adapter’s ChannelListener is registered against.

The original code used the PrefixNamespace object to describe a literal namespace prefix (/TEMPLATE/CHANNEL). The new mapped version of the subject, however, includes a dynamic element (the user’s username), and so we need a RegexNamespace object to describe the namespace as a pattern:

public void initialise() {
  dataSource.addChannelListener(new RegexNamespace("^/TEMPLATE/[^/]+/CHANNEL"), this);
  
}

See also: