MySQL Change Data Capture: A Definitive Guide

Learn how to implement Change Data Capture (CDC) in MySQL using Debezium and Kafka.

MySQL CDC

Graphic: Propel Data

CDC monitors changes like insertions, updates, and deletions in real-time. When detected, these changes can be integrated as a stream of change events into other systems and applications that can react to or process these changes.

CDC's ability to update databases in real-time makes it useful for many different applications. For example, in event-driven systems like microservices, CDC exchanges data between microservices. Furthermore, CDC can perform event-driven updates of materialized views within microservices. This enables up-to-date query responses without requiring complex joins or aggregations. CDC is also utilized in data warehouses, where it captures changes from different source systems that are used to perform timely and incremental updates of data warehouses. CDC is also useful in cache invalidation, where it tracks changes at the source and triggers a mechanism to refresh or invalidate cached entries. Lastly, in audit logging, CDC can capture every change made to the data and then provide a detailed record of transactions.

In this article, you'll learn the importance of CDC, become familiar with some of its use cases, and explore how to implement CDC with a MySQL database.

How to Implement CDC with MySQL

There are various ways you can implement CDC with MySQL. Debezium, Kafka Connect with a JDBC connector, and Maxwell are some of the most popular solutions.

Debezium

Debezium is an open-source platform that offers a user-friendly solution for implementing CDC with MySQL. By connecting Debezium to a MySQL database, you can detect changes occurring at the level of individual rows. In particular, Debezium monitors the binary log of the MySQL database. As soon as an insert, update, or delete operation happens in the database, these changes are captured in the MySQL binary log by Debezium and streamed to a Kafka topic. This makes Debezium especially well-suited for replication and streaming in real-time. To set up Debezium, you need to activate binary logging in your MySQL database and configure the Debezium connector to establish a connection with Kafka.

Kafka Connect with a Connector

You can also implement CDC using Kafka Connect and a connector such as JDBC. In this setup, a JDBC connector periodically executes SQL queries to detect and fetch new and modified data from the database. These queries usually track data changes based on specific columns, like a last-updated timestamp or an incrementing ID. In contrast to Debezium's log-based CDC, which monitors transaction logs, this approach actively queries the database at regular intervals to detect changes. As soon as any changes in the data are detected, the connector then streams these changes to Kafka.

Unlike Debezium, using Kafka Connect and a JDBC connector is more query-heavy. This approach can also be less comprehensive compared to log-based CDC because it may not capture all types of changes in the database. For example, once a row is deleted in the database, the row no longer appears in the query results, which means that this method cannot accurately capture deletions.

Maxwell

Alternatively, you can implement CDC with MySQL using Maxwell, which is an open-source application that can read MySQL binlogs, detect data changes, and output them as JSON to Kafka or other streaming platforms. Maxwell was built to be lightweight and easy to deploy. It can detect all data changes at the row level (inserts, updates, and deletes). The setup for doing CDC with Maxwell and MySQL involves configuring Maxwell to access the MySQL binlog, similar to Debezium.

Practical Use Cases for CDC Using Debezium and MySQL

CDC is useful for a wide variety of applications, including but not limited to the following:

  • Microservices: Debezium can detect changes in the MySQL database of a microservice and export them as events. For example, imagine an e-commerce service where a new order is placed. This data change is captured by Debezium and published to a dedicated Kafka topic. Other services that are subscribed to this topic will receive this event and update their data or perform certain actions based on this change. This approach is often called the outbox pattern because it ensures consistency across microservices that don't require direct communication.
  • Real-time data loading: Debezium can keep a data warehouse in sync with operational databases. Imagine an online retail company that uses a MySQL database to manage its inventory, orders, and customer data. When any new transaction occurs, the inventory changes, or a new customer is added, Debezium captures these data updates and streams them to the data warehouse. In the data warehouse, this data can be used to generate reports and analytics for insights into sales trends, customer behavior, and inventory turnover.
  • Cache invalidation: Consider an online store with a website that receives a high amount of traffic. Product details like price, availability, and description are stored in a MySQL database but are also cached in a distributed cache like Redis. The latter allows quicker access to the data and a reduced database load. However, prices, stock, and product details may change quickly and frequently. Whenever changes occur in the MySQL database, they need to be reflected in the cache to prevent the display of outdated or incorrect information. In this context, Debezium can detect changes like price changes or stock updates by monitoring the binary log of the MySQL database. The changes are streamed to Kafka, which is connected to a service that determines the relevant cache entry that requires updating based on the change.
  • Audit logging: Debezium can capture all data changes to provide an audit trail. For example, in a financial system, Debezium could track changes in a MySQL database, like transactions, account balance modifications, or updates to customer data. Each of these changes is recorded in detail and provides a trail that can be used for compliance or security.

