Queue Poller Service

The QUEUE_POLLER is a service type in Polling Source mode. Services of this type can be used to poll updates from an AMQP queue at regular intervals in batches and publish them to a Diffusion topic.

Since this service handles messages from a queue, the consumerCount configuration parameter allows you to specify the number of consumers to be created for the service. Multiple consumers can be created for load-balancing purposes to achieve higher throughput. However, since the messages will be retrieved by multiple consumers from different threads, the order of messages published to a Diffusion topic cannot be guaranteed with multiple consumers. The maxBatchSize configuration parameter can be used to specify the maximum number of messages to retrieve during each poll.

There are other configuration parameters that define the behavior of the message consumer, how the message is transformed, and how it is published to a Diffusion topic. These are defined in the following section.

Configuration

The complete configuration for this service consists of the framework-required configuration for a polling source service defined under the framework section, and service-type specific configuration defined under the application section.

Example 1. Sample configuration for a service of type QUEUE_POLLER
    {
      "serviceName": "simpleQueuePoller",
      "serviceType": "QUEUE_POLLER",
      "description": "Polls from a queue",
      "config": {
          "framework": {
            "publicationRetries": 5,
            "retryIntervalMs": 5000,
            "pollIntervalMs": 30000,
            "pollTimeoutMs": 300000,
            "topicProperties": {
              "persistencePolicy": "SESSION",
              "timeSeries": false,
              "timeSeriesRetainedRange": "",
              "timeSeriesSubscriptionRange": "",
              "publishValuesOnly": false,
              "dontRetainValue": false
            },
            "payloadConverters" : [
              {
                "name": "$TextMessageToStringConverter"
              }
            ]
          },
          "application": {
            "amqpServerConnection": {
              "connectionUrl": "amqp://localhost:5672",
              "username": "admin",
              "password": "admin",
              "maximumNumberOfRetries": 10,
              "initialRetryIntervalMs": 5,
              "maxRetryIntervalMs": 5000
            },
            "sourceName": "batchQueue",
            "diffusionTopicTemplate": "target/batchQueue",
            "messageSelector": "",
            "durable": false,
            "noLocal": false,
            "acknowledgeMode": "AUTO_ACKNOWLEDGE",
            "consumerCount": 1,
            "maxBatchSize": 100
          }
        }
    }

Polling Service Framework Configuration

Details about supported configuration parameters for the framework configuration for the Polling service can be found here.

Application Configuration

The supported application configuration parameters for this service are defined in the table below:

QUEUE_LISTENER service’s application configuration
Name Type Description Mandatory Default value

amqpServerConnection

object

The configuration for the AMQP server connection.

No

This can be defined as a shared configuration.

sourceName

string

The name of the AMQP queue from which to consume messages.

Yes

n/a

diffusionTopicTemplate

string

The Diffusion topic template to use for creating Diffusion topics. The template can include any message headers, such as '${JMSDestination}', or user-specified property keys, such as '${aCustomPropertyKey}', as placeholders. The placeholder for a message header will be replaced with the corresponding header value retrieved from the AMQP message, and the placeholder for a property will be replaced with the value of the specified property key. For example, if this configuration’s value is 'queue/${JMSDestination}/${location}', the 'JMSDestination' value is 'country', and the property key 'location' has the value 'london' in a message, the resulting Diffusion topic will be 'queue/country/london'. If such placeholders are included in the template, a new Diffusion topic will be created for each message, which may introduce overhead during message processing. If this parameter is not specified, the value of 'sourceName' will be used instead. If the value of a specified property key is not found or is empty, the service will fail to process the message and will be paused.

No

The value specified in the 'sourceName'

messageSelector

string

An optional SQL-like expression used to filter messages based on their properties. The selector determines which messages are delivered to the consumer. Only messages that match the specified condition will be received. E.g., 'priority > 5 AND color = 'red''.

No

""

noLocal

boolean

A boolean value to specify whether to inhibit the delivery of messages published through its own connection.

No

false

acknowledgeMode

string

The acknowledgement mode used to confirm the processing of messages. Supported values are AUTO_ACKNOWLEDGE, CLIENT_ACKNOWLEDGE, and DUPS_OK_ACKNOWLEDGE. These correspond to the session acknowledgement modes supported by the JMS API.

No

AUTO_ACKNOWLEDGE

consumerCount

integer

The number of consumers to create for consuming messages from a queue. Messages will be distributed among them in a round-robin manner if more than one consumer is created, which may result in out-of-order message processing.

No

1

maxBatchSize

integer

The maximum number of messages a consumer should process from an AMQP queue in the specified interval.

No

100

amqpServerConnection configuration parameters
Name Type Description Mandatory Default value

connectionUrl

string

The AMQP server connection URL, containing the full connection URL with port and/or other required details.

Yes

n/a

username

string

The username for the AMQP server connection.

No

n/a

password

string

The password for the AMQP server connection.

No

n/a

maximumNumberOfRetries

integer

The maximum number of retries for establishing a connection to the AMQP server. If this parameter is not set, the attempts will be infinite.

No

n/a

initialRetryIntervalMs

integer

Initial retry interval in milliseconds. It will increase exponentially with each retry until maxRetryIntervalMs is reached.

No

5

maxRetryIntervalMs

integer

Maximum retry interval in milliseconds.

No

5000

The adapter allows you to specify the amqpServerConnection through a shared configuration. If the amqpServerConnection is explicitly defined in the service configuration, a unique AMQP connection instance will be allocated to the service. However, if the sharedConfigName parameter is used to reference a shared configuration, the connection instance will be shared with any other services that reference the same shared configuration.

See the Message Transformation page for details on how AMQP messages are transformed into Diffusion topic types.

See the AMQP connection management page for details on how an AMQP server connection is created and managed.

There can be multiple instances of this service type added to the adapter to consume messages from different queues and publish them to different Diffusion topics using different configurations.