Integration with custom publisher
The AMQP adapter supports using Custom Publishers with the source services.
The custom publisher for the AMQP adapter should process updates of type jakarta.jms.Message
with an UpdateContext
of type AmqpUpdateContext
. The AmqpUpdateContext
contains the
AMQP source name (queue/topic name), in addition to the default Diffusion topic path
and default topic properties. The update supplied to the process method of the custom
publisher will be the update received from the AMQP source and will be of jakarta.jms.Message
type.
Below is a sample implementation of the custom publisher for the AMQP adapter:
package com.diffusiondata.gateway.adapter.amqp;
public final class AmqpCustomPublisher
extends CustomPublisher<Message, AmqpUpdateContext> {
private static final Logger LOG =
LoggerFactory.getLogger(AmqpCustomPublisher.class);
public static final String CUSTOM_MESSAGE_SUFFIX =
"-This is a custom message suffix.";
private final Publisher publisher;
private final PayloadConverter<Message, String> defaultConverter;
/**
* Constructor.
*/
public AmqpCustomPublisher(
Publisher publisher,
Map<String, Object> configContext,
Map<String, ? extends PayloadConverter<?, ?>> payloadConverterMap) {
super(publisher);
this.publisher = publisher;
this.defaultConverter =
(PayloadConverter<Message, String>) payloadConverterMap.get("$TextMessageToStringConverter");
if (defaultConverter == null) {
throw new IllegalArgumentException(
"Required converter: '" + TextMessageToStringConverter.getName() +
"' not found");
}
}
/**
* NOTE - process the update as required, convert with the payload
* converter and publish using the 'publisher'
*/
@Override
public CompletableFuture<?> process(
Message message,
AmqpUpdateContext updateContext) throws PayloadConversionException {
final String diffusionTopic = updateContext.getDiffusionTopic();
LOG.info(
"Publishing message {} to {}",
message,
diffusionTopic);
return publisher
.publish(
diffusionTopic,
defaultConverter.convert(message) + CUSTOM_MESSAGE_SUFFIX)
.whenComplete((result, ex) -> {
if (ex != null) {
LOG.error("Failed to publish record to diffusion " +
"topic", ex);
}
});
}
}
In the example above, the custom publisher expects an instance of $TextMessageToStringConverter
payload converter to convert the TextMessage
to a String
. An adapter user should configure
the customPublisher for the service to include the required payload converter configuration.
Expecting a payload converter in the payload converter map argument within the constructor
is optional but can be expected and documented as required.
{
"serviceName": "simpleTopicListener",
"serviceType": "TOPIC_LISTENER",
"description": "Listens to updates from a topic",
"config": {
"sharedConfigName": "rabbitMqBroker",
"framework": {
"customPublisher": {
"className": "com.diffusiondata.gateway.adapter.amqp.AmqpCustomPublisher",
"payloadConverters": [
{
"name": "$TextMessageToStringConverter"
}
]
}
},
"application": {
"sourceName": "test_topic",
"diffusionTopicTemplate": "target/test_topic"
}
}
}
The AmqpUpdateContext
is available in the AMQP adapter JAR, which should be supplied
to the custom publisher as a dependency. For details on including the adapter JAR in
the custom publisher, refer to the user guide for Custom Publishers.