Implementing CDC with MySQL

Imagine that you work for a retail company that uses a MySQL database to capture all transactions, inventory updates, and customer interactions. However, the data warehouse used for analytics and reporting solely relies on batch processes for updates, which has resulted in delays in reflecting the most recent data. To address this issue, your task is to implement CDC with Debezium, which connects to a MySQL database called <span class="code-exp">inventory</span> and monitors the changes in this database. For example, when a new customer is entered into the <span class="code-exp">customers</span> table in the <span class="code-exp">inventory</span> database, Debezium should capture these changes in real time and stream these events to Kafka.

The technical architecture involves setting up a Debezium MySQL connector for the relevant table in a MySQL database. This connector then monitors the binlog of the database for data changes. The diagram below shows how the application works:

Prerequisites

For this tutorial, you'll need ZooKeeper, Kafka, Kafka Connect, and a Debezium connector service, as well as a MySQL server. All these services will be running in separate Docker containers.

To start the services needed for this tutorial, follow the guides linked below:

Note: Kafka Connect exposes a REST API to manage the Debezium MySQL connector. Make sure to specify --link mysql-database:mysql-database  in the Docker command, where mysql-database is the name of the Docker container where the MySQL server is running.

Setting Up a MySQL Database

After you've started the Docker container where the MySQL server is running, you can proceed to create a database inside the MySQL server. First, execute the command below in a terminal to get access to run commands inside the Docker container:

docker exec -it mysql-database bash

Once inside the container, you can run various commands as you would in a typical Linux environment. Run the following command inside the container to access the MySQL CLI inside the container:

mysql -u root -p

This will log you into the MySQL CLI. Next, create the database <span class="code-exp">inventory</span> by executing the following command in the MySQL CLI:

CREATE DATABASE IF NOT EXISTS inventory;
USE inventory;

Once the database is set up, you can create the table <span class="code-exp">customers</span> inside the database to contain the first and last name of each customer, as well as their email and address. You can do this by executing the following command in the MySQL CLI:

CREATE TABLE customers (
    id INT AUTO_INCREMENT PRIMARY KEY,
    first_name VARCHAR(100),
    last_name VARCHAR(100),
    email VARCHAR(100),
    address VARCHAR(100)
);

You can now insert some artificial data into the table using the following command:

INSERT INTO customers (id, first_name, last_name, email, address) VALUES
(1005, 'Alice', 'Johnson', 'alice.johnson@example.com', '123 Maple Street, Springfield, USA'),
(1006, 'Brian', 'Smith', 'brian.smith@sample.com', '456 Oak Avenue, Riverdale, Canada'),
(1007, 'Catherine', 'Lee', 'catherine.lee@demo.com', '789 Pine Road, Lakeside, UK'),
(1008, 'David', 'Martinez', 'david.martinez@site.com', '101 Birch Lane, Sunnyvale, Australia');

The command fills the <span class="code-exp">customers</span> table within the <span class="code-exp">inventory</span> database with four rows. Each row represents one customer. Executing the following command in the MySQL CLI will display the table:

SELECT * FROM customers;

If the table exists, you should see this output in the CLI:

+------+------------+-----------+---------------------------+--------------------------------------+
| id   | first_name | last_name | email                     | address                              |
+------+------------+-----------+---------------------------+--------------------------------------+
| 1005 | Alice      | Johnson   | alice.johnson@example.com | 123 Maple Street, Springfield, USA   |
| 1006 | Brian      | Smith     | brian.smith@sample.com    | 456 Oak Avenue, Riverdale, Canada    |
| 1007 | Catherine  | Lee       | catherine.lee@demo.com    | 789 Pine Road, Lakeside, UK          |
| 1008 | David      | Martinez  | david.martinez@site.com   | 101 Birch Lane, Sunnyvale, Australia |
+------+------------+-----------+---------------------------+--------------------------------------+
4 rows in set (0.00 sec)

In the following steps, you'll add a new customer to this table and see how Debezium captures this change event. But first, you need to establish a connection between Debezium and the MySQL database.

Deploying the MySQL Connector

