Oracle ogg 同步到 kafka 部署实践

来源:这里教程网 时间:2026-03-03 20:14:18 作者:

Oracle ogg 同步到 kafka 部署实践

1.1 概述目标:通过OGG 软件将Oracle数据库表通过给KAFKA 平台,通过KAFKA平台解析JSON文件,在进行大数据库平台展现。 环境: 源端规划 当源端为 Oracle 数据库时,通常的做法是将 O GG 部署于源端的 O racle 数据库服务器 上; 也可以部署到相应的 A DG 环境或者单独的主机(要求与 数据库的操作系统平台相同)。 目标端规划 当目标端的产品为 O GG for Big Data 产品时,目前官方认证过并支持的平台有如下的四种平台:Linux、Windows、IBM-AIX、Solaries、HP-UNIX 基于稳定性考虑,必须选择 O GG for Big Data 产品安装在其中的一种平台上。 下载方式 高版本:Oracle GoldenGate Downloads 历史版本: http://edelivery.oracle.com/osdc/faces/Home.jspx 1.2 概述 1.2.1  文件系统要求 OG G 的安装和运行需要使用文件系统来存放文件。对文件系统的要求主要有: 最好使用专用的文件系统,比如挂载到 / ogg 目录下,通过考虑高可用可以选用集中存储。 文件系统的容量主要与所要复制的数据量相关。如果不确定要复制的数据量,建议从 1 6GB 32GB 起步。 1.2.2 源端到目标端主机的网络连接 O GG 的数据复制操作要求源端主机和目标端主机之间的网络是连通的,未必必须放开特定端口的防火墙限制(如果有防火墙限制)。 当前的要求是:打开源与目标主机之间的 7890   7850 端口的防火墙限制。 1.2.3 创建操作系统用户 建议将 O GG 产品安装以及运行于专用的操作系统用户,这样可以提高维护的安全性。 可以通过下面的步骤来创建操作系统用户 o gg

- - 对于没有 O racle 数据库的环境(本文即 K afka 环境)

> Linux:

groupadd oinstall

useradd -g oinstall -G oinstall  -d /home/ogg ogg

1.3 在源端(Oracle库, Linux 平台)安装 O GG 1.3.1  安装 J DK 1.8

O GG for Big Data 的运行需要 J AVA 1. 8 ,可以安装 J DK 1.8 或者 J RE   1.8 。 检查发现当前环境上 JDK   1.7 不满足要求,考虑到现有环境对JDK1.7 的依赖考虑,我们在ogg用户下面单独配置一个JDK1.8的环境变量:

[ogg@rac2 soft]$ ls -lrt 总用量 568456 drwxr-xr-x 3 ogg oinstall      4096 4月  16 2018 fbo_ggs_Linux_x64_shiphome -rw-r--r-- 1 ogg oinstall      1396 5月  10 2018 OGG-12.3.0.1.4-README.txt -rw-r--r-- 1 ogg oinstall    293566 5月  10 2018 OGG_WinUnix_Rel_Notes_12.3.0.1.4.pdf -rw-r--r-- 1 ogg oinstall 146729827 7月  17 01:08 jdk-8u421-linux-x64.tar.gz -rw-r--r-- 1 ogg oinstall 339837611 7月  17 20:00 V975837-01.zip -rw-r--r-- 1 ogg oinstall  95212174 7月  17 20:00 V979723-01.zip [ogg@rac2 soft]$cd /ogg/jdk1.8 [ogg@rac2 jdk1.8]$ pwd /ogg/jdk1.8 [ogg@rac2 ~]$ cat .bash_profile ORACLE_SID=rac2; export ORACLE_SID ORACLE_BASE=/u01/app/oracle; export ORACLE_BASE ORACLE_HOME=$ORACLE_BASE/product/11.2.0/db_1; export ORACLE_HOME export PATH export JAVA_HOME=/ogg/jdk1.8 export JRE_HOME=/ogg/jdk1.8/jre export CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib export PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$ORACLE_HOME/bin:$ORACLE_HOME/OPatch:$PATH LD_LIBRARY_PATH=$ORACLE_HOME/lib LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$ORACLE_HOME/oracm/lib LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:/lib:/usr/lib:/usr/local/lib export LD_LIBRARY_PATH [ogg@rac2 ~]$ java -version java version "1.8.0_421" Java(TM) SE Runtime Environment (build 1.8.0_421-b09) Java HotSpot(TM) 64-Bit Server VM (build 25.421-b09, mixed mode) [ogg@rac2 ~]$

