跳到主要内容

debezium binlog demo

执行一遍入门版demo,就知道如何消费binblog,binblog的格式如何了。 整体而言,看起来debezium的binlog变更日志还是非常容易解读的。

基础步骤: mysql配置开启binlog,创建读取binlog的用户,给予binlog读取权限,开启debezium connect读取binlog信息,转发到kafka中,用于从kafka里消费。 binlog信息对于变更记录,会提供完整的before和after数据进行对比。

入门 tutorial文档

https://debezium.io/documentation/reference/2.5/tutorial.html#starting-kafka-connect

启动zookeeper和kafka

docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 quay.io/debezium/zookeeper:2.4

docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper quay.io/debezium/kafka:2.4

启动mysql

mysql里有初始化数据

docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw quay.io/debezium/example-mysql:2.4
  • /etc/my.cnf
#cat /etc/my.cnf
# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/8.0/en/server-configuration-defaults.html

[mysqld]
#
# Remove leading # and set to the amount of RAM for the most important data
# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
# innodb_buffer_pool_size = 128M
#
# Remove leading # to turn on a very important data integrity option: logging
# changes to the binary log between backups.
# log_bin
#
# Remove leading # to set options mainly useful for reporting servers.
# The server defaults are faster for transactions and fast SELECTs.
# Adjust sizes as needed, experiment to find the optimal values.
# join_buffer_size = 128M
# sort_buffer_size = 2M
# read_rnd_buffer_size = 2M

# Remove leading # to revert to previous value for default_authentication_plugin,
# this will increase compatibility with older clients. For background, see:
# https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html#sysvar_default_authentication_plugin
# default-authentication-plugin=mysql_native_password
skip-host-cache
skip-name-resolve
datadir=/var/lib/mysql
socket=/var/run/mysqld/mysqld.sock
secure-file-priv=/var/lib/mysql-files
user=mysql

pid-file=/var/run/mysqld/mysqld.pid
[client]
socket=/var/run/mysqld/mysqld.sock

!includedir /etc/mysql/conf.d/

开启binlog的配置

  • /etc/mysql/conf.d/mysql.cnf

# cat /etc/mysql/conf.d/mysql.cnf
# For advice on how to change settings please see
# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html

[mysqld]
#
# Remove leading # and set to the amount of RAM for the most important data
# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
# innodb_buffer_pool_size = 128M
#
# Remove leading # to turn on a very important data integrity option: logging
# changes to the binary log between backups.
# log_bin
#
# Remove leading # to set options mainly useful for reporting servers.
# The server defaults are faster for transactions and fast SELECTs.
# Adjust sizes as needed, experiment to find the optimal values.
# join_buffer_size = 128M
# sort_buffer_size = 2M
# read_rnd_buffer_size = 2M
skip-host-cache
skip-name-resolve
#datadir=/var/lib/mysql
#socket=/var/lib/mysql/mysql.sock
#secure-file-priv=/var/lib/mysql-files
user=mysql

# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0

#log-error=/var/log/mysqld.log
#pid-file=/var/run/mysqld/mysqld.pid

# ----------------------------------------------
# Enable the binlog for replication & CDC
# ----------------------------------------------

# Enable binary replication log and set the prefix, expiration, and log format.
# The prefix is arbitrary, expiration can be short for integration tests but would
# be longer on a production system. Row-level info is required for ingest to work.
# Server ID is required, but this will vary on production systems
server-id = 223344
log_bin = mysql-bin
expire_logs_days = 1
binlog_format = row

default_authentication_plugin = mysql_native_password

数据库初始脚本, 可以看到创建了debezium用户,并授予了REPLICATION SLAVE, REPLICATION CLIENT权限,支持读取binlog。

-- cat  /docker-entrypoint-initdb.d/inventory.sql   

# In production you would almost certainly limit the replication user must be on the follower (slave)machine,
# to prevent other clients accessing the log from other machines. For example, 'replicator'@'follower.acme.com'.
#
# However, this grant is equivalent to specifying *any* hosts, which makes this easier since the docker host
# is not easily known to the Docker container. But don't do this in production.
#

