Subscription service

The SUBSCRIPTION_SERVICE is a service type of Streaming Source mode. It can be used to subscribe to Kafka topics and publish to Diffusion topics.

There can be multiple instances of this service type, added to the adapter, to consume data from different Kafka topics and publish them to different Diffusion topics, using different configuration.

Complete configuration for this service consists of framework required configuration for streaming service and this service type specific configuration.

Example 1. Sample configuration for a service of type SUBSCRIPTION_SERVICE
    {
      "serviceName": "fxRegexSubscription",
      "serviceType": "SUBSCRIPTION_SERVICE",
      "description": "Consumes from Kafka topic starting with 'fx' with key of type 'String' and value of type 'JSON'",
      "config": {
        "sharedConfigName": "localKafkaCluster",
        "framework": {
          "topicProperties": {
            "persistencePolicy": "SERVER",
            "timeSeries": true,
            "timeSeriesRetainedRange": "last 1H",
            "doNotRetainValue": false
          }
        },
        "application": {
          "consumerGroupId": "diffusionKafkaAdapter",
          "consumerCount": 3,
          "keyType": "STRING",
          "valueType": "JSON",
          "diffusionTopicPattern": "kafka/${topic}/${partition}",
          "configuration": {
            "auto.offset.reset": "latest"
          },
          "kafkaTopics": ["fx.*"],
          "regexSubscription": true
        }
      }
    }

Streaming service Framework configuration

Details about supported configuration parameters for framework configuration for Streaming service can be found here.

Application configuration

The supported application configuration parameters for this service are defined in the table below:

Subscription service application configuration
Name Type Description Mandatory Default value

kafkaTopics

Comma separated string values

List of Kafka topic or regex value to subscribe to

yes

n\a

regexSubscription

boolean

Flag to specify if the subscription is for Kafka topic pattern

no

false

consumerGroupId

string

A unique string that identifies the consumer group for Kafka consumers created for the service

no

Diffusion_Kafka_Adapter_ConsumerGroup

consumerCount

number

Number of Kafka consumers to be created for the service

no

1

keyType

MessageType

Data type of 'key' for defined Kafka topic subscriptions. Supported values are defined below. Value of this config is used to set value of key.deserializer. Hence, if MessageType does not cover a required type, key.deserializer can be directly set in a configuration map.

Yes, if key.deserializer is not specified explicitly in the configuration map

n/a

valueType

MessageType

Data type of 'value' for this Kafka topics subscription. Supported values are defined below. Value of this config is used to set value of value.deserializer. Hence, if MessageType does not cover a required type, value.deserializer can be directly set in a configuration map.

Yes, if value.deserializer is not specified explicitly in the configuration map

n/a

diffusionTopicPattern

String

Diffusion topic pattern to use to create/publish to Diffusion topics. The pattern can contain ${topic},${key} and ${partition}, which will be replaced by Kafka topic, key and partition value respectively. For example, If this configuration’s value is: 'diffusion/${topic}/${key}/${partition}', the Kafka topic is 'a.b', the key is "key", and the partition is 0; then the resulting diffusion topic created will be 'diffusion/a.b/key/0'.

no

${topic}/${partition}

createTopicHierarchy

boolean

Kafka supports use of '.', '-' and '_' characters in its topic name. If Kafka topic name is to be used in created Diffusion topic name, such characters can be used as they are, or a Diffusion topic hierarchy can be created by replacing such chars with '/'. This flag can be set to true if a hierarchical Diffusion topic name should be created from Kafka topic. For example, If value of configuration diffusionTopicPattern is: '${topic}/${key}/${partition}', value of createTopicHierarchy is set to true, the Kafka topic is 'a.b', the key is "key", and the partition is 0; then the resulting diffusion topic created will be 'a/b/key/0'.

no

false

pollTimeoutMs

number

Kafka consumer poll timeout in milliseconds.

no

500

updateMode

UpdateMode

Update mode to be used to publish data to Diffusion topic.

no

STREAMING

Apart from above configs, configuration of KAFKA_CLUSTER sharedConfig can also be included within this configuration, if defining separate sharedConfig instance it is not required/preferred, or any of the sharedConfig configuration parameter is to be overridden.

MessageType

The following values can be set as MessageType for keyType and valueType:

MessageType values

BYTEARRAY

LONG

BYTEBUFFER

SHORT

BYTES

STRING

DOUBLE

UUID

FLOAT

JSON

INTEGER

AVRO

UpdateMode

The following values can be set as UpdateMode for the updateMode configuration parameter:

UpdateMode values
UpdateMode Description

SIMPLE

Simple asynchronous Update Mode.

In this mode, each publish will be a separate update of the topic - sending the full value to the server. This allows for normal publishing and also for applying patches to JSON topics. The updates are sent asynchronously to the server. If the update fails with a transient, retryable error, a retry will be performed with the last value, set for the topic.

SIMPLE_SYNC

Simple synchronous Update Mode.