At this point, you've started the Debezium services and created the MySQL database <span class="code-exp">inventory</span>. You can now connect Debezium with the MySQL database so that Debezium can detect any changes in the database. To do so, you have to register the Debezium MySQL connector to monitor the binlog of the <span class="code-exp">inventory</span> database.

Before registering the connector, reviewing its configuration can help you understand how it works. The connector is configured by the following JSON document:

{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql-database",
    "database.port": "3306",
    "database.user": "root",
    "database.password": "",
    "database.server.id": "1",
    "topic.prefix": "dbserver1",
    "database.include.list": "inventory",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schema-changes.inventory"
  }
}

Let's take a look at some of the attributes:

  • <span class="code-exp">name</span>: The name of the connector.
  • <span class="code-exp">config</span>: This field contains all configurations of the connector.
  • <span class="code-exp">tasks.max</span>: This describes how many tasks should operate simultaneously.
  • <span class="">database.hostname</span>: This is the name of the container where the MySQL server is running.
  • <span class="code-exp">database.port</span>: The port of the MySQL server.
  • <span class="code-exp">database.user</span>: The name of the user that uses the MySQL server.
  • <span class="code-exp">database.password</span>: The password that you specified during the creation of the MySQL server.
  • <span class="code-exp">database.server.id</span>: The unique ID of the database.
  • <span class="code-exp">topic.prefix</span>: A topic prefix that will be used as the prefix for all Kafka topics.
  • <span class="code-exp">database.include.list</span>: This field specifies the name of the database that will be monitored for changes.
  • <span class="code-exp">schema.history.internal.kafka.bootstrap.servers</span>: The Kafka broker that will be used by the connector to store the history of the database schemas.

To register the Debezium MySQL connector, open a new terminal and use the following <span class="code-exp">curl</span> command, which submits a POST request with the previous JSON document that describes the new connector:

