Integration with custom publisher

The AMQP adapter supports using Custom Publishers with the source services.

The custom publisher for the AMQP adapter should process updates of type jakarta.jms.Message with an UpdateContext of type AmqpUpdateContext. The AmqpUpdateContext contains the AMQP source name (queue/topic name), in addition to the default Diffusion topic path and default topic properties. The update supplied to the process method of the custom publisher will be the update received from the AMQP source and will be of jakarta.jms.Message type.

Below is a sample implementation of the custom publisher for the AMQP adapter:

package com.diffusiondata.gateway.adapter.amqp;

public final class AmqpCustomPublisher
    extends CustomPublisher<Message, AmqpUpdateContext> {

    private static final Logger LOG =
        LoggerFactory.getLogger(AmqpCustomPublisher.class);

    public static final String CUSTOM_MESSAGE_SUFFIX =
        "-This is a custom message suffix.";

    private final Publisher publisher;
    private final PayloadConverter<Message, String> defaultConverter;

    /**
     * Constructor.
     */
    public AmqpCustomPublisher(
        Publisher publisher,
        Map<String, Object> configContext,
        Map<String, ? extends PayloadConverter<?, ?>> payloadConverterMap) {

        super(publisher);
        this.publisher = publisher;
        this.defaultConverter =
            (PayloadConverter<Message, String>) payloadConverterMap.get("$TextMessageToStringConverter");

        if (defaultConverter == null) {
            throw new IllegalArgumentException(
                "Required converter: '" + TextMessageToStringConverter.getName() +
                    "' not found");
        }
    }

    /**
     * NOTE - process the update as required, convert with the payload
     * converter and publish using the 'publisher'
     */
    @Override
    public CompletableFuture<?> process(
        Message message,
        AmqpUpdateContext updateContext) throws PayloadConversionException {

        final String diffusionTopic = updateContext.getDiffusionTopic();
        LOG.info(
            "Publishing message {} to {}",
            message,
            diffusionTopic);

        return publisher
            .publish(
                diffusionTopic,
                defaultConverter.convert(message) + CUSTOM_MESSAGE_SUFFIX)
            .whenComplete((result, ex) -> {
                if (ex != null) {
                    LOG.error("Failed to publish record to diffusion " +
                        "topic", ex);
                }
            });

    }
}

In the example above, the custom publisher expects an instance of $TextMessageToStringConverter payload converter to convert the TextMessage to a String. An adapter user should configure the customPublisher for the service to include the required payload converter configuration. Expecting a payload converter in the payload converter map argument within the constructor is optional but can be expected and documented as required.

Example 1. Example of a service configuration that specifies using the CustomPublisher defined above:
    {
      "serviceName": "simpleTopicListener",
      "serviceType": "TOPIC_LISTENER",
      "description": "Listens to updates from a topic",
      "config": {
        "sharedConfigName": "rabbitMqBroker",
        "framework": {
          "customPublisher": {
            "className": "com.diffusiondata.gateway.adapter.amqp.AmqpCustomPublisher",
            "payloadConverters": [
              {
                "name": "$TextMessageToStringConverter"
              }
            ]
          }
        },
        "application": {
          "sourceName": "test_topic",
          "diffusionTopicTemplate": "target/test_topic"
        }
      }
    }

The AmqpUpdateContext is available in the AMQP adapter JAR, which should be supplied to the custom publisher as a dependency. For details on including the adapter JAR in the custom publisher, refer to the user guide for Custom Publishers.