使用OGG将oracle增量数据实时同步至kafka(无安全认证模式)

来源:这里教程网 时间:2026-03-03 23:21:02 作者:

本文将简单讲下使用ogg将oracle增量数据实时同步至kafka(无安全认证模式)的配置操作,后续也将更新在kafka试用不同认证模式下对应的ogg的配置方式。 一、环境描述 1.1 环境 简介

IP 操作系统 数据库版本 OGG版本
源端RAC 192.168.238.121/122 RHEL6.4 oracle1120 4 Oracle GoldenGate 11.2.1.0.1 for Oracle on Linux x86-64
目标端kafka集群 192.168.238.141 kafka 集群其中一个节点) RHEL6.4 kafka_2.11-1.1.0 Oracle GoldenGate for Big Data 12.3.0.1.4 on Linux x86-64  

注:下面操作展示不涉及kafka集群搭建,默认 kafka 集群已经搭建完

检查生产库

2 .1附加日志检查 SQL> select supplemental_log_data_min from v$database; SUPPLEME -------- YES                       -------------若不是YES,需要打开 打开语句: SQL> alter database add supplemental log data; SQL> ALTER SYSTEM SWITCH LOGFILE;

 

2 .2打开 force log SQL> select force_logging from v$database; FOR --- YES SQL> ALTER DATABASE FORCE LOGGING;

 

2 .3查询压缩表SQL> select owner,table_name from dba_tables where owner in ('CBSRUN2') and compression='ENABLED';SQL> select table_owner,table_name from dba_tab_partitions where table_owner in ('CBSRUN2') and compression='ENABLED';

 

2 .4查询不支持的列的表格

SQL> select owner,table_name,column_name,data_type from dba_tab_columns where owner in ('CBSRUN2') and data_type in ('ANYDATA','ANYDATASET','ANYTYPE','BFILE','BINARY_INTEGER','MLSLABEL','PLS_INTEGER','TIMEZONE_ABBR','TIMEZONE_REGION','URITYPE,UROWID');

2 .5 查询 物化  

  查看要复制的表的日志信息是完整的,确保是logging。把nologing变成logging。 SQL> select owner,table_name from dba_tables where owner in ('CBSRUN2') and logging='NO' and temporary='N'; SQL> alter table xue.tt1291 logging; SQL> select object_name,owner from dba_objects where object_type='MATERIALIZED VIEW';

2 .6生产库 创建管理用户 --创建管理用户 SQL> create tablespace odc_tps datafile '+DATA/trac/datafile/odc01.dbf' size 100M autoextend on; SQL> create user odc identified by odc default tablespace odc_tps; --授权,DBA权限在安装成功后可以收回 GRANT CONNECT TO odc; GRANT ALTER ANY TABLE TO odc; GRANT ALTER SESSION TO odc; GRANT CREATE SESSION TO odc; GRANT FLASHBACK ANY TABLE TO odc; GRANT SELECT ANY DICTIONARY TO odc; GRANT SELECT ANY TABLE TO odc; GRANT RESOURCE TO odc; GRANT DBA TO odc; --安装成功后可以收回dba权限,但是要授予UNLIMITED TABLESPACE权限。 GRANT UNLIMITED TABLESPACE TO odc; 或者alter user odc quota unlimited on users; 安装软件 生产和备份端安装步骤一样,只有软件及环境变量可能不一样 3 .1创建安装目录或文件系统

  # su - oracle $ mkdir /backup/ogg

3 .2配置 library环境变量

LINUX  :  LIBRARY_PATH例如:export LIBRARY_PATH=/backup/ogg:$LIBPATHexoprt LIBRARY_PATH=$ORACLE_HOME/lib:$LIBPATH

3 .3安装软件

1. 解压软件安装包$ cd /odc$ unzip ogg112101_fbo_ggs_Linux_x64_ora11g_64bit.zip$ tar xf fbo_ggs_Linux_x64_ora11g_64bit.tar

2. 创建 子工作目录解压并解包成功后在安装目录(如:/odc)执行$ ./ggsciggsci>create subdirs

以上所创建的目录的作用:

N ame

purpose

dirch k

Checkpoint files

用来存放检查点 (checkpoint) 文件 , 次检查点是 ogg 自己的检查点与实例不同

dirdat

GoldenGate trails

用来存放 TRAIL 文件

dirdef

Data definition files

用来存放通过 DEFGEN 工具生成的源或目标端数据定义文件

dirprm

Parameter files

用来存放配置参数文件

dirpcs

Process status files

用来存放进程状态文件

dirrpt

Report files

用来存放进程报告文件

dirsql

SQL script files

用来存放 SQL 脚本文件

dirtmp

Temporary files

当事物所需要的内存超过已分配内存时 , 默认存储在这个目录

 

