Persist data to Transformer

Transformer’s Persistence Service provides StreamLink clients and Transformer Modules with access to a persisted key-value store. This page compares the Persistence Service in Transformer 6.2 and Transformer 7.0, and provides examples of persisting data using the StreamLink API and the Java Transformer Module (JTM) API.

If you are reading this page as part of an upgrade to Caplin Platform 7, see Upgrading to Caplin Platform 7.

Requirements

The Persistence Service requires the persistence option in Transformer’s licence.

For more information on activating the Persistence Service, see the page appropriate to your version of Transformer:

Breaking changes in the Persistence Service in Transformer 7

Breaking changes in Transformer 7:

  • Transformer 7’s persistence API is not compatible with modules written for Transformer 6.2’s persistence API

  • Transformer 7’s persistence service uses a new database persistence model, which is incompatible with data stored in Transformer 6.2’s persistence model.

  • Transformer 7’s persistence service does not support Transformer 6.2’s filedb format

The differences between the Persistence Service in Transformer 6.2 and Transformer 7.0 are summarised below:

Transformer 6.2 Transformer 7

Number of database tables

One table

Multiple tables

When are database tables created

Manually, during activation of the Persistence Service

Manually, during installation of each dependent Transformer module

Persistence key

One string, mapped to one primary-key column.

One or more strings, each mapped to a primary-key column.

Persistence value

One string, mapped to one value column.

One or more strings, each mapped to a value column.

SQL data-types

Character data-types.

Character, integer, and decimal data-types.

Table design

Fixed design.

Designed by the module or pipeline developer.

Mapping of data to tables and columns

Specified by the Transformer administrator in the persistence.conf configuration file.

Specified by the developer when persisting, reading, and removing key-value pairs.

Stored procedures

No

Yes (Transformer 7.1.1+)

Transactions

No

No

Database transaction support

The persistence service does not support database transactions. To ensure that parallel writes are not made to the same persistence key, follow the guidance below:

  • In load-balanced deployments of Transformer, enable the source-affinity load-balancing algorithm to ensure that a user’s persistence operations occur in sequence on the same instance of Transformer.

  • [Transformer 7] Do not share database tables between different Transformer modules.

Support for database transactions will be added in a later release of Transformer.

Persisting data using the Transformer 6.2 API

This section provides an example of using the legacy Java Transformer Module (JTM) API, com.caplin.transformer.module, to persist and retrieve values.

The Persistence API is in package com.caplin.transformer.module.persistence.

Configuration

No extra configuration is required for a Transformer module to use Transformer 6.2’s Persistence Service.

Persisting a value

In Transformer 6.2, all persisted values are stored in the same database table. To ensure that the value’s key is unique, prefix the key with any values required to distinguish it from other keys of the same name. In the example below, the key "defaultaccount" is prefixed by the module name and the user’s username.

To persist a value, use the Persistence.put method.

Persistence ps = new PersistenceImpl();

String key = moduleName + username + "defaultaccount";
String value = "account1"
ps.put(new PersistedValue(key, value));

Retrieving a value

To retrieve a value, use the Persistence.get method:

Persistence ps = new PersistenceImpl();

String key = moduleName + username + "defaultaccount";
String value = null;
PersistedValue pv = ps.get(key);
if (pv != null) {
    value = pv.getValue();
}

Retrieving all persisted data for a user

To retrieve all key-value pairs for a user, use the Persistence.query method. The example below retrieves all keys persisted by the module for a specific user:

Persistence ps = new PersistenceImpl();

String keyPrefix = moduleName + username;
PersistedValue[] persistedValues = ps.query(keyPrefix);

Persisting data using the Transformer 7 API

This section provides an example of using the Java Transformer Module (JTM) API, com.caplin.jtm, to persist and retrieve values.

Deprecation notice: Transformer 7.0’s new JTM API, com.caplin.jtm, is a replacement for Transformer 6.2’s JTM API, com.caplin.transformer.module. The original JTM API is now deprecated, and has an end-of-life scheduled for one year following the initial release of Transformer 7.

The Persistence API is in package com.caplin.jtm.persistence.

Creating database tables

Transformer 7’s persistence service, in contrast to Transformer 6.2’s persistence service, is much more than a key-value store persisted to a single database table. As the developer of a Transformer module that uses the Persistence API, you are responsible for designing the database table(s) that your module persists data to.

All Caplin supplied modules that use the Transformer 7 persistence service require a database administrator to create database tables as part of their installation. For detailed instructions, see Activating the Persistence Service 7, Deploying the Watchlist Service, and Deploying the Alerts Service.

The Transformer 7 persistence service supports the following features in database tables:

  • Multiple-column primary keys

  • Multiple value columns

  • Character, integer, and decimal column types

The SQL script below shows an example of a table suitable for use with Transformer 7’s persistence service:

