Writing sink handlers

Prerequisites: understand Implementing Service Handlers, before proceeding with this step.

SinkHandler is a type of service handler for sink services. Its purpose is to receive updates from subscribed Diffusion topics and forward them to external systems.

sink service
Figure 1. Data publishing sequence in Sink handler

In a sink service, Diffusion topics are subscribed to and when updates are received, they are converted to the format expected by the sink handler and then published to the sink handler.

By default, the Diffusion topic selector configured in the sink service is subscribed to by the framework. This behaviour can be overridden with the autoSubscribe flag in SinkServiceProperties as defined below. If overridden, a Subscriber should be used to subscribe to Diffusion topics. The subscribed topic selector could map to topics of different types, thus allowing updates received for a sink service to be of different types. Payload converters are used to convert Diffusion updates of various types into the type expected by the sink handler. According to the requirement, the sink handler should specify the expected value type as follows:

    @Override
    public Class<Object> valueType() {
        return Object.class;
    }

This method indicates that it can accept updates of any types. The framework uses this method to verify whether the payload converter, which is used for the updates from a specific topic path, generates an output of the expected type. If not, the updates for the subscribed topic path, will be ignored and eventually the topic path will be unsubscribed from.

Payload converters defined in the SinkServiceProperties or configured in the service configuration will be used to convert the updates received from the subscriptions. If payload converters are not defined for the sink service, depending on the topic type for which the update is received, the framework will use a default converter and send the update to the sink handler. The table below illustrates the type of data the default converters will produce depending on the topic type of the updates received for a topic path.

Diffusion topic type Type of output value Default Payload converter used

JSON

String

$JSON_to_String

STRING

String

Internal converter that returns received String value

INT64

Long

Internal converter that returns received Long value

DOUBLE

Double

Internal converter that returns received Double value

BINARY

byte[]

$BINARY_to_byte_array

If the default converter’s output is incompatible with the sink handler’s expected value type, updates for the topic path will be ignored.

If the converter produces data in the expected format, it will be published to the sink handler using the update method. This method receives the topic path, the converted value, and the topic properties that specify the type of the topic and its properties from which the updates have been received.

In CsvFileSinkHandler, the value received in the update method is written to a file using FileOutputStream.

    ...
    @Override
    public CompletableFuture<?> update(String diffusionTopic, String value, TopicProperties properties) {

        final CompletableFuture<?> updateCf =
            new CompletableFuture<>();

        final String fileName = constructFileName(diffusionTopic);

        try (FileOutputStream outputStream =
                 new FileOutputStream(createPathToWrite(fileName).toFile())) {

            outputStream.write(value.getBytes());
            updateCf.complete(null);
        }
        catch (IOException ex) {
            updateCf.completeExceptionally(ex);
        }

        return updateCf;
    }
    ...
Documentation tip
Since users can configure payload converters as required in the sink service configuration, in order to specify the correct converter that will produce a value of the type expected by the sink handler, they will need to know the type of data that the sink handler expects.
Ensure that this information is documented in the application user guide for the supported sink service type.

Providing sink service specific properties

The framework provides an option to SinkHandlers to specify certain properties, required to subscribe to Diffusion topics and process updates received from Diffusion topics. These are the payload converters to use to convert data from the Diffusion topic type to the format expected by sink handler and a boolean flag to specify whether the framework should subscribe to the configured topic selector.

The framework retrieves this information by invoking the SinkHandler.getSinkServiceProperties method. By default, the implementation of this method returns null, indicating that the framework should assume the use of default properties. However, if the application developer desires to specify properties different from the defaults, the method must be overridden to return a SinkServiceProperties object.

A SinkServiceProperties object may be built using a builder obtained using the DiffusionGatewayFramework.newSinkServicePropertiesBuilder method.

SinkServiceProperties for CsvFileSinkHandler is defined as follows:

    @Override
    public SinkServiceProperties getSinkServiceProperties() throws InvalidConfigurationException {
        return newSinkServicePropertiesBuilder()
            .payloadConverter("$JSON_to_CSV_String")
            .build();
    }

Here, the sink handler is specified to use a payload converter with name $JSON_to_CSV_String. This is one of the Payload converters issued in the framework.

The properties that may be set with newSinkServicePropertiesBuilder and the defaults (if not explicitly set) are as follows:-

Property Description Default

autoSubscribe

A boolean flag to determine whether the framework should subscribe to the configured topic selector when the service is started.

The default is true, meaning that the diffusionTopicSelector specified in the service configuration will be automatically subscribed to by the framework. If set to false, Diffusion topics should be explicitly subscribed as required in the SinkHandler using the Subscriber passed in the GatewayApplication.addSink method.

true

payloadConverter

Specifies a payload converter to be used to convert the Diffusion topic value to external format.

Standard conversions are used.
See here for more details.

Users have the option to configure payload converters within the framework’s configuration for specific services. These user-configured settings take precedence over those defined in the configured sink service properties.