3. 创建管理 进程

创建 管理进程 所需 参数:

参数

阀值

定义

port

7809

mgr 进程使用的 TCP/IP 端口侦听请求,默认端口为 7809

DYNAMICPORTLIST

7800-7810

指定可用动态 TCP/IP 端口列表,用于源端与目标端进程通信的绑定 ,最大支持 256 个端口

PURGEOLDEXTRACTS

./dirdat/sm*

定期清理抽取出的过期文件,可以被设置在 Manager, Extract, and Replicat 参数文件中, oracle 建议设置在 Manager

USECHECKPOINTS

根据任何 MINKEEP 规则,允许清除那些已经 Extract Replicat 进程检测过的 保证数据不丢失。

MINKEEPHOURS

24

trail 文件的 保留时间,超过这个时间则删除

autorestart

extract *

指定 进程失败时自动重启

retries

10

指定尝试 重新启动进程的次数 (默认 尝试 2

waitminutes

10

指定重新 启动进程的 等待 时间 ,直到必要资源变得可用或一些其他的事件发生。默认的延迟时间为 2 分钟。

 

具体 设置如 ggsci>edit param mgr 输入: port 7809 DYNAMICPORTLIST 7800-7820 PURGEOLDEXTRACTS ./dirdat/cz*, USECHECKPOINTS, MINKEEPHOURS 24 autorestart extract * retries 10 waitminutes 10 ggsci>edit param ./GLOBALS 输入: GGSCHEMA odc CHECKPOINTTABLE odc.ggs_checkpoint ---保存退出  ggsci>start mgr

  四、 为表格添加 supplement log

  ggsci>dblogin userid odc password odc   ggsci>add trandata CBSRUN2.SAVJ_ATOMJOUR0ggsci>info trandata CBSRUN2.SAVJ_ATOMJOUR0

 

创建抽取进程

抽取进程要 配置的参数:

参数

阀值

定义

extract

sm_ext

指定进程名和类型  

setenv

(NLS_LANG="SIMPLIFIED CHINESE_CHINA.ZHS16GBK")

配置系统环境变量

userid /  password

odc

指定 OGG 连接数据库的用户名和密码

exttrail

./dirdat/sm

指定写入到本地的哪个队列

tranlogoptions

指定在解析数据库日志时所需要的特殊参数

altarchivelogdest

instance or a11g1  +MCDATA/archlog,altarchivelogdest instance ora11g2 +MCDATA/archlog

指定 归档 路径

FETCHOPTIONS

指定 ogg 获取数据的方式

FETCHPKUPDATECOLS

复制进程出现丢失 update 记录( missing update )并且更新的是主键, update 将转换成 insert ( 当使用了 HANDLECOLLISIONS 时,请使用该参数。 )

ddl

include objname oggtest.* exclude objtype 'TRIGGER'

使用 DDL 参数,指定 DDL 的支持和过滤 DDL 操作。

table

oggtest.*

定义需要复制的表,后面需以 ; 结尾  

 

具体参数 设置如

  ggsci>add extract cz_ext tranlog threads 2 begin now ggsci>add exttrail ./dirdat/cz extract cz_ext 注: tranlog:表示数据抓取的来源是数据库的redo数据。 threads 2:表示我们数据库有多少个redo threads,单实例基本上是1或者不设,rac就自己设置了,配置过RAC的基本都了解。 begin now:表示我们在启动这个抓取进程的就去抓取数据。 ./dirdat:表示trail文件的目录 sm:trail文件的前缀 extract sm_ext:值指定给那个进程用的(sm_ext)。 ggsci>edit param cz_ext 参数: extract cz_ext setenv (NLS_LANG="SIMPLIFIED CHINESE_CHINA.ZHS16GBK") userid odc,password odc exttrail ./dirdat/cz tranlogoptions dblogreader FETCHOPTIONS FETCHPKUPDATECOLS table oggtest. * ; ggsci>start cz_ext

-- 观察,能够挖掘再继续下面的操作

 

  六、 增长 pump进程

传输进程要 配置的参数:

参数

阀值

定义

extract

sm_dmp

指定进程名和类型  

userid /  password

odc

指定 OGG 连接数据库的用户名和密码

rmthost

192.168.56.33

指定目标端主机 IP

mgrport

7809

指定管理进程端口号

rmttrail

./dirdat/sm

指定目标端保存队列文件的目录

passthru

采用 pass-through 模式处理表

table

oggtest.*

定义需要复制的表,后面需以 ; 结尾  

  ggsci>add extract cz_dmp EXTTRAILSOURCE ./dirdat/cz ggsci>ADD RMTTRAIL ./dirdat/cz, EXTRACT cz_dmp 注: EXTTRAILSOURCE:指定提取文件作为数据源 ADD RMTTRAIL:在目标数据库上创建一个trail ggsci>edit param cz_dmp 参数: extract cz_dmp userid odc,password odc rmthost 192.168.238.141, mgrport 7809 rmttrail ./dirdat/cz passthru table oggtest. *; ggsci>start sm_dmp

七、目标 kafka端创建 replicat进程

  创建测试topic ./kafka-topics.sh --create --zookeeper 192.168.238.141:2181,192.168.238.142:2181,192.168.238.143:2181 --replication-factor 3 --partitions 3 --topic oggtest 模拟测试任务发布订阅 生产者 ./kafka-console-producer.sh --broker-list node1:9092,node2:9092,node3:9092 --topic oggtest 消费者 ./kafka-console-consumer.sh --bootstrap-server node1:9092 --topic oggtest --from-beginning 确认kafka可用正常使用后,创建ogg 应用进程,将源端增量数据同步到kafka集群 具体参数如下: ggsci> Add replicat rep_ka exttrail ./dirdat/cz REPLICAT rep_ka sourcedefs ./dirdef/kafka.def TARGETDB LIBFILE libggjava.so SET property= dirprm/kafka.props REPORTCOUNT EVERY 1 MINUTES, RATE GROUPTRANSOPS 10000 MAP oggtest .t_ogg , TARGET oggtest.t_ogg; 从/ogg/AdapterExamples/big-data/kafka中将kafka.props,custom_kafka_producer.properties复制到/ogg/dirprm并根据实际情况修改 kafka.props配置如下 gg.handlerlist = kafkahandler  //handler 类型 gg.handler.kafkahandler.type=kafka gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties  //kafka 相关配置 #gg.handler.kafkahandler.TopicName=oggtest  //kafka topic 名称,无需手动创建 gg.handler.kafkahandler.topicMappingTemplate=oggtest gg.handler.kafkahandler.format=json  // 传输文件的格式,支持 json xml gg.handler.kafkahandler.mode=op  //OGG for Big Data 中传输模式,即 op 为一次 SQL 传输一次, tx 为一次事务传输一次 #Sample gg.classpath for Apache Kafka gg.classpath=dirprm/:/kafka/kafka_2.12-1.1.1/libs/*:/ogg/:/ogg/lib/*  

// 相关库文件的引用   custom_kafka_producer.properties配置如下 bootstrap.servers=192.168.238.141:9092,192.168.238.142:9092,192.168.238.143:9092   //kafkabroker 的地址 acks=1 reconnect.backoff.ms=1000  // 重连延时 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer # 100KB per partition batch.size=102400 linger.ms=10000 配置完后,启动应用进程观察 ggsci> start rep_kaSending START request to MANAGER ...REPLICAT REP_KA starting 八、测试同步效果

测试方法比较简单,直接在源端的数据表中insert,update,delete操作即可。

SQL> conn oggtest/oggtest

Connected.

SQL> select * from t_ogg;

no rows selected

SQL> desc t_ogg;

 Name                                      Null?    Type

 ----------------------------------------- -------- ----------------------------

 ID                                        NOT NULL NUMBER(38)

 TEXT_NAME                                          VARCHAR2(20)

SQL> insert into t_ogg values(1,'test');

1 row created.

SQL> commit;

Commit complete.

查看源端trail文件状态

$ ls -l /backup/ogg/dirdat/tc*

-rw-rw-rw- 1 oracle oinstall 1180 Nov  9 17:05 /backup/ogg/dirdat/tc000000

查看目标端trail文件状态

# ls -l /data/gg/dirdat/tc*      

-rw-r----- 1 root root 1217 Nov  9 17:05 /data/gg/dirdat/tc000000

检查实时同步到kafka的效果,在Oracle源端更新表的同时,使用kafka客户端自带的脚本去查看这里配置的ggtopic这个kafkatopic下的消息:

SQL> insert into t_ogg values(2,'test2');

1 row created.

SQL> commit;

Commit complete.

目标端Kafka的同步情况:

# bin/kafka-console-consumer.sh --zookeeper  192.168.238.141:2181  --

from-beginning --topic ggtopic

{"table":"OGGTEST.T_OGG","op_type":"I","op_ts":"2025-11-09

09:05:25.067082","current_ts":"2025-11-

09T17:59:20.943000","pos":"00000000000000001080","after":

{"ID":"1","TEXT_NAME":"test"}}

{"table":" OGGTEST.T_OGG","op_type":"I","op_ts":"2025-11-09

10:02:06.827204","current_ts":"2025-11-

09T18:02:12.323000","pos":"00000000000000001217","after":

{"ID":"2","TEXT_NAME":"test2"}}

可以看到,Oracle的增量数据已实时同步到Kafka。

相关推荐

热文推荐