使用OGG将oracle增量数据实时同步至kafka(SASL/SCARM认证)

来源:这里教程网 时间:2026-03-03 23:21:00 作者:

前面文章已经介绍过ogg的安装部署,这里不做赘述,只针对kafka集群是sasl/scarm安全认证模式的情况下,挖掘,传输,和应用进程的配置做介绍,其他安装步骤可参考前面的文档。 挖掘进程配置(DG端挖掘)

GSCI (oracle_sty) 16> view param e1_sd

 

EXTRACT e1_sd
SETENV ( NLS_LANG = "AMERICAN_AMERICA.ZHS16GBK" )
userid odc@tnsstandby,password odc
EXTTRAIL ./dirdat/sd
dynamicresolution
TRANLOGOPTIONS MINEFROMACTIVEDG
--TRANLOGOPTIONS DBLOGREADER
TRANLOGOPTIONS EXCLUDEUSER odc
--TRANLOGOPTIONS _DISABLESTREAMLINEDDBLOGREADER
--TRANLOGOPTIONS convertucs2clobs
DISCARDFILE ./dirrpt/e1_sd.dsc,APPEND,MEGABYTES 1000
DISCARDROLLOVER AT 8:00
--BR BROFF
GETTRUNCATES
GETUPDATEBEFORES
NOCOMPRESSDELETES
NOCOMPRESSUPDATES
numfiles 5000
DDL INCLUDE all
--DDLOPTIONS ADDTRANDATA RETRYOP RETRYDELAY 2 MAXRETRIES 3
DDLOPTIONS REPORT
REPORTROLLOVER AT 12:00
REPORTCOUNT EVERY 1 HOURS,RATE

table oggtest.test1; 传输进程配置

GGSCI (oracle_sty) 19> view param p1_sd

 

EXTRACT p1_sd
setenv ( NLS_LANG = AMERICAN_AMERICA.ZHS16GBK )
Dynamicresolution
PASSTHRU
RMTHOST 172.23.108.68, MGRPORT 7809, compress
RMTTRAIL ./dirdat/sd
numfiles 5000
TABLE oggtest.*;

重点区别在于目标端应用进程的相关配置,如下:

应用进程
GGSCI (ogg) 16> view param R2KAFKA

 

REPLICAT R2KAFKA
ASSUMETARGETDEFS
REPLACEBADCHAR NULL FORCECHECK
GETTRUNCATES
GETUPDATEBEFORES
TARGETDB LIBFILE libggjava.so SET property= dirprm/kafka_r2kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
DDL &
INCLUDE MAPPED &
INCLUDE OBJTYPE table &
EXCLUDE OBJTYPE type &
EXCLUDE OBJTYPE snapshot &
EXCLUDE OBJTYPE cluster &
EXCLUDE OBJTYPE comment &
EXCLUDE OBJTYPE trigger &
EXCLUDE OBJTYPE ref_constraint &
EXCLUDE OBJTYPE 'materialized view'
DDLOPTIONS REPORT
MAP oggtest.*,target oggtest.*;

kafka_r2kafka.props配置:

gg.handlerlist=kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=/ogg/dirprm/ kafka_producer.properties
gg.handler.kafkahandler.topicMappingTemplate=oggtest_${tableName}
gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys}
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.format.includePrimaryKeys=true
gg.handler.kafkahandler.mode=op
gg.handler.kafkahandler.BlockingSend=false
gg.classpath=/ogg/dirprm/:/opt/kafka_2.12-3.4.0/libs/*:/ogg:/ogg/lib/*
jvm.bootoptions=-Xmx64m -Xms64m -Djava.class.path=ggjava/ggjava.jar -Djava.security.auth.login.config= /ogg/dirprm/jaas.conf
gg.log=log4j
gg.log.level=info
gg.report.time=30sec

/ogg/dirprm/jaas.conf 配置:

KafkaClient {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="kafka-XXX-oggtest"  
    password="BokjkcVCdCigXXXXXXXNiuKF4EUXYNo7XEYY14u:udXhShHl6j:";
};

kafka_producer.properties 配置:

bootstrap.servers = 192.168.238.141:1529
s ecurity.protocol = SASL_PLAINTEXT
sasl.mechanism = SCRAM-SHA-512
acks=all
compression.type=lz4
buffer.memory=134217728
reconnect.backoff.ms=20000
retries=10
retry.backoff.ms=20000
max.in.flight.requests.per.connection=1
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=524288
linger.ms=1000
request.timeout.ms=60000
send.buffer.bytes=5242880

标红部分为该模式下需要重点关注和配置的。

相关推荐

热文推荐