CREATE USER 'replicator' IDENTIFIED BY 'replpass';
CREATE USER 'debezium' IDENTIFIED BY 'dbz';
GRANT REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium';

# Create the database that we'll use to populate data and watch the effect in the binlog
CREATE DATABASE inventory;
GRANT ALL PRIVILEGES ON inventory.* TO 'mysqluser'@'%';

# Switch to this database
USE inventory;

# Create and populate our products using a single insert with many rows
CREATE TABLE products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512),
weight FLOAT
);
ALTER TABLE products AUTO_INCREMENT = 101;

mysql> show tables;
+---------------------+
| Tables_in_inventory |
+---------------------+
| addresses |
| customers |
| geom |
| orders |
| products |
| products_on_hand |
+---------------------+
6 rows in set (0.00 sec)

启动kafka connect

支持读取mysql binlog信息发送到kafka中

docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link kafka:kafka --link mysql:mysql quay.io/debezium/connect:2.4

注册mysql binlog

通过api提交注册信息,指明观察某个库表的binlog变更信息

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "topic.prefix": "dbserver1", "database.include.list": "inventory", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schemahistory.inventory" } }'

具体的注册参数,可以看到数据库账号密码信息,还有观察的库表database.include.list, kafka topic topic.prefix

{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"topic.prefix": "dbserver1",
"database.include.list": "inventory",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.inventory"
}
}

查看注册的connectors

curl -H "Accept:application/json" localhost:8083/connectors/


["inventory-connector"]
curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector

HTTP/1.1 200 OK
Date: Thu, 06 Feb 2020 22:12:03 GMT
Content-Type: application/json
Content-Length: 531
Server: Jetty(9.4.20.v20190813)

{
"name": "inventory-connector",
...
"tasks": [
{
"connector": "inventory-connector",
"task": 0
}
]
}

然后注册流程就完毕了,可以观察数据库的数据变更下的binlog kafka信息。

开启kafka查看数据

docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka quay.io/debezium/kafka:2.4 watch-topic -a -k dbserver1.inventory.customers

可以看到topic名字是topic-prefix然后加上具体的库表信息。

要查看另一个库表的变更topic信息,topic里可以更换一下

docker run -it --rm --name watcher3 --link zookeeper:zookeeper --link kafka:kafka quay.io/debezium/kafka:2.4 watch-topic -a -k dbserver1.inventory.orders

数据变更查看

访问kafka topic,查看kafka connect发送过来的binlog信息,看起来主要是首次插入数据时候的变更信息。

docker run -it --rm --name watcher --link zookeeper:zookeeper --link kafka:kafka quay.io/debezium/kafka:2.4 watch-topic -a -k dbserver1.inventory.customers
[5/5]WARNING: Using default NODE_ID=1, which is valid only for non-clustered installations.Starting in ZooKeeper mode using NODE_ID=1.Using ZOOKEEPER_CONNECT=172.17.0.2:2181
Using configuration config/server.properties.
Using KAFKA_LISTENERS=PLAINTEXT://172.17.0.7:tcp://172.17.0.3:9092 and KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.7:9092
Using KAFKA_BROKER=172.17.0.3:9092
Contents of topic dbserver1.inventory.customers:

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1001}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"2.4.2.Final","connector":"mysql","name":"dbserver1","ts_ms":1710753466000,"snapshot":"first_in_data_collection","db":"inventory","sequence":null,"table":"customers","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":157,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1710753466731,"transaction":null}}


{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1002}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"},"source":{"version":"2.4.2.Final","connector":"mysql","name":"dbserver1","ts_ms":1710753466000,"snapshot":"true","db":"inventory","sequence":null,"table":"customers","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":157,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1710753466732,"transaction":null}}


