Topic Poller Service
The TOPIC_POLLER is a service type in Polling Source mode. Services of this type can be used to poll updates from an AMQP topic at regular intervals in batches and publish them to a Diffusion topic.
Since this service handles messages from a topic, a single consumer is created
for the service. The maxBatchSize
configuration parameter allows you to specify
the maximum number of messages to retrieve during each poll.
It is also possible to create a durable subscription on a topic. When a subscription is durable, the AMQP broker retains the subscription even if the consumer disconnects. Messages published to the topic while the consumer is disconnected are stored by the broker. When the consumer reconnects, any missed messages are delivered to it.
If a topic poller service is configured as durable, the client will unsubscribe from the subscription only when the adapter or the service is explicitly shutdown or when the service is removed. However, if the service is paused due to reasons such as disconnection from the Diffusion server or the AMQP broker, the subscription remains active. This ensures that any messages published to the topic during the downtime are delivered to the consumer when the service resumes.
There are other configuration parameters that define the behavior of the message consumer, how the message is transformed, and how it is published to a Diffusion topic. These are defined in the following section.
Configuration
Complete configuration for this service consists of framework required configuration for polling source service defined under the framework section and service type-specific configuration defined under the application section.
{
"serviceName": "simpleTopicPoller",
"serviceType": "TOPIC_POLLER",
"description": "Polls from a topic",
"config": {
"framework": {
"publicationRetries": 5,
"retryIntervalMs": 5000,
"pollIntervalMs": 30000,
"pollTimeoutMs": 300000,
"topicProperties": {
"persistencePolicy": "SESSION",
"timeSeries": false,
"timeSeriesRetainedRange": "",
"timeSeriesSubscriptionRange": "",
"publishValuesOnly": false,
"dontRetainValue": false
},
"payloadConverters" : [
{
"name": "$TextMessageToStringConverter"
}
]
},
"application": {
"amqpServerConnection": {
"connectionUrl": "amqp://localhost:5672",
"username": "admin",
"password": "admin",
"maximumNumberOfRetries": 10,
"initialRetryIntervalMs": 5,
"maxRetryIntervalMs": 5000
},
"sourceName": "batchTopic",
"diffusionTopicTemplate": "target/batchTopic",
"messageSelector": "",
"durable": false,
"noLocal": false,
"acknowledgeMode": "AUTO_ACKNOWLEDGE",
"maxBatchSize": 100
}
}
}
Polling service framework configuration
Details about supported configuration parameters for framework configuration for the Polling service can be found here.
Application configuration
The supported application configuration parameters for this service are defined in the table below:
Name | Type | Description | Mandatory | Default value |
---|---|---|---|---|
amqpServerConnection |
object |
The configuration for the AMQP server connection. |
No |
This can be defined as a shared configuration. |
sourceName |
string |
The name of the AMQP queue from which to consume messages. |
Yes |
n/a |
diffusionTopicTemplate |
string |
The Diffusion topic template to use for creating Diffusion topics. The template can include any message headers, such as '${JMSDestination}', or user-specified property keys, such as '${aCustomPropertyKey}', as placeholders. The placeholder for a message header will be replaced with the corresponding header value retrieved from the AMQP message, and the placeholder for a property will be replaced with the value of the specified property key. For example, if this configuration’s value is 'queue/${JMSDestination}/${location}', the 'JMSDestination' value is 'country', and the property key 'location' has the value 'london' in a message, the resulting Diffusion topic will be 'queue/country/london'. If such placeholders are included in the template, a new Diffusion topic will be created for each message, which may introduce overhead during message processing. If this parameter is not specified, the value of 'sourceName' will be used instead. If the value of a specified property key is not found or is empty, the service will fail to process the message and will be paused. |
No |
The value specified in the 'sourceName' |
messageSelector |
string |
An optional SQL-like expression used to filter messages based on their properties. The selector determines which messages are delivered to the consumer. Only messages that match the specified condition will be received. E.g., 'priority > 5 AND color = 'red''. |
No |
"" |
noLocal |
boolean |
A boolean value to specify whether to inhibit the delivery of messages published through its own connection. |
No |
false |
acknowledgeMode |
string |
The acknowledgement mode used to confirm the processing of messages.
Supported values are |
No |
AUTO_ACKNOWLEDGE |
durable |
boolean |
A boolean value to specify whether to create a durable subscription on the configured AMQP topic. |
No |
false |
maxBatchSize |
integer |
The maximum number of messages a consumer should process from an AMQP topic in the specified interval. |
No |
100 |
Name | Type | Description | Mandatory | Default value |
---|---|---|---|---|
connectionUrl |
string |
The AMQP server connection URL, containing the full connection URL with port and/or other required details. |
Yes |
n/a |
username |
string |
The username for the AMQP server connection. |
No |
n/a |
password |
string |
The password for the AMQP server connection. |
No |
n/a |
maximumNumberOfRetries |
integer |
The maximum number of retries for establishing a connection to the AMQP server. If this parameter is not set, the attempts will be infinite. |
No |
n/a |
initialRetryIntervalMs |
integer |
Initial retry interval in milliseconds. It
will increase exponentially with each retry until |
No |
5 |
maxRetryIntervalMs |
integer |
Maximum retry interval in milliseconds. |
No |
5000 |
The adapter allows you to specify the amqpServerConnection through a shared configuration. If the amqpServerConnection is explicitly defined in the service configuration, a unique AMQP connection instance will be allocated to the service. However, if the sharedConfigName parameter is used to reference a shared configuration, the connection instance will be shared with any other services that reference the same shared configuration. |
See the Message Transformation page for details on how AMQP messages are transformed into Diffusion topic types.
See the AMQP connection management page for details on how an AMQP server connection is created and managed.
There can be multiple instances of this service type, added to the adapter, to consume messages from different topics and publish them to different Diffusion topics, using different configurations.