Writing hybrid handlers
Prerequisites: See Writing sink handlers, before proceeding with this step. |
HybridHandler is a type of service handler for hybrid services. Hybrid handlers are used to consume data from Diffusion topics, perform any data manipulation and publish them to another Diffusion topic. Hence, it has functionality of both Diffusion topic publisher and subscriber.
As defined here,
the required configuration parameters, Publisher instance, Subscriber instance
and StateHandler instance can be injected to hybrid handlers during instantiation of
this class. The Publisher
instance can be used in the update method of the Hybrid handler,
to publish the received data, from a subscribed topic to another Diffusion topic.
The Subscriber
instance can be used to subscribe to Diffusion
topics as required.
Below is the code snippet from JsonDateAppender, which demonstrates receiving a topic update and publishing it to another topic in the update method.
@Override
public CompletableFuture<?> update(String path, String value, TopicProperties properties) {
try {
final JsonNode jsonNode = OBJECT_MAPPER.readTree(value);
if (!jsonNode.isObject()) {
return CompletableFuture.completedFuture(null);
}
final ObjectNode objectNode = (ObjectNode) jsonNode;
objectNode.put("timestamp", Instant.now().toString());
publisher
.publish(targetTopicPrefix + path, objectNode)
.whenComplete((result, ex) -> {
if (ex != null) {
LOG.error("Failed to send updated data from {}", path);
}
});
}
catch (JsonProcessingException | PayloadConversionException ex) {
LOG.error("Failed to process update from {}", path, ex);
}
return CompletableFuture.completedFuture(null);
}
Here, if the updated value is JSON, a timestamp is appended to the value and published to another topic.
Providing hybrid service specific properties
HybridHandler is an extension of both SinkHandler
and StreamingSourceHandler
. Hence, it allows developers to supply both; sink specific service properties and source specific service properties, to provide the necessary service properties to the framework, to consume and publish data to Diffusion topics.
@Override
public SinkServiceProperties getSinkServiceProperties() throws InvalidConfigurationException {
return
newSinkServicePropertiesBuilder()
.autoSubscribe(false)
.build();
}
@Override
public SourceServiceProperties getSourceServiceProperties() throws InvalidConfigurationException {
return
newSourceServicePropertiesBuilder()
.topicType(TopicType.JSON)
.build();
}