{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1003}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"},"source":{"version":"2.4.2.Final","connector":"mysql","name":"dbserver1","ts_ms":1710753466000,"snapshot":"true","db":"inventory","sequence":null,"table":"customers","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":157,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1710753466732,"transaction":null}}


{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1004}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope","version":1},"payload":{"before":null,"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"2.4.2.Final","connector":"mysql","name":"dbserver1","ts_ms":1710753466000,"snapshot":"last_in_data_collection","db":"inventory","sequence":null,"table":"customers","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":157,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1710753466732,"transaction":null}}

对应的库表

mysql> select * from customers;
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email
|+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+

update 记录

update一条记录,收到两条kafka信息

UPDATE customers SET first_name='Anne Marie' WHERE id=1004;

一条信息是upate 1d为1004,kafka 消息的key

{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Key"
},
"payload": {
"id": 1004
}
}

一条信息是详细的数据变更,kafka消息的value

{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": true,
"field": "table"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "query"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Envelope",
"version": 1
},
"payload": {
"before": {
"id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": {
"id": 1004,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": {
"version": "2.4.2.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1710754961000,
"snapshot": "false",
"db": "inventory",
"sequence": null,
"table": "customers",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 401,
"row": 0,
"thread": 8,
"query": null
},
"op": "u",
"ts_ms": 1710754961501,
"transaction": null
}
}

大量的信息都是无用的schema注册信息,有用的数据变更记录是这几条, op为u说明是update事件,payload里提供了update前和update后的完整record记录, 信息量非常充分里。

不过变更了哪个field,看起来像是需要自己去对比一下。

  "payload": {
"before": {
"id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": {
"id": 1004,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": {
"version": "2.4.2.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1710754961000,
"snapshot": "false",
"db": "inventory",
"sequence": null,
"table": "customers",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 401,
"row": 0,
"thread": 8,
"query": null
},
"op": "u",
"ts_ms": 1710754961501,
"transaction": null
}

删除一条记录

DELETE FROM customers WHERE id=1004;    

key

{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Key"
},
"payload": {
"id": 1004
}
}

valude


{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": true,
"field": "table"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "query"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Envelope",
"version": 1
},
"payload": {
"before": {
"id": 1004,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": null,
"source": {
"version": "2.4.2.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1710755489000,
"snapshot": "false",
"db": "inventory",
"sequence": null,
"table": "customers",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 1163,
"row": 0,
"thread": 8,
"query": null
},
"op": "d",
"ts_ms": 1710755489993,
"transaction": null
}
}

有用的信息, op为d说明是delete, after是null说明数据没了

 "payload": {
"before": {
"id": 1004,
"first_name": "Anne Marie",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": null,
"source": {
"version": "2.4.2.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1710755489000,
"snapshot": "false",
"db": "inventory",
"sequence": null,
"table": "customers",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 1163,
"row": 0,
"thread": 8,
"query": null
},
"op": "d",
"ts_ms": 1710755489993,
"transaction": null
}
}

还有第一条没什么用的

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1004}}        null  

插入新增数据

INSERT INTO customers VALUES (default, "Sarah", "Thompson", "kitt@acme.com");

key

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1005}} 

value


{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field": "first_name"
},
{
"type": "string",
"optional": false,
"field": "last_name"
},
{
"type": "string",
"optional": false,
"field": "email"
}
],
"optional": true,
"name": "dbserver1.inventory.customers.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": true,
"field": "table"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "query"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Envelope",
"version": 1
},
"payload": {
"before": null,
"after": {
"id": 1005,
"first_name": "Sarah",
"last_name": "Thompson",
"email": "kitt@acme.com"
},
"source": {
"version": "2.4.2.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1710755876000,
"snapshot": "false",
"db": "inventory",
"sequence": null,
"table": "customers",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 2001,
"row": 0,
"thread": 8,
"query": null
},
"op": "c",
"ts_ms": 1710755876703,
"transaction": null
}
}

op为c,代表create。

尝试观察 information schema

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "schema-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "topic.prefix": "dbserver1", "database.include.list": "information_schema", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schemahistory.information" } }'