1. 4 在目标端( K afka L inux 平台)安装 O GG for Big Data 1.4.1  安装 J DK 1.8 方法按照上述描述进行操作,可以解决同一系统对软件多版本的依赖 1.4.2  安装 O GG for Big Data 产品

说明: 对于低版本 O GG f or Big Data 产品,我们可以通过解压缩安装的方式,不需要使用 O UI 以及 G lobal Inventory 机制,也没有 o patch 命令(比如通过 o patch 列出所安装的单个补丁)。

首先使用 o gg 用户创建存放 O GG for Big Data 产品的目录。

最好部署在一个独立的文件系统上,具有至少 16GB 以上的可用存储空间。

cd  /data/ogg m kdir ogg_home1

这里通过将 OGG  for Big Data 补丁包进行解压缩的方式,即完成软件的安装。 该补丁包的应用,使 O GG 的版本为 12.3.2.1.6 (Build 001)

c d /data/ogg/ogg_home unzip V979723-01.zip tar xvf  OGG_BigData_Linux_x64_12.3.2.1.1.tar

  o gg 用户设置环境变量,以满足运行 O GG 的要求。

[ogg@node3 ogg_home]$ ./ggsci ./ggsci: error while loading shared libraries: libjvm.so: cannot open shared object file: No such file or directory [ogg@node3 ogg_home]$ ldd ggsci         linux-vdso.so.1 =>  (0x00007ffcd7dd3000)         librt.so.1 => /lib64/librt.so.1 (0x00007fc6e1ffb000)         libdl.so.2 => /lib64/libdl.so.2 (0x00007fc6e1df3000)         libgglog.so => /data/ogg/ogg_home/./libgglog.so (0x00007fc6e19a3000)         libggutil.so => /data/ogg/ogg_home/./libggutil.so (0x00007fc6e177b000)         libggrepo.so => /data/ogg/ogg_home/./libggrepo.so (0x00007fc6e148b000)         libdb-6.1.so => /data/ogg/ogg_home/./libdb-6.1.so (0x00007fc6e1073000)         liblmdb.so => /data/ogg/ogg_home/./liblmdb.so (0x00007fc6e0e5b000)         libggperf.so => /data/ogg/ogg_home/./libggperf.so (0x00007fc6e0c1b000)         libggparam.so => /data/ogg/ogg_home/./libggparam.so (0x00007fc6df3f3000)         libicui18n.so.56 => /data/ogg/ogg_home/./libicui18n.so.56 (0x00007fc6def2b000)         libicuuc.so.56 => /data/ogg/ogg_home/./libicuuc.so.56 (0x00007fc6deb2b000)         libicudata.so.56 => /data/ogg/ogg_home/./libicudata.so.56 (0x00007fc6dccc3000)         libpthread.so.0 => /lib64/libpthread.so.0 (0x00007fc6dcaa3000)         libxerces-c-3.1.so => /data/ogg/ogg_home/./libxerces-c-3.1.so (0x00007fc6dc413000)         libantlr3c.so => /data/ogg/ogg_home/./libantlr3c.so (0x00007fc6dc1f3000)         libjvm.so => not found         libggnnzitp.so => /data/ogg/ogg_home/./libggnnzitp.so (0x00007fc6db77b000)         libm.so.6 => /lib64/libm.so.6 (0x00007fc6db473000)         libc.so.6 => /lib64/libc.so.6 (0x00007fc6db0a3000)         /lib64/ld-linux-x86-64.so.2 (0x00007fc6e2203000)         libnsl.so.1 => /lib64/libnsl.so.1 (0x00007fc6dae83000)         libstdc++.so.6 => /lib64/libstdc++.so.6 (0x00007fc6dab7b000)         libgcc_s.so.1 => /lib64/libgcc_s.so.1 (0x00007fc6da963000)

