Writing sink handlers
Prerequisites: understand Implementing Service Handlers, before proceeding with this step. |
SinkHandler is a type of service handler for sink services. Its purpose is to receive updates from subscribed Diffusion topics and forward them to external systems.
In a sink service, Diffusion topics are subscribed to and when updates are received, they are converted to the format expected by the sink handler and then published to the sink handler.
By default, the Diffusion topic selector configured in the sink service is
subscribed to by the framework. This behaviour can be overridden
with the autoSubscribe
flag in SinkServiceProperties
as defined below.
If overridden, a Subscriber
should be used to subscribe to Diffusion topics.
The subscribed topic selector could map to topics of different types, thus
allowing updates received for a sink service to be of different types. Payload converters
are used to convert Diffusion updates of various types into the type expected by
the sink handler. According to the requirement, the sink handler should specify the expected value type as follows:
@Override
public Class<Object> valueType() {
return Object.class;
}
This method indicates that it can accept updates of any types. The framework uses this method to verify whether the payload converter, which is used for the updates from a specific topic path, generates an output of the expected type. If not, the updates for the subscribed topic path, will be ignored and eventually the topic path will be unsubscribed from.
Payload converters defined in the SinkServiceProperties
or configured in the
service configuration will be used to convert the updates received from the subscriptions.
If payload converters are not defined for the sink service, depending on the topic
type for which the update is received, the framework will use a default converter
and send the update to the sink handler.
The table below illustrates the type of data the default converters will produce
depending on the topic type of the updates received for a topic path.
Diffusion topic type | Type of output value | Default Payload converter used |
---|---|---|
JSON |
String |
$JSON_to_String |
STRING |
String |
Internal converter that returns received String value |
INT64 |
Long |
Internal converter that returns received Long value |
DOUBLE |
Double |
Internal converter that returns received Double value |
BINARY |
byte[] |
$BINARY_to_byte_array |
If the default converter’s output is incompatible with the sink handler’s expected value type, updates for the topic path will be ignored.
If the converter produces data in the expected format, it will be published to
the sink handler using the update
method. This method receives the topic path,
the converted value, and the topic properties that specify the type of the topic
and its properties from which the updates have been received.
In CsvFileSinkHandler, the value received in the update method is written to a file using FileOutputStream.
...
@Override
public CompletableFuture<?> update(String diffusionTopic, String value, TopicProperties properties) {
final CompletableFuture<?> updateCf =
new CompletableFuture<>();
final String fileName = constructFileName(diffusionTopic);
try (FileOutputStream outputStream =
new FileOutputStream(createPathToWrite(fileName).toFile())) {
outputStream.write(value.getBytes());
updateCf.complete(null);
}
catch (IOException ex) {
updateCf.completeExceptionally(ex);
}
return updateCf;
}
...
Documentation tip
Since users can configure payload converters as required in the sink service configuration, in order to specify the correct converter that will produce a value of the type expected by the sink handler, they will need to know the type of data that the sink handler expects.Ensure that this information is documented in the application user guide for the supported sink service type. |
Providing sink service specific properties
The framework provides an option to SinkHandlers to specify certain properties, required to subscribe to Diffusion topics and process updates received from Diffusion topics. These are the payload converters to use to convert data from the Diffusion topic type to the format expected by sink handler and a boolean flag to specify whether the framework should subscribe to the configured topic selector.
The framework retrieves this information by invoking the SinkHandler.getSinkServiceProperties method. By default, the implementation of this method returns null, indicating that the framework should assume the use of default properties. However, if the application developer desires to specify properties different from the defaults, the method must be overridden to return a SinkServiceProperties
object.
A SinkServiceProperties
object may be built using a builder obtained
using the DiffusionGatewayFramework.newSinkServicePropertiesBuilder
method.
SinkServiceProperties for CsvFileSinkHandler is defined as follows:
@Override
public SinkServiceProperties getSinkServiceProperties() throws InvalidConfigurationException {
return newSinkServicePropertiesBuilder()
.payloadConverter("$JSON_to_CSV_String")
.build();
}
Here, the sink handler is specified to use a payload converter with name $JSON_to_CSV_String. This is one of the Payload converters issued in the framework.
The properties that may be set with newSinkServicePropertiesBuilder and the defaults (if not explicitly set) are as follows:-
Property | Description | Default |
---|---|---|
|
A boolean flag to determine whether the framework should subscribe
to the configured topic selector when the service is started. The default is true,
meaning that the |
true |
|
Specifies a payload converter to be used to convert the Diffusion topic value to external format. |
Standard conversions are used. |
Users have the option to configure payload converters within the framework’s configuration for specific services. These user-configured settings take precedence over those defined in the configured sink service properties. |