STREAM_TO_REMOTE
The STREAM_TO_REMOTE
service type can be used to subscribe to Diffusion topics
in the local server and publish their updates to Diffusion topics on the remote
server. Thus, for this service type, the local server is the source server and
the remote server is the target server. This service type also supports the
on-demand publication feature, which can be enabled
to publish to Diffusion topics in the remote server only if there is a demand or a trigger.
There can be multiple instances of this service type added to the adapter to consume data from different Diffusion topics on the local server and publish the updates to Diffusion topics on the remote server using different configurations.
Complete configuration for this service consists of framework required configuration for sink service and this service type specific configuration.
{
"serviceName": "tradeConsumerReplicated",
"description": "Replicates from local topic and publishes to remote",
"serviceType": "STREAM_TO_REMOTE",
"config": {
"sharedConfigName": "remoteDiffusionServer",
"framework": {
"diffusionTopicSelector": "?trade//"
},
"application": {
"topicMappingFunction": "fromLocal/replicated/<path(0)>",
"publicationRetries": 5,
"retryIntervalMs": 5000
}
}
}
Framework configuration
Details about supported configuration parameters for framework configuration for Sink service can be found here.
Application configuration
The supported application configuration parameters for this service type are defined in the table below:
Name | Type | Description | Mandatory | Default value |
---|---|---|---|---|
String |
A function to map the Diffusion topic path on the local server from which an update is received, to construct a topic path used to publish updates on the remote server. |
No |
<path(0)> |
|
publicationRetries |
integer |
Number of retries that will be used to attempt to publish an update to a Diffusion topic when transient errors occur. |
No |
5 |
retryIntervalMs |
integer |
Time interval in milliseconds to retry publication attempts if publication fails due to retryable or transient errors. |
No |
5000 |
updateMode |
UpdateMode |
The update mode to be used when publishing to Diffusion topics on the remote server. This could be: - - - |
No |
STREAMING |
topicProperties |
TopicProperties |
The properties to be used to create the Diffusion topic in the remote Diffusion server. If the 'topicType' value in the topicProperties is not explicitly specified, the topic type will be inferred from the topic type of the source topic in the local Diffusion server. If 'payloadConverters' are specified in the framework configuration, the topic type should also be specified accordingly. See here for details about this configuration. See the description below for more details. |
No |
null |
removeStaleTopic |
boolean |
A boolean value to specify whether to remove the topic created in the remote server, if the source topic in the local server becomes unavailable. |
No |
true |
onDemandPublication |
Configuration to specify on-demand topic publication. |
No |
null |
Apart from the above configuration parameters, this service also requires the remote server connection details. These can be supplied as part of the service configuration or specified as shared configuration and referred to in the service configuration as illustrated above. See here for details on supported Diffusion server configuration parameters. Below is a sample configuration that contains the remote server connection details within the service configuration instead of specifying it in the shared configuration block:
{
"serviceName": "tradeConsumerReplicated",
"description": "Replicates from local topic and publishes to remote",
"serviceType": "STREAM_TO_REMOTE",
"config": {
"framework": {
"diffusionTopicSelector": "?trade//"
},
"application": {
"diffusion": {
"url": "ws://localhost:7080",
"principal": "admin",
"password": "password",
"reconnectIntervalMs": 5000
},
"topicMappingFunction": "fromLocal/replicated/<path(0)>",
"publicationRetries": 5,
"retryIntervalMs": 5000
}
}
}
Data subscription, conversion and publication to remote server
When a STREAM_TO_REMOTE
service is added and started, the framework subscribes
to any topic paths that match the configured Diffusion topic selector in the
local server, defined in the diffusionTopicSelector
within the framework configuration
(if onDemandPublication
is specified in the application configuration, the subscription
occurs only after there is a demand or a trigger in the remote server). Any updates
to the subscribed Diffusion topics are received and converted as configured and published
to a Diffusion topic in the remote server which is defined with topicMappingFunction
.
See here for more details on how the source topic
path is mapped to a target topic path using the given mapping function.
By default, the topic type of the source topic will be replicated to the target topic.
If the source topic is time series, time-series-specific properties which are
timeSeriesRetainedRange
and timeSeriesSubscriptionRange
will also be replicated.
However, if the topicProperties
configuration for the service is also configured
to set the target topic to be time series, time-series-specific properties specified in
the configuration will be used instead of those in the source topic to create the
target topic.
This behavior can be overridden by explicitly setting the topicType
parameter
in the topicProperties
for the service. If payloadConverters
configuration
is specified in the framework configuration, the topicType
should also be specified accordingly.
If the configured payload converter creates an update which is not compatible with a Diffusion topic type, a payload conversion exception will be logged.
For a deeper understanding of how payload converters are used, please refer here. See here for the list of all issued payload converters by the framework. |
As an illustration, consider the following sample configuration for a service of
type STREAM_TO_REMOTE
. This service subscribes to the Diffusion topic doubles
,
which produces updates of type DOUBLE
. The topicType
parameter is not explicitly
specified. Hence, the type of topic to be published to the remote server will also
be of type DOUBLE
. In this example, since payloadConverters
is not explicitly
configured, an internal default payload converter will be used. This converter directly
passes the Double
value to the service, which is then published to the Diffusion topic.
Since topicProperties
is not specified, default topic properties will be used.
{
"serviceName": "doublesPublisher",
"serviceType": "STREAM_TO_REMOTE",
"description": "Consumes from Doubles Diffusion topic and publishes to remote server",
"config": {
"sharedConfigName": "remoteDiffusionServer",
"framework": {
"diffusionTopicSelector": "doubles"
}
}
}
With the same doubles
topic of DOUBLE
topic type in the local server, in the
example below, a custom payload converter is specified to be used that transforms
the data from the source topic, and the topic type is specified to be String
.
Thus, the transformed update from the source topic will be converted to STRING
topic type and published to the remote server. If the topicType
is not explicitly
specified, the target topic to be created in the remote server is expected to be of
DOUBLE
type. Hence, if the supplied payload converter does not produce the output
that is compatible with DOUBLE
, a payload converter exception will be logged.
{
"serviceName": "doublesPublisher",
"serviceType": "STREAM_TO_REMOTE",
"description": "Consumes from Doubles Diffusion topic, transforms and publishes to remote server",
"config": {
"sharedConfigName": "remoteDiffusionServer",
"framework": {
"diffusionTopicSelector": "doubles",
"payloadConverters": [
{
"name": "customPayloadConverterThatTransformsData"
}
]
},
"application": {
"topicType": "STRING"
}
}
}
If the requirement is simply to publish to STRING
topics in the remote server
from doubles
topics in the local server, the configuration can be simplified as follows:
{
"serviceName": "doublesPublisher",
"serviceType": "STREAM_TO_REMOTE",
"description": "Consumes from Doubles Diffusion topic, and publishes to STRING topics in the remote server",
"config": {
"sharedConfigName": "remoteDiffusionServer",
"framework": {
"diffusionTopicSelector": "doubles"
},
"application": {
"topicType": "STRING"
}
}
}
See Configuring the adapter for a complete example of the configuration for the adapter with configuration for STREAM_TO_REMOTE
services.
See Diffusion session management
to understand how remote Diffusion sessions are created
and used for the services of STREAM_FROM_REMOTE
service type.