CREATE TABLE IF NOT EXISTS TRANSFORMER_EXAMPLE_MODULE (
    EXAMPLE_USER VARCHAR(250) NOT NULL,
    EXAMPLE_KEY VARCHAR(250) NOT NULL,
    EXAMPLE_VALUE VARCHAR(4096) NOT NULL,
    PRIMARY KEY (EXAMPLE_USER, EXAMPLE_KEY)
);

The table has a composite primary-key (EXAMPLE_USER and EXAMPLE_KEY) and a value column (EXAMPLE_VALUE). The position of the EXAMPLE_USER column as the leftmost column of the primary key is deliberate; it optimises queries that return all the keys persisted for a specific user (see Retrieving all key-value pairs for a user, below).

For further examples of table designs, see the SQLite reference scripts included in the Persistence Service Client, the Watchlist Service, and the Alerts Service.

Caplin does not support SQLite in production. The SQLite scripts are provided as reference implementations for database administrators to adapt to the syntax of their own database servers.

Writing to a database table

To persist a value, use the Persistence.upsert method. Keys and values are passed to the method as java.util.Map collections of column values keyed by column names. The example below references the key and value columns created by the script, bootstrap.sql, above.

// transformerAccessor is passed to the module in the TransformerModule.initialise method
Persistence ps = transformerAccessor.getPersistence();

java.util.Map<String,String> keys = new java.util.HashMap<String,String>();
keys.put("EXAMPLE_USER", username);
keys.put("EXAMPLE_KEY", "defaultaccount");

java.util.Map<String,String> data = new java.util.HashMap<String,String>();
data.put("EXAMPLE_VALUE", "account1");

ps.upsert("TRANSFORMER_EXAMPLE_MODULE", keys, data);

Querying a database table

To retrieve a value, use the Persistence.get method. The method returns a java.util.List of java.util.Map objects, where each Map object represents a resultset row.

The example below references the key columns created by the script, bootstrap.sql, above.

// transformerAccessor is passed to the module in the TransformerModule.initialise method
Persistence ps = transformerAccessor.getPersistence();

java.util.Map<String,String> keys = new java.util.HashMap<String,String>();
keys.put("EXAMPLE_USER", username);
keys.put("EXAMPLE_KEY", "defaultaccount");

String value = null;
java.util.List<java.util.Map<String,String>> rows
    = ps.get("TRANSFORMER_EXAMPLE_MODULE", keys);
if (!rows.isEmpty()) {
    java.util.Map<String,String> row = rows.get(0);
    value = row.get("EXAMPLE_VALUE");
}

Retrieving all rows in a table for a specific user

To retrieve all values persisted to a database table for a specific user, use the Persistence.get method.

The code example below is similar to that used for retrieving a single persisted value, except that this time the keys map contains only a single key column: EXAMPLE_USER.

// transformerAccessor is passed to the module in the TransformerModule.initialise method
Persistence ps = transformerAccessor.getPersistence();

java.util.Map<String,String> keys = new java.util.HashMap<String,String>();
keys.put("EXAMPLE_USER", username);

String value = null;
java.util.List<java.util.Map<String,String>> rows
    = ps.get("TRANSFORMER_EXAMPLE_MODULE", keys);
for (java.util.Map<String,String> row : rows) {
    System.out.println(row.get("EXAMPLE_KEY") + "=" + row.get("EXAMPLE_VALUE"));
}

Calling a stored procedure

The Transformer persistence API supports calling stored procedures with both input and output parameters. The API does not support retrieving result sets from stored procedures.

The H2 database does not support output parameters for stored procedures. The H2 JDBC driver allows you to access a stored procedure’s return value as a virtual output parameter, P0.

To call a database stored procedure, use the Persistence.call method.

The example MySQL DDL and Java code below shows how to call stored procedures that store and retrieve values.

MySQL DDL: table and stored procedures
CREATE TABLE `user_properties` (
  `username` varchar(50) NOT NULL,
  `property` varchar(50) NOT NULL,
  `value` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`username`,`property`)
);

DELIMITER $$

CREATE
  PROCEDURE `setUserDefaultAccount`(
    IN username VARCHAR(50),
    IN account VARCHAR(255)
  )
  BEGIN
    INSERT INTO `user_properties` (`username`, `property`, `value`)
      VALUES (username, 'defaultaccount', account);
  END$$

CREATE
  PROCEDURE `getUserDefaultAccount`(
    IN username VARCHAR(50),
    OUT account VARCHAR(255)
  )
  BEGIN
    SELECT `value` INTO account
      FROM `user_properties`
      WHERE `username` = username AND `property` = 'defaultaccount';
  END$$

DELIMITER ;
Calling stored procedure setUserDefaultAccount
// transformerAccessor is passed to the module in the TransformerModule.initialise method
Persistence ps = transformerAccessor.getPersistence();

String[] inputParameters = new String[2];
inputParameters[0] = username;
inputParameters[1] = "account1"

