Skip to main content

Change Data Capture Plugin

You can use plugins in Macrometa to extend the functionality of your streams.

The Change Data Capture (CDC) plugin captures change data from RDBMS databases like MySQL, MS SQL, PostgreSQL, H2, and Oracle. When a change event occurs on a database table, the CDC source receives the event in key-value format.

CDC has two available modes: polling and listening.

  • Polling mode enables you to set a periodic check at defined intervals for changes to the database. This mode uses the column polling.column to capture RDBMS, INSERT, and UPDATE change events. You can track changes by timestamp or by sequence number.
  • Listening mode notifies you in real time when the database logs changes. This mode uses logs to capture MySQL, INSERT, UPDATE, and DELETE change events.

Parameters

Insert the following parameters into the provided template to create a CDC plugin.

NameData TypeDefault ValueOptional?Dynamic?Description
urlSTRINGN/ANoNoDatabase URL. Format: jdbc:mysql://:/<database_name>
modeSTRINGlisteningYesNoChange data capture mode. Choose polling or listening.
jdbc.driver.nameSTRINGN/AYesNoSDK class name for database connection. Required if you are using polling mode.
usernameSTRINGN/ANoNoThe user that accesses the database needs these permissions in the table.name field: SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, and REPLICATION CLIENT.
passwordSTRINGN/ANoNoPassword for the user accessing the database.
pool.propertiesSTRINGN/AYesNoYou can use key-value pairs for pool parameters.
data-source.nameSTRINGN/AYesNoIf you use a data source, then you do not need to provide a URL, username, or password. Data source connections have higher priority than URL-based connections. You can only use a data source with polling mode and a stream processor.
table.nameSTRINGN/ANoNoName of the table to be monitored for changes.
polling.columnSTRINGN/AYesNoWhen using polling mode, this field indicates whether records are tracked with an incremental sequence of numbers or a timestamp. Set to id to track updates with a number sequence or last_updated to use a timestamp. Number sequences are only compatible with insertion operations.
polling.intervalINT1YesNoTime in seconds between polling for changes. Only applicable for polling mode.
operationSTRINGN/ANoNoChange event operations you want to monitor: insert, update, or delete. To choose multiple, separate each value with a comma. Not case-sensitive. Required if you are using listening mode. When monitoring for multiple operations, each event is returned as a transport property trp:operation which can be viewed when mapping the events.
connector.propertiesSTRINGEmptyYesNoSpecify Debezium connector properties as a comma-separated string. Only applicable for listening mode. Properties specified here have higher priority than other parameters.
database.server.idSTRINGRandom integer between 5400 and 6400YesNoID to use when joining MySQL database to read bin log. Use a unique integer between 1 and 2^32. Only applicable for listening mode. If none is specified, a random integer is chosen.
database.server.nameSTRING(host)_(port)YesNoProvide a logical name that will serve as the namespace for the database server. Only applicable for listening mode.
wait.on.missed.recordBOOLfalseYesNoSet to true if you want to the process to hold if it identifies a missing our out-of-order record. The record is identified by the polling.column sequence. The order is sequential, so if you use this with timestamps, they will not be chronological. Enabling this parameter lowers performance and should only be used if you anticipate out-of-order records from concurrent writers.
missed.record.waiting.timeoutINT-1YesNoTimeout in seconds to retry for missing or out-of-order records. The default value -1 waits indefinitely.
polling.history.sizeINT10YesNoNumber of recent polling results exposed to metrics. When set to 10, the last 10 polling results are exposed to metrics. Only usable if you are gathering metrics.
cron.expressionSTRINGEmptyYesNoSpecify a timestamp using a cron expression. When the system time matches the CRON expression, we print the records of insertions or deletions. Only applicable for polling mode.
plugin.nameSTRINGdecoderbufsYesNoUsed when the logical decoding output plugin needs to specify the connection to the database. This is mainly required for PostgreSQL.

Template

Use the following template to create a CDC plugin:

@source(type="cdc", url="<STRING>", mode="<STRING>", jdbc.driver.name="<STRING>", username="<STRING>", password="<STRING>", pool.properties="<STRING>", datasource.name="<STRING>", table.name="<STRING>", polling.column="<STRING>", polling.interval="<INT>", operation="<STRING>", connector.properties="<STRING>", database.server.id="<STRING>", database.server.name="<STRING>", wait.on.missed.record="<BOOL>", missed.record.waiting.timeout="<INT>", polling.history.size="<INT>", cron.expression="<STRING>", plugin.name="<STRING>", @map(...)))

Examples

The following CDC plugin examples assume you are using a table called students and a database called SimpleDB.

Example 1

This example listens to row insertions:

CREATE SOURCE inputStream WITH (type = 'cdc' , url = 'jdbc:mysql://localhost:3306/SimpleDB', username = 'user', password = 'password', table.name = 'students', operation = 'insert', map.type='keyvalue', attributes.id = 'id', attributes.name = 'name') (id string, name string);

You can adjust this plugin to listen for updates or deletions by adding update or delete to the operation field separated by commas. For example: operation = 'insert,update,delete'

Example 2

This example polls for row insertions:

CREATE SOURCE inputStream WITH (type = 'cdc', mode='polling', polling.column = 'id', jdbc.driver.name = 'com.mysql.jdbc.Driver', url = 'jdbc:mysql://localhost:3306/SimpleDB', username = 'user', password = 'password', table.name = 'students', map.type='keyvalue', attributes.id = 'id', attributes.name = 'name') (id int, name string);

The polling.column field is set to id, indicating that polling will be tracked with an incremental sequence of numbers. Connection requires a URL, username, password, and JDBC SDK name.

Example 3

This example polls for row insertions and adds sequential numbering to the polling column:

CREATE SOURCE inputStream WITH (type = 'cdc', mode='polling', polling.column = 'id', datasource.name = 'SimpleDB', table.name = 'students', map.type='keyvalue', attributes.id = 'id', attributes.name = 'name') (id int, name string);

The polling column The data-source.name parameter is only valid with a Stream Processor.

Example 4

This example polls for row insertions and uses a timestamp in the polling column:

CREATE SOURCE inputStream WITH (type = 'cdc', mode='polling', polling.column = 'last_updated', datasource.name = 'SimpleDB', table.name = 'students', map.type='keyvalue') (name string);

The polling.column field is set to last_updated, indicating that polling will be tracked with a timestamp.

Example 5

This example polls for row insertions with a ten-second buffer to account for any concurrent or out-of-order requests:

CREATE SOURCE inputStream (type='cdc', jdbc.driver.name='com.mysql.jdbc.Driver', url='jdbc:mysql://localhost:3306/SimpleDB', username='user', password='password', table.name='students', mode='polling', polling.column='id', operation='insert', wait.on.missed.record='true', missed.record.waiting.timeout='10', map.type='keyvalue', attributes.batch_no='batch_no', attributes.item='item', attributes.qty='qty') (id int, name string);

The polling column is numeric.

Example 6

This example connects to an Oracle database and listens to row insertions to a table called sweetproductiontable:

CREATE SOURCE insertSweetProductionStream WITH (type = 'cdc', url = 'jdbc:oracle:thin://localhost:1521/ORCLCDB', username='c##xstrm', password='xs', table.name='DEBEZIUM.sweetproductiontable', operation = 'insert', connector.properties='oracle.outserver.name=DBZXOUT,oracle.pdb=ORCLPDB1' map.type = 'keyvalue') (ID int, NAME string, WEIGHT int);