Subscription service
The SUBSCRIPTION_SERVICE
is a service type of Streaming Source mode. It can be used to subscribe to Kafka topics and publish to Diffusion topics.
There can be multiple instances of this service type, added to the adapter, to consume data from different Kafka topics and publish them to different Diffusion topics, using different configuration.
Complete configuration for this service consists of framework required configuration for streaming service and this service type specific configuration.
{
"serviceName": "fxRegexSubscription",
"serviceType": "SUBSCRIPTION_SERVICE",
"description": "Consumes from Kafka topic starting with 'fx' with key of type 'String' and value of type 'JSON'",
"config": {
"sharedConfigName": "localKafkaCluster",
"framework": {
"topicProperties": {
"persistencePolicy": "SERVER",
"timeSeries": true,
"timeSeriesRetainedRange": "last 1H",
"doNotRetainValue": false
}
},
"application": {
"consumerGroupId": "diffusionKafkaAdapter",
"consumerCount": 3,
"keyType": "STRING",
"valueType": "JSON",
"diffusionTopicPattern": "kafka/${topic}/${partition}",
"configuration": {
"auto.offset.reset": "latest"
},
"kafkaTopics": ["fx.*"],
"regexSubscription": true
}
}
}
Streaming service Framework configuration
Details about supported configuration parameters for framework configuration for Streaming service can be found here.
Application configuration
The supported application configuration parameters for this service are defined in the table below:
Name | Type | Description | Mandatory | Default value |
---|---|---|---|---|
kafkaTopics |
Comma separated string values |
List of Kafka topic or regex value to subscribe to |
yes |
n\a |
regexSubscription |
boolean |
Flag to specify if the subscription is for Kafka topic pattern |
no |
false |
consumerGroupId |
string |
A unique string that identifies the consumer group for Kafka consumers created for the service |
no |
Diffusion_Kafka_Adapter_ConsumerGroup |
consumerCount |
number |
Number of Kafka consumers to be created for the service |
no |
1 |
keyType |
Data type of 'key' for defined Kafka topic subscriptions. Supported values are defined below. Value of this config is used to set value of |
Yes, if |
n/a |
|
valueType |
Data type of 'value' for this Kafka topics subscription. Supported values are defined below. Value of this config is used to set value of |
Yes, if |
n/a |
|
diffusionTopicPattern |
String |
Diffusion topic pattern to use to create/publish to Diffusion topics. The pattern can contain ${topic},${key} and ${partition}, which will be replaced by Kafka topic, key and partition value respectively. For example, If this configuration’s value is: 'diffusion/${topic}/${key}/${partition}', the Kafka topic is 'a.b', the key is "key", and the partition is 0; then the resulting diffusion topic created will be 'diffusion/a.b/key/0'. |
no |
${topic}/${partition} |
createTopicHierarchy |
boolean |
Kafka supports use of '.', '-' and '_' characters in its topic name. If Kafka topic name is to be used in created Diffusion topic name, such characters can be used as they are, or a Diffusion topic hierarchy can be created by replacing such chars with '/'. This flag can be set to true if a hierarchical Diffusion topic name should be created from Kafka topic. For example, If value of configuration |
no |
false |
pollTimeoutMs |
number |
Kafka consumer poll timeout in milliseconds. |
no |
500 |
updateMode |
Update mode to be used to publish data to Diffusion topic. |
no |
STREAMING |
Apart from above configs, configuration of KAFKA_CLUSTER sharedConfig can also be included within this configuration, if defining separate sharedConfig instance it is not required/preferred, or any of the sharedConfig configuration parameter is to be overridden. |
MessageType
The following values can be set as MessageType for keyType
and valueType
:
BYTEARRAY |
LONG |
BYTEBUFFER |
SHORT |
BYTES |
STRING |
DOUBLE |
UUID |
FLOAT |
JSON |
INTEGER |
AVRO |
UpdateMode
The following values can be set as UpdateMode for the updateMode
configuration parameter:
UpdateMode | Description |
---|---|
SIMPLE |
Simple asynchronous Update Mode. In this mode, each publish will be a separate update of the topic - sending the full value to the server. This allows for normal publishing and also for applying patches to JSON topics. The updates are sent asynchronously to the server. If the update fails with a transient, retryable error, a retry will be performed with the last value, set for the topic. |
SIMPLE_SYNC |
Simple synchronous Update Mode. In this mode, each publish will be a separate update of the topic -
sending the full value to the server. This allows for normal
publishing and also for applying patches to JSON topics. The updates are sent synchronously to the server. If the update fails with a transient, retryable error, a retry will be performed. Only after all retry attempts are exhausted, successive updates will be published to server. Hence, this mode can be less performant compared to others. |
STREAMING |
Streaming Update Mode. In this mode, sending updates to the server takes advantage of the
Diffusion 'delta' capabilities, so that only changes to the value
are sent to the server. This significantly reduces bandwidth. The updates are sent asynchronously to the server. If the
update fails with a transient, retryable error, a retry will be
performed with the last value, set to the update stream. In this mode it is not possible to apply patches to JSON topics. |
Data consumption, conversion and publication to Diffusion
In the SUBSCRIPTION_SERVICE service type of Kafka adapter, a list of Kafka topics or topic pattern defined in the kafkaTopics
configuration within the framework configuration will be subscribed to. Any updates on such Kafka topics are received and converted as configured and published to Diffusion topic which is defined with diffusionTopicPattern
.
If a payload converter or topic type is not defined in the service configuration for this service type, the default, $Simple_Record_to_JSON will be used. This converter accepts a Kafka record and transforms it into a JSON value containing key, value and partition data of the record. The output data will then be published to the JSON Diffusion topic.
Consequently, with the following simple configuration, updates from sampleTopic
Kafka topic will be converted to JSON format and published to Diffusion topic kafka/sampleTopic/someKey
.
{ "serviceName": "simpleService", "serviceType": "SUBSCRIPTION_SERVICE", "description": "Consumes from `sampleTopic` Kafka topic", "config": { "sharedConfigName": "localKafkaCluster", "application": { "kafkaTopics": [ "sampleTopic" ], "keyType": "STRING", "valueType": "STRING", "diffusionTopicPattern": "kafka/${topic}/${key}" } } }
With above configuration, kafka/sampleTopic/someKey
Diffusion topic will be published with following JSON content:
{
"key": "someKey",
"value": "example data",
"partition": 1
}
$Simple_Record_to_JSON can be configured in the configuration explicitly as well. This converter supports configuring an optional headers
parameter which adds headers of the Kafka record in the output JSON data. This field is a list of header names to be extracted. "$all" will include all header key value pairs.
Below is a simple configuration with $Simple_Record_to_JSON configured and the final JSON data created with this payload converter to be published to Diffusion topic.
{ "serviceName": "simpleService", "serviceType": "SUBSCRIPTION_SERVICE", "description": "Consumes from `sampleTopic` Kafka topic", "config": { "sharedConfigName": "localKafkaCluster", "framework": { "payloadConverters": [ { "name": "$Simple_Record_to_JSON", "parameters": { "headers": [ "$all" ] } } ] }, "application": { "kafkaTopics": [ "sampleTopic" ], "keyType": "STRING", "valueType": "STRING", "diffusionTopicPattern": "kafka/${topic}/${partition}" } } }
{
"key": "someKey",
"value": "example data",
"partition": 1,
"headers": {
"ID": "99",
"tag": "red"
}
}
Avro data conversion
The Kafka adapter provides payload converters that can be used to convert Avro data to JSON Diffusion topic.
To consume Avro data:
-
the AVRO Message type must also be set for
valueType
configuration. -
if
key
of the records is also of Avro type, the AVRO Message type must be set forkeyType
configuration
The application uses Kafka Avro deserializer (provided by Confluent) to deserialize Avro data. Hence, you must set additional required configuration settings for the Kafka consumer, such as schema.registry.url
.
SUBSCRIPTION_SERVICE
typed source service, to consume Avro value data from a Kafka topic, which uses Confluent schema registry.{
"serviceName": "simpleService",
"serviceType": "SUBSCRIPTION_SERVICE",
"description": "Consumes from `orders` Kafka topic",
"config": {
"sharedConfigName": "localKafkaCluster",
"framework": {
"payloadConverters": [
{
"name": "$Avro_Value_to_JSON"
}
]
},
"application": {
"configuration": {
"schema.registry.url": "<shemaRegistryURL>",
"basic.auth.credentials.source": "USER_INFO",
"basic.auth.user.info": "<API key>:<API secret>"
},
"kafkaTopics": [
"orders"
],
"keyType": "STRING",
"valueType": "AVRO",
"diffusionTopicPattern": "kafka/avro/${topic}/${partition}"
}
}
}
When consuming Avro data from Kafka topics, the Kafka Avro deserializer will convert the byte array key and value data to `GenericRecord`s. The payload converter then converts the Avro data to JSON, which is published to a JSON Diffusion topic. There are two varieties of Avro record converter:
Avro value to JSON conversion
An Avro value to JSON payload converter is available to convert only Avro value data from Kafka record, and publish to a JSON topic. This means, with this converter, only the value of the Kafka record will be published to Diffusion JSON topic.
To use the Avro value to JSON converter:
-
$Avro_Value_to_JSON must be configured in the
payloadConverters
configuration in theframework
configuration section for this service type.
Avro record to JSON conversion
An Avro record to JSON payload converter is available to convert all details of a Kafka record (key, value, partition, and headers) into JSON format and publish it to a JSON topic. The Avro value will be converted to JsonNode
and appended to the final result. This converter can also be configured to convert the Avro key if the Kafka record contains the key in Avro format. The name of this converter is $Avro_Record_to_JSON.
Similar to $Simple_Record_to_JSON, this converter can also be configured to include headers in the final JSON data. Below are the configuration parameters that can be configured for this converter.
Parameter Name | Type | Description | Mandatory | Default value |
---|---|---|---|---|
headers |
List of string |
List of headers to include in the created JSON data. |
no |
By default, no headers will be added |
hasAvroKey |
boolean |
specifies whether the key is of Avro type. If true, it will also be converted to JSON data |
no |
false |
Below is a simple configuration with $Avro_Record_to_JSON configured and the final JSON data created with this payload converter to be published to Diffusion topic.
{
"serviceName": "simpleService",
"serviceType": "SUBSCRIPTION_SERVICE",
"description": "Consumes from `sampleTopic` Kafka topic",
"config": {
"sharedConfigName": "localKafkaCluster",
"framework": {
"payloadConverters": [
{
"name": "$Avro_Record_to_JSON",
"parameters": {
"headers": [
"$all"
],
"hasAvroKey": "false"
}
}
]
},
"application": {
"configuration": {
"schema.registry.url": "<shemaRegistryURL>",
"basic.auth.credentials.source": "USER_INFO",
"basic.auth.user.info": "<API key>:<API secret>"
},
"kafkaTopics": [
"sampleTopic"
],
"keyType": "STRING",
"valueType": "AVRO",
"diffusionTopicPattern": "kafka/${topic}/${partition}"
}
}
}
{
"key": "someKey",
"value": {
"requestTime" : 9999,
"employeeNames" : [ "Fred", "Bill" ]
},
"partition": 1,
"headers": {
"ID": "99",
"tag": "red"
}
}
Other supported Payload converters
Kafka value to JSON conversion
A Kafka value to JSON payload converter is available to extract the value of a
Kafka record and convert it to a JSON format. This means this converter can be
used to publish the value of the Kafka record to a Diffusion JSON topic. This
converter accepts the input data of type ConsumerRecord
. The value of valueType
in the application configuration should be correctly configured to use this converter.
It can be used with any MessageType
as valueType
except BYTEBUFFER
, BYTES
,
and AVRO
. For BYTEARRAY
as MessageType
, a JSON value containing the String
representation of the byte array value will be constructed.
To use the Kafka value to JSON converter:
-
$Kafka_value_to_JSON must be configured in the payloadConverters configuration in the framework configuration section for this service type.
-
valueType
must be configured as required.
Kafka value to Diffusion Binary conversion
A Kafka value to Diffusion Binary data payload converter is available to extract
the value of a Kafka record and convert it to a Diffusion Binary format. This means
this converter can be used to publish the value of the Kafka record to a Diffusion
Binary topic. This converter accepts input data of type ConsumerRecord
. The value
of valueType
in the application configuration should be BYTEARRAY
to use this
converter.
To use the Kafka value to Diffusion Binary converter:
-
$Kafka_value_to_BINARY must be configured in the payloadConverters configuration in the framework configuration section for this service type.
-
valueType
must be configured asBYTEARRAY
.
Kafka value to String conversion
A Kafka value to String payload converter is available to extract the value of a
Kafka record and convert it to a String value. This converter accepts input data
of type ConsumerRecord
. The string representation of the value of the supplied
ConsumerRecord
will be returned by the converter.
If the value of the consumer record is of type BYTEARRAY
, the byte array value
should contain string. By default, UTF-8
charset will be used to decode the bytes into string.
This can be configured by specifying charSet
configuration parameter for the payload converter
as following:
"payloadConverters": [
{
"name": "$Kafka_value_to_String",
"parameters": {
"charSet": "ASCII"
}
}
]
To use the Kafka value to String converter:
-
KafkaValueToStringConverter must be configured in the payloadConverters configuration in the framework configuration section for this service type.
-
If the value is of type
BYTEARRAY
and the character set to be used to decode the bytes is other than the defaultUTF-8
character set, required character set to be used should be configured as presented above.
See Configuring the adapter for a complete example of the configuration for the adapter with configuration for SUBSCRIPTION_SERVICE
service.
Integration with Custom Publisher
The Kafka adapter supports using Custom Publishers with the subscription services to process the Kafka records before publication.
The custom publisher for the Kafka adapter should process update of type ConsumerRecord
with
UpdateContext
of type KafkaUpdateContext
. The KafkaUpdateContext
contains the
key type and the value type, in addition to the default Diffusion topic path and
default topic properties.
Below is a sample implementation of the custom publisher for Kafka adapter:
public final class KafkaCustomPublisher
extends CustomPublisher<ConsumerRecord<?, ?>, KafkaUpdateContext> {
private final Publisher publisher;
private final PayloadConverter<ConsumerRecord<?, ?>, JSON> simpleRecordToJsonConverter;
public KafkaCustomPublisher(
Publisher publisher,
Map<String, Object> configContext,
Map<String, ? extends PayloadConverter<?, ?>> payloadConverterMap) {
super(publisher);
this.publisher = publisher;
this.simpleRecordToJsonConverter =
(PayloadConverter<ConsumerRecord<?, ?>, JSON>) payloadConverterMap.get("$Simple_Record_to_JSON");
if (simpleRecordToJsonConverter == null) {
throw new IllegalArgumentException(
"No simple record converter found");
}
}
@Override
public CompletableFuture<?> process(
ConsumerRecord<?, ?> update,
KafkaUpdateContext updateContext) throws PayloadConversionException {
MessageType valueType = updateContext.getValueType();
MessageType keyType = updateContext.getKeyType();
TopicProperties topicProperties = updateContext.getTopicProperties();
String diffusionTopic = updateContext.getDiffusionTopic();
// TODO process the update as required, convert with the payload
// converter and publish using the 'publisher'
return CompletableFuture.completedFuture(null);
}
In the example above, the custom publisher expects an instance of a payload converter that converts a ConsumerRecord into JSON in its constructor. An adapter user should configure the customPublisher for the service to include the required payload converter configuration. Expecting a payload converter in the payload converter map argument within the constructor is optional and can be expected and documented as required.
The KafkaUpdateContext
is available in the Kafka adapter JAR, which should be supplied
to the custom publisher as a dependency. For details on including the adapter JAR
in the custom publisher, refer to the userguide for Custom Publishers.