RedisToDiffusion

RedisToDiffusion is a Streaming Source mode service. It subscribes to Redis values and publishes the results to Diffusion topics.

There can be multiple instances of this service running within the adapter, reflecting data from multiple Redis servers to different Diffusion topic trees, using different configuration.

The RedisToDiffusion service functions by detecting and reflecting Redis value changes to Diffusion topics. Both detection and reflection are necessarily two separate phases: the adapter detects a change and subsequently reflects the new value.

The Redis server is single threaded, executing each client request in sequence. If servicing a request takes an excessive length of time it may be tracked by the Redis slow log, though it will run uninterrupted, and may negatively impact services to other clients, and raise overall server latency.

The Redis Adapter for Diffusion minimises the impact of its requests:

  1. At the time of initial connection to the Redis server it scans the Redis database in batches, allowing the server time to execute other client requests between scan requests from the adapter. The size of the batch is configured with the batchSize property.

  2. The Redis adapter subscribes to Redis keyspace notifications and issues batches of Redis requests to fetch the value of each affected key, allowing the server time to execute other client requests, after which each new value is then reflected onto the Diffusion topic tree, completing the 'detect and reflect' cycle. The maximum batch size is configured with the batchSize property. The time taken by each cycle is measured, and if a cycle takes longer than batchTimeoutSeconds seconds it is logged as a timeout failure.

When the Redis server is under heavy load it is possible the Redis Diffusion Adapter may detect a change to the same Redis key more than once before reflecting the latest value. Therefore, the RedisToDiffusion service must not be depended on to reflect intermediary Redis values when traffic volume is high.

If a Redis value is removed, either directly or indirectly, the corresponding Diffusion topic is also removed. A Redis value can be directly removed with commands such as DEL or indirectly by popping the last value from a list value with LPOP.

Complete configuration for this service consists of framework required configuration for streaming service and this service type specific configuration.

Below is an example of configuration for an instance of the RedisToDiffusion service. Most of the configuration’s format is dictated by the Gateway Framework. Configuration specific to this adapter lies within the "application" object.

Example 1. Sample configuration for a service of type RedisToDiffusion
{
  "serviceName": "exampleRedisService",
  "description": "Example Redis Service",
  "serviceType": "RedisToDiffusion",
  "config": {
    "framework": {
      "publicationRetries": 5,
      "retryIntervalMs": 5000,
      "topicProperties": {
        "persistencePolicy": "SERVICE",
        "timeSeries": false,
        "publishesValueOnly": false,
        "doNotRetainValue": false
      }
    },
    "application": {
      "redisURI": "redis://localhost:6379",
      "batchSize": 2000,
      "batchTimeoutSeconds": 300,
      "keyDelimiter": ":",
      "keyFilter": "*",
      "topicPathPrefix": "redis/"
    },
    "state": "ACTIVE"
  }
}

Application configuration

The Gateway Framework configuration is detailed here. Configuration specific to the Redis Adapter for Diffusion is below.

Property

Type

Description

Mandatory

Default

redisURI

string

Redis URI. Contains connection details for the Redis connections.

No

redis://localhost

keyFilter

string

Glob style expression against which Redis keys must match to be reflected as Diffusion topics. See Redis SCAN command for examples.

No

*

batchSize

integer

The maximum number of Redis keys scanned in one go, and the maximum number of Redis values fetched in one go

No

1024

batchTimeoutSeconds

integer

The maximum time, in seconds, that the adapter may take to fetch a Redis value and reflect it to a Diffusion topic

No

5

keyDelimiter

string

Delimiting character in Redis keys mapped to / in Diffusion topics paths.

No

:

topicPathPrefix

string

Topic path prefixed to Diffusion topics, e.g. if topicPathPrefix is redis/0/ then Redis key foo:bar is reflected as redis/0/foo/bar .

No

redis/

See Configuring the adapter for a complete example of the configuration for the adapter with configuration for RedisToDiffusion service.

redisURI

This URI encapsulates all Redis connectivity details, as specified in the documentation for the RedisURI class.

For example redisURI of rediss://myUser:hunter2@server0:1234/4 will connect securely to server0 in port 1234, as users myUsers and use Redis database number 4.

keyFilter

The Redis Adapter for Diffusion can reflect a subset of the keys in the source Redis server, for example a keyFilters of accounts:us:checking:* will focus the adapter to only those Redis keys starting accounts:us:checking:

batchSize

Redis is a single threaded server, wherein an excessively large request from one user may degrade service to other users. The Redis Adapter for Diffusion reflects values from Redis to Diffusion in batches to be both efficient and considerate to other Redis users.

