Publishing service

The PUBLISHING_SERVICE is a service type of Sink mode. It can be used to publish to Kafka topics with updates from Diffusion topic subscription.

There can be multiple instances of this service type, added to the adapter, to update data from different Diffusion topics to different Kafka topics, using different configuration.

Complete configuration for this service consists of framework required configuration for sink service and this service type specific configuration.

Example 1. Sample configuration for a service of type PUBLISHING_SERVICE
{
      "serviceName": "kafkaPublisherService",
      "serviceType": "PUBLISHING_SERVICE",
      "description": "Consumes from `?users//` Diffusion topic selector which is of JSON dataType and publishes to Kafka",
      "config": {
        "sharedConfigName": "localKafkaCluster",
        "framework": {
          "diffusionTopicSelector": "?users//"
        },
        "application": {
          "configuration": {
            "client.id": "diffusionGatewayProducer",
            "request.timeout.ms": "30000"
          },
          "kafkaTopicPattern": "diffusion.${topic}",
          "keyValue": "key"
        }
      }
    }

Sink service Framework configuration

Details about supported configuration parameters for framework configuration for Sink service can be found here.

Application configuration

The supported application configuration parameters for this service type are defined in the table below:

Publishing service application configuration
Name Type Description Mandatory Default value

kafkaTopicPattern

string

The Kafka topic pattern to be used to create/publish to Kafka topics. The pattern can contain ${topic}, which will be replaced by the Diffusion topic path. / in the Diffusion topic will be replaced by .. For example, If this configuration’s value is: 'diffusion.${topic}', Diffusion topic is 'a/b; then, the resulting Kafka topic created will be 'diffusion.a.b'.

no

${topic}

keyValue

string

The value for the 'key' to be set for Kafka records created by this service. If not set, the Diffusion topic path will be used.

no

n/a

valueType

MessageType

The expected type of value for the Kafka producer. A corresponding value serializer class name is set for the Kafka producer based on this configuration. If not explicitly set, the serializer class name will be deduced from the type of the received update. See here for more details

no

n/a

Apart from above configs, configuration of KAFKA_CLUSTER sharedConfig can also be included within this configuration, if defining separate sharedConfig instance it is not required, or any of the sharedConfig configuration parameter is to be overridden.

Data subscription, conversion and publication

