使用OGG将oracle增量数据实时同步至kafka(SASL/GSSAPI(Kerberos)认证)

来源:这里教程网 时间:2026-03-03 23:20:32 作者:

前面文章已经介绍过ogg的安装部署,这里不做赘述,只针对kafka集群是kerberos认证模式的情况下,挖掘,传输,和应用进程的配置做介绍,其他安装步骤可参考前面的文档。

挖掘进程配置(DG端挖掘)

GSCI (oracle_sty) 16> view param e1_sd

 

EXTRACT e1_sd

SETENV ( NLS_LANG = "AMERICAN_AMERICA.ZHS16GBK" )

userid odc@tnsstandby,password odc

EXTTRAIL ./dirdat/sd

dynamicresolution

TRANLOGOPTIONS MINEFROMACTIVEDG

--TRANLOGOPTIONS DBLOGREADER

TRANLOGOPTIONS EXCLUDEUSER odc

--TRANLOGOPTIONS _DISABLESTREAMLINEDDBLOGREADER

--TRANLOGOPTIONS convertucs2clobs

DISCARDFILE ./dirrpt/e1_sd.dsc,APPEND,MEGABYTES 1000

DISCARDROLLOVER AT 8:00

--BR BROFF

GETTRUNCATES

GETUPDATEBEFORES

NOCOMPRESSDELETES

NOCOMPRESSUPDATES

numfiles 5000

DDL INCLUDE all

--DDLOPTIONS ADDTRANDATA RETRYOP RETRYDELAY 2 MAXRETRIES 3

DDLOPTIONS REPORT

REPORTROLLOVER AT 12:00

REPORTCOUNT EVERY 1 HOURS,RATE

table oggtest.test1;

传输进程配置

GGSCI (oracle_sty) 19> view param p1_sd

 

EXTRACT p1_sd

setenv ( NLS_LANG = AMERICAN_AMERICA.ZHS16GBK )

Dynamicresolution

PASSTHRU

RMTHOST 172.23.108.68, MGRPORT 7809, compress

RMTTRAIL ./dirdat/sd

numfiles 5000

TABLE oggtest.*;

重点区别在于目标端应用进程的相关配置,如下:

应用进程
GGSCI (ogg) 16> view param R2KAFKA

 

REPLICAT  R2KAFKA
ASSUMETARGETDEFS
REPLACEBADCHAR NULL FORCECHECK
GETTRUNCATES
GETUPDATEBEFORES
TARGETDB LIBFILE libggjava.so SET property= dirprm/kafka_r2kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
DDL &
INCLUDE MAPPED &
INCLUDE OBJTYPE table &
EXCLUDE OBJTYPE type &
EXCLUDE OBJTYPE snapshot &
EXCLUDE OBJTYPE cluster &
EXCLUDE OBJTYPE comment &
EXCLUDE OBJTYPE trigger &
EXCLUDE OBJTYPE ref_constraint &
EXCLUDE OBJTYPE 'materialized view'
DDLOPTIONS REPORT
MAP oggtest.*,target oggtest.*;

kafka_r2kafka.props配置:gg.handlerlist=kafkahandlergg.handler.kafkahandler.type=kafkagg.handler.kafkahandler.KafkaProducerConfigFile= kafka_handler/kafka_producer.propertiesgg.handler.kafkahandler.topicMappingTemplate=oggtest_${tableName}gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys}gg.handler.kafkahandler.format=jsongg.handler.kafkahandler.format.includePrimaryKeys=truegg.handler.kafkahandler.mode=opgg.handler.kafkahandler.BlockingSend=falsegg.classpath=dirprm/:/usr/local/TDH-Client/kafka/libs*:/ogg:/ogg/lib/*jvm.bootoptions=-Xmx512m -Xms512m -Djava.class.path=./ggjava/ggjava.jar -Dlog4j.configuration=log4j.properties - Djava.security.auth.login.config=/etc/eventstore1/conf/jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf /etc/eventstore1/conf/jaas.con f 配置: KafkaClient {   com.sun.security.auth.module.Krb5LoginModule required   useKeyTab=true   keyTab="/etc/eventstore1/conf/kafka.keytab"   storeKey=true   useTicketCache=false   principal="kafka@TDH"; }; kafka_producer.properties 配置:bootstrap.servers=tdh02:9092, tdh03:9092, tdh04:9092, tdh05:9092, tdh07:9092, tdh08:9092, tdh09:9092, tdh10:9092acks=-1compression.type=lz4buffer.memory=134217728reconnect.backoff.ms=20000retries=10retry.backoff.ms=20000max.in.flight.requests.per.connection=1value.serializer=org.apache.kafka.common.serialization.ByteArraySerializerkey.serializer=org.apache.kafka.common.serialization.ByteArraySerializerbatch.size=524288linger.ms=1000request.timeout.ms=60000send.buffer.bytes=5242880 security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka sasl.mechanism=GSSAPI 标红部分为该模式下需要重点关注和配置的。

相关推荐

热文推荐