Ape-DTS 是一款高效、轻量级且功能强大的 开源工具,专注于解决 数据迁移、同步、校验、订阅与加工的需求。无论是 将自建的 MySQL/PostgreSQL 数据库迁移到云端,还是 在不同数据库间进行数据迁移,Ape-DTS 都能为您提供 便捷且可靠的解决方案。它特别适合于将自建 MySQL 数据库迁移到其他 MySQL 环境(如云端 MySQL、KubeBlocks MySQL),或者其他分析型数据库(例如 ClickHouse、StarRocks),以及消息队列(例如 Kafka)等场景。
https://github.com/apecloud/ape-dts

欢迎扫码添加小助手,备注「DTS」,进入交流群。

为什么选择 Ape-DTS
Ape-DTS 是一款旨在实现 any-to-any 的数据迁移工具:
库表结构迁移,
数据全量迁移、
增量迁移,
数据校验、
订正、
复查,
数据订阅、
加工 等能力。
支持广泛: 目前已支持数据库包括 MySQL,Postgres,Redis,Mongo,StarRocks,ClickHouse,Kafka 等。
简单轻量: 不依赖第三方组件和额外存储,完整镜像解压后小于 100 MB。
性能突出,使用 Rust 开发。
Ape-DTS 支持 MySQL,Postgres,Redis,Mongo,StarRocks,ClickHouse,Kafka 等数据库间的迁移,具体如下:
MySQL -> MySQL
PostgreSQL -> PostgreSQL
MongoDB -> MongoDB
Redis -> Redis
MySQL -> Kafka
PostgreSQL -> Kafka
MySQL -> StarRocks
MySQL -> ClickHouse
MySQL -> TiDB
PostgreSQL -> StarRocks
全量迁移
✅
✅
✅
✅
✅
✅
✅
✅
✅
✅
增量同步
✅
✅
✅
✅
✅
✅
✅
✅
✅
✅
数据校验/订正/复查
✅
✅
✅
✅
库表结构迁移
✅
✅
✅
✅
✅
✅
功能亮点
支持多种数据库间的
同构、
异构数据迁移和同步。
支持全量、增量任务的
断点续传。
支持数据校验、订正。
支持库、表、列级别的过滤和路由。
针对不同源、目标、任务类型,实现不同的并发算法,提高性能。
可加载用户 Lua 脚本,加工正在迁移/同步的数据。
支持将数据发送到 Kafka,供用户自主消费。
支持以 HTTP Server 的方式启动 Ape-DTS 并拉取增量数据,用户可使用 HTTP Client 获取数据并自主消费。
Ape-DTS 的性能表现
Ape-DTS 在多种场景下展现了卓越的性能表现。本文使用 sysbench 生成全量和增量数据,分别使用 Ape-DTS 和 Debezium 执行迁移任务,并对比结果。
以下是 MySQL -> MySQL 的测试,源端 MySQL和目标端 MySQL 均在 8C16G BCC(百度智能云云服务器)机器上使用 Docker 部署。
可以看到,Ape-DTS 的全量/增减迁移性能都显著优于 Debezium。在相同的节点规格(4C8G)下,Ape-DTS 的全量迁移性能约为 Debezium 的
31 倍,Ape-DTS 的增量迁移性能约为 Debezium 的
9 倍。
测试一:全量数据迁移
同步方式
节点规格
RPS(Rows per Second)
源 MySQL 负荷(CPU/内存)
目标 MySQL 负荷(CPU/内存)
Ape-DTS
1C2G
71428
8.2% / 5.2%
211% / 5.1%
Ape-DTS
2C4G
99403
14.0% / 5.2%
359% / 5.1%
Ape-DTS
4C8G
126582
13.8% / 5.2%
552% / 5.1%
Debezium
4C8G
4051
21.5% / 5.2%
51.2% / 5.1%
测试二:增量数据迁移
同步方式
节点规格
RPS(Rows per Second)
源 MySQL 负荷(CPU/内存)
目标 MySQL 负荷(CPU/内存)
Ape-DTS
1C2G
15002
18.8% / 5.2%
467% / 6.5%
Ape-DTS
2C4G
24692
18.1% / 5.2%
687% / 6.5%
Ape-DTS
4C8G
26287
18.2% / 5.2%
685% / 6.5%
Debezium
4C8G
2951
20.4% / 5.2%
98% / 6.5%
镜像对比
工具
镜像大小
Ape-DTS
86.4 MB
Debezium
1.38 GB
如何使用 Ape-DTS 迁移 MySQL?
以下是 Ape-DTS 在
自建 MySQL 数据库迁移到 KubeBlocks MySQL 场景中的实际使用示例。
更多示例可参考: https://github.com/apecloud/ape-dts/tree/main/docs/en/tutorial。
配置文件差异总结
针对不同的任务,配置文件中的
extract_type,
sink_type 等其他配置不同。
以下是一个简略的配置文件差异总结。更多具体配置,可参考官网上教程、模板及配置说明:https://github.com/apecloud/ape-dts。
任务类型
extract_type
sink_type
特殊配置
说明
库表结构迁移
struct
struct
do_dbs
指定需要迁移的数据库
全量迁移
snapshot
write
parallel_type=snapshot
全量抓取数据快照
增量同步
cdc
write
server_id,
do_events
基于 binlog 同步数据变更
数据校验
snapshot
check
日志输出
比较源库与目标库数据一致性
数据订正
check_log
write
check_log_dir
指定校验结果日志的路径,用于订正任务
数据复查
check_log
check
check_log_dir
指定校验结果日志的路径
1. 准备工作
1.1 环境准备
工具准备
该示例中使用的 DTS_IMAGE 版本如下:
APE_DTS_IMAGE="docker.io/apecloud/ape-dts:2.0.12"
源库
用本地 Docker 搭建 MySQL。
docker run -d --name some-mysql-1 \
--platform linux/x86_64 \
-it \
-p 3307:3306 -e MYSQL_ROOT_PASSWORD="123456" \ "$MYSQL_IMAGE" --lower_case_table_names=1 --character-set-server=utf8 --collation-server=utf8_general_ci \
--datadir=/var/lib/mysql \
--user=mysql \
--server_id=1 \
--log_bin=/var/lib/mysql/mysql-bin.log \
--max_binlog_size=100M \
--gtid_mode=ON \
--enforce_gtid_consistency=ON \
--binlog_format=ROW \
--default_time_zone=+08:00
将主机的端口
3307 映射到容器的端口
3306。
设置 MySQL root 密码为
123456。
使用由
$MYSQL_IMAGE 指定的镜像。
记录源端的 URL:
url=mysql://root:123456@127.0.0.1:3307?ssl-mode=disabled
目标库
我们在 ACK 上使用
KubeBlocks 搭建了 MySQL 集群,更多集群运维操作可参考
KubeBlocks MySQL Examples。
-
创建集群。
# 创建 MySQL 集群kbcli cluster create mycluster --cluster-definition mysql -n demo
-
暴露服务。这里我们通过 LoadBalancer 暴露服务地址。
# 将集群暴露至公网kbcli cluster expose mycluster --type internet --enable=true -ndemo
查看到公网地址为:
47.xx.xx.xx,记录为你的
目标地址。
记录目标端地址:
url=mysql://ape_test:Ape123456789@<目标地址>:3306?ssl-mode=disabled
1.2 数据准备
登录本地 MySQL,创建测试用的数据库和表。
mysql -h127.0.0.1 -uroot -p123456 -P3307
CREATE DATABASE test_db;
CREATE TABLE test_db.tb_1(id int, value int, primary key(id));
CREATE TABLE test_db.tb_2(id int, value text, primary key(id));
INSERT INTO test_db.tb_1 VALUES(1,1),(2,2),(3,3),(4,4);
INSERT INTO test_db.tb_2 VALUES(5,'a'),(6,'b'),(7,'c'),(8,'d');
2. 库表结构迁移
创建任务配置
请将以下示例配置中的 extractor.url 和 sinker.url 替换为前面记录的源端 URL 和目标端 URL。
核心配置:
extract_type=struct 和
sink_type=struct:表示迁移的是库表结构。
do_dbs:指定需要迁移的数据库。
cat <<EOL > /tmp/ape_dts/task_config.ini
[extractor]
extract_type=struct
db_type=mysql
url=mysql://root:123456@127.0.0.1:3307?ssl-mode=disabled
[sinker]
sink_type=struct
db_type=mysql
url=mysql://ape_test:Ape123456789@<目标地址>:3306?ssl-mode=disabled
[filter]
do_dbs=test_db
[parallelizer]
parallel_type=serial
[pipeline]
buffer_size=100
checkpoint_interval_secs=1
EOL
启动任务
docker run --rm --network host \
-v "/tmp/ape_dts/task_config.ini:/task_config.ini" \"$APE_DTS_IMAGE" /task_config.ini
检查目标库
-
登录目标库。
mysql -h<目标地址> -uape_test -pApe123456789
-
查看数据。
mysql> SHOW CREATE TABLE test_db.tb_1;
mysql> SHOW CREATE TABLE test_db.tb_2;
3. 同步全量数据
创建任务配置
核心配置:
extract_type=snapshot:表示全量迁移,抓取源数据库的快照。
sink_type=write:表示将数据写入目标数据库。
parallel_type=snapshot 和
parallel_size:控制快照并发级别,提高全量迁移效率。
cat <<EOL > /tmp/ape_dts/task_config.ini
[extractor]
db_type=mysql
extract_type=snapshot
url=mysql://root:123456@127.0.0.1:3307?ssl-mode=disabled
[sinker]
db_type=mysql
sink_type=write
url=mysql://ape_test:Ape123456789@<目标地址>:3306?ssl-mode=disabled
[filter]
do_dbs=test_db
do_events=insert
[parallelizer]
parallel_type=snapshot
parallel_size=8
[pipeline]
buffer_size=16000
checkpoint_interval_secs=1
EOL
启动任务
docker run --rm --network host \
-v "/tmp/ape_dts/task_config.ini:/task_config.ini" \"$APE_DTS_IMAGE" /task_config.ini
检查目标库
-
登录目标库。
mysql -h<目标地址> -uape_test -pApe123456789
-
检查数据。
mysql> SELECT * FROM test_db.tb_1;
+----+-------+
| id | value |
+----+-------+
| 1 | 1 |
| 2 | 2 |
| 3 | 3 |
| 4 | 4 |
+----+-------+
mysql> SELECT * FROM test_db.tb_2;
+----+-------+
| id | value |
+----+-------+
| 5 | a |
| 6 | b |
| 7 | c |
| 8 | d |
+----+-------+
4. 增量任务
创建任务配置
核心配置:
extract_type=cdc:表示增量同步,基于源库的 binlog 或 WAL 日志抓取数据变更。
sink_type=write:表示将数据写入目标数据库。
server_id:Ape-DTS 在该 MySQL 复制组中的标识,由用户指定,取值 [1-2^32 - 1],不得与该复制组中其他 server_id 相同。
do_events:指定需同步的事件类型(如
insert,
update,
delete)。
cat <<EOL > /tmp/ape_dts/task_config.ini
[extractor]
db_type=mysql
extract_type=cdc
server_id=2000
url=mysql://root:123456@127.0.0.1:3307?ssl-mode=disabled
[filter]
do_dbs=test_db
do_events=insert,update,delete
[sinker]
db_type=mysql
sink_type=write
batch_size=200
url=mysql://ape_test:Ape123456789@<目标地址>:3306?ssl-mode=disabled
[parallelizer]
parallel_type=rdb_merge
parallel_size=8
[pipeline]
buffer_size=16000
checkpoint_interval_secs=1
EOL
启动任务
docker run --rm --network host \
-v "/tmp/ape_dts/task_config.ini:/task_config.ini" \"$APE_DTS_IMAGE" /task_config.ini
修改源库数据
-
登录本地 MySQL。
mysql -h127.0.0.1 -uroot -p123456 -uroot -P3307
-
修改数据。
DELETE FROM test_db.tb_1 WHERE id=1;
UPDATE test_db.tb_1 SET value=2000000 WHERE id=2;
INSERT INTO test_db.tb_2 VALUES(9, 'f');
检查目标库
-
登录目标 MySQL。
mysql -h<目标地址> -uape_test -pApe123456789
-
查看目标端数据。
mysql> SELECT * FROM test_db.tb_1;
+----+---------+
| id | value |
+----+---------+
| 2 | 2000000 |
| 3 | 3 |
| 4 | 4 |
+----+---------+
mysql> SELECT * FROM test_db.tb_2;
+----+-------+
| id | value |
+----+-------+
| 5 | a |
| 6 | b |
| 7 | c |
| 8 | d |
| 9 | f |
+----+-------+
可以看到增量数据都已经同步了。
5. 数据校验
在目标端修改数据
-
登录目标 MySQL。
mysql -h<目标地址> -uape_test -pApe123456789
-
在目标端修改数据,构造和源库的差异。
DELETE FROM test_db.tb_1 WHERE id=4; # 删除tb_1数据UPDATE test_db.tb_1 SET value=1 WHERE id=2; # 修改tb_1数据DELETE FROM test_db.tb_2 WHERE id=5; # 删除tb_2数据
创建任务配置
核心配置:
extract_type=snapshot:校验基于源库的全量数据快照。
sink_type=check:表示校验目标库与源库数据是否一致。
输出日志:校验的差异会记录在日志文件中(如缺失数据和不一致的数据)。
cat <<EOL > /tmp/ape_dts/task_config.ini
[extractor]
db_type=mysql
extract_type=snapshot
url=mysql://root:123456@127.0.0.1:3307?ssl-mode=disabled
[sinker]
db_type=mysql
sink_type=check
url=mysql://ape_test:Ape123456789@<目标地址>:3306?ssl-mode=disabled
[filter]
do_dbs=test_db
do_events=insert
[parallelizer]
parallel_type=rdb_check
parallel_size=8
[pipeline]
buffer_size=16000
checkpoint_interval_secs=1
EOL
启动任务
docker run --rm --network host \
-v "/tmp/ape_dts/task_config.ini:/task_config.ini" \
-v "/tmp/ape_dts/check_data_task_log/:/logs/" \"$APE_DTS_IMAGE" /task_config.ini
查看校验结果
以
源库为基准,检查目标库的数据缺失和不同,校验结果以日志文件输出。
-
检查数据缺失。
cat /tmp/ape_dts/check_data_task_log/check/miss.log
可以看到具体的缺失数据信息。
{"log_type":"Miss","schema":"test_db","tb":"tb_1","id_col_values":{"id":"4"},"diff_col_values":{}}
{"log_type":"Miss","schema":"test_db","tb":"tb_2","id_col_values":{"id":"5"},"diff_col_values":{}}
-
检查数据差异。
cat /tmp/ape_dts/check_data_task_log/check/diff.log
可以看到输出如下,diff_col_values展示了差异的具体内容。
{"log_type":"Diff","schema":"test_db","tb":"tb_1","id_col_values":{"id":"2"},"diff_col_values":{"value":{"src":"2000000","dst":"1"}}}
6. 数据订正
根据校验日志,反查源库,订正目标库。
创建任务配置
核心配置:
extract_type=check_log:表示基于校验日志执行数据订正任务。
sink_type=write:将订正后的数据写回目标库。
check_log_dir:指定校验日志的路径,用于订正任务。
cat <<EOL > /tmp/ape_dts/task_config.ini
[extractor]
db_type=mysql
extract_type=check_log
url=mysql://root:123456@127.0.0.1:3307?ssl-mode=disabled
check_log_dir=./check_data_task_log
[sinker]
db_type=mysql
sink_type=write
url=mysql://ape_test:Ape123456789@<目标地址>:3306?ssl-mode=disabled
[filter]
do_events=*
[parallelizer]
parallel_type=rdb_check
parallel_size=8
[pipeline]
buffer_size=16000
checkpoint_interval_secs=1
EOL
启动任务
docker run --rm --network host \
-v "/tmp/ape_dts/task_config.ini:/task_config.ini" \
-v "/tmp/ape_dts/check_data_task_log/check/:/check_data_task_log/" \"$APE_DTS_IMAGE" /task_config.ini
查看订正后的结果
-
登录目标库。
mysql -h<目标地址> -uape_test -pApe123456789
-
查看数据。
mysql> SELECT * FROM test_db.tb_1;
+----+---------+
| id | value |
+----+---------+
| 2 | 2000000 |
| 3 | 3 |
| 4 | 4 |
+----+---------+
mysql> SELECT * FROM test_db.tb_2;
+----+-------+
| id | value |
+----+-------+
| 5 | a |
| 6 | b |
| 7 | c |
| 8 | d |
| 9 | f |
+----+-------+
可以看到目标端被删除和更新的数据,都已经被订正了。
7. 数据复查
根据校验日志,反查源库,再次校验目标库
和全量校验的区别在于校验数据的范围:数据复查限定在校验出的 缺失/不同 的数据
修改目标库数据,构造和源库的差异
-
登录目标库。
mysql -h<目标地址> -uape_test -pApe123456789
-
修改数据。
DELETE FROM test_db.tb_1 WHERE id=4; # 删除tb_1数据
创建任务配置
核心配置:
extract_type=check_log:基于校验日志。
sink_type=check 用于复查目标库数据。
check_log_dir:指定校验日志的路径。
cat <<EOL > /tmp/ape_dts/task_config.ini[extractor]db_type=mysqlextract_type=check_logurl=mysql://root:123456@127.0.0.1:3307?ssl-mode=disabledcheck_log_dir=./check_data_task_log[sinker]db_type=mysqlsink_type=checkurl=mysql://ape_test:Ape123456789@<目标地址>:3306?ssl-mode=disabled[filter]do_events=*[parallelizer]parallel_type=rdb_checkparallel_size=8[pipeline]buffer_size=16000checkpoint_interval_secs=1EOL
启动任务
docker run --rm --network host \
-v "/tmp/ape_dts/task_config.ini:/task_config.ini" \
-v "/tmp/ape_dts/check_data_task_log/check/:/check_data_task_log/" \
-v "/tmp/ape_dts/review_data_task_log/:/logs/" \"$APE_DTS_IMAGE" /task_config.ini
查看复查结果
-
查看数据缺失。
cat /tmp/ape_dts/review_data_task_log/check/miss.log
可以看到输入日志显示 {“id”:“4”} 缺失。
{"log_type":"Miss","schema":"test_db","tb":"tb_1","id_col_values":{"id":"4"},"diff_col_values":{}}
-
查看数据差异。
/tmp/ape_dts/review_data_task_log/check/diff.log 为空,符合预期。
更多教程
更多任务类型、教程、任务配置,请参考
Ape-DTS 主页。