When a PUBLISHING_SERVICE service is added and started, the framework subscribes to any topic paths that match the configured Diffusion topic selector defined in the diffusionTopicSelector within the framework configuration. Any updates on the subscribed topics are converted as required or configured and used to create the value of a Kafka record. The "key" of the record is set according to the value defined in the keyValue configuration, which will be of type String, and the partition is internally calculated using the "key" value. The resulting Kafka record is then published to the Kafka topic defined in `kafkaTopicPattern.

The valueType configuration parameter in the service configuration defines the value serializer to be used for configuring the KafkaProducer, which publishes created records to Kafka topics. If the valueType parameter is not set, the framework inspects the type of update published to decide the value serializer to use for the KafkaProducer.

The type of update published by the framework depends on whether the payloadConverters configuration parameter is set for the service. If configured, the update type will be the output type of the specified payload converter (or the output type of the last converter in the list if multiple converters are defined). If the payloadConverters configuration parameter is not set, the update type depends on the type of the Diffusion topic path from which the update is received. The table below illustrates the mapping between different Diffusion topic types and the corresponding types of updates published to the sink service when a payload converter is not configured.

Diffusion topic type and update value type mapping
Diffusion topic type Update value type

JSON

String

STRING

String

INT64

Long

DOUBLE

Double

Binary

bytes

For a deeper understanding of how payload converters are used, please refer here. See here for the list of all issued payload converters by the framework.

If the serializer cannot be deduced from the type of the received update by the sink service, an exception will be thrown. Therefore, when configuring the payload converter for the sink service, ensure that the final output type of the converter matches one of the following:

  • byte[]

  • ByteBuffer

  • Bytes

  • Double

  • Float

  • Integer

  • Long

  • Short

  • String

  • UUID

  • JsonNode

  • GenericContainer

The framework checks compatibility between the type of updates received from Diffusion topics, the type of data to which these updates are converted (if payload converters are configured), and the type of data expected by the sink service. Any incompatible data that may result in a serialization exception will be ignored, and the topics from which such updates are received will be unsubscribed. Therefore, it is essential to appropriately configure payloadConverters, kafkaTopicPattern, and valueType to publish to Kafka topics with the desired value types.

As an illustration, consider the following sample configuration for a sink service of type PUBLISHING_SERVICE. This service subscribes to the Diffusion topic doubles, which produces updates of type DOUBLE. The configuration specifies that the service should publish updates from the doubles Diffusion topic to the doubles Kafka topic. In this example, since payloadConverters is not explicitly configured, an internal default payload converter will be used. This converter directly passes the Double value to the sink service, which is then published to the Kafka topic.

{
  "serviceName": "doublesPublisher",
  "serviceType": "PUBLISHING_SERVICE",
  "description": "Consumes from Doubles Diffusion topic and publishes to Kafka",
  "config": {
    "sharedConfigName": "localKafkaCluster",
    "framework": {
      "diffusionTopicSelector": "doubles"
    },
    "application": {
      "kafkaTopicPattern": "doubles",
      "valueType": "DOUBLE"
    }
  }
}

However, if the value type is incorrectly set to STRING as follows, updates from the doubles Diffusion topic will be ignored, leading to the eventual unsubscription of the doubles Diffusion topic for this service.

{
  "serviceName": "doublesPublisher",
  "serviceType": "PUBLISHING_SERVICE",
  "description": "Consumes from Doubles Diffusion topic and publishes to Kafka",
  "config": {
    "sharedConfigName": "localKafkaCluster",
    "framework": {
      "diffusionTopicSelector": "doubles"
    },
    "application": {
      "kafkaTopicPattern": "doubles",
      "valueType": "STRING"
    }
  }
}

Meanwhile, a sink service with the following configuration will receive updates from any topics that match the configured ?data// selector. These topics can be of different Diffusion topic types. The updates from different topic types will be published to corresponding Kafka topics, with the serializer deduced from the types of updates received.

{
  "serviceName": "dynamicPublisher",
  "serviceType": "PUBLISHING_SERVICE",
  "description": "Consumes from Diffusion topics and publishes to Kafka",
  "config": {
    "sharedConfigName": "localKafkaCluster",
    "framework": {
      "diffusionTopicSelector": "?data//"
    },
    "application": {
      "kafkaTopicPattern": "diffusion.${topic}"
    }
  }
}
The sink service can handle different types of updates received from Diffusion topics and publish them to Kafka topics as configured. However, if the subscription to the Diffusion topic is for a specific path that publishes values of a specific type, or if payloadConverters is configured (ensuring that updates of the same type are published to the sink service), setting valueType pre-defines the serializer to be used for the service. This is more efficient than dynamically checking each update for its value type to access the serializer to be used.

JSON to Avro conversion

The JSON to Avro payload converter can be used to consume JSON data from a Diffusion topic and publish it to an Avro Kafka topic. This converter is issued by the framework. See here to understand about the converter.

To use the JSON to Avro converter:

  • $JSON_to_Avro must be configured in the payloadConverters configuration in the framework configuration section for the sink service. This converter requires schemaFilePath that is a mandatory configuration parameter.

To produce Avro data:

  • the AVRO Message type must also be set for valueType configuration.

The application uses Kafka Avro serializer (provided by Confluent), to serialize Generic Record created by the converter. Hence, you must set additional required configuration settings for the Kafka producer, such as schema.registry.url.

Example 2. Sample service configuration for PUBLISHING_SERVICE typed sink service, to consume JSON data from Diffusion topic, and publish to Avro topic
{
  "serviceName": "avroPublisher",
  "serviceType": "PUBLISHING_SERVICE",
  "description": "Published Avro data to Kafka",
  "config": {
    "sharedConfigName": "localKafkaCluster",
    "framework": {
      "diffusionTopicSelector": "sourceTopic",
      "payloadConverters": [
        {
          "name": "$JSON_to_Avro",
          "parameters": {
            "schemaFilePath": "/data/resources/ordersSchema.avsc"
          }
        }
      ]
    },
    "application": {
      "configuration": {
        "client.id": "diffusionGatewayProducer",
        "schema.registry.url": "<schemaRegistryURL>",
        "basic.auth.credentials.source": "USER_INFO",
        "basic.auth.user.info": "<API key>:<API secret>"
      },
      "kafkaTopicPattern": "diffusion.${topic}",
      "keyValue": "key",
      "valueType": "AVRO"
    }
  }
}

See Configuring the adapter for a complete example of the configuration for the adapter with configuration for PUBLISHING_SERVICE service.