Custom Publisher
By default, for source and hybrid services, a Publisher
instance is supplied by
the framework which can be used to perform any operation the publisher allows. If
the requirement is to process an update before calling the publish
method, a developer
of an application can do so in the service handler. The update could be divided into
multiple values and be published to multiple Diffusion topics or, based on the update,
a Diffusion topic could be removed. However, if an application is already implemented,
and the default publication of an update needs to be overridden, a CustomPublisher
can be implemented and used with the application. Using a CustomPublisher
is
an advanced option and is sensible only to override the default publication mechanism
in an application. The custom publisher should be application-specific, as it should
be built to process application-specific updates with application specific
UpdateContext
. To use a CustomPublisher
a Gateway application has to support
its usage. All the DiffusionData provided Gateway adapters supports CustomPublisher
.
Supporting CustomPublisher in an application
The first step in supporting a CustomPublisher
in an application is to identify
any update-specific context that can be supplied with the update into the process
method of the CustomPublisher. An implementation of UpdateContext
should then be
added containing such contexts. For example., for Kafka adapter, the update
type can be ConsumerRecord
, and its update context could contain a key type and a
value type.
For the source or hybrid services in an application, the application must check
if the publisher supplied to the add
method of such service types is an instance
of CustomPublisher
. In such cases, the publisher should be cast to CustomPublisher
to use the process
method in the service handler.
private static CompletableFuture<?> publish(
Publisher publisher,
String defaultTopic,
Object update) {
try {
if (publisher instanceof CustomPublisher) {
return ((CustomPublisher) publisher).process(
update,
new UpdateContextImpl(
diffusionTopic,
customPublisher.getConfiguredTopicProperties(),
someUpdateContext));
}
else {
return publisher.publish(defaultTopic, update);
}
}
catch (PayloadConversionException ex) {
LOG.error("Failed to process update", ex);
}
return CompletableFuture.completedFuture(null);
}
As presented above, the application should support publication for services that are configured with or without a custom publisher, as the use of a custom publisher for a service is dependent on the user’s requirement. If an application supports using a custom publisher for source or hybrid services, users can specify the use of a custom publisher for a specific service by specifying it in the configuration
Implementing CustomPublisher
CustomPublisher
is an abstract class and provides an abstract method process
that needs to be implemented by a subclass. The subclass should be made available
as a standalone JAR that can then be used with a Gateway application.
Within the abstract method, logic to process an update for the service needs to be
implemented. This method accepts an update to process and an UpdateContext
.
The update context is application-specific and is implemented within the Gateway application that supports custom publisher. Hence, the CustomPublisher should include the Gateway application JAR as its dependency so that, the application-specific update context is accessible to the CustomPublisher. With maven, this can be achieved by downloading the application JAR to disk and including it in the POM file with the system scope as follows:
<dependency>
<groupId>com.diffusiondata.gateway.adapter.kafka</groupId>
<artifactId>kafka-adapter</artifactId>
<version>1.0</version>
<scope>system</scope>
<systemPath>/builds/engineering/gateway/gateway-framework/user-guide/src/main/resources/kafka-adapter-1.0.0.jar</systemPath>
</dependency>
An example implementation of the abstract method is provided below:
@Override
public CompletableFuture<?> process(
String update,
ApplicationUpdateContext updateContext) throws PayloadConversionException {
final Map<String, Object> topicToUpdatesMap = getFinalUpdates(update, context);
topicToUpdatesMap.forEach((topic, value) -> {
try {
publisher
.publish(topic, value)
.whenComplete((result, throwable) -> {
if (throwable != null) {
LOG.error("Failed to publish update to {} topic", topic, throwable);
}
});
}
catch (PayloadConversionException ex) {
LOG.error("Failed to convert update for {} topic", topic, ex);
}
});
return CompletableFuture.completedFuture(null);
}
In the example above, an update is processed using the supplied context to create a map of
Diffusion topic paths and corresponding updates. Each topic is then published with its update.
The context in this process
method is supplied by the application and is update-specific.
If such contexts are required, the user guide of application that supports the custom
publisher should be consulted to identify different contexts that are supplied with each update.
The custom publisher specific context required during the construction of the publisher
can be specified as a requirement of the publisher configuration
that a user needs to specify in the service configuration. See here
for details on how a CustomPublisher
can be configured to be used for a service.
In addition, a custom publisher can also use PayloadConverter
to perform any required
update conversion. For details on the construction of the subclass, with and without
using payload converters and on providing
further context for it, see the Javadoc of the CustomPublisher
class.
Below is an example of a custom publisher implementation that requires context and payload converters for processing updates.
public final class ApplicationCustomPublisher
extends CustomPublisher<String, ApplicationUpdateContext> {
private final Publisher publisher;
private final PayloadConverter<Object, JSON> objectToJsonConverter;
private final PayloadConverter<Object, String> objectToStringConverter;
private final String someContext;
/**
* Constructor.
*/
public ApplicationCustomPublisher(
Publisher publisher,
Map<String, Object> configContext,
Map<String, ? extends PayloadConverter<?, ?>> payloadConverterMap) {
super(publisher);
this.publisher = publisher;
this.someContext = configContext.get("someContext").toString();
this.objectToJsonConverter =
(PayloadConverter<Object, JSON>) payloadConverterMap.get("$Object_to_JSON");
if (objectToJsonConverter == null) {
throw new IllegalArgumentException(
"No object to JSON converter found");
}
this.objectToStringConverter =
(PayloadConverter<Object, String>) payloadConverterMap.get("$Object_to_String");
if (objectToStringConverter == null) {
throw new IllegalArgumentException(
"No object to String converter found");
}
}
@Override
public CompletableFuture<?> process(
String update,
ApplicationUpdateContext updateContext) throws PayloadConversionException {
String someUpdateContext = updateContext.getUpdateContext();
TopicProperties topicProperties = updateContext.getTopicProperties();
String diffusionTopic = updateContext.getDiffusionTopic();
// TODO process the update as required, convert with the payload
// converters and publish using the 'publisher'
return CompletableFuture.completedFuture(null);
}
}
The custom publisher is responsible for converting the update to the appropriate Diffusion topic type as required. |
The framework will not use any payload converter internally when a custom publisher is configured for a service. Therefore, if an update needs to be converted into a specific type, the payload converters can be fetched in the constructor and validated, as shown in the example above. The required payload converters should be documented for the custom publisher so that an adapter user can configure the publisher as required for a service. For the publisher mentioned above, the required configuration will be as follows:
{
"config": {
"framework": {
"customPublisher": {
"className": "com.diffusiondata.gateway.adapter.ApplicationCustomPublisher",
"parameters": {
"someContext": "example/"
},
"payloadConverters": [
{
"name": "$Object_to_JSON"
},
{
"name": "$Object_to_String"
}
]
}
}
}
}
See below for more details on configuring the custom publisher for a service.
Using CustomPublisher in an application
The implementation of the CustomPublisher
is provided as a standalone JAR that
needs to be plugged in with the application during its startup, i.e., the JAR should
be added to the classpath when starting the application. For this, the JAR file can
be placed in a directory, and the path of the directory can be set as the value for
the gateway.ext.dir system property.
The framework will load this directory into the classpath during application startup.
A service in the application can then be configured to use the custom publisher. However, this will only work if
both the application and the CustomPublisher
are compatible with each other. This
means the application must support using the CustomPublisher
, and the CustomPublisher
must be able to handle updates supplied by the application.
CustomPublisher configuration
The configuration for source or hybrid services in an application can be configured in the following way to use the custom publisher.
{ "serviceName": "enhancedPoller", "serviceType": "POLLING_SOURCE", "config": { "framework": { "pollIntervalMs": 500, "customPublisher": { "className": "com.diffusiondata.gateway.publisher.MultiTopicPublisher", "parameters": { "margin": "0.1" } } } } },
As presented above, to specify the use of a custom publisher for a source or hybrid
service, the customPublisher
parameter should be specified in the framework configuration.
The parameters for the customPublisher
are defined below:
Key | Description | Mandatory |
---|---|---|
|
The full class name of the custom publisher. |
Yes |
|
The parameters required for instantiation of the custom publisher. These should be documented in the user guide of the custom publisher. |
No |
|
The array of payloadConverter details required by the Custom publisher. These should be documented in the user guide of the custom publisher, so that users can specify them in the configuration. See below for detailed configuration parameters. |
No |
payloadConverters
payloadConverters
contains an array of payload converter details. One or more converters can be specified as required.
Parameter Name | Type | Description | Mandatory |
---|---|---|---|
name |
String |
Name of the payload converter. This information would be documented
in the user guide of the |
yes |
parameters |
Map of String to primitive Object |
Parameters required by the payload converter. These are specific to the payload converter and will be documented in the application user guide for the respective converter. See here for configuration details of framework-issued converters. |
no |
Thus, with above service configuration, if the implementation of
the CustomPublisher
, i.e., com.diffusiondata.gateway.publisher.MultiTopicPublisher
is available in the classpath, the framework will supply its instance to the add
method
for the POLLING_SOURCE
serviceType. The service will then use this publisher when
processing the updates from the source.