Integration with custom publisher

The Diffusion adapter supports using Custom Publishers with the STREAM_FROM_REMOTE and LOCAL_TO_LOCAL services.

The custom publisher for the Diffusion adapter should process update of type Object with UpdateContext of type DiffusionUpdateContext. The DiffusionUpdateContext contains the source Diffusion topic, in addition to the default Diffusion topic path and default topic properties. The update supplied to the process method of the custom publisher will be the update received from the source topic and will be of same type as of source topic.

Below is a sample implementation of the custom publisher for Diffusion adapter:

public final class DiffusionCustomPublisher
    extends CustomPublisher<Object, DiffusionUpdateContext> {

    private final Publisher publisher;
    private final PayloadConverter<Object, JSON> objectToJsonConverter;
    private final PayloadConverter<Object, String> objectToStringConverter;

    /**
     * Constructor.
     */
    public DiffusionCustomPublisher(
        Publisher publisher,
        Map<String, Object> configContext,
        Map<String, ? extends PayloadConverter<?, ?>> payloadConverterMap) {

        super(publisher);
        this.publisher = publisher;
        this.objectToJsonConverter =
            (PayloadConverter<Object, JSON>) payloadConverterMap.get(ObjectToJSONConverter.getName());

        if (objectToJsonConverter == null) {
            throw new IllegalArgumentException(
                "No object to JSON converter found");
        }

        this.objectToStringConverter =
            (PayloadConverter<Object, String>) payloadConverterMap.get(ObjectToStringConverter.getName());

        if (objectToStringConverter == null) {
            throw new IllegalArgumentException(
                "No object to String converter found");
        }
    }

    @Override
    public CompletableFuture<?> process(
        Object update,
        DiffusionUpdateContext updateContext) throws PayloadConversionException {

        String sourceDiffusionTopic = updateContext.getSourceDiffusionTopic();
        TopicProperties topicProperties = updateContext.getTopicProperties();
        String diffusionTopic = updateContext.getDiffusionTopic();

        // TODO process the update as required, convert with the payload
        //  converters and publish using the 'publisher'

        return CompletableFuture.completedFuture(null);
    }
}

In the example above, the custom publisher expects instances of payload converters to convert Object to JSON and Object to String type. An adapter user should configure the customPublisher for the service to include the required payload converters configuration. Expecting a payload converter in the payload converter map argument within the constructor is optional and can be expected and documented as required.

The DiffusionUpdateContext is available in the Diffusion adapter JAR, which should be supplied to the custom publisher as a dependency. For details on including the adapter JAR in the custom publisher, refer to the userguide for Custom Publishers.