RedisToDiffusion
RedisToDiffusion
is a Streaming Source mode service. It subscribes to Redis values and publishes the results to Diffusion topics.
There can be multiple instances of this service running within the adapter, reflecting data from multiple Redis servers to different Diffusion topic trees, using different configuration.
The RedisToDiffusion service functions by detecting and reflecting Redis value changes to Diffusion topics. Both detection and reflection are necessarily two separate phases: the adapter detects a change and subsequently reflects the new value.
The Redis server is single threaded, executing each client request in sequence. If servicing a request takes an excessive length of time it may be tracked by the Redis slow log, though it will run uninterrupted, and may negatively impact services to other clients, and raise overall server latency.
The Redis Adapter for Diffusion minimises the impact of its requests:
-
At the time of initial connection to the Redis server it scans the Redis database in batches, allowing the server time to execute other client requests between scan requests from the adapter. The size of the batch is configured with the
batchSize
property. -
The Redis adapter subscribes to Redis keyspace notifications and issues batches of Redis requests to fetch the value of each affected key, allowing the server time to execute other client requests, after which each new value is then reflected onto the Diffusion topic tree, completing the 'detect and reflect' cycle. The maximum batch size is configured with the
batchSize
property. The time taken by each cycle is measured, and if a cycle takes longer than batchTimeoutSeconds seconds it is logged as a timeout failure.
When the Redis server is under heavy load it is possible the Redis Diffusion Adapter may detect a change to the same Redis key more than once before reflecting the latest value. Therefore, the RedisToDiffusion service must not be depended on to reflect intermediary Redis values when traffic volume is high.
If a Redis value is removed, either directly or indirectly, the corresponding Diffusion topic is also removed. A Redis value can be directly removed with commands such as DEL or indirectly by popping the last value from a list value with LPOP.
Complete configuration for this service consists of framework required configuration for streaming service and this service type specific configuration.
Below is an example of configuration for an instance of the RedisToDiffusion
service. Most of the configuration’s format is dictated by the Gateway Framework. Configuration specific to this adapter lies within the "application"
object.
RedisToDiffusion
{
"serviceName": "exampleRedisService",
"description": "Example Redis Service",
"serviceType": "RedisToDiffusion",
"config": {
"framework": {
"publicationRetries": 5,
"retryIntervalMs": 5000,
"topicProperties": {
"persistencePolicy": "SERVICE",
"timeSeries": false,
"publishesValueOnly": false,
"doNotRetainValue": false
}
},
"application": {
"redisURI": "redis://localhost:6379",
"batchSize": 2000,
"batchTimeoutSeconds": 300,
"keyDelimiter": ":",
"keyFilter": "*",
"topicPathPrefix": "redis/"
},
"state": "ACTIVE"
}
}
Application configuration
The Gateway Framework configuration is detailed here. Configuration specific to the Redis Adapter for Diffusion is below.
Property |
Type |
Description |
Mandatory |
Default |
redisURI |
string |
Redis URI. Contains connection details for the Redis connections. |
No |
redis://localhost |
keyFilter |
string |
Glob style expression against which Redis keys must match to be reflected as Diffusion topics. See Redis SCAN command for examples. |
No |
* |
batchSize |
integer |
The maximum number of Redis keys scanned in one go, and the maximum number of Redis values fetched in one go |
No |
1024 |
batchTimeoutSeconds |
integer |
The maximum time, in seconds, that the adapter may take to fetch a Redis value and reflect it to a Diffusion topic |
No |
5 |
keyDelimiter |
string |
Delimiting character in Redis keys mapped to |
No |
: |
topicPathPrefix |
string |
Topic path prefixed to Diffusion topics, e.g. if |
No |
redis/ |
See Configuring the adapter for a complete example of the configuration for the adapter with configuration for RedisToDiffusion
service.
- redisURI
-
This URI encapsulates all Redis connectivity details, as specified in the documentation for the RedisURI class.
For example redisURI of rediss://myUser:hunter2@server0:1234/4
will connect securely to server0
in port 1234, as users myUsers
and use Redis database number 4.
- keyFilter
-
The Redis Adapter for Diffusion can reflect a subset of the keys in the source Redis server, for example a keyFilters of
accounts:us:checking:*
will focus the adapter to only those Redis keys startingaccounts:us:checking:
- batchSize
-
Redis is a single threaded server, wherein an excessively large request from one user may degrade service to other users. The Redis Adapter for Diffusion reflects values from Redis to Diffusion in batches to be both efficient and considerate to other Redis users.
This value may require tuning to improve performance.
- batchTimeoutSeconds
-
Each batch has
batchTimeoutSeconds
seconds before it logged as a timeout failure. This value may require tuning in performance sensitive scenarios.
Redis value to Diffusion topic conversion
string
Redis strings are represented as string topics. The adapter currently does not support Redis strings that contain bitmap data or hyperloglogs; such Redis strings are treated as regular string values.
zset
Redis sorted sets are reflected as JSON topics holding arrays, where each value-score pair is represented as an inner array, e.g.
[
["foo", 1],
["bar", 3]
]
hash
Redis hashes are reflected as JSON topics holding an object, e.g.
{
"foo": "1",
"bar": "2",
"baz": "3",
"qux": "4"
}
ReJSON-RL
Redis JSON values are reflected as Diffusion JSON topics, e.g.
{
"name":"John",
"age":30,
"colours": ["red", "green"],
"car": null
}
TSDB-TYPE
Redis time-series values are reflected as Diffusion JSON topics holding arrays, where each time-value pair is reflected as an inner array holding the timestamp and the value as numbers.
[
[1684405854177, 1],
[1684405863513, 2],
[1684405902574, 3],
[1684405904656, 3],
[1684405998399, 3.14159]
]
Performance
Performance of the Redis Adapter for Diffusion has been tested on a server with 24 vCPUs and a 5 gigabyte heap. Each supported Redis data type is independently tested from cold: where the adapter builds a new topic tree from static Redis values of around 1K size; and hot: where the Redis values are updated while the adapter updates an existing topic tree.
Type | Redis Values | Seconds | Rate | Scenario |
---|---|---|---|---|
STRING |
256000 |
5.584 |
45,845.272/s |
Cold |
STRING |
256000 |
5.916 |
43,272.481/s |
Hot |
LIST |
256000 |
56.712 |
4,514.036/s |
Cold |
LIST |
256000 |
64.631 |
3,960.948/s |
Hot |
HASH |
256000 |
36.025 |
7,106.176/s |
Cold |
HASH |
256000 |
59.350 |
4,313.395/s |
Hot |
SET |
256000 |
29.026 |
8,819.679/s |
Cold |
SET |
256000 |
59.127 |
4,329.663/s |
Hot |
ZSET |
256000 |
40.531 |
6,316.153/s |
Cold |
ZSET |
256000 |
90.424 |
2,831.107/s |
Hot |
REJSON_RL |
256000 |
73.479 |
3,483.989/s |
Cold |
REJSON_RL |
256000 |
115.179 |
2,222.627/s |
Hot |
TSDB_TYPE |
256000 |
26.321 |
9,726.074/s |
Cold |
TSDB_TYPE |
256000 |
38.652 |
6,623.202/s |
Hot |
Integration with Custom Publisher
The Redis adapter supports using Custom Publishers
with the RedisToDiffusion
service.
The custom publisher for the Redis adapter should process update of type Object
with
UpdateContext
of type RedisUpdateContext
. The RedisUpdateContext
contains the
redis key from where the update is received, in addition to the default Diffusion
topic path and default topic properties. The update supplied to the process
method of the custom publisher will be the update received from the Redis key.
If the received value is of type String
, the update supplied to the process
method will be of String
type otherwise it will be JSON
.
Below is a sample implementation of the custom publisher for Redis adapter:
public final class RedisCustomPublisher
extends CustomPublisher<Object, RedisUpdateContext> {
private final Publisher publisher;
private final String topicPrefix;
/**
* Constructor.
*/
public RedisCustomPublisher(
Publisher publisher,
Map<String, Object> context) {
super(publisher);
this.publisher = publisher;
this.topicPrefix = context.get("topicPrefix").toString();
}
@Override
public CompletableFuture<?> process(
Object update,
RedisUpdateContext updateContext) throws PayloadConversionException {
String redisKey = updateContext.getRedisKey();
TopicProperties topicProperties = updateContext.getTopicProperties();
String diffusionTopic = updateContext.getDiffusionTopic();
// process update as required, or simply publish as following
return publisher
.publish(
topicPrefix + "/" + redisKey + "/" + updateContext.getDiffusionTopic(),
update,
updateContext.getTopicProperties());
}
}
The RedisUpdateContext
is available in the Redis adapter JAR, which should be supplied
to the custom publisher as a dependency. For details on including the adapter JAR
in the custom publisher, refer to the userguide for Custom Publishers.