然后查询

curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/schema-connector
HTTP/1.1 200 OK
Date: Mon, 18 Mar 2024 11:50:32 GMT
Content-Type: application/json
Content-Length: 546
Server: Jetty(9.4.51.v20230217)

{"name":"schema-connector","config":{"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.user":"debezium","topic.prefix":"dbserver1","schema.history.internal.kafka.topic":"schemahistory.information","database.server.id":"184054","tasks.max":"1","database.hostname":"mysql","database.password":"dbz","name":"schema-connector","schema.history.internal.kafka.bootstrap.servers":"kafka:9092","database.port":"3306","database.include.list":"information_schema"},"tasks":[{"connector":"schema-connector","task":0}],"type":"source"}

kafka 监听下

docker run -it --rm --name watcher3 --link zookeeper:zookeeper --link kafka:kafka quay.io/debezium/kafka:2.4 watch-topic -a -k dbserver1.information_schema.tables

测试下新建库表

CREATE TABLE inventory.products2 (  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,  name VARCHAR(255) NOT NULL,
description VARCHAR(512), weight FLOAT
);

测试不出来, 没有监听到信息,看chagtp的意思是information schema的变更貌似没有binlog信息。

[root@geedev ~]# docker run -it --rm --name watcher3 --link zookeeper:zookeeper --link kafka:kafka quay.io/debezium/kafka:2.4 watch-topic -a -k dbserver1.information_schema.tables                                           
WARNING: Using default NODE_ID=1, which is valid only for non-clustered installations.Starting in ZooKeeper mode using NODE_ID=1.Using ZOOKEEPER_CONNECT=172.17.0.2:2181Using configuration config/server.properties.Using KAFKA_LISTENERS=PLAINTEXT://172.17.0.9:tcp://172.17.0.3:9092 and KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.9:9092 Using KAFKA_BROKER=172.17.0.3:9092Contents of topic dbserver1.information_schema.tables:


chatgpt建议使用triggers将information表的信息迁移到其他数据库,然后监听其他数据库。

Debezium 是一个开源的分布式平台,用于捕获和发布数据库的变更事件。它支持多种数据库,包括 MySQL。虽然 Debezium 可以监听 MySQL 数据库的变更事件,但它并不直接支持监听信息模式(information_schema)表的变更。

信息模式表包含了关于数据库、表、列、索引等元数据的信息,它是一个只读的数据库,不能直接通过 Debezium 来监听它的变更。

如果你想要监听信息模式表的变更,可以考虑以下几种方法:

使用 MySQL 的触发器(Triggers):你可以在信息模式表上创建触发器,当有数据更改时,触发器会执行相应的操作,例如将变更信息记录到其他表中。然后,使用 Debezium 监听其他表的变更事件。

使用外部工具:你可以编写自定义的脚本或应用程序,定期查询信息模式表的变更,并将变更信息记录到其他存储或发送到其他系统。然后,使用 Debezium 监听其他存储或系统的变更事件。

需要注意的是,以上方法都需要你自己编写代码或配置来实现。Debezium 本身并不直接支持监听信息模式表的变更。但通过结合其他工具和技术,你可以实现监听信息模式表的变更,并使用 Debezium 监听其他表或系统的变更事件。

测试新的

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "abc-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "topic.prefix": "dbserver1", "database.include.list": "abc", "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schemahistory.information" } }'

docker run -it --rm --name watcher4 --link zookeeper:zookeeper --link kafka:kafka quay.io/debezium/kafka:2.4 watch-topic -a -k dbserver1.abc.kk

然后在数据库里创建库表, insert数据, kafka topic监听成功.

看起来一条记录有三条重复的kafka监听消息,不过看起来可以通过payload里的ts_ms进行唯一性识别。

[root@geedev ~]# docker run -it --rm --name watcher4 --link zookeeper:zookeeper --link kafka:kafka quay.io/debezium/kafka:2.4 watch-topic -a -k dbserver1.abc.kk 