In this mode, each publish will be a separate update of the topic - sending the full value to the server. This allows for normal publishing and also for applying patches to JSON topics.

The updates are sent synchronously to the server. If the update fails with a transient, retryable error, a retry will be performed. Only after all retry attempts are exhausted, successive updates will be published to server. Hence, this mode can be less performant compared to others.

STREAMING

Streaming Update Mode.

In this mode, sending updates to the server takes advantage of the Diffusion 'delta' capabilities, so that only changes to the value are sent to the server. This significantly reduces bandwidth.

The updates are sent asynchronously to the server. If the update fails with a transient, retryable error, a retry will be performed with the last value, set to the update stream.

In this mode it is not possible to apply patches to JSON topics.

Data consumption, conversion and publication to Diffusion

In the SUBSCRIPTION_SERVICE service type of Kafka adapter, a list of Kafka topics or topic pattern defined in the kafkaTopics configuration within the framework configuration will be subscribed to. Any updates on such Kafka topics are received and converted as configured and published to Diffusion topic which is defined with diffusionTopicPattern.

If a payload converter or topic type is not defined in the service configuration for this service type, the default, $Simple_Record_to_JSON will be used. This converter accepts a Kafka record and transforms it into a JSON value containing key, value and partition data of the record. The output data will then be published to the JSON Diffusion topic.

Consequently, with the following simple configuration, updates from sampleTopic Kafka topic will be converted to JSON format and published to Diffusion topic kafka/sampleTopic/someKey.

{
  "serviceName": "simpleService",
  "serviceType": "SUBSCRIPTION_SERVICE",
  "description": "Consumes from `sampleTopic` Kafka topic",
  "config": {
    "sharedConfigName": "localKafkaCluster",
    "application": {
      "kafkaTopics": [
        "sampleTopic"
      ],
      "keyType": "STRING",
      "valueType": "STRING",
      "diffusionTopicPattern": "kafka/${topic}/${key}"
    }
  }
}

With above configuration, kafka/sampleTopic/someKey Diffusion topic will be published with following JSON content:

  {
    "key": "someKey",
    "value": "example data",
    "partition": 1
  }

$Simple_Record_to_JSON can be configured in the configuration explicitly as well. This converter supports configuring an optional headers parameter which adds headers of the Kafka record in the output JSON data. This field is a list of header names to be extracted. "$all" will include all header key value pairs.

Below is a simple configuration with $Simple_Record_to_JSON configured and the final JSON data created with this payload converter to be published to Diffusion topic.

{
  "serviceName": "simpleService",
  "serviceType": "SUBSCRIPTION_SERVICE",
  "description": "Consumes from `sampleTopic` Kafka topic",
  "config": {
    "sharedConfigName": "localKafkaCluster",
    "framework": {
      "payloadConverters": [
        {
          "name": "$Simple_Record_to_JSON",
          "parameters": {
            "headers": [
              "$all"
            ]
          }
        }
      ]
    },
    "application": {
      "kafkaTopics": [
        "sampleTopic"
      ],
      "keyType": "STRING",
      "valueType": "STRING",
      "diffusionTopicPattern": "kafka/${topic}/${partition}"
    }
  }
}
Example 2. Created JSON data
  {
    "key": "someKey",
    "value": "example data",
    "partition": 1,
    "headers": {
      "ID": "99",
      "tag": "red"
    }
  }

Avro data conversion

The Kafka adapter provides payload converters that can be used to convert Avro data to JSON Diffusion topic.

To consume Avro data:

  • the AVRO Message type must also be set for valueType configuration.

  • if key of the records is also of Avro type, the AVRO Message type must be set for keyType configuration

The application uses Kafka Avro deserializer (provided by Confluent) to deserialize Avro data. Hence, you must set additional required configuration settings for the Kafka consumer, such as schema.registry.url.

Example 3. Sample configuration for SUBSCRIPTION_SERVICE typed source service, to consume Avro value data from a Kafka topic, which uses Confluent schema registry.
{
  "serviceName": "simpleService",
  "serviceType": "SUBSCRIPTION_SERVICE",
  "description": "Consumes from `orders` Kafka topic",
  "config": {
    "sharedConfigName": "localKafkaCluster",
    "framework": {
      "payloadConverters": [
        {
          "name": "$Avro_Value_to_JSON"
        }
      ]
    },
    "application": {
      "configuration": {
        "schema.registry.url": "<shemaRegistryURL>",
        "basic.auth.credentials.source": "USER_INFO",
        "basic.auth.user.info": "<API key>:<API secret>"
      },
      "kafkaTopics": [
        "orders"
      ],
      "keyType": "STRING",
      "valueType": "AVRO",
      "diffusionTopicPattern": "kafka/avro/${topic}/${partition}"
    }
  }
}

When consuming Avro data from Kafka topics, the Kafka Avro deserializer will convert the byte array key and value data to `GenericRecord`s. The payload converter then converts the Avro data to JSON, which is published to a JSON Diffusion topic. There are two varieties of Avro record converter:

