本文将简单讲下使用ogg将oracle增量数据实时同步至kafka(无安全认证模式)的配置操作,后续也将更新在kafka试用不同认证模式下对应的ogg的配置方式。 一、环境描述 1.1 环境 简介
| IP | 操作系统 | 数据库版本 | OGG版本 | |
| 源端RAC | 192.168.238.121/122 | RHEL6.4 | oracle1120 4 | Oracle GoldenGate 11.2.1.0.1 for Oracle on Linux x86-64 |
| 目标端kafka集群 | 192.168.238.141 ( kafka 集群其中一个节点) | RHEL6.4 | kafka_2.11-1.1.0 | Oracle GoldenGate for Big Data 12.3.0.1.4 on Linux x86-64 |
注:下面操作展示不涉及kafka集群搭建,默认 kafka 集群已经搭建完 。
二 、 检查生产库
2 .1附加日志检查 SQL> select supplemental_log_data_min from v$database; SUPPLEME -------- YES -------------若不是YES,需要打开 打开语句: SQL> alter database add supplemental log data; SQL> ALTER SYSTEM SWITCH LOGFILE;
2 .2打开 force log SQL> select force_logging from v$database; FOR --- YES SQL> ALTER DATABASE FORCE LOGGING;
2 .3查询压缩表SQL> select owner,table_name from dba_tables where owner in ('CBSRUN2') and compression='ENABLED';SQL> select table_owner,table_name from dba_tab_partitions where table_owner in ('CBSRUN2') and compression='ENABLED';
2 .4查询不支持的列的表格
SQL> select owner,table_name,column_name,data_type from dba_tab_columns where owner in ('CBSRUN2') and data_type in ('ANYDATA','ANYDATASET','ANYTYPE','BFILE','BINARY_INTEGER','MLSLABEL','PLS_INTEGER','TIMEZONE_ABBR','TIMEZONE_REGION','URITYPE,UROWID');
2 .5 查询 物化 视 图
查看要复制的表的日志信息是完整的,确保是logging。把nologing变成logging。 SQL> select owner,table_name from dba_tables where owner in ('CBSRUN2') and logging='NO' and temporary='N'; SQL> alter table xue.tt1291 logging; SQL> select object_name,owner from dba_objects where object_type='MATERIALIZED VIEW';
2 .6生产库 创建管理用户 --创建管理用户 SQL> create tablespace odc_tps datafile '+DATA/trac/datafile/odc01.dbf' size 100M autoextend on; SQL> create user odc identified by odc default tablespace odc_tps; --授权,DBA权限在安装成功后可以收回 GRANT CONNECT TO odc; GRANT ALTER ANY TABLE TO odc; GRANT ALTER SESSION TO odc; GRANT CREATE SESSION TO odc; GRANT FLASHBACK ANY TABLE TO odc; GRANT SELECT ANY DICTIONARY TO odc; GRANT SELECT ANY TABLE TO odc; GRANT RESOURCE TO odc; GRANT DBA TO odc; --安装成功后可以收回dba权限,但是要授予UNLIMITED TABLESPACE权限。 GRANT UNLIMITED TABLESPACE TO odc; 或者alter user odc quota unlimited on users; 三 、 安装软件 生产和备份端安装步骤一样,只有软件及环境变量可能不一样 3 .1创建安装目录或文件系统
# su - oracle $ mkdir /backup/ogg
3 .2配置 library环境变量
LINUX : LIBRARY_PATH例如:export LIBRARY_PATH=/backup/ogg:$LIBPATHexoprt LIBRARY_PATH=$ORACLE_HOME/lib:$LIBPATH
3 .3安装软件
1. 解压软件安装包$ cd /odc$ unzip ogg112101_fbo_ggs_Linux_x64_ora11g_64bit.zip$ tar xf fbo_ggs_Linux_x64_ora11g_64bit.tar
2. 创建 子工作目录解压并解包成功后在安装目录(如:/odc)执行$ ./ggsciggsci>create subdirs
以上所创建的目录的作用:
|
N ame |
purpose |
|
|
dirch k |
Checkpoint files |
用来存放检查点 (checkpoint) 文件 , 次检查点是 ogg 自己的检查点与实例不同 |
|
dirdat |
GoldenGate trails |
用来存放 TRAIL 文件 |
|
dirdef |
Data definition files |
用来存放通过 DEFGEN 工具生成的源或目标端数据定义文件 |
|
dirprm |
Parameter files |
用来存放配置参数文件 |
|
dirpcs |
Process status files |
用来存放进程状态文件 |
|
dirrpt |
Report files |
用来存放进程报告文件 |
|
dirsql |
SQL script files |
用来存放 SQL 脚本文件 |
|
dirtmp |
Temporary files |
当事物所需要的内存超过已分配内存时 , 默认存储在这个目录 |
3. 创建管理 进程
创建 管理进程 所需 参数:
|
参数 |
阀值 |
定义 |
|
port |
7809 |
mgr 进程使用的 TCP/IP 端口侦听请求,默认端口为 7809 |
|
DYNAMICPORTLIST |
7800-7810 |
指定可用动态 TCP/IP 端口列表,用于源端与目标端进程通信的绑定 ,最大支持 256 个端口 |
|
PURGEOLDEXTRACTS |
./dirdat/sm* |
定期清理抽取出的过期文件,可以被设置在 Manager, Extract, and Replicat 参数文件中, oracle 建议设置在 Manager 中 |
|
USECHECKPOINTS |
根据任何 MINKEEP 规则,允许清除那些已经 被Extract 和 Replicat 进程检测过的 , 保证数据不丢失。 | |
|
MINKEEPHOURS |
24 |
trail 文件的 保留时间,超过这个时间则删除 |
|
autorestart |
extract * |
指定 进程失败时自动重启 |
|
retries |
10 |
指定尝试 重新启动进程的次数 (默认 尝试 2 次 ) |
|
waitminutes |
10 |
指定重新 启动进程的 等待 时间 ,直到必要资源变得可用或一些其他的事件发生。默认的延迟时间为 2 分钟。 |
具体 设置如 下 : ggsci>edit param mgr 输入: port 7809 DYNAMICPORTLIST 7800-7820 PURGEOLDEXTRACTS ./dirdat/cz*, USECHECKPOINTS, MINKEEPHOURS 24 autorestart extract * retries 10 waitminutes 10 ggsci>edit param ./GLOBALS 输入: GGSCHEMA odc CHECKPOINTTABLE odc.ggs_checkpoint ---保存退出 ggsci>start mgr
四、 为表格添加 supplement log
ggsci>dblogin userid odc password odc ggsci>add trandata CBSRUN2.SAVJ_ATOMJOUR0ggsci>info trandata CBSRUN2.SAVJ_ATOMJOUR0
五 、 创建抽取进程
抽取进程要 配置的参数:
|
参数 |
阀值 |
定义 |
|
extract |
sm_ext |
指定进程名和类型 |
|
setenv |
(NLS_LANG="SIMPLIFIED CHINESE_CHINA.ZHS16GBK") |
配置系统环境变量 |
|
userid / password |
odc |
指定 OGG 连接数据库的用户名和密码 |
|
exttrail |
./dirdat/sm |
指定写入到本地的哪个队列 |
|
tranlogoptions |
指定在解析数据库日志时所需要的特殊参数 | |
|
altarchivelogdest |
instance or a11g1 +MCDATA/archlog,altarchivelogdest instance ora11g2 +MCDATA/archlog |
指定 归档 路径 |
|
FETCHOPTIONS |
指定 ogg 获取数据的方式 | |
|
FETCHPKUPDATECOLS |
复制进程出现丢失 update 记录( missing update )并且更新的是主键, update 将转换成 insert 。 ( 当使用了 HANDLECOLLISIONS 时,请使用该参数。 ) | |
|
ddl |
include objname oggtest.* exclude objtype 'TRIGGER' |
使用 DDL 参数,指定 DDL 的支持和过滤 DDL 操作。 |
|
table |
oggtest.* |
定义需要复制的表,后面需以 ; 结尾 |
具体参数 设置如 下 :
ggsci>add extract cz_ext tranlog threads 2 begin now ggsci>add exttrail ./dirdat/cz extract cz_ext 注: tranlog:表示数据抓取的来源是数据库的redo数据。 threads 2:表示我们数据库有多少个redo threads,单实例基本上是1或者不设,rac就自己设置了,配置过RAC的基本都了解。 begin now:表示我们在启动这个抓取进程的就去抓取数据。 ./dirdat:表示trail文件的目录 sm:trail文件的前缀 extract sm_ext:值指定给那个进程用的(sm_ext)。 ggsci>edit param cz_ext 参数: extract cz_ext setenv (NLS_LANG="SIMPLIFIED CHINESE_CHINA.ZHS16GBK") userid odc,password odc exttrail ./dirdat/cz tranlogoptions dblogreader FETCHOPTIONS FETCHPKUPDATECOLS table oggtest. * ; ggsci>start cz_ext
-- 观察,能够挖掘再继续下面的操作
六、 增长 pump进程
传输进程要 配置的参数:
|
参数 |
阀值 |
定义 |
|
extract |
sm_dmp |
指定进程名和类型 |
|
userid / password |
odc |
指定 OGG 连接数据库的用户名和密码 |
|
rmthost |
192.168.56.33 |
指定目标端主机 IP |
|
mgrport |
7809 |
指定管理进程端口号 |
|
rmttrail |
./dirdat/sm |
指定目标端保存队列文件的目录 |
|
passthru |
采用 pass-through 模式处理表 | |
|
table |
oggtest.* |
定义需要复制的表,后面需以 ; 结尾 |
ggsci>add extract cz_dmp EXTTRAILSOURCE ./dirdat/cz ggsci>ADD RMTTRAIL ./dirdat/cz, EXTRACT cz_dmp 注: EXTTRAILSOURCE:指定提取文件作为数据源 ADD RMTTRAIL:在目标数据库上创建一个trail ggsci>edit param cz_dmp 参数: extract cz_dmp userid odc,password odc rmthost 192.168.238.141, mgrport 7809 rmttrail ./dirdat/cz passthru table oggtest. *; ggsci>start sm_dmp
七、目标 kafka端创建 replicat进程
创建测试topic ./kafka-topics.sh --create --zookeeper 192.168.238.141:2181,192.168.238.142:2181,192.168.238.143:2181 --replication-factor 3 --partitions 3 --topic oggtest 模拟测试任务发布订阅 生产者 ./kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic oggtest 消费者 ./kafka-console-consumer.sh --bootstrap-server node1:9092 --topic oggtest --from-beginning 确认kafka可用正常使用后,创建ogg 应用进程,将源端增量数据同步到kafka集群 具体参数如下: ggsci> Add replicat rep_ka exttrail ./dirdat/cz REPLICAT rep_ka sourcedefs ./dirdef/kafka.def TARGETDB LIBFILE libggjava.so SET property= dirprm/kafka.props REPORTCOUNT EVERY 1 MINUTES, RATE GROUPTRANSOPS 10000 MAP oggtest .t_ogg , TARGET oggtest.t_ogg; 从/ogg/AdapterExamples/big-data/kafka中将kafka.props,custom_kafka_producer.properties复制到/ogg/dirprm并根据实际情况修改 kafka.props配置如下 gg.handlerlist = kafkahandler //handler 类型 gg.handler.kafkahandler.type=kafka gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties //kafka 相关配置 #gg.handler.kafkahandler.TopicName=oggtest //kafka 的 topic 名称,无需手动创建 gg.handler.kafkahandler.topicMappingTemplate=oggtest gg.handler.kafkahandler.format=json // 传输文件的格式,支持 json , xml 等 gg.handler.kafkahandler.mode=op //OGG for Big Data 中传输模式,即 op 为一次 SQL 传输一次, tx 为一次事务传输一次 #Sample gg.classpath for Apache Kafka gg.classpath=dirprm/:/kafka/kafka_2.12-1.1.1/libs/*:/ogg/:/ogg/lib/*
// 相关库文件的引用 custom_kafka_producer.properties配置如下 bootstrap.servers=192.168.238.141:9092,192.168.238.142:9092,192.168.238.143:9092 //kafkabroker 的地址 acks=1 reconnect.backoff.ms=1000 // 重连延时 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer # 100KB per partition batch.size=102400 linger.ms=10000 配置完后,启动应用进程观察 ggsci> start rep_kaSending START request to MANAGER ...REPLICAT REP_KA starting 八、测试同步效果
测试方法比较简单,直接在源端的数据表中insert,update,delete操作即可。
SQL> conn oggtest/oggtest
Connected.
SQL> select * from t_ogg;
no rows selected
SQL> desc t_ogg;
Name Null? Type
----------------------------------------- -------- ----------------------------
ID NOT NULL NUMBER(38)
TEXT_NAME VARCHAR2(20)
SQL> insert into t_ogg values(1,'test');
1 row created.
SQL> commit;
Commit complete.
查看源端trail文件状态
$ ls -l /backup/ogg/dirdat/tc*
-rw-rw-rw- 1 oracle oinstall 1180 Nov 9 17:05 /backup/ogg/dirdat/tc000000
查看目标端trail文件状态
# ls -l /data/gg/dirdat/tc*
-rw-r----- 1 root root 1217 Nov 9 17:05 /data/gg/dirdat/tc000000
检查实时同步到kafka的效果,在Oracle源端更新表的同时,使用kafka客户端自带的脚本去查看这里配置的ggtopic这个kafkatopic下的消息:
SQL> insert into t_ogg values(2,'test2');
1 row created.
SQL> commit;
Commit complete.
目标端Kafka的同步情况:
# bin/kafka-console-consumer.sh --zookeeper 192.168.238.141:2181 --
from-beginning --topic ggtopic
{"table":"OGGTEST.T_OGG","op_type":"I","op_ts":"2025-11-09
09:05:25.067082","current_ts":"2025-11-
09T17:59:20.943000","pos":"00000000000000001080","after":
{"ID":"1","TEXT_NAME":"test"}}
{"table":" OGGTEST.T_OGG","op_type":"I","op_ts":"2025-11-09
10:02:06.827204","current_ts":"2025-11-
09T18:02:12.323000","pos":"00000000000000001217","after":
{"ID":"2","TEXT_NAME":"test2"}}
可以看到,Oracle的增量数据已实时同步到Kafka。
