Custom Publisher

By default, for source and hybrid services, a Publisher instance is supplied by the framework which can be used to perform any operation the publisher allows. If the requirement is to process an update before calling the publish method, a developer of an application can do so in the service handler. The update could be divided into multiple values and be published to multiple Diffusion topics or, based on the update, a Diffusion topic could be removed. However, if an application is already implemented, and the default publication of an update needs to be overridden, a CustomPublisher can be implemented and used with the application. Using a CustomPublisher is an advanced option and is sensible only to override the default publication mechanism in an application. The custom publisher should be application-specific, as it should be built to process application-specific updates with application specific UpdateContext. To use a CustomPublisher a Gateway application has to support its usage. All the DiffusionData provided Gateway adapters supports CustomPublisher.

Supporting CustomPublisher in an application

The first step in supporting a CustomPublisher in an application is to identify any update-specific context that can be supplied with the update into the process method of the CustomPublisher. An implementation of UpdateContext should then be added containing such contexts. For example., for Kafka adapter, the update type can be ConsumerRecord, and its update context could contain a key type and a value type.

For the source or hybrid services in an application, the application must check if the publisher supplied to the add method of such service types is an instance of CustomPublisher. In such cases, the publisher should be cast to CustomPublisher to use the process method in the service handler.

    private static CompletableFuture<?> publish(
        Publisher publisher,
        String defaultTopic,
        Object update) {

        try {
            if (publisher instanceof CustomPublisher) {

                return ((CustomPublisher) publisher).process(
                    update,
                    new UpdateContextImpl(
                        diffusionTopic,
                        customPublisher.getConfiguredTopicProperties(),
                        someUpdateContext));
            }
            else {
                return publisher.publish(defaultTopic, update);
            }
        }
        catch (PayloadConversionException ex) {
            LOG.error("Failed to process update", ex);
        }
        return CompletableFuture.completedFuture(null);
    }

As presented above, the application should support publication for services that are configured with or without a custom publisher, as the use of a custom publisher for a service is dependent on the user’s requirement. If an application supports using a custom publisher for source or hybrid services, users can specify the use of a custom publisher for a specific service by specifying it in the configuration

Implementing CustomPublisher

CustomPublisher is an abstract class and provides an abstract method process that needs to be implemented by a subclass. The subclass should be made available as a standalone JAR that can then be used with a Gateway application. Within the abstract method, logic to process an update for the service needs to be implemented. This method accepts an update to process and an UpdateContext.

The update context is application-specific and is implemented within the Gateway application that supports custom publisher. Hence, the CustomPublisher should include the Gateway application JAR as its dependency so that, the application-specific update context is accessible to the CustomPublisher. With maven, this can be achieved by downloading the application JAR to disk and including it in the POM file with the system scope as follows:

<dependency>
    <groupId>com.diffusiondata.gateway.adapter.kafka</groupId>
    <artifactId>kafka-adapter</artifactId>
    <version>1.0</version>
    <scope>system</scope>
    <systemPath>/builds/engineering/gateway/gateway-framework/user-guide/src/main/resources/kafka-adapter-1.0.0.jar</systemPath>
</dependency>

An example implementation of the abstract method is provided below:

    @Override
    public CompletableFuture<?> process(
        String update,
        ApplicationUpdateContext updateContext) throws PayloadConversionException {

    final Map<String, Object> topicToUpdatesMap = getFinalUpdates(update, context);

        topicToUpdatesMap.forEach((topic, value) -> {
            try {
                publisher
                    .publish(topic, value)
                    .whenComplete((result, throwable) -> {
                        if (throwable != null) {
                            LOG.error("Failed to publish update to {} topic", topic, throwable);
                        }
                    });
            }
            catch (PayloadConversionException ex) {
                LOG.error("Failed to convert update for {} topic", topic, ex);
            }
        });

        return CompletableFuture.completedFuture(null);
    }

In the example above, an update is processed using the supplied context to create a map of Diffusion topic paths and corresponding updates. Each topic is then published with its update. The context in this process method is supplied by the application and is update-specific. If such contexts are required, the user guide of application that supports the custom publisher should be consulted to identify different contexts that are supplied with each update.

The custom publisher specific context required during the construction of the publisher can be specified as a requirement of the publisher configuration that a user needs to specify in the service configuration. See here for details on how a CustomPublisher can be configured to be used for a service. In addition, a custom publisher can also use PayloadConverter to perform any required update conversion. For details on the construction of the subclass, with and without using payload converters and on providing further context for it, see the Javadoc of the CustomPublisher class.

Below is an example of a custom publisher implementation that requires context and payload converters for processing updates.

