Database reader service

The DATABASE_READER is a service type of Streaming Source mode. It can be used to start the Debezium connector engine and begin streaming changed data values from the database to Diffusion topics.

There can be multiple instances of this service type added to the adapter to consume data from different tables in a database, using different configurations.

The complete configuration for this service consists of the framework required configuration for the streaming service and this service type-specific configuration.

Streaming service Framework configuration

Details about supported configuration parameters for the framework configuration of the Streaming service can be found here.

Application configuration

The supported application configuration parameters for this service are defined in the table below:

Database reader service application configuration
Name Type Description Mandatory Default value

debeziumConfigs

map

A map of configurations for the Debezium Engine for a specific Debezium connector. A list of supported Debezium configurations for a specific database can be found in its official documentation.

no

empty map

topicMapping

TopicMappingStrategy

The type of topic mapping strategy to be used. It defines how data change events per row are mapped to a Diffusion topic.

no

OBJECT

includeSchema

boolean

A flag to specify whether to capture the schema of the table.
If set to true, the schema will be published to a separate Diffusion topic for each configured table of the database.

no

false

snapshot

boolean

A flag to specify whether to capture a snapshot of the database. This will depend on the value set for the snapshot.mode configuration property for the Debezium engine.

no

false

diffusionTopicPattern

string

The Diffusion topic pattern to use in order to create/publish to Diffusion topics.
The pattern can contain ${database} and ${table}, which will be replaced by the database name and table name, respectively.
For example, if this configuration’s value is: 'diffusion/${database}/${table}', and the database name is 'database', and the table name is 'table', then the resulting Diffusion topic created will be 'diffusion/database/table'.

For ROW/RAW topic mapping strategy, the primary key value for the row will also be appended to the final Diffusion topic.

no

${database}/${table}

TopicMappingStrategy

Each event (insert/update/delete/read(when doing a snapshot)) from the database is captured and published to the server. The adapter supports three different methods to map these events to a Diffusion topic.

  • Object method: A table is mapped to a JSON topic, with each record being a JSON object keyed by the table’s primary key. If a table does not have a primary key, updates for this table are ignored.

  • Row method: Each table row is mapped to an individual JSON topic. The topic is created using the value of the diffusionTopicPattern configuration with primary keys as postfix. Example: database/table/pk1.
    If a table does not have a primary key, and this setting is used, updates for this table are ignored.

  • Raw method: As per row, but the topic contents are exactly as received from Debezium. This includes schema information and table row data, before and after the change.

If a table has a composite primary key, values of those keys will be escaped and concatenated together with , to formulate a complete primary key combination. This will be used in Object and Row topic mapping, as defined above.
Example 1. Sample DATABASE_READER service configuration snippet
{
  "serviceName": "addressesTableReader",
  "serviceType": "DATABASE_READER",
  "description": "Reads 'addresses' table and publishes to Diffusion with OBJECT type topic mapping",
  "config": {
    "sharedConfigName": "mySqlDatabaseConnector",
    "framework": {
      "topicProperties": {
        "persistencePolicy": "SESSION"
      }
    },
    "application": {
      "debeziumConfigs": {
        "database.server.name": "addressesTableReader",
        "name": "addressesTableReader",
        "database.server.id": "8194",
        "database.include.list": "inventory",
        "table.include.list": "inventory.addresses"
      },
      "snapshot": false,
      "includeSchema": true,
      "diffusionTopicPattern": "prefix/${database}/${table}"
    }
  }
}

Integration with Custom Publisher

The CDC adapter supports using Custom Publishers with the Database reader services to process the contents of the table before publication.

The custom publisher for the CDC adapter should process JSON updates using an UpdateContext of type CdcUpdateContext. The CdcUpdateContext contains the database name and table name from which the update is received, in addition to the default Diffusion topic path and default topic properties. The JSON update provided to the custom publisher’s process method will depend on the topicMappingStrategy configured for the service.

Below is a sample implementation of the custom publisher for CDC adapter:

public final class CdcCustomPublisher
    extends CustomPublisher<JSON, CdcUpdateContext> {

    private final Publisher publisher;

    public CdcCustomPublisher(
        Publisher publisher,
        Map<String, Object> configContext,
        Map<String, ? extends PayloadConverter<?, ?>> payloadConverterMap) {

        super(publisher);
        this.publisher = publisher;
    }

    @Override
    public CompletableFuture<?> process(
        JSON update,
        CdcUpdateContext updateContext) throws PayloadConversionException {

        String database = updateContext.getDatabase();
        String table = updateContext.getTable();
        TopicProperties topicProperties = updateContext.getTopicProperties();
        String diffusionTopic = updateContext.getDiffusionTopic();

        // TODO process the JSON update as required and publish using the 'publisher'

        return CompletableFuture.completedFuture(null);
    }
}

The CdcUpdateContext is available in the CDC adapter JAR, which should be supplied to the custom publisher as a dependency. For details on including the adapter JAR in the custom publisher, refer to the userguide for Custom Publishers.