Publishing a JSON subject from a Java DataSource

The page provides an overview of how to publish a subject of type JSON from a Java DataSource.

The instructions on this page assume you already know how to create a Java DataSource project. For more information on creating a Java DataSource, see the Caplin Platform Developer Tutorial 1.

Requirements

Minimum version of Caplin DataSource for Java: 7.1.9

Third-party dependencies

DataSource for Java does not include a JSON object mapper. Instead, it abstracts JSON operations to an interface, JSONHandler, and includes two reference implementations that use third-party JSON object mappers: JacksonJsonHandler and GsonJsonHandler.

Choose an implementation of JSONHandler and configure your dependencies accordingly:

Project dependencies for JacksonJsonHandler

Example Gradle configuration for a project using FasterXML’s Jackson Databind for JSON object mapping.

build.gradle (using Caplin repository)
repositories {
    mavenCentral()
    maven {
        url "https://repository.caplin.com"
        credentials {
            username "<username>"
            password "<password>"
        }
    }
}
dependencies {
    implementation 'com.caplin.platform.integration.java:datasource:7.1.21'
    implementation 'com.fasterxml.jackson.core:jackson-databind:2.14.2'
    implementation 'com.github.java-json-tools:json-patch:1.13'
}
build.gradle (using local Caplin libraries)
repositories {
    mavenCentral()
}
dependencies {
    implementation fileTree(dir: 'lib', include: '*.jar') (1)
    implementation 'com.fasterxml.jackson.core:jackson-databind:2.14.2'
    implementation 'com.github.java-json-tools:json-patch:1.13'
}
1 Copy Caplin’s datasource-7.1.n-jar-with-dependencies.jar to the lib directory in your Gradle project.
Project dependencies for GsonJsonHandler

Example Gradle configuration for a project using Google’s GSON for JSON object mapping.

build.gradle (using Caplin repository)
repositories {
    mavenCentral()
    maven {
        url "https://repository.caplin.com"
        credentials {
            username "<username>"
            password "<password>"
        }
    }
}
dependencies {
    implementation 'com.caplin.platform.integration.java:datasource:7.1.21'
    implementation 'com.google.code.gson:gson:2.10.1'
    implementation 'com.tananaev:json-patch:1.2'
}
build.gradle (using local Caplin libraries)
repositories {
    mavenCentral()
}
dependencies {
    implementation fileTree(dir: 'lib', include: '*.jar') (1)
    implementation 'com.google.code.gson:gson:2.10.1'
    implementation 'com.tananaev:json-patch:1.2'
}
1 Copy Caplin’s datasource-7.1.n-jar-with-dependencies.jar to the lib directory in your Gradle project.

Configuration

DataSource for Java is not configured to use a specific JSONHandler implementation by default. Before you can publish JSON messages, configure DataSource for Java to use your preferred JsonHandler implementation:

Configuration for JacksonJsonHandler

To use JacksonJsonHandler as the JSON object mapper, choose one of the methods below:

  • Setting the JsonHandler implementation in your adapter’s datasource.conf configuration file:

    json-handler-class com.caplin.datasource.messaging.json.JacksonJsonHandler
  • Setting the JsonHandler implementation in your adapter’s code:

    dataSource.getExtraConfiguration().setJsonHandler(
        new com.caplin.datasource.messaging.json.JacksonJsonHandler());
Configuration for GsonJsonHandler

To use GsonJsonHandler as the JSON object mapper, choose one of the methods below:

  • Setting the JsonHandler implementation in your adapter’s datasource.conf configuration file:

    json-handler-class com.caplin.datasource.messaging.json.GsonJsonHandler
  • Setting the JsonHandler implementation in your adapter’s code:

    dataSource.getExtraConfiguration().setJsonHandler(
        new com.caplin.datasource.messaging.json.GsonJsonHandler());

Overview

JSON messages are published via a CachingPublisher, which caches the last published value for each subject it serves. The CachingPublisher uses its cache to serve new requests for subjects it has already served, and, in the case of JSON data, to determine if it is more efficient to publish an update as a JSON patch or in full.

You provide data for JSON subjects by writing a CachingDataProvider, which you register with a DataSource instance using DataSource.createCachingPublisher(Namespace, CachingDataProvider). This call returns a CachingPublisher, which you must make available to your CachingDataProvider in order for it to publish JsonMessage objects.

When a DataSource receives a JSON subject request, if the CachingPublisher for the subject namespace does not have a cached object for the subject, then the CachingPublisher routes the request to its associated CachingDataProvider. The CachingDataProvider subscribes to data in the backend and publishes a JSONMessage.

Java DataSourceStreamLinkStreamLinkLiberatorLiberatorDataSourceDataSourceCachingPublisher (/FX/...)CachingPublisher (/FX/...)CachingDataProviderCachingDataProviderBackend SystemBackend System/FX/GBPUSD/FX/GBPUSD/FX/GBPUSDObject not in cacheCachingDataProvider.onRequest(subject)Subscribe to GBPUSDCachingPublisher.publish(jsonMessage)Cache JSON data
Serving a request for an uncached object

When a DataSource receives a JSON subject request, if the CachingPublisher for the subject namespace has a cached object for the subject, then the CachingPublisher serves the request directly from its cache:

