Configuring the adapter

Prerequisite: See configuring Gateway Application for an overview on how to configure a Gateway application.

The structure of configuration for the Kafka adapter is similar to any Gateway application with the difference of configuration for supported service types, sharedConfig type and global configuration for the application.

A sample configuration file for the Kafka adapter, which contains configuration for publishing and subscribing services, KAFKA_CLUSTER sharedConfig and configuration for metrics is presented below:

Example 1. Sample configuration file
{
  "id": "kafka-adapter-1",
  "framework-version": 1,
  "application-version": 1,
  "diffusion": {
    "url": "ws://localhost:8090",
    "principal": "admin",
    "password": "password",
    "reconnectIntervalMs": 5000
  },
  "sharedConfigs": [
    {
      "sharedConfigName": "localKafkaCluster",
      "sharedConfigType": "KAFKA_CLUSTER",
      "description": "local Kafka cluster details",
      "config": {
        "application": {
          "bootstrapServers": [
            "localhost:9092",
            "localhost:9093"
          ],
          "configuration": {
            "default.api.timeout.ms": 5000,
            "connections.max.idle.ms": 60000,
            "metadata.max.age.ms": 1000
          }
        }
      }
    }
  ],
  "services": [
    {
      "serviceName": "regexSubscription_fx",
      "serviceType": "SUBSCRIPTION_SERVICE",
      "description": "Consumes from Kafka topic starting with 'fx' with key of type 'String' and value of type 'JSON'",
      "config": {
        "sharedConfigName": "localKafkaCluster",
        "framework": {
          "payloadConverters": [
            {
              "name": "$Simple_Record_to_JSON",
              "parameters": {
                "headers": [
                  "$all"
                ]
              }
            },
            {
              "name": "$JSON_to_String"
            }
          ],
          "topicProperties": {
            "persistencePolicy": "SERVER",
            "timeSeries": true,
            "timeSeriesRetainedRange": "last 1H",
            "doNotRetainValue": false
          }
        },
        "application": {
          "consumerGroupId": "diffusionKafkaAdapter",
          "consumerCount": 3,
          "keyType": "STRING",
          "valueType": "JSON",
          "diffusionTopicPattern": "kafka/${topic}/${partition}",
          "configuration": {
            "auto.offset.reset": "earliest"
          },
          "kafkaTopics": ["fx.*"],
          "regexSubscription": true
        }
      }
    },
    {
      "serviceName": "listTopicSubscription",
      "serviceType": "SUBSCRIPTION_SERVICE",
      "description": "Consumes from list of Kafka topics",
      "config": {
        "sharedConfigName": "localKafkaCluster",
        "framework": {
          "topicProperties": {
            "persistencePolicy": "SERVER",
            "timeSeries": true,
            "timeSeriesRetainedRange": "last 1H",
            "doNotRetainValue": false
          }
        },
        "application": {
          "updateMode": "SIMPLE",
          "configuration": {
            "auto.offset.reset": "earliest"
          },
          "consumerGroupId": "diffusionKafkaAdapter",
          "consumerCount": 5,
          "kafkaTopics": [
            "sampleTopic",
            "examples"
          ],
          "keyType": "STRING",
          "valueType": "STRING",
          "diffusionTopicPattern": "kafka/${topic}/${partition}"
        }
      }
    },
    {
      "serviceName": "kafkaPublisherService",
      "serviceType": "PUBLISHING_SERVICE",
      "description": "Consumes from JSON Diffusion topic and publishes to Kafka",
      "config": {
        "sharedConfigName": "localKafkaCluster",
        "framework": {
          "diffusionTopicSelector": "?kafka//"
        },
        "application": {
          "configuration": {
            "client.id": "diffusionGatewayProducer",
            "request.timeout.ms": "30000"
          },
          "kafkaTopicPattern": "diffusion.${topic}",
          "keyValue": "key"
        }
      }
    },
    {
      "serviceName": "kafkaPublisherService2",
      "serviceType": "PUBLISHING_SERVICE",
      "description": "Consumes from Doubles Diffusion topic and publishes to Kafka",
      "config": {
        "sharedConfigName": "localKafkaCluster",
        "framework": {
          "diffusionTopicSelector": "doubles"
        },
        "application": {
          "kafkaTopicPattern": "doubles"
        }
      }
    }
  ],
  "global": {
    "framework": {
      "threadPoolSize": 10,
      "mode": "DYNAMIC",
      "metrics": {
        "enabled": true
      }
    },
    "application": {
      "prometheus": {
        "port": 8081,
        "path": "/metrics"
      }
    }
  }
}

Configuration version

The application configuration version this adapter supports is 1.

The framework configuration version expected by the framework used with this adapter is also 1.

Hence, the configuration for the adapter should be created with following configuration versions:

{
    ...
    "framework-version": 1,
    "application-version": 1
    ...
}