Integration with custom publisher
The Diffusion adapter supports using Custom Publishers with the STREAM_FROM_REMOTE
and LOCAL_TO_LOCAL
services.
The custom publisher for the Diffusion adapter should process update of type Object
with
UpdateContext
of type DiffusionUpdateContext
. The DiffusionUpdateContext
contains
the source Diffusion topic, in addition to the default Diffusion topic path and
default topic properties. The update supplied to the process
method of the
custom publisher will be the update received from the source topic and will be
of same type as of source topic.
Below is a sample implementation of the custom publisher for Diffusion adapter:
public final class DiffusionCustomPublisher
extends CustomPublisher<Object, DiffusionUpdateContext> {
private final Publisher publisher;
private final PayloadConverter<Object, JSON> objectToJsonConverter;
private final PayloadConverter<Object, String> objectToStringConverter;
/**
* Constructor.
*/
public DiffusionCustomPublisher(
Publisher publisher,
Map<String, Object> configContext,
Map<String, ? extends PayloadConverter<?, ?>> payloadConverterMap) {
super(publisher);
this.publisher = publisher;
this.objectToJsonConverter =
(PayloadConverter<Object, JSON>) payloadConverterMap.get(ObjectToJSONConverter.getName());
if (objectToJsonConverter == null) {
throw new IllegalArgumentException(
"No object to JSON converter found");
}
this.objectToStringConverter =
(PayloadConverter<Object, String>) payloadConverterMap.get(ObjectToStringConverter.getName());
if (objectToStringConverter == null) {
throw new IllegalArgumentException(
"No object to String converter found");
}
}
@Override
public CompletableFuture<?> process(
Object update,
DiffusionUpdateContext updateContext) throws PayloadConversionException {
String sourceDiffusionTopic = updateContext.getSourceDiffusionTopic();
TopicProperties topicProperties = updateContext.getTopicProperties();
String diffusionTopic = updateContext.getDiffusionTopic();
// TODO process the update as required, convert with the payload
// converters and publish using the 'publisher'
return CompletableFuture.completedFuture(null);
}
}
In the example above, the custom publisher expects instances of payload converters
to convert Object
to JSON
and Object
to String
type. An adapter user should
configure the customPublisher for the service to include the required payload converters
configuration. Expecting a payload converter in the payload converter map argument within
the constructor is optional and can be expected and documented as required.
The DiffusionUpdateContext
is available in the Diffusion adapter JAR, which should be supplied
to the custom publisher as a dependency. For details on including the adapter JAR
in the custom publisher, refer to the userguide for Custom Publishers.