WARNING: Using default NODE_ID=1, which is valid only for non-clustered installations.
Starting in ZooKeeper mode using NODE_ID=1.
Using ZOOKEEPER_CONNECT=172.17.0.2:2181
Using configuration config/server.properties.
Using KAFKA_LISTENERS=PLAINTEXT://172.17.0.10:tcp://172.17.0.3:9092 and KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.10:9092
Using KAFKA_BROKER=172.17.0.3:9092
Contents of topic dbserver1.abc.kk:

[2024-03-18 12:19:24,778] WARN [Consumer clientId=console-consumer, groupId=console-consumer-10757] Error while fetching metadata with correlation id 2 : {dbserver1.abc.kk=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)


null {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":true,"field":"a"},{"type":"int32","optional":true,"field":"b"}],"optional":true,"name":"dbserver1.abc.kk.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":true,"field":"a"},{"type":"int32","optional":true,"field":"b"}],"optional":true,"name":"dbserver1.abc.kk.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"dbserver1.abc.kk.Envelope","version":1},"payload":{"before":null,"after":{"a":1,"b":1},"source":{"version":"2.4.2.Final","connector":"mysql","name":"dbserver1","ts_ms":1710764408000,"snapshot":"false","db":"abc","sequence":null,"table":"kk","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":3364,"row":0,"thread":8,"query":null},"op":"c","ts_ms":1710764408697,"transaction":null}}


null {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":true,"field":"a"},{"type":"int32","optional":true,"field":"b"}],"optional":true,"name":"dbserver1.abc.kk.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":true,"field":"a"},{"type":"int32","optional":true,"field":"b"}],"optional":true,"name":"dbserver1.abc.kk.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"dbserver1.abc.kk.Envelope","version":1},"payload":{"before":null,"after":{"a":1,"b":1},"source":{"version":"2.4.2.Final","connector":"mysql","name":"dbserver1","ts_ms":1710764408000,"snapshot":"false","db":"abc","sequence":null,"table":"kk","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":3364,"row":0,"thread":8,"query":null},"op":"c","ts_ms":1710764421565,"transaction":null}}

null {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":true,"field":"a"},{"type":"int32","optional":true,"field":"b"}],"optional":true,"name":"dbserver1.abc.kk.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":true,"field":"a"},{"type":"int32","optional":true,"field":"b"}],"optional":true,"name":"dbserver1.abc.kk.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"name":"event.block","version":1,"field":"transaction"}],"optional":false,"name":"dbserver1.abc.kk.Envelope","version":1},"payload":{"before":null,"after":{"a":1,"b":1},"source":{"version":"2.4.2.Final","connector":"mysql","name":"dbserver1","ts_ms":1710764408000,"snapshot":"false","db":"abc","sequence":null,"table":"kk","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":3364,"row":0,"thread":8,"query":null},"op":"c","ts_ms":1710764443249,"transaction":null}}

看起来kafka重复的问题是datasource id与其他connector重复导致的, connect一直在重启, 然后重复触发。

org.apache.kafka.connect.errors.RetriableException: An exception occurred in the change event producer. This connector will be restarted.                                                                                     
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:63)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1299)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1079)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:631)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:932)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: io.debezium.DebeziumException: A replica with the same server_uuid/server_id as this replica has connected to the source; the first event 'mysql-bin.000003' at 2104, the last event read from './mysql-bin.000003'
at 3715, the last byte read from './mysql-bin.000003' at 3715. Error code: 1236; SQLSTATE: HY000.
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1254)
... 5 more
Caused by: com.github.shyiko.mysql.binlog.network.ServerException: A replica with the same server_uuid/server_id as this replica has connected to the source; the first event 'mysql-bin.000003' at 2104, the last event read f
rom './mysql-bin.000003' at 3715, the last byte read from './mysql-bin.000003' at 3715. at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:1043)
... 3 more

删除其他已经存在的connectors

curl -X DELETE localhost:8083/connectors/schema-connector
created at 2024-03-18