Database reader service
The DATABASE_READER
is a service type of Streaming Source mode. It can be used to start the Debezium connector engine and begin streaming changed data values from the database to Diffusion topics.
There can be multiple instances of this service type added to the adapter to consume data from different tables in a database, using different configurations.
The complete configuration for this service consists of the framework required configuration for the streaming service and this service type-specific configuration.
Streaming service Framework configuration
Details about supported configuration parameters for the framework configuration of the Streaming service can be found here.
Application configuration
The supported application configuration parameters for this service are defined in the table below:
Name | Type | Description | Mandatory | Default value |
---|---|---|---|---|
debeziumConfigs |
map |
A map of configurations for the Debezium Engine for a specific Debezium connector. A list of supported Debezium configurations for a specific database can be found in its official documentation. |
no |
empty map |
topicMapping |
The type of topic mapping strategy to be used. It defines how data change events per row are mapped to a Diffusion topic. |
no |
OBJECT |
|
includeSchema |
boolean |
A flag to specify whether to capture the schema of the table. |
no |
false |
snapshot |
boolean |
A flag to specify whether to capture a snapshot of the database. This will depend on the value set for the snapshot.mode configuration property for the Debezium engine. |
no |
false |
diffusionTopicPattern |
string |
The Diffusion topic pattern to use in order to create/publish to Diffusion topics. For |
no |
${database}/${table} |
TopicMappingStrategy
Each event (insert/update/delete/read(when doing a snapshot)) from the database is captured and published to the server. The adapter supports three different methods to map these events to a Diffusion topic.
-
Object method: A table is mapped to a JSON topic, with each record being a JSON object keyed by the table’s primary key. If a table does not have a primary key, updates for this table are ignored.
-
Row method: Each table row is mapped to an individual JSON topic. The topic is created using the value of the
diffusionTopicPattern
configuration with primary keys as postfix. Example: database/table/pk1.
If a table does not have a primary key, and this setting is used, updates for this table are ignored. -
Raw method: As per row, but the topic contents are exactly as received from Debezium. This includes schema information and table row data, before and after the change.
If a table has a composite primary key, values of those keys will be escaped and concatenated together with , to formulate a complete primary key combination. This will be used in Object and Row topic mapping, as defined above.
|
{
"serviceName": "addressesTableReader",
"serviceType": "DATABASE_READER",
"description": "Reads 'addresses' table and publishes to Diffusion with OBJECT type topic mapping",
"config": {
"sharedConfigName": "mySqlDatabaseConnector",
"framework": {
"topicProperties": {
"persistencePolicy": "SESSION"
}
},
"application": {
"debeziumConfigs": {
"database.server.name": "addressesTableReader",
"name": "addressesTableReader",
"database.server.id": "8194",
"database.include.list": "inventory",
"table.include.list": "inventory.addresses"
},
"snapshot": false,
"includeSchema": true,
"diffusionTopicPattern": "prefix/${database}/${table}"
}
}
}
Integration with Custom Publisher
The CDC adapter supports using Custom Publishers with the Database reader services to process the contents of the table before publication.
The custom publisher for the CDC adapter should process JSON
updates using an
UpdateContext
of type CdcUpdateContext
. The CdcUpdateContext
contains the
database name and table name from which the update is received, in addition to
the default Diffusion topic path and default topic properties. The JSON update
provided to the custom publisher’s process
method will depend on the
topicMappingStrategy
configured for the service.
Below is a sample implementation of the custom publisher for CDC adapter:
public final class CdcCustomPublisher
extends CustomPublisher<JSON, CdcUpdateContext> {
private final Publisher publisher;
public CdcCustomPublisher(
Publisher publisher,
Map<String, Object> configContext,
Map<String, ? extends PayloadConverter<?, ?>> payloadConverterMap) {
super(publisher);
this.publisher = publisher;
}
@Override
public CompletableFuture<?> process(
JSON update,
CdcUpdateContext updateContext) throws PayloadConversionException {
String database = updateContext.getDatabase();
String table = updateContext.getTable();
TopicProperties topicProperties = updateContext.getTopicProperties();
String diffusionTopic = updateContext.getDiffusionTopic();
// TODO process the JSON update as required and publish using the 'publisher'
return CompletableFuture.completedFuture(null);
}
}
The CdcUpdateContext
is available in the CDC adapter JAR, which should be supplied
to the custom publisher as a dependency. For details on including the adapter JAR
in the custom publisher, refer to the userguide for Custom Publishers.