Publishing service
The PUBLISHING_SERVICE
is a service type of Sink mode. It can be used to publish to Kafka topics with updates from Diffusion topic subscription.
There can be multiple instances of this service type, added to the adapter, to update data from different Diffusion topics to different Kafka topics, using different configuration.
Complete configuration for this service consists of framework required configuration for sink service and this service type specific configuration.
{
"serviceName": "kafkaPublisherService",
"serviceType": "PUBLISHING_SERVICE",
"description": "Consumes from `?users//` Diffusion topic selector which is of JSON dataType and publishes to Kafka",
"config": {
"sharedConfigName": "localKafkaCluster",
"framework": {
"diffusionTopicSelector": "?users//"
},
"application": {
"configuration": {
"client.id": "diffusionGatewayProducer",
"request.timeout.ms": "30000"
},
"kafkaTopicPattern": "diffusion.${topic}",
"keyValue": "key"
}
}
}
Sink service Framework configuration
Details about supported configuration parameters for framework configuration for Sink service can be found here.
Application configuration
The supported application configuration parameters for this service type are defined in the table below:
Name | Type | Description | Mandatory | Default value |
---|---|---|---|---|
kafkaTopicPattern |
string |
The Kafka topic pattern to be used to create/publish to Kafka topics. The pattern can contain ${topic}, which will be replaced by the Diffusion topic path. |
no |
${topic} |
keyValue |
string |
The value for the 'key' to be set for Kafka records created by this service. If not set, the Diffusion topic path will be used. |
no |
n/a |
valueType |
The expected type of value for the Kafka producer. A corresponding value serializer class name is set for the Kafka producer based on this configuration. If not explicitly set, the serializer class name will be deduced from the type of the received update. See here for more details |
no |
n/a |
Apart from above configs, configuration of KAFKA_CLUSTER sharedConfig can also be included within this configuration, if defining separate sharedConfig instance it is not required, or any of the sharedConfig configuration parameter is to be overridden. |
Data subscription, conversion and publication
When a PUBLISHING_SERVICE
service is added and started, the framework subscribes to any topic paths that match the
configured Diffusion topic selector defined in the diffusionTopicSelector
within the framework configuration. Any updates
on the subscribed topics are converted as required or configured and used to create the value of a Kafka record. The "key"
of the record is set according to the value defined in the keyValue
configuration, which will be of type String, and
the partition is internally calculated using the "key" value. The resulting Kafka record is then published to the Kafka
topic defined in `kafkaTopicPattern
.
The valueType
configuration parameter in the service configuration defines the value serializer to be used for configuring
the KafkaProducer
, which publishes created records to Kafka topics. If the valueType
parameter is not set, the framework
inspects the type of update published to decide the value serializer to use for the KafkaProducer
.
The type of update published by the framework depends on whether the payloadConverters
configuration parameter is set
for the service. If configured, the update type will be the output type of the specified payload converter (or the output
type of the last converter in the list if multiple converters are defined). If the payloadConverters
configuration
parameter is not set, the update type depends on the type of the Diffusion topic path from which the update is received.
The table below illustrates the mapping between different Diffusion topic types and the corresponding types of updates
published to the sink service when a payload converter is not configured.
Diffusion topic type | Update value type |
---|---|
JSON |
String |
STRING |
String |
INT64 |
Long |
DOUBLE |
Double |
Binary |
bytes |
For a deeper understanding of how payload converters are used, please refer here. See here for the list of all issued payload converters by the framework. |
If the serializer cannot be deduced from the type of the received update by the sink service, an exception will be thrown. Therefore, when configuring the payload converter for the sink service, ensure that the final output type of the converter matches one of the following:
-
byte[]
-
ByteBuffer
-
Bytes
-
Double
-
Float
-
Integer
-
Long
-
Short
-
String
-
UUID
-
JsonNode
-
GenericContainer
The framework checks compatibility between the type of updates received from Diffusion topics, the type of data to which
these updates are converted (if payload converters are configured), and the type of data expected by the sink service.
Any incompatible data that may result in a serialization exception will be ignored, and the topics from which such updates
are received will be unsubscribed. Therefore, it is essential to appropriately configure payloadConverters
,
kafkaTopicPattern
, and valueType
to publish to Kafka topics with the desired value types.
As an illustration, consider the following sample configuration for a sink service of type PUBLISHING_SERVICE
. This service
subscribes to the Diffusion topic doubles
, which produces updates of type DOUBLE
. The configuration specifies that the
service should publish updates from the doubles
Diffusion topic to the doubles
Kafka topic. In this example, since
payloadConverters
is not explicitly configured, an internal default payload converter will be used. This converter
directly passes the Double
value to the sink service, which is then published to the Kafka topic.
{
"serviceName": "doublesPublisher",
"serviceType": "PUBLISHING_SERVICE",
"description": "Consumes from Doubles Diffusion topic and publishes to Kafka",
"config": {
"sharedConfigName": "localKafkaCluster",
"framework": {
"diffusionTopicSelector": "doubles"
},
"application": {
"kafkaTopicPattern": "doubles",
"valueType": "DOUBLE"
}
}
}
However, if the value type is incorrectly set to STRING
as follows, updates from the doubles
Diffusion topic will be
ignored, leading to the eventual unsubscription of the doubles
Diffusion topic for this service.
{
"serviceName": "doublesPublisher",
"serviceType": "PUBLISHING_SERVICE",
"description": "Consumes from Doubles Diffusion topic and publishes to Kafka",
"config": {
"sharedConfigName": "localKafkaCluster",
"framework": {
"diffusionTopicSelector": "doubles"
},
"application": {
"kafkaTopicPattern": "doubles",
"valueType": "STRING"
}
}
}
Meanwhile, a sink service with the following configuration will receive updates from any topics that match the configured
?data//
selector. These topics can be of different Diffusion topic types. The updates from different topic types will
be published to corresponding Kafka topics, with the serializer deduced from the types of updates received.
{
"serviceName": "dynamicPublisher",
"serviceType": "PUBLISHING_SERVICE",
"description": "Consumes from Diffusion topics and publishes to Kafka",
"config": {
"sharedConfigName": "localKafkaCluster",
"framework": {
"diffusionTopicSelector": "?data//"
},
"application": {
"kafkaTopicPattern": "diffusion.${topic}"
}
}
}
The sink service can handle different types of updates received from Diffusion topics and publish them to Kafka topics as configured. However, if the subscription to the Diffusion topic is for a specific path that publishes values of a specific type, or if payloadConverters is configured (ensuring that updates of the same type are published to the sink service), setting valueType pre-defines the serializer to be used for the service. This is more efficient than dynamically checking each update for its value type to access the serializer to be used. |
JSON to Avro conversion
The JSON to Avro payload converter can be used to consume JSON data from a Diffusion topic and publish it to an Avro Kafka topic. This converter is issued by the framework. See here to understand about the converter.
To use the JSON to Avro converter:
-
$JSON_to_Avro must be configured in the
payloadConverters
configuration in theframework
configuration section for the sink service. This converter requiresschemaFilePath
that is a mandatory configuration parameter.
To produce Avro data:
-
the AVRO Message type must also be set for
valueType
configuration.
The application uses Kafka Avro serializer (provided by Confluent), to serialize Generic Record created by the converter. Hence, you must set additional required configuration settings for the Kafka producer, such as schema.registry.url
.
PUBLISHING_SERVICE
typed sink service, to consume JSON data from Diffusion topic, and publish to Avro topic{
"serviceName": "avroPublisher",
"serviceType": "PUBLISHING_SERVICE",
"description": "Published Avro data to Kafka",
"config": {
"sharedConfigName": "localKafkaCluster",
"framework": {
"diffusionTopicSelector": "sourceTopic",
"payloadConverters": [
{
"name": "$JSON_to_Avro",
"parameters": {
"schemaFilePath": "/data/resources/ordersSchema.avsc"
}
}
]
},
"application": {
"configuration": {
"client.id": "diffusionGatewayProducer",
"schema.registry.url": "<schemaRegistryURL>",
"basic.auth.credentials.source": "USER_INFO",
"basic.auth.user.info": "<API key>:<API secret>"
},
"kafkaTopicPattern": "diffusion.${topic}",
"keyValue": "key",
"valueType": "AVRO"
}
}
}
See Configuring the adapter for a complete example of the configuration for the adapter with configuration for PUBLISHING_SERVICE
service.