阿里canal部署和应用

来源:这里教程网 时间:2026-03-01 15:15:01 作者:

阿里canal概述

canal是阿里巴巴mysql数据库binlog的增量订阅&消费组件。

应用场景

canal-python 作为Canal的客户端,其应用场景就是Canal的应用场景。关于应用场景在Canal介绍一节已有概述。举一些实际的使用例子:

1.代替使用轮询数据库方式来监控数据库变更,有效改善轮询耗费数据库资源。

2.根据数据库的变更实时更新搜索引擎,比如电商场景下商品信息发生变更,实时同步到商品搜索引擎 Elasticsearch、solr等

3.根据数据库的变更实时更新缓存,比如电商场景下商品价格、库存发生变更实时同步到redis

4.数据库异地备份、数据同步

5.根据数据库变更触发某种业务,比如电商场景下,创建订单超过xx时间未支付被自动取消,我们获取到这条订单数据的状态变更即可向用户推送消息。

6.将数据库变更整理成自己的数据格式发送到kafka等消息队列,供消息队列的消费者进行消费。

安装Canal

tar -zxvf canal.deployer-1.1.4.tar.gz
[root@mdb01 canal]# ll

total 4

drwxr-xr-x 2 root root   93 Jul 19 15:18 bin

drwxr-xr-x 5 root root  123 Jul 19 14:25 conf

drwxr-xr-x 2 root root 4096 Jul 19 14:25 lib

drwxrwxrwx 4 root root   34 Jul 19 14:29 logs

配置文件在conf/example/instance.properties

[root@mdb01 example]# ll

total 176

-rw-r--r-- 1 root root 172032 Jul 19 15:19 h2.mv.db

-rwxrwxrwx 1 root root   2041 Jul 19 14:34 instance.properties

-rw-r--r-- 1 root root    342 Jul 19 21:11 meta.dat

[root@mdb01 example]# more instance.properties |grep -v '^#'




canal.instance.gtidon=false




canal.instance.master.address=192.168.61.16:3306

canal.instance.master.journal.name=

canal.instance.master.position=

canal.instance.master.timestamp=

canal.instance.master.gtid=




canal.instance.rds.accesskey=

canal.instance.rds.secretkey=

canal.instance.rds.instanceId=




canal.instance.tsdb.enable=true







canal.instance.dbUsername=canal

canal.instance.dbPassword=oracle

canal.instance.connectionCharset = UTF-8

canal.instance.enableDruid=false




canal.instance.filter.regex=.*\\..*

canal.instance.filter.black.regex=




canal.mq.topic=example

canal.mq.partition=0

创建数据库用户

CREATE USER canal IDENTIFIED BY 'oracle';  

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; 

FLUSH PRIVILEGES;

启停脚本在bin目录下,sh restart.sh

[root@mdb01 canal]# cd bin

[root@mdb01 bin]# ll

total 20

-rw-r--r-- 1 root root    7 Jul 19 15:18 canal.pid

-rwxr-xr-x 1 root root   58 Sep  2  2019 restart.sh

-rwxr-xr-x 1 root root 1181 Sep  2  2019 startup.bat

-rwxr-xr-x 1 root root 3167 Sep  2  2019 startup.sh

-rwxr-xr-x 1 root root 1356 Sep  2  2019 stop.sh

日志在logs/example目录下。

[root@mdb01 example]# pwd

/u01/canal/logs/example

[root@mdb01 example]# ll

total 28

-rw-r--r-- 1 root root 21582 Jul 19 21:11 example.log

-rw-r--r-- 1 root root  2090 Jul 19 21:11 meta.log

成功启动后日志输出:

2020-07-19 15:18:18.596 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 

2020-07-19 15:18:18.605 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$

2020-07-19 15:18:18.606 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : 

2020-07-19 15:18:18.616 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....

2020-07-19 15:18:18.842 [destination = example , address = /192.168.61.16:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - --->

 begin to find start position, it will be long time for reset or first position

2020-07-19 15:18:18.842 [destination = example , address = /192.168.61.16:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prep

are to find start position just show master status

2020-07-19 15:18:37.804 [destination = example , address = /192.168.61.16:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - --->

 find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000019,position=4,serverId=1573854809,gtid=<null>,timestamp=1595139828000]

 cost : 18915ms , the next step is binlog dump

Canal Python客户端

canal-python 是 Canal 的 python 客户端,它与 Canal 是采用的Socket来进行通信的,传输协议是TCP,交互协议采用的是 Google Protocol Buffer 3.0。github地址:

https://github.com/haozi3156666/canal-python

github中的给出的例子是不对的,不能正确显示出update的前值。下面是修正过的:

import time



from canal.client import Client

from canal.protocol import EntryProtocol_pb2

from canal.protocol import CanalProtocol_pb2



client = Client()

client.connect(host='127.0.0.1', port=11111)

client.check_valid(username=b'root', password=b'oracle')

client.subscribe(client_id=b'1001', destination=b'example', filter=b'.*\\..*')



while True:

    message = client.get(100)

    entries = message['entries']

    for entry in entries:

        entry_type = entry.entryType

        if entry_type in [EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN, EntryProtocol_pb2.EntryType.TRANSACTIONEND]:

            continue

        row_change = EntryProtocol_pb2.RowChange()

        row_change.MergeFromString(entry.storeValue)

        event_type = row_change.eventType

        header = entry.header

        database = header.schemaName

        table = header.tableName

        event_type = header.eventType

        for row in row_change.rowDatas:

            format_data = dict()

            if event_type == EntryProtocol_pb2.EventType.DELETE:

                for column in row.beforeColumns:

                    format_data = {

                        column.name: column.value

                    }

            elif event_type == EntryProtocol_pb2.EventType.INSERT:

                for column in row.afterColumns:

                    format_data = {

                        column.name: column.value

                    }

            else:

 


               #format_data['before'] = format_data['after'] = dict()

                format_data['before'] = dict()

                format_data['after'] = dict()

                for column in row.beforeColumns:

                    format_data['before'][column.name] = column.value

                for column in row.afterColumns:

                    format_data['after'][column.name] = column.value

            data = dict(

                db=database,

                table=table,

                event_type=event_type,

                data=format_data,

            )

            print(data)

    time.sleep(1)



client.disconnect()

操作数据库

mysql> insert into t1 select 1;

Query OK, 1 row affected (0.02 sec)

Records: 1  Duplicates: 0  Warnings: 0




mysql> update t1 set a=2 where a=1;

Query OK, 1 row affected (0.17 sec)

Rows matched: 1  Changed: 1  Warnings: 0




mysql> delete from t1 where a=2;

Query OK, 1 row affected (0.02 sec)

输出:

connected to 127.0.0.1:11111

Auth succed

Subscribe succed

{'db': 'ming', 'table': 't1', 'event_type': 1, 'data': {'a': '1'}}

{'db': 'ming', 'table': 't1', 'event_type': 2, 'data': {'before': {'a': '1'}, 'after': {'a': '2'}}}

{'db': 'ming', 'table': 't1', 'event_type': 3, 'data': {'a': '2'}}

相关推荐