curl -H "Accept:application/json" localhost:8083/connectors/
["inventory-connector"](basi -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql-database", "database.port": "3306", "database.user": "root", "database.password": "my-secret-pw", "database.server.id": "1", "topic.prefix": "dbserver1", "database.include.list": "inventory", "schema.history.internal.kafka.bootstrap.servers": "0.0.0.0:9092", "schema.history.internal.kafka.topic": "schemahistory.inventory" } }'

To verify that <span class="code-exp">inventory-connector</span> was connected successfully to the database, execute the following command in the terminal:

curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector

You should see the output below, which indicates the connector is monitoring the database <span class="code-exp">inventory</span>:

HTTP/1.1 200 OK
Date: Fri, 26 Jan 2024 06:07:16 GMT
Content-Type: application/json
Content-Length: 553
Server: Jetty(9.4.51.v20230217)
{
   "name":"inventory-connector",
   "config":{
      "connector.class":"io.debezium.connector.mysql.MySqlConnector",
     ... output omitted...
   },
   "tasks":[
      {
         "connector":"inventory-connector",
         "task":0
      }
   ],
   "type":"source"
}

Viewing Change Events

At this point, Debezium has started monitoring the <span class="code-exp">inventory</span> database for any data changes. All change events will be written to a topic with the specified <span class="code-exp">dbserver1</span> prefix. In particular, <span class="code-exp">dbserver1.inventory.customers</span> will be the topic that captures change events for the <span class="code-exp">customers</span> table in the <span class="code-exp">inventory</span> database. In the next step, you'll see how Debezium captures "create" events triggered by adding new customers to the database.

First, you must start the <span class="code-exp">watch-topic</span> utility to watch the <span class="code-exp">dbserver1.inventory.customers</span> topic for any change events. The <span class="code-exp">watch-topic</span> utility will display the change event records as JSON in the terminal. To start the utility, open a new terminal and execute the following command:

docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka quay.io/debezium/kafka:2.4 watch-topic -a -k dbserver1.inventory.customers

You can now add a new customer to the table and observe the captured change events in the terminal where the <span class="code-exp">watch-topic</span> utility is running. To add a new customer, return to the terminal with the MySQL CLI and run the command below:

INSERT INTO customers (id, first_name, last_name, email, address) VALUES
(1009, 'Laura', 'Williams', 'laura.williams@example.com', '234 Elm Street, Newtown, USA');

This MySQL command adds a new customer to the <span class="code-exp">customers</span> table in the database. In the terminal where the <span class="code-exp">watch-topic</span> utility is running, you should see the create event that was captured by Debezium:

… output omitted …
"payload":{
      "before":null,
      "after":{
         "id":1009,
         "first_name":"Laura",
         "last_name":"Williams",
         "email":"laura.williams@example.com",
         "address":"234 Elm Street, Newtown, USA"
      },
      "source":{
         "version":"2.4.2.Final",
         "connector":"mysql",
         "name":"dbserver1",
         "ts_ms":1706249487000,
         "snapshot":"false",
         "db":"inventory",
         "sequence":null,
         "table":"customers",
         "server_id":1,
         "gtid":null,
         "file":"binlog.000004",
         "pos":2115,
         "row":0,
         "thread":11,
         "query":null
      },
      "op":"c",
      "ts_ms":1706249487283,
      "transaction":null
   }

In this create event record, the <span class="code-exp">before</span> field contains the state of the row before the create event occurred. Since there was no row before, the state is simply <span class="code-exp">null</span>. The <span class="code-exp">after</span> field includes the state of the row after the event occurred. In particular, it contains the information of the row that was added to the table. The <span class="code-exp">source</span> field contains all the source metadata for the event (for example, the connector name and the name of the binlog file where the event was captured).

To verify that the system works and that new data was added to the <span class="code-exp">customers</span> table, go back to the MySQL CLI and execute the following command:

SELECT * FROM customers;

In the case of a successful create event, you should see the modified table with the new row:

+------+------------+-----------+----------------------------+--------------------------------------+
| id   | first_name | last_name | email                      | address                              |
+------+------------+-----------+----------------------------+--------------------------------------+
| 1005 | Alice      | Johnson   | alice.johnson@example.com  | 123 Maple Street, Springfield, USA   |
| 1006 | Brian      | Smith     | brian.smith@sample.com     | 456 Oak Avenue, Riverdale, Canada    |
| 1007 | Catherine  | Lee       | catherine.lee@demo.com     | 789 Pine Road, Lakeside, UK          |
| 1008 | David      | Martinez  | david.martinez@site.com    | 101 Birch Lane, Sunnyvale, Australia |
| 1009 | Laura      | Williams  | laura.williams@example.com | 234 Elm Street, Newtown, USA         |
+------+------------+-----------+----------------------------+--------------------------------------+
5 rows in set (0.00 sec)

Conclusion

Implementing CDC ensures that data across different systems and applications remains synchronized and up to date. This is important to maintain data integrity, improve decision-making processes, and enable real-time analytics and reporting. In this tutorial, you learned how to implement CDC in a MySQL database using Debezium.

If you need to power analytics from your MySQL data, consider exploring Propel. For example, you to directly ingest your MySQL CDC to a Propel Webhook Data and expose it via low-latency analytics APIs to power dashboards and reports. You can also enrich and transform your DynamoDB data for analytics using Propel’s Materialized Views. Think of it as gathering your data in one main spot for different uses.

Alternatively, Propel also integrates with ELT platforms like Fivetran and Airbyte, as well as data warehouses like BigQuery and Snowflake. This allows you to ingest data from either of these platforms and then use it to Propel to power customer-facing analytics.

Related posts

MySQL Change Data Capture: A Definitive Guide

This is some text inside of a div block.

Heading 1

Heading 2

Heading 3

Heading 4

Heading 5
Heading 6

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur.

Block quote

Ordered list

  1. Item 1
  2. Item 2
  3. Item 3

Unordered list

  • Item A
  • Item B
  • Item C

Text link

Bold text

Emphasis

Superscript

Subscript

DynamoDB Change Data Capture: A Definitive Guide

This is some text inside of a div block.

Heading 1

Heading 2

Heading 3

Heading 4

Heading 5
Heading 6

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur.

Block quote

Ordered list

  1. Item 1
  2. Item 2
  3. Item 3

Unordered list

  • Item A
  • Item B
  • Item C

Text link

Bold text

Emphasis

Superscript

Subscript

How to Move Data from MongoDB to Amazon S3 in Parquet

This is some text inside of a div block.

Heading 1

Heading 2

Heading 3

Heading 4

Heading 5
Heading 6

Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur.

Block quote

Ordered list

  1. Item 1
  2. Item 2
  3. Item 3

Unordered list

  • Item A
  • Item B
  • Item C

Text link

Bold text

Emphasis

Superscript

Subscript

Start shipping today

Deliver the analytics your customers have been asking for.