Defining payload converters for service handlers

Payload converters for source services

Each source service utilises a payload converter to transform data into the required format for publication to the Diffusion topic. The Payload converter in source service properties can be explicitly defined as follows, but it is not mandatory.

    @Override
    public SourceServiceProperties getSourceServiceProperties() throws InvalidConfigurationException {
        return
            newSourceServicePropertiesBuilder()
                .topicType(TopicType.JSON)
                .payloadConverter("$CSV_to_JSON")
                .build();
    }

If defined together with the topicType, this converter will be used by the source service, and the topic type will be used to validate that the defined payload converter generates data that is compatible with the topic type.

If neither a payload converter nor a topic type is defined, by default, the $Object_to_JSON converter will be used. Hence, by default, it is expected that the source service handler will publish JSON String data.

If a payload converter is not explicitly specified but a topic type is defined, the converter to be used will be inferred from the topicType.

More than one payload converter can be specified, as explained here. These associated payload converters will be applied to all services associated with this service handler unless the user configures the service to use a specific set of payload converters.

image
Figure 1. Use of payload converter in a source service

Various APIs are available for specifying the payload converter in source service properties. Multiple payload converters can be specified, and they will be executed in sequence to convert data. Refer to the Java doc for more details.

Here are a few examples of setting the payload converter in ServiceProperties:

Example 1. Example 1: Defining a payload converter using its name
    ...
    @Override
    public SourceServiceProperties getSourceServiceProperties() throws InvalidConfigurationException {
        return
            newSourceServicePropertiesBuilder()
                .payloadConverter("$CSV_to_JSON")
                .build();
    }
    ...

In the above code, $CSV_to_JSON is an issued payload converter This converter can be set with or without configuration. In this case, it is set without any configuration. With this approach, an instance of $CSV_to_JSON converter will be created for the service by the framework and used when publishing data to Diffusion to convert CSV data into JSON.

Example 2. Example 2: Defining a payload converter with configuration parameters
    ...
    @Override
    public SourceServiceProperties getSourceServiceProperties()
        throws InvalidConfigurationException {
        return
            newSourceServicePropertiesBuilder()
                .payloadConverter(
                    "$CSV_to_JSON",
                    Collections.singletonMap(
                        "headers",
                        Arrays.asList("name", "age", "profession")))
                .build();
    }
    ...

In the above code, the $CSV_to_JSON converter is used by specifying its configuration parameters. With this approach, an instance of $CSV_to_JSON converter will be created, using the configured parameters, by the framework and used in the service when publishing data to Diffusion to convert CSV data into JSON.

Example 3. Example 3: Defining a payload converter, by passing a converter instance
    ...
    @Override
    public SourceServiceProperties getSourceServiceProperties()
        throws InvalidConfigurationException {

        return newSourceServicePropertiesBuilder()
            .payloadConverter(new CsvToJSONConverter(customCsvMapper, customCsvSchema))
            .build();
    }
    ...

In this example, an instance of CsvToJSONConverter is created with the required parameters and passed into the SourceServicePropertiesBuilder. With this approach, the passed converter instance will be used in the service when publishing data to Diffusion to convert CSV data into JSON.

Following this approach, it is possible to instantiate an anonymous class that implements the PayloadConverter, as follows:

    ...
    @Override
    public SourceServiceProperties getSourceServiceProperties()
        throws InvalidConfigurationException {

        return
            newSourceServicePropertiesBuilder()
                .payloadConverter(
                    new PayloadConverter<Long, String>() {
                        @Override
                        public String convert(Long input) throws PayloadConversionException {
                            return String.valueOf(input);
                        }
                    })
                .build();
    }
    ...
Lambda expressions are not supported for the passing of converter instances.
When defining an issued payload converter in service properties, consult the converter’s documentation to understand whether it requires configuration parameters and, if so, which configuration parameters are defined.

Payload converters for sink services

Each sink service also utilises a payload converter to convert data from the subscribed Diffusion topic into the format expected by the sink handler. Similar to source services, defining a payload converter in sink service properties can be done as follows, but it is not mandatory.

    @Override
    public SinkServiceProperties getSinkServiceProperties() throws InvalidConfigurationException {
        return newSinkServicePropertiesBuilder()
            .payloadConverter("$JSON_to_CSV_String")
            .build();
    }

More than one payload converter can be specified, as explained in here. These associated payload converters will be applied to all services associated with this service handler, unless the user configures the service to use a specific set of payload converters.

image
Figure 2. Use of payload converter in a sink service

Similar to SourceServicePropertiesBuilder, there are similar APIs available for specifying the payload converter in sink service properties. Multiple payload converters can be specified, and they will be executed in sequence to convert data. Refer to the Java doc for more details.

Example 4. Defining payload converter with its name in getSinkServiceProperties for sink service
    ...
    @Override
    public SinkServiceProperties getSinkServiceProperties() throws InvalidConfigurationException {
        return newSinkServicePropertiesBuilder()
            .payloadConverter("$JSON_to_CSV_String")
            .build();
    }
    ...

In the above code snippet, $JSON_to_CSV_String is also an issued payload converter. It will be used by the sink service instance when sending updates to the sink handler to convert JSON data from Diffusion into CSV string format.

Similar to SourceServicePropertiesBuilder, for SinkServicePropertiesBuilder, payload converters can be specified with or without configuration parameters or by passing the converter instance.

Defining multiple payload converters

As defined in payload converter overview, a chain of converters can be specified for a service handler in ServiceProperties. Below is an example of setting such a chain of converters in a sink service property. The similar API applies for SourceServicePropertiesBuilder.

    ...
    @Override
    public SinkServiceProperties getSinkServiceProperties() throws InvalidConfigurationException {
        return
            newSinkServicePropertiesBuilder()
                .payloadConverter(OBJECT_TO_LONG)
                .payloadConverter(OBJECT_TO_JSON)
                .build();
    }
    ...
Payload converter names for converters issued in the framework are defined as Java constant variables and can be accessed in the PayloadConverter interface.