debezium同步binlog到kafka

0">一.环境安装

1.zk
wget https://dlcdn.apache.org/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz --no-check-certificate

tar -zxvf apache-zookeeper-3.7.0-bin.tar.gz -C /usr/local/
cd /usr/local
ln -sL apache-zookeeper-3.7.0-bin zookeeper
cd /usr/local/apache-zookeeper-3.7.0-bin/conf
cp -rp zoo_sample.cfg zoo.cfg

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/log
clientPort=2181

cd /usr/local/zookeeper/bin && ./zkServer.sh start
2.kafka
wget https://archive.apache.org/dist/kafka/2.7.0/kafka_2.13-2.7.0.tgz
tar -zxvf kafka_2.13-2.7.0.tgz -C /usr/local
cd /usr/local
ln -sL kafka_2.13-2.7.0 kafka
cd kafka/config
vi server.properties
- 内容
listeners=PLAINTEXT://127.0.0.1:9092
advertised.listeners=PLAINTEXT://127.0.0.1:9092
log.dirs=/usr/local/kafka/kafka-logs
host.name=127.0.0.1
zookeeper.connect=localhost:2181

cd /usr/local/kafka && ./bin/kafka-server-start.sh -daemon config/server.properties
# kafka
3.debezium connector
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.6.1.Final/debezium-connector-mysql-1.6.1.Final-plugin.tar.gz
mkdir /usr/local/kafka/plugins
tar -zxvf debezium-connector-mysql-1.0.3.Final-plugin.tar.gz -C /usr/local/kafka/plugins
vi /usr/local/kafka/config/connect-standalone.properties
bootstrap.servers=127.0.0.1:9092
plugin.path=/usr/local/kafka/plugins

# listeners=http://localhost:7778 默认是8083

二.standalone读取binlog到kafka

    • 1.数据库授权
      grant select,show view,RELOAD, SHOW DATABASES, replication client,replication slave,lock tables  on *.*  to  kafka@'%' identified by 'kafka';
        • 2.准备connector参数 mysql.properities
          name=mysql_connect_source
          connector.class=io.debezium.connector.mysql.MySqlConnector
          database.hostname=192.168.234.xx
          database.port=3306
          database.user=kafka
          database.password=kafka
          database.server.id=234008001
          database.server.name=kafka_db
          database.whitelist=你的数据库名称
          table.include.list=schema.table1,schema.table2
          database.history.kafka.bootstrap.servers=127.0.0.1:9092
          database.history.kafka.topic=kafka_db.debezium
          time.precision.mode=connect
          include.schema.changes=true
          errors.tolerance=all
          # 包含sql原始语句
          include.query=true
          # 默认是 initial 获取该表所有数据;schema_only 当前位置获取;schema_only_recovery 误删 topic 后恢复
          snapshot.mode=schema_only
          database.history.skip.unparseable.ddl=true
            • 3.启动
              nohup connect-standalone.sh  config/connect-standalone.properties  mysql.properities > nohuo.out &
                • 4.连接zk查看topic
                  ./bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181
                    • 5.命令行消费topic
                      kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic {database.server.name}.{table.include.list之一} --from-beginning
                        • 6.删除topic
                          bin/kafka-topics.sh --delete --zookeeper 127.0.0.1:2181 -topic 
                          topic1,topic2

                          三.restapi管理standlone的connector

                          1.建表
                          - 1.1开启消费 
                          cd /usr/local/kafka && ./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic debezium --from-beginning
                          - 1.2源库建表插入数据 
                          insert into dba_test_table(name) values('aa');
                          - 1.3发现kafka上多了一张表 
                          debezium.debezium.dba_test_table
                          2.观察这个topic
                          - 2.1 insert into dba_test_table(name) values('aa'); 
                          "payload": {
                          "before": null,
                          "after": {
                          "id": 1,
                          "name": "aa",
                          "address": null
                          },
                          "source": {
                          "version": "1.6.1.Final",
                          "connector": "mysql",
                          "name": "debezium",
                          "ts_ms": 1638348372000,
                          "snapshot": "false",
                          "db": "debezium",
                          "sequence": null,
                          "table": "dba_test_table",
                          "server_id": 23400801,
                          "gtid": "66339ee5-3d10-11ec-a611-0242ac110002:1263042",
                          "file": "mysql-bin.000034",
                          "pos": 747079405,
                          "row": 0,
                          "thread": null,
                          "query": null
                          },
                          "op": "c",
                          "ts_ms": 1638348372161,
                          "transaction": null
                          }
                          - 2.2 update dba_test_table set name='bb', address='addr1' where id=1; 
                          "payload": {
                          "before": {
                          "id": 1,
                          "name": "aa",
                          "address": null
                          },
                          "after": {
                          "id": 1,
                          "name": "bb",
                          "address": "addr1"
                          },
                          "source": {
                          "version": "1.6.1.Final",
                          "connector": "mysql",
                          "name": "debezium",
                          "ts_ms": 1638348630000,
                          "snapshot": "false",
                          "db": "debezium",
                          "sequence": null,
                          "table": "dba_test_table",
                          "server_id": 23400801,
                          "gtid": "66339ee5-3d10-11ec-a611-0242ac110002:1307889",
                          "file": "mysql-bin.000034",
                          "pos": 818840233,
                          "row": 0,
                          "thread": null,
                          "query": null
                          },
                          "op": "u",
                          "ts_ms": 1638348630034,
                          "transaction": null
                          }
                          - 2.3 delete from dba_test_table where id=1; 
                          "payload": {
                          "before": {
                          "id": 1,
                          "name": "bb",
                          "address": "addr1"
                          },
                          "after": null,
                          "source": {
                          "version": "1.6.1.Final",
                          "connector": "mysql",
                          "name": "debezium",
                          "ts_ms": 1638348731000,
                          "snapshot": "false",
                          "db": "debezium",
                          "sequence": null,
                          "table": "dba_test_table",
                          "server_id": 23400801,
                          "gtid": "66339ee5-3d10-11ec-a611-0242ac110002:1325649",
                          "file": "mysql-bin.000034",
                          "pos": 847104330,
                          "row": 0,
                          "thread": null,
                          "query": null
                          },
                          "op": "d",
                          "ts_ms": 1638348731068,
                          "transaction": null
                          }
                          - 2.4 modify 修改长度:alter table dba_test_table modify  `address` varchar(64) DEFAULT NULL;    kafka没有输出 - 2.5 modify 修改默认值:alter table dba_test_table modify  `address` varchar(64) DEFAULT '';    kafka没有输出 - 2.6 add 修改默认值:alter table dba_test_table add  `age` tinyint DEFAULT '18';    kafka没有输出