public final class ApplicationCustomPublisher
    extends CustomPublisher<String, ApplicationUpdateContext> {

    private final Publisher publisher;
    private final PayloadConverter<Object, JSON> objectToJsonConverter;
    private final PayloadConverter<Object, String> objectToStringConverter;
    private final String someContext;

    /**
     * Constructor.
     */
    public ApplicationCustomPublisher(
        Publisher publisher,
        Map<String, Object> configContext,
        Map<String, ? extends PayloadConverter<?, ?>> payloadConverterMap) {

        super(publisher);
        this.publisher = publisher;
        this.someContext = configContext.get("someContext").toString();

        this.objectToJsonConverter =
            (PayloadConverter<Object, JSON>) payloadConverterMap.get("$Object_to_JSON");

        if (objectToJsonConverter == null) {
            throw new IllegalArgumentException(
                "No object to JSON converter found");
        }

        this.objectToStringConverter =
            (PayloadConverter<Object, String>) payloadConverterMap.get("$Object_to_String");

        if (objectToStringConverter == null) {
            throw new IllegalArgumentException(
                "No object to String converter found");
        }
    }

    @Override
    public CompletableFuture<?> process(
        String update,
        ApplicationUpdateContext updateContext) throws PayloadConversionException {

        String someUpdateContext = updateContext.getUpdateContext();
        TopicProperties topicProperties = updateContext.getTopicProperties();
        String diffusionTopic = updateContext.getDiffusionTopic();

        // TODO process the update as required, convert with the payload
        //  converters and publish using the 'publisher'

        return CompletableFuture.completedFuture(null);
    }
}
The custom publisher is responsible for converting the update to the appropriate Diffusion topic type as required.

The framework will not use any payload converter internally when a custom publisher is configured for a service. Therefore, if an update needs to be converted into a specific type, the payload converters can be fetched in the constructor and validated, as shown in the example above. The required payload converters should be documented for the custom publisher so that an adapter user can configure the publisher as required for a service. For the publisher mentioned above, the required configuration will be as follows:

{
      "config": {
        "framework": {
          "customPublisher": {
            "className": "com.diffusiondata.gateway.adapter.ApplicationCustomPublisher",
            "parameters": {
              "someContext": "example/"
            },
            "payloadConverters": [
              {
                "name": "$Object_to_JSON"
              },
              {
                "name": "$Object_to_String"
              }
            ]
          }
        }
      }
}

See below for more details on configuring the custom publisher for a service.

Using CustomPublisher in an application

The implementation of the CustomPublisher is provided as a standalone JAR that needs to be plugged in with the application during its startup, i.e., the JAR should be added to the classpath when starting the application. For this, the JAR file can be placed in a directory, and the path of the directory can be set as the value for the gateway.ext.dir system property. The framework will load this directory into the classpath during application startup. A service in the application can then be configured to use the custom publisher. However, this will only work if both the application and the CustomPublisher are compatible with each other. This means the application must support using the CustomPublisher, and the CustomPublisher must be able to handle updates supplied by the application.

CustomPublisher configuration

The configuration for source or hybrid services in an application can be configured in the following way to use the custom publisher.

    {
      "serviceName": "enhancedPoller",
      "serviceType": "POLLING_SOURCE",
      "config": {
        "framework": {
          "pollIntervalMs": 500,
          "customPublisher": {
            "className": "com.diffusiondata.gateway.publisher.MultiTopicPublisher",
            "parameters": {
              "margin": "0.1"
            }
          }
        }
      }
    },

As presented above, to specify the use of a custom publisher for a source or hybrid service, the customPublisher parameter should be specified in the framework configuration. The parameters for the customPublisher are defined below:

Key Description Mandatory

className

The full class name of the custom publisher.

Yes

parameters

The parameters required for instantiation of the custom publisher. These should be documented in the user guide of the custom publisher.

No

payloadConverters

The array of payloadConverter details required by the Custom publisher. These should be documented in the user guide of the custom publisher, so that users can specify them in the configuration. See below for detailed configuration parameters.

No

payloadConverters

payloadConverters contains an array of payload converter details. One or more converters can be specified as required.

Parameters for each payload converter detail
Parameter Name Type Description Mandatory

name

String

Name of the payload converter. This information would be documented in the user guide of the CustomPublisher.

yes

parameters

Map of String to primitive Object

Parameters required by the payload converter. These are specific to the payload converter and will be documented in the application user guide for the respective converter. See here for configuration details of framework-issued converters.

no

Thus, with above service configuration, if the implementation of the CustomPublisher, i.e., com.diffusiondata.gateway.publisher.MultiTopicPublisher is available in the classpath, the framework will supply its instance to the add method for the POLLING_SOURCE serviceType. The service will then use this publisher when processing the updates from the source.