Avro value to JSON conversion

An Avro value to JSON payload converter is available to convert only Avro value data from Kafka record, and publish to a JSON topic. This means, with this converter, only the value of the Kafka record will be published to Diffusion JSON topic.

To use the Avro value to JSON converter:

  • $Avro_Value_to_JSON must be configured in the payloadConverters configuration in the framework configuration section for this service type.

Avro record to JSON conversion

An Avro record to JSON payload converter is available to convert all details of a Kafka record (key, value, partition, and headers) into JSON format and publish it to a JSON topic. The Avro value will be converted to JsonNode and appended to the final result. This converter can also be configured to convert the Avro key if the Kafka record contains the key in Avro format. The name of this converter is $Avro_Record_to_JSON.

Similar to $Simple_Record_to_JSON, this converter can also be configured to include headers in the final JSON data. Below are the configuration parameters that can be configured for this converter.

Supported configuration parameter for $Avro_Record_to_JSON converter
Parameter Name Type Description Mandatory Default value

headers

List of string

List of headers to include in the created JSON data.
"$all" can be set as its value, to include all header key value pairs.

no

By default, no headers will be added

hasAvroKey

boolean

specifies whether the key is of Avro type. If true, it will also be converted to JSON data

no

false

Below is a simple configuration with $Avro_Record_to_JSON configured and the final JSON data created with this payload converter to be published to Diffusion topic.

    {
      "serviceName": "simpleService",
      "serviceType": "SUBSCRIPTION_SERVICE",
      "description": "Consumes from `sampleTopic` Kafka topic",
      "config": {
        "sharedConfigName": "localKafkaCluster",
        "framework": {
          "payloadConverters": [
            {
              "name": "$Avro_Record_to_JSON",
              "parameters": {
                "headers": [
                  "$all"
                ],
                "hasAvroKey": "false"
              }
            }
          ]
        },
        "application": {
            "configuration": {
                "schema.registry.url": "<shemaRegistryURL>",
                "basic.auth.credentials.source": "USER_INFO",
                "basic.auth.user.info": "<API key>:<API secret>"
            },
            "kafkaTopics": [
              "sampleTopic"
            ],
            "keyType": "STRING",
            "valueType": "AVRO",
            "diffusionTopicPattern": "kafka/${topic}/${partition}"
        }
      }
    }
Example 4. Created JSON data
  {
    "key": "someKey",
    "value": {
        "requestTime" : 9999,
        "employeeNames" : [ "Fred", "Bill" ]
    },
    "partition": 1,
    "headers": {
      "ID": "99",
      "tag": "red"
    }
  }

Other supported Payload converters

Kafka value to JSON conversion

A Kafka value to JSON payload converter is available to extract the value of a Kafka record and convert it to a JSON format. This means this converter can be used to publish the value of the Kafka record to a Diffusion JSON topic. This converter accepts the input data of type ConsumerRecord. The value of valueType in the application configuration should be correctly configured to use this converter. It can be used with any MessageType as valueType except BYTEBUFFER, BYTES, and AVRO. For BYTEARRAY as MessageType, a JSON value containing the String representation of the byte array value will be constructed.

To use the Kafka value to JSON converter:

  • $Kafka_value_to_JSON must be configured in the payloadConverters configuration in the framework configuration section for this service type.

  • valueType must be configured as required.

Kafka value to Diffusion Binary conversion

A Kafka value to Diffusion Binary data payload converter is available to extract the value of a Kafka record and convert it to a Diffusion Binary format. This means this converter can be used to publish the value of the Kafka record to a Diffusion Binary topic. This converter accepts input data of type ConsumerRecord. The value of valueType in the application configuration should be BYTEARRAY to use this converter.

To use the Kafka value to Diffusion Binary converter:

  • $Kafka_value_to_BINARY must be configured in the payloadConverters configuration in the framework configuration section for this service type.

  • valueType must be configured as BYTEARRAY.

Kafka value to String conversion

A Kafka value to String payload converter is available to extract the value of a Kafka record and convert it to a String value. This converter accepts input data of type ConsumerRecord. The string representation of the value of the supplied ConsumerRecord will be returned by the converter.

If the value of the consumer record is of type BYTEARRAY, the byte array value should contain string. By default, UTF-8 charset will be used to decode the bytes into string. This can be configured by specifying charSet configuration parameter for the payload converter as following:

  "payloadConverters": [
    {
      "name": "$Kafka_value_to_String",
      "parameters": {
        "charSet": "ASCII"
      }
    }
  ]

To use the Kafka value to String converter:

  • KafkaValueToStringConverter must be configured in the payloadConverters configuration in the framework configuration section for this service type.

  • If the value is of type BYTEARRAY and the character set to be used to decode the bytes is other than the default UTF-8 character set, required character set to be used should be configured as presented above.

See Configuring the adapter for a complete example of the configuration for the adapter with configuration for SUBSCRIPTION_SERVICE service.