Java DataSourceStreamLinkStreamLinkLiberatorLiberatorDataSourceDataSourceCachingPublisher (/FX/...)CachingPublisher (/FX/...)CachingDataProviderCachingDataProviderBackend SystemBackend System/FX/GBPUSD/FX/GBPUSD/FX/GBPUSDObject in cache
Serving a request for a cached object

The CachingDataProvider implementation should also publish updates resulting from its subscription to backend data events. When the CachingDataProvider publishes an update to a subject that the CachingPublisher has already cached, the CachingPublisher chooses the most efficient format in which to publish the update: as a JSON patch or as the full image.

Java DataSourceSubscribed PeersSubscribed PeersDataSourceDataSourceCachingPublisher (/FX/...)CachingPublisher (/FX/...)CachingDataProviderCachingDataProviderBackend SystemBackend SystemUpdate for GBPUSDCachingPublisher.publish(jsonMessage)Compare to cached object.Publish as a JSON patchor publish in full?Update cache
Serving updates for a cached object

Example

In this example, we’ll create a CachingDataProvider that generates random pricing data for currency pair subjects (/CCYS/<currency_pair>).

ExampleJsonAdapter.java
import com.caplin.datasource.DataSource;
import com.caplin.datasource.messaging.json.JacksonJsonHandler;
import com.caplin.datasource.namespace.PrefixNamespace;
import com.caplin.datasource.publisher.CachingPublisher;

public class ExampleJsonAdapter
{
    public static void main(final String[] args)
    {
        final DataSource dataSource = DataSource.fromArgs(args);
        dataSource.getExtraConfiguration().setJsonHandler(new JacksonJsonHandler()); (1)

        PricingCachingDataProvider pricingCachingDataProvider =
                new PricingCachingDataProvider(); (2)

        CachingPublisher pricingCachingPublisher = dataSource.createCachingPublisher(
                new PrefixNamespace("/CCYS/"), pricingCachingDataProvider); (3)
        pricingCachingDataProvider.setCachingPublisher(pricingCachingPublisher); (4)

        dataSource.start();
    }
}
1 Set the implementation of JsonHandler that the adapter’s DataSource instance should use to serialise Java objects to JSON. In this example, we use DataSource for Java’s JacksonJsonHandler implementation.
2 Instantiate a CachingDataProvider. In this example we instantiate a PricingCachingDataProvider (see source code below).
3 Register the PricingCachingDataProvider with the adapter’s DataSource instance to create a CachingPublisher.
4 Initialise the PricingCachingDataProvider with its associated CachingPublisher.
PricingCachingDataProvider.java

The CachingDataProvider that responds to subject requests and discards.

import com.caplin.datasource.messaging.json.JsonMessage;
import com.caplin.datasource.publisher.CachingDataProvider;
import com.caplin.datasource.publisher.CachingPublisher;

import java.util.Map;
import java.util.Random;
import java.util.concurrent.*;

public class PricingCachingDataProvider implements CachingDataProvider
{
    private final ScheduledExecutorService executorService =
            Executors.newSingleThreadScheduledExecutor();
    private final Random random = new Random();
    private final Map<String, ScheduledFuture<?>> subscriptions =
            new ConcurrentHashMap<>();
    private CachingPublisher cachingPublisher = null;

    @Override
    public void onRequest(String subject) { (1)
        subscriptions.put(
            subject,
            executorService.scheduleAtFixedRate(() -> {
                Price randomPrice = createRandomPrice(subject.split("/")[2]);
                JsonMessage jsonMessage = cachingPublisher.getCachedMessageFactory()
                        .createJsonMessage(subject, randomPrice);
                cachingPublisher.publish(jsonMessage);
            }, 0L, 1L, TimeUnit.SECONDS)
        );
    }

    @Override
    public void onDiscard(String subject) { (2)
        ScheduledFuture<?> scheduledFuture = subscriptions.remove(subject);
        if (scheduledFuture != null) scheduledFuture.cancel(true);
    }

    public void setCachingPublisher(CachingPublisher cachingPublisher) { (3)
        this.cachingPublisher = cachingPublisher;
    }

    private Price createRandomPrice(String currencyPair) {
        return new Price(
            String.valueOf(random.nextInt()),
            String.valueOf(random.nextDouble()),
            String.valueOf(random.nextDouble()),
            currencyPair
        );
    }
}
1 On subject request, parse the subject and subscribe to a backend process to provide updates. This method is called by the CachingPublisher when it does not have cached data for a subject; thereafter, requests for the same subject are served from the CachingPublisher cache. In this example, we start a simulation that publishes a random price for the subject every second.
2 On subject discard, cancel the backend process that generates updates for the subject. In this example, we cancel the ScheduledFuture that generates a random price every second.
3 The setCachingPublisher method is not part of the CachingDataProvider interface. We use this method to initialise the PricingCachingDataProvider with its associated CachingPublisher.
Price.java

A POJO that represents the data to send as a JSON message. Instances of this class are serialised to JSON by DataSource for Java’s JsonHandler.

public class Price {
    private final String id;
    private final String bid;
    private final String ask;
    private final String currencyPair;

    public Price(String id, String bid, String ask, String currencyPair) {
        this.id = id;
        this.bid = bid;
        this.ask = ask;
        this.currencyPair = currencyPair;
    }

    public String getId() {
        return id;
    }

    public String getBid() {
        return bid;
    }

    public String getAsk() {
        return ask;
    }

    public String getCurrencyPair() {
        return currencyPair;
    }
}

See also: