使用canal.adapter同步数据到MySQL
通过canal,同步一张表的增量数据,从ming.test01到tt.test01下面。
1.安装canal.adapter
从github上下载 canal.adapter-1.1.4.tar.gz安装包,解压。
[root@mdb01 canal-adapter]# ll total 8 drwxr-xr-x 2 root root 95 Apr 6 14:53 bin drwxrwxrwx 6 root root 119 Apr 6 14:53 conf drwxr-xr-x 2 root root 4096 Apr 6 14:29 lib drwxrwxrwx 3 root root 21 Apr 6 14:49 logs drwxrwxrwx 2 root root 253 Sep 2 2019 plugin
2.启动canal.adapter
修改conf/application.yml
server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp # kafka rocketMQ canalServerHost: 192.168.61.16:11111 --canal server 信息 batchSize: 500 syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: srcDataSources: --源端数据库连接信息,db是ming defaultDS: url: jdbc:mysql://192.168.61.16:3306/ming?useUnicode=true username: canal password: oracle canalAdapters: - instance: example # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: - name: logger - name: rdb key: mysql1 properties: --目标端数据库连接信息,db是tt jdbc.driverClassName: com.mysql.jdbc.Driver jdbc.url: jdbc:mysql://192.168.61.16:3306/tt?useUnicode=true jdbc.username: root jdbc.password: oracle
修改conf/rdb/mytest_user.yml
dataSourceKey: defaultDS destination: example groupId: g1 outerAdapterKey: mysql1 concurrent: true dbMapping: database: ming table: test01 targetTable: tt.test01 targetPk: c1: c1 mapAll: true commitBatch: 3000 # 批量提交的大小
两张表的结构是一样的话,mapAll直接设置为true 如果表结构不一致的话,可以用targetColumns设置 : 从表字段名字: 主表字段名字配置完成后,启动adaptersh start.sh
3.canal配置
修改canal instance.properties,并重启canal
# table regex canal.instance.filter.regex=ming.test01
4.测试
源端
mysql> insert into test01 select 13,13,13,now(); Query OK, 1 row affected (0.30 sec) Records: 1 Duplicates: 0 Warnings: 0 mysql> update test01 set c4=now() where c1=13; Query OK, 1 row affected (0.03 sec) Rows matched: 1 Changed: 1 Warnings: 0 mysql> delete from test01 where c1=13; Query OK, 1 row affected (0.03 sec)
目标端
mysql> select * from test01; +----+------+------+---------------------+ | c1 | c2 | c3 | c4 | +----+------+------+---------------------+ | 13 | 13 | 13 | 2021-04-06 14:54:11 | +----+------+------+---------------------+ 1 row in set (0.00 sec) mysql> select * from test01; +----+------+------+---------------------+ | c1 | c2 | c3 | c4 | +----+------+------+---------------------+ | 13 | 13 | 13 | 2021-04-06 14:56:03 | +----+------+------+---------------------+ 1 row in set (0.00 sec) mysql> select * from test01; Empty set (0.00 sec)
对应的adapeter日志
2021-04-06 14:54:12.145 [pool-8-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"c1":13,"c2":13,"c3":"13","c4":1617692051000}],"database":"ming","destination":"example","es":1617692051000,"groupId":null,"isDdl":false,"old":null,"pkNames":["c1"],"sql":"","table":"test01","ts":1617692051862,"type":"INSERT"} 2021-04-06 14:54:12.244 [pool-3-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"c1":13,"c2":13,"c3":"13","c4":1617692051000},"database":"ming","destination":"example","old":null,"table":"test01","type":"INSERT"} 2021-04-06 14:56:03.453 [pool-8-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"c1":13,"c2":13,"c3":"13","c4":1617692163000}],"database":"ming","destination":"example","es":1617692163000,"groupId":null,"isDdl":false,"old":[{"c4":1617692051000}],"pkNames":["c1"],"sql":"","table":"test01","ts":1617692163452,"type":"UPDATE"} 2021-04-06 14:56:03.468 [pool-3-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"c1":13,"c2":13,"c3":"13","c4":1617692163000},"database":"ming","destination":"example","old":{"c4":1617692051000},"table":"test01","type":"UPDATE"} 2021-04-06 14:59:07.631 [pool-8-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"c1":13,"c2":13,"c3":"13","c4":1617692163000}],"database":"ming","destination":"example","es":1617692347000,"groupId":null,"isDdl":false,"old":null,"pkNames":["c1"],"sql":"","table":"test01","ts":1617692347629,"type":"DELETE"}
5.字段映射
# mapAll: true targetColumns: c1: c1 c2: c2 c3: c3 c5: c4
c1列在targetPk中已经指定了,可以不指定;其他列即使名字没有改变,比如c2 c3,也要写进去,否则这几列抓不到数据。如果只同步部分列的数据,那么就可以不用写所有的列映射关系了。重启sh restart.sh其他注意事项canal-adapter/conf/rdb/mytest_user.yml的groupId应该和canal-adapter/conf/application.yml中的保持一致如果要同步多张表或者多个不同数据源,只要在canal-adapter/conf/rdb/mytest_user.yml和canal-adapter/conf/application.yml中再增加一个groupId即可。
