Queue Poller Service
The QUEUE_POLLER is a service type in Polling Source mode. Services of this type can be used to poll updates from an AMQP queue at regular intervals in batches and publish them to a Diffusion topic.
Since this service handles messages from a queue, the consumerCount
configuration
parameter allows you to specify the number of consumers to be created for the service.
Multiple consumers can be created for load-balancing purposes to achieve higher throughput.
However, since the messages will be retrieved by multiple consumers from different threads,
the order of messages published to a Diffusion topic cannot be guaranteed with multiple
consumers. The maxBatchSize
configuration parameter can be used to specify the
maximum number of messages to retrieve during each poll.
There are other configuration parameters that define the behavior of the message consumer, how the message is transformed, and how it is published to a Diffusion topic. These are defined in the following section.
Configuration
The complete configuration for this service consists of the framework-required configuration for a polling source service defined under the framework section, and service-type specific configuration defined under the application section.
{
"serviceName": "simpleQueuePoller",
"serviceType": "QUEUE_POLLER",
"description": "Polls from a queue",
"config": {
"framework": {
"publicationRetries": 5,
"retryIntervalMs": 5000,
"pollIntervalMs": 30000,
"pollTimeoutMs": 300000,
"topicProperties": {
"persistencePolicy": "SESSION",
"timeSeries": false,
"timeSeriesRetainedRange": "",
"timeSeriesSubscriptionRange": "",
"publishValuesOnly": false,
"dontRetainValue": false
},
"payloadConverters" : [
{
"name": "$TextMessageToStringConverter"
}
]
},
"application": {
"amqpServerConnection": {
"connectionUrl": "amqp://localhost:5672",
"username": "admin",
"password": "admin",
"maximumNumberOfRetries": 10,
"initialRetryIntervalMs": 5,
"maxRetryIntervalMs": 5000
},
"sourceName": "batchQueue",
"diffusionTopicTemplate": "target/batchQueue",
"messageSelector": "",
"durable": false,
"noLocal": false,
"acknowledgeMode": "AUTO_ACKNOWLEDGE",
"consumerCount": 1,
"maxBatchSize": 100
}
}
}
Polling Service Framework Configuration
Details about supported configuration parameters for the framework configuration for the Polling service can be found here.
Application Configuration
The supported application configuration parameters for this service are defined in the table below:
Name | Type | Description | Mandatory | Default value |
---|---|---|---|---|
amqpServerConnection |
object |
The configuration for the AMQP server connection. |
No |
This can be defined as a shared configuration. |
sourceName |
string |
The name of the AMQP queue from which to consume messages. |
Yes |
n/a |
diffusionTopicTemplate |
string |
The Diffusion topic template to use for creating Diffusion topics. The template can include any message headers, such as '${JMSDestination}', or user-specified property keys, such as '${aCustomPropertyKey}', as placeholders. The placeholder for a message header will be replaced with the corresponding header value retrieved from the AMQP message, and the placeholder for a property will be replaced with the value of the specified property key. For example, if this configuration’s value is 'queue/${JMSDestination}/${location}', the 'JMSDestination' value is 'country', and the property key 'location' has the value 'london' in a message, the resulting Diffusion topic will be 'queue/country/london'. If such placeholders are included in the template, a new Diffusion topic will be created for each message, which may introduce overhead during message processing. If this parameter is not specified, the value of 'sourceName' will be used instead. If the value of a specified property key is not found or is empty, the service will fail to process the message and will be paused. |
No |
The value specified in the 'sourceName' |
messageSelector |
string |
An optional SQL-like expression used to filter messages based on their properties. The selector determines which messages are delivered to the consumer. Only messages that match the specified condition will be received. E.g., 'priority > 5 AND color = 'red''. |
No |
"" |
noLocal |
boolean |
A boolean value to specify whether to inhibit the delivery of messages published through its own connection. |
No |
false |
acknowledgeMode |
string |
The acknowledgement mode used to confirm the processing of messages.
Supported values are |
No |
AUTO_ACKNOWLEDGE |
consumerCount |
integer |
The number of consumers to create for consuming messages from a queue. Messages will be distributed among them in a round-robin manner if more than one consumer is created, which may result in out-of-order message processing. |
No |
1 |
maxBatchSize |
integer |
The maximum number of messages a consumer should process from an AMQP queue in the specified interval. |
No |
100 |
Name | Type | Description | Mandatory | Default value |
---|---|---|---|---|
connectionUrl |
string |
The AMQP server connection URL, containing the full connection URL with port and/or other required details. |
Yes |
n/a |
username |
string |
The username for the AMQP server connection. |
No |
n/a |
password |
string |
The password for the AMQP server connection. |
No |
n/a |
maximumNumberOfRetries |
integer |
The maximum number of retries for establishing a connection to the AMQP server. If this parameter is not set, the attempts will be infinite. |
No |
n/a |
initialRetryIntervalMs |
integer |
Initial retry interval in milliseconds. It
will increase exponentially with each retry until |
No |
5 |
maxRetryIntervalMs |
integer |
Maximum retry interval in milliseconds. |
No |
5000 |
The adapter allows you to specify the amqpServerConnection through a shared configuration. If the amqpServerConnection is explicitly defined in the service configuration, a unique AMQP connection instance will be allocated to the service. However, if the sharedConfigName parameter is used to reference a shared configuration, the connection instance will be shared with any other services that reference the same shared configuration. |
See the Message Transformation page for details on how AMQP messages are transformed into Diffusion topic types.
See the AMQP connection management page for details on how an AMQP server connection is created and managed.
There can be multiple instances of this service type added to the adapter to consume messages from different queues and publish them to different Diffusion topics using different configurations.