try {
  ps.call("setUserDefaultAccount", arguments);
} catch(PesistenceException e) {
  System.out.println(e.getMessage());
}
Calling stored procedure getUserDefaultAccount
// transformerAccessor is passed to the module in the TransformerModule.initialise method
Persistence ps = transformerAccessor.getPersistence();

String[] inputParameters = new String[1];
inputParameters[0] = username;

java.util.Map<String,String> outputParameters;

try {
  outputParameters = ps.call("getUserDefaultAccount", arguments);
  System.out.println(outputParameters.get("ACCOUNT")); (1)
} catch(PesistenceException e) {
  System.out.println(e.getMessage());
}
1 The Transformer persistence API converts output parameter names to uppercase (in this case, accountACCOUNT).

The Persistence Service Client module publishes a private namespace for each StreamLink user, where they can read and update persisted generic-records, and a control channel to allow each StreamLink user to create and delete records within their private namespace.

Subjects published by the Persistence Service Client module
Subject Description

/PRIVATE/PERSISTENCE/CONTROL

The control channel. You create and delete persisted records by contributing messages to this channel.

/PRIVATE/PERSISTENCE/RECORD/record_identifier

Individual record channels. You write data to a persisted record by publishing data to it.

/PRIVATE/PERSISTENCE/CONTAINER/ALL

[Transformer 7.1.1] Container of all records persisted by the user.

Security

A StreamLink user cannot access, change, or delete another StreamLink user’s persisted records.

  • The record namespace, /PRIVATE/PERSISTENCE/RECORD, is mapped internally by Liberator to a namespace private to each StreamLink user: /PRIVATE/PERSISTENCE/username/RECORD.

  • The control subject, /PRIVATE/PERSISTENCE/CONTROL, is mapped internally by Liberator to a subject private to each StreamLink user: /PRIVATE/PERSISTENCE/username/CONTROL.

  • The container subject, /PRIVATE/PERSISTENCE/CONTAINER/ALL, is mapped internally by Liberator to a subject private to each StreamLink user: /PRIVATE/PERSISTENCE/username/CONTAINER/ALL

  • The Persistence Service Client persists each record under a persistence key that includes the StreamLink user’s username.

Creating a persisted record

Before you can write to a persistence record, you must create it.

To create a persisted record, send the following command message to the control channel, `/PRIVATE/PERSISTENCE/CONTROL.

Request message
Field Data Type Example Description

OperationId

String

1

A unique identifier for this operation. The identifier will be returned in the control channel’s response message, and will enable you to associate the response with the operation.

Depending on Liberator’s configuration and licence, it may be possible for a user to run concurrent sessions. The identifier you assign to OperationId must be unique across all the user’s sessions.

CreateRecord

String

defaultblotter

An identifier for the record.

The Persistence Service Client will respond with the following message on the control channel:

Response message
Field Data Type Example Description

OperationId

String

1

The unique identifier for this operation, as provided by you in the request message.

CreateResult

Integer

1

[Transformer 7 only] 1 if the operation succeeded, or -1 if the operation failed.

OperationFinished

Boolean

true

true if the operation succeeded, or false if the operation failed.

Reading a persisted record

Subscribe to the record’s subject. For example, to read the record with a record identifier of 'defaultblotter', subscribe to /PRIVATE/PERSISTENCE/RECORD/defaultblotter.

Writing to a persisted record

To write data to record, it must exist first. You can test whether a record exists by subscribing to it.

Writing data to a record does not return a success value. To confirm that a write was successful, subscribe to the subject beforehand and listen for the update that occurs after a successful write.

To write data to a record, follow the steps below:

  1. Subscribe to the persisted record’s subject.

    • If nodata is returned, then the record does not exist. Create the record (see Creating a persisted record) and re-subscribe to the record’s subject.

  2. To write data to the record, publish fields to the persisted record’s subject.

    • Your fields are merged with any previously contributed fields in the record.

    • You cannot delete a previously contributed field, but you can set the field to an empty value.

    • If the contribution is successful, your subscription to the record’s subject will receive an update.

Deleting a persisted record

To delete a persisted record, send the following command message to the control channel, `/PRIVATE/PERSISTENCE/CONTROL.

Request message
Field Data Type Example Description

OperationId

String

1

A unique identifier for this operation. The identifier will be returned in the control channel’s response message, and will enable you to associate the response with the operation.

Depending on your Liberator’s configuration and licence, it may be possible for a user to run concurrent sessions. The identifier you assign to OperationId must be unique across all the user’s sessions.

DeleteRecord

String

blotter

The record’s identifier.

The Persistence Service Client will respond with the following message on the control channel:

Response message
Field Data Type Example Description

OperationId

String

1

The unique identifier for this operation, as provided by you in the request message.

DeleteResult

Integer

1

[Transformer 7 only] 1 if the operation succeeded, or -1 if the operation failed.

OperationFinished

Boolean

true

true if the operation succeeded, or false if the operation failed.