This value may require tuning to improve performance.

batchTimeoutSeconds

Each batch has batchTimeoutSeconds seconds before it logged as a timeout failure. This value may require tuning in performance sensitive scenarios.

Redis value to Diffusion topic conversion

string

Redis strings are represented as string topics. The adapter currently does not support Redis strings that contain bitmap data or hyperloglogs; such Redis strings are treated as regular string values.

list

Redis lists are reflected as JSON topics holding arrays, e.g. ["thing one", "thing two"].

set

Redis sets are reflected as JSON topics holding arrays e.g. ["red", "green", "blue"].

zset

Redis sorted sets are reflected as JSON topics holding arrays, where each value-score pair is represented as an inner array, e.g.

[
 ["foo", 1],
 ["bar", 3]
]

hash

Redis hashes are reflected as JSON topics holding an object, e.g.

{
  "foo": "1",
  "bar": "2",
  "baz": "3",
  "qux": "4"
}

ReJSON-RL

Redis JSON values are reflected as Diffusion JSON topics, e.g.

{
    "name":"John",
    "age":30,
    "colours": ["red", "green"],
    "car": null
}

TSDB-TYPE

Redis time-series values are reflected as Diffusion JSON topics holding arrays, where each time-value pair is reflected as an inner array holding the timestamp and the value as numbers.

[
  [1684405854177, 1],
  [1684405863513, 2],
  [1684405902574, 3],
  [1684405904656, 3],
  [1684405998399, 3.14159]
]

Unsupported Redis Data Types

The RedisToDiffusion service does not currently support Redis Streams, Bitmaps, or probabilistic types such as Hyperloglogs.

Performance

Performance of the Redis Adapter for Diffusion has been tested on a server with 24 vCPUs and a 5 gigabyte heap. Each supported Redis data type is independently tested from cold: where the adapter builds a new topic tree from static Redis values of around 1K size; and hot: where the Redis values are updated while the adapter updates an existing topic tree.

Type Redis Values Seconds Rate Scenario

STRING

256000

5.584

45,845.272/s

Cold

STRING

256000

5.916

43,272.481/s

Hot

LIST

256000

56.712

4,514.036/s

Cold

LIST

256000

64.631

3,960.948/s

Hot

HASH

256000

36.025

7,106.176/s

Cold

HASH

256000

59.350

4,313.395/s

Hot

SET

256000

29.026

8,819.679/s

Cold

SET

256000

59.127

4,329.663/s

Hot

ZSET

256000

40.531

6,316.153/s

Cold

ZSET

256000

90.424

2,831.107/s

Hot

REJSON_RL

256000

73.479

3,483.989/s

Cold

REJSON_RL

256000

115.179

2,222.627/s

Hot

TSDB_TYPE

256000

26.321

9,726.074/s

Cold

TSDB_TYPE

256000

38.652

6,623.202/s

Hot

Integration with Custom Publisher

The Redis adapter supports using Custom Publishers with the RedisToDiffusion service.

The custom publisher for the Redis adapter should process update of type Object with UpdateContext of type RedisUpdateContext. The RedisUpdateContext contains the redis key from where the update is received, in addition to the default Diffusion topic path and default topic properties. The update supplied to the process method of the custom publisher will be the update received from the Redis key. If the received value is of type String, the update supplied to the process method will be of String type otherwise it will be JSON.

Below is a sample implementation of the custom publisher for Redis adapter:

public final class RedisCustomPublisher
    extends CustomPublisher<Object, RedisUpdateContext> {

    private final Publisher publisher;
    private final String topicPrefix;

    /**
     * Constructor.
     */
    public RedisCustomPublisher(
        Publisher publisher,
        Map<String, Object> context) {

        super(publisher);
        this.publisher = publisher;
        this.topicPrefix = context.get("topicPrefix").toString();
    }

    @Override
    public CompletableFuture<?> process(
        Object update,
        RedisUpdateContext updateContext) throws PayloadConversionException {

        String redisKey = updateContext.getRedisKey();
        TopicProperties topicProperties = updateContext.getTopicProperties();
        String diffusionTopic = updateContext.getDiffusionTopic();

        // process update as required, or simply publish as following
        return publisher
            .publish(
                topicPrefix + "/" + redisKey + "/" + updateContext.getDiffusionTopic(),
                update,
                updateContext.getTopicProperties());
    }
}

The RedisUpdateContext is available in the Redis adapter JAR, which should be supplied to the custom publisher as a dependency. For details on including the adapter JAR in the custom publisher, refer to the userguide for Custom Publishers.