在使用 J DK 目录设置了 LD_LIBRARY_PATH 环境变量后,要检查该环境变量指向的目录中是否具有以下两个库文件,这两个文件为 O GG 运行所必须:

cd ~ v i .bash_profile # For JDK 1.8 export JAVA_HOME=/data/jdk1.8 export PATH=$JAVA_HOME/bin:$PATHexport LD_LIBRARY_PATH=/usr/java/jdk1.8.0_192-amd64/jre/lib/amd64/server export OGG_HOME=/data/ogg/ogg_home

退出 o gg 用户当前的 s sh 会话并重新连接,使环境变量的设置生效,随后运行 g gsci 命令,确认可以正常执行:

[ogg@node3 ~]$ cd /data/ogg/ogg_home/ [ogg@node3 ogg_home]$ ./ggsci Oracle GoldenGate for Big Data Version 12.3.2.1.1 (Build 005) Oracle GoldenGate Command Interpreter Version 12.3.0.1.2 OGGCORE_OGGADP.12.3.0.1.2_PLATFORMS_180712.2305 Linux, x64, 64bit (optimized), Generic on Jul 13 2018 00:46:09 Operating system character set identified as UTF-8. Copyright (C) 1995, 2018, Oracle and/or its affiliates. All rights reserved.

1.5 在源库( O racle 数据库)的检查与配置 为满足 O GG 的工作条件,必须对源数据库进行必要的设置。本节将检查相关的配置项,并对其中不满足的进行设置。 1.5.1  在数据库上启用对 O GG 的支持 对于 Oracle 11.2.0.4 以及更高版本,必须显式设数据库以启用对 OGG 的支持,这个设置要求对于抽取和复制的全部模式(经典和集成)都是必须的。

alter system set enable_goldengate_replication = true scope = both sid = '*';

show parameter enable_goldengate_replication

1.5.2  数据库的归档模式与附加日志 O GG 要求数据库必须运行于归档日志模式,并且启用附加日志的支持。 以下用来检查数据库的归档模式以及附加日志的配置情况。 说明: O racle 官方推荐在数据库级别启用 F orce logging 模式。为减少全库级别启用 F orce logging 后一些不必要的 R edo 日志量,本文的做法是在表空间级别启用 F orce logging 【可选操作】在数据库级别启用强制日志 如果 上面 force_logging ”字段的返回值为“ YES ”,则已经启用 否则执行 下面的 S QL

SQL> alter database force logging;

 

-- 重新执行上面的检查SQL,确认返回值为“YES”。 回退操作

alter database no force logging;

【必选操作】在数据库级别打开最小附加日志 如果 上面 supplemental_log_data_min ”字段的返回值为“ YES ”,则已经启用,否则执行:

alter database add supplemental log data;

-- 如果如果出现行锁,可以取消重新自行

-- 在全部R AC 节点上执行日志文件的切换

alter system switch logfile;

 

-- 重新执行上面的检查SQL,确认返回值为“YES”。 回退操作

alter database drop  supplemental log data;

  1.5.3  创建用于运行 OGG 的数据库用户

O GG 的运行需要一个数据库用户, O GG 通过该用户建立到数据库的连接,通常采用一个专用的用户。 创建 O GG 需要的数据库用户

执行下面的 SQL 以创建 OGG 的数据库用户 ogg (也可以使用其它名称)。

- - (可选)为 o gg 用户创建专用的表空间: create tablespace oggtbs datafile '+ASM'size 30G autoextend off; create user ogg identified by "xxxxx" default tablespace ts_ogg_data; --   检查并设置密码修改策略,使得口令不需要经常修改: alter user ogg profile PROFILE_APP; --   赋予 ogg 必须的基本权限以及运行 O GG 需要的其它权限: grant connect, resource ,unlimited tablespace to ogg ; grant create session   to ogg ; g rant alter system, select any dictionary to ogg; execute dbms_goldengate_auth.grant_admin_privilege(   'ogg'   );

1.5.4  表空间的 F orce Logging 建议将源端所有需要数据复制的表存放在单独的表空间上,并为这些表空间启用 f orce logging 启用业务数据表空间的 F orce Logging

执行下面的 SQL 以创建 OGG 的数据库用户 ogg (也可以使用其它名称)。 1.6 在源端( O racle 数据库)配置 O GG 1.6.1  创建登录数据库的用户证书

O GG 登录数据库的用户名、口令等以证书的形式保存到系统中,这样在配置文件以及执行管理任务时可以不需要输入口令,从而简化系统的配置,增加系统的安全性。 创建用户证书

执行下面的命令。

add credentialstore

alter CredentialStore add user ogg, password xxxxx alias user_ogg

 

Credential store altered.

 

GGSCI (rac2) 13> dblogin useridalias user_ogg;

Successfully logged into database.

1.6.2  配置并启动 O GG m gr 进程

配置并启动 O GG m anager 进程。

edit param mgr   PORT 7809 dynamicportlist 7810-7820 userid ogg@db_ogg,password ogg autorestart er *,retries 5 ,waitminutes 3 purgeoldextracts ./dirdat/*,usecheckpoints,minkeepdays 3 lagreporthours 2 laginfominutes 30 lagcriticalminutes 45 start mgr Manager started. GGSCI (rac2) 17> info all Program     Status      Group       Lag at Chkpt  Time Since Chkpt MANAGER     RUNNING

1.6.4  对复制表执行附加日志设置 对于 U PDATE 操作, O GG 的数据复制机制要求必须将相关行的主键(或者唯一索引字段)的值记录到 R edo 中,即使这些字段的值在 U PDATE 操作中没有被更改。对于源端任何一条数据的 UPDATE 操作, O GG 会使用主键字段的值作为 W HERE 条件,在目标端对一行记录执行修改,这样可以避免错误的更新多行记录。 为实现上述目标,必须对复制表进行附加日志的设置,从而使 O racle 数据库可以将主键以及唯一字段的值也写入到 R edo 中。本节将执行这个操作,这是通过 O GG a dd trandata 命令完成的。 为所有需要进行复制的表执行 add trandata 命令。

dblogin UserIdAlias user_ogg

 

add trandata owner.table_name

检查表的 SUPPLEMENTAL_LOG是否开启:

info trandata owner.table_name

1.6.5  配置抽取进程 抽取进程负责从源库的 R edo 中将变更的数据捕获并写入到 T rail 文件中。 创建抽取进程 在源端创建抽取进程 e _kafka1

register extract e_kafka1 database

add extract e_kafka1, integrated tranlog, begin now

add exttrail ./dirdat/k1, extract e_kafka1, MegaBytes 256

  编辑抽取进程的参数,将参数信息将保存于 ./dirprm/ e_kafka1 .prm 文件中。

edit params e _ kafka1

 

Extract e_kafka1

UserIDAlias user_ogg

DiscardFile ./dirrpt/e_kafka1.dsc, Append, MegaBytes 128

ExtTrail ./dirdat/k1

DBOptions AllowUnusedColumn

GetTruncates

LogAllSupCols

UpdateRecordFormat compact

WarnLongTrans 2h, CheckInterval 300s

TranLogOptions IntegratedParam( max_sga_size 1024, Parallelism 2 )

 

obey ./dirprm/tabmap;

 

vi  ./dirprm/tabmap

table owner.table_name;

ggsci>   start *

可以使用 info all 检查 e _ kafka1 , p _ kafka1 进程状态是否为 RUNNING 如果状态不是 RUNNING ,则可以使用以下命令查看报错原因,再根据错误原因具体分析和处理:

g gsci > view report e _ kafka1 1.6.6  配置传输进程 在源端创建数据泵进程 pump1

add extract p _ kafka1, exttrailsource ./dirdat/k1

add rmttrail ./dirdat/k1, extract p _ kafka1, Megabytes 2 56

 

用来将 t rail 文件的存放位置修改在 NAS目录上:

alter pump1 exttrailsource /nasdir/zhzy/ogg/dirdat/k1

编辑 pump1 进程参数设置,配置信息将保存于 ./dirprm/pump1.prm 文件中。

ggsci>   edit params p _ kafka1

  pump1 进程的参数配置如下:

Extract p _ kafka1

Passthru

RmtHost xx.xx.xx.xx, MgrPort 7809

RmtTrail ./dirdat/ k 1

 

obey ./dirprm/tabmap;

  1.6.7  启动抽取和传输进程 在源端启动数据抽取进程和数据泵进程:

ggsci>   start *

 

使用 stats 命令对抽取和传输进行进行检查:

stats ext1

stats pump1

  1.7 在目标端( K afka )的检查与配置 1.7.1  配置并启动 O GG m gr 进程

配置并启动 O GG m anager 进程。

./g gsci create subdirs e dit param mgr   Port 7809 DynamicPortList 7810-7850 -- PurgeOldExtracts ./dirdat/*, UseCheckpoints, MinKeepDays 7 -- AutoRestart ER *, Retries 5, WaitMinutes 10, ResetMinutes 60 LagInfoMinutes 3 LagCriticalMinutes 30 LagReportHours 1   s tart mgr

1.1.2  配置并启动 O GG 复制进程

配置并启动 O GG 的复制进程。

cd $OGG_HOME ggsci add replicat r_kafka1 exttrail ./dirdat/k1   编辑 r_kafka1 进程的参数文件: edit param r_kafka1   Replicat r_kafka1 GetEnv (JAVA_HOME) GetEnv (CLASSPATH) GetEnv (PATH) GetEnv (LD_LIBRARY_PATH) TargetDB LibFile libggjava.so Set property=dirprm/kafka_handler1.props ReportCount Every 1 Minutes, Rate GroupTransOps 10000   Map *.*, Target *.*;

  编辑复制进程 r_kafka1 所使用的 K afka Han d ler 的属性文件:  

# # 将安装文件中自带的示例配置文件拷贝到 d irprm 目录,并在此基础上执行修改

cd $OGG_HOME/AdapterExamples/big-data/kafka

cp kafka.props $OGG_HOME/dirprm/kafka_handler1 .props

cp custom_kafka_producer.properties $OGG_HOME/dirprm/kafka_producer1.props

cd $OGG_HOME/dirprm

v i kafka_handler1.props

gg.handlerlist            = kafkahandler

gg.handler.kafkahandler.type    = kafka

# The following selects the message key using the concatenated primary keys

gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys}

gg.handler.kafkahandler.BlockingSend =false

gg.handler.kafkahandler.includeTokens=false

goldengate.userexit.writers=javawriter

javawriter.stats.display=TRUE

javawriter.stats.full=TRUE

gg.log=log4j

gg.log.level=TRACE

gg.report.time=30sec

javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

gg.handler.kafkahandler.KafkaProducerConfigFile = kafka_producer1.props

gg.classpath  = dirprm/:/usr/lib/kafka/libs/*

gg.handler.kafkahandler.mode             = op

gg.handler.kafkahandler.topicMappingTemplate    = test_illegal

gg.handler.kafkahandler.format            = json

 

## 根据情况决定是否 启用 trace

gg.log.level = trace

  编辑 K afka Han d ler 所使用的连接 K afka 相关的属性文件 配置文件 custom_kafka_producer.properties 中指定了连接到 K afka 的配置:

c d dirprm

vi kafka1_producer.properties

acks=1

reconnect.backoff.ms=1000

value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

# 100KB per partition

batch.size=16384

linger.ms=0

## Added by ZhangHua

bootstrap.servers   = xxx.xxx.xxx.xxx:9000, xxx.xxx.xxx.xxx:9000, xxx.xxx.xxx.xxx:9000, xxx.xxx.xxx.xxx:9000

max.request.size    = 5024000

send.buffer.bytes   = 5024000

启动复制进程

start r_kafka1   可以使用 info all 检查复制进程的状态是否为 RUNNING ;如果状态不是 RUNNING ,则可以使用以下命令查看报错原因:

view report r_kafka1 再根据错误原因具体分析和处理。 相关学习链接:GGSCI: Unable To Open Credential Store. Error Code 29,231 After Applying RDBMS APR-2021 Patch While Using FIPS.                (Doc ID 2782150.1) Replicat Abends With OGG-15051  Java Or JNI Exception: When Using Coludera Ggclasspath And Error creating bean with name 'userExitDataSource' defined in class path resource (Doc ID 2802307.1)https://www.cnblogs.com/halberd-lee/p/10874452.html

相关推荐