Oracle CDC 变更数据捕获技术

来源:这里教程网 时间:2026-03-03 11:59:46 作者:

一、Oracle CDC技术概要 对于增量数据采集,Oracle推出了两种主要方案,一种是我们熟悉的物化视图(materialized view),另一种就是CDC组件(Change Data Capture 变更数据捕获)。利用CDC,在对源表进行INSERT、UPDATE或DELETE等操作的同时就可以提取数据,并且变化的数据被保存在数据库的变化表中。这样就可以捕获发生变化的数据,然后利用数据库视图以一种可控的方式提供给目标系统。 CDC体系结构基于发布者/订阅者模型。发布者捕捉变化数据并提供给订阅者。订阅者使用从发布者那里获得的变化数据。通常,CDC系统拥有一个发布者和多个订阅者。发布者首先需要识别捕获变化数据所需的源表。然后,它捕捉变化的数据并将其保存在特别创建的变化表中。它还使订阅者能够控制对变化数据的访问。订阅者需要清楚自己感兴趣的是哪些变化数据。一个订阅者可能不会对发布者发布的所有数据都感兴趣。订阅者需要创建一个订阅者视图来访问经发布者授权可以访问的变化数据。 CDC有几个重要的基本术语: 源表(Source Table),业务数据库的需要捕获数据的源表。 变化表(Change Table),保存从源表捕获的变化数据(包括各种DML产生的数据)。 变化集(Change Set),是保证事务一致性的数据集合,一个变化集对应多个变化表。 订阅视图(Subscription View),提供给读取变化表数据的视图。 订阅窗口(Subscription Window),定义了查看变化数据的时间范围,就象一个观察变化数据的滑动窗口,变化数据处理完成后,可以清除订阅窗口。 CDC分同步和异步模式: 同步模式,实时的捕获变化数据并存储到变化表中,发布者与订阅都位于同一数据库中,同步模式实际使用trigger的形式来捕捉变化数据。

 

  异步模式,以Oracle流复制技术为基础,从redo log中读取日志记录来捕捉变化数据。异步模式分为三种: 1)Asynchronous HotLog Configuration(异步在线日志CDC)

  该过程不再使用触发器,而是使用在线日志,不是归档日志。原表与目标表仍然必须是同一个库。该模式相对简单,是在Oracle 10g以上产生的,9i没有该机制。 2)Asynchronous Distributed HotLog Configuration(异步分布式在线日志CDC)

  该模式是对异步在线日志CDC的一种优化,也比较容易理解,就是加入了DB-LINK机制,使原表、目标表不在同一个库,实际和异步在线日志CDC没有什么本质区别。 3)Asynchronous AutoLog Mode(通过异步自动日志复制的CDC) 这个又分两种: 3.1)Asynchronous Autolog Online Change Data Capture Configuration(异步在线日志复制CDC)

  异步在线日志复制CDC模式使用Standby热备数据库日志,将日志写入了热备数据库,目标表就可以建立在热备库上,这对主数据库性能影响就进一步降低。 3.2)Asynchronous AutoLog Archive Change Data Capture Configuration(异步归档日志复制CDC)

 

  该模式几乎可以完全不影响原数据库的性能。需要做异地同步归档,然后在目标端分析归档日志进行变化数据处理。 关于Oracle CDC的更多介绍可参考Oracle官方资料: https://docs.oracle.com/cd/E11882_01/server.112/e25554/cdc.htm#DWHSG016 同步模式以trigger形式来捕捉,不如自主创建trigger灵活,且对性能的影响比从日志捕捉变化数据的影响大,所以这里以异步Hotlog来进行部署。 二、CDC环境搭建 1、确认数据库对CDC(变更数据捕获)的支持 col parameter for a20 col value for a10 select * from v$option where parameter = 'Change Data Capture'; PARAMETER            VALUE -------------------- ---------- Change Data Capture  TRUE 2、确认数据库运行在归档模式 archive log list; 3、修改数据库为强制日志 alter database force logging; 4、添加数据库补充日志 alter database add supplemental log data; 5、确认数据库修改后的状态 col log_mode for a15 col force_logging for a15 col supp_log_min for a15 select log_mode, supplemental_log_data_min supp_log_min, force_logging from v$database; LOG_MODE        SUPP_LOG_MIN    FORCE_LOGGING --------------- --------------- --------------- ARCHIVELOG      YES             YES 6、准备发布数据和用户 -- 创建业务用户表,这里业务用户以Oracle自带的SCOTT用户为例 conn scott/tiger create table t1(a number, b varchar2(20)); -- 创建发布者并授权 conn / as sysdba create user cdc_pubr identified by cdc_pubr default tablespace users; alter user cdc_pubr quota unlimited on system quota unlimited on sysaux; grant connect, resource, dba to cdc_pubr; grant create session to cdc_pubr; grant create table to cdc_pubr; grant create tablespace to cdc_pubr; grant unlimited tablespace to cdc_pubr; grant select_catalog_role to cdc_pubr; grant execute_catalog_role to cdc_pubr; grant create sequence to cdc_pubr; grant select_catalog_role to cdc_pubr; grant execute_catalog_role to cdc_pubr; grant select on change_sets to cdc_pubr; grant execute on sys.dbms_cdc_publish to cdc_pubr; begin     dbms_streams_auth.grant_admin_privilege(grantee => 'cdc_pubr'); end; / -- 添加CDC表的补充日志 alter table scott.t1 add supplemental log group log_group_t1(a number, b varchar2(20)) always; -- 如要捕捉表的所有列,则可用以下语句替代 alter table scott.t1 add supplemental log data(all) columns; 7、源表实例化 conn / as sysdba begin     dbms_capture_adm.prepare_table_instantiation(table_name => 'scott.t1'); end; / 8、创建变更集 conn cdc_pubr/cdc_pubr; begin   dbms_cdc_publish.create_change_set(change_set_name    => 'cdc_t1',                                      description        => 'change set for scott.t1',                                      change_source_name => 'hotlog_source',                                      stop_on_ddl        => 'y'); end; / 9、创建变更表 conn cdc_pubr/cdc_pubr; begin   dbms_cdc_publish.create_change_table(owner             => 'cdc_pubr',                                        change_table_name => 'c_t1',                                        change_set_name   => 'cdc_t1',                                        source_schema     => 'scott',                                        source_table      => 't1',                                        column_type_list  => 'a number, b varchar2(20)',                                        capture_values    => 'both',                                        rs_id             => 'y',                                        row_id            => 'y',                                        user_id           => 'y',                                        timestamp         => 'y',                                        object_id         => 'n',                                        source_colmap     => 'n',                                        target_colmap     => 'y',                                        options_string    => ''); end; / 10、激活变更集 conn cdc_pubr/cdc_pubr; begin   dbms_cdc_publish.alter_change_set(change_set_name => 'cdc_t1',                                     enable_capture  => 'y'); end; / dbms_cdc_publish过程的详细解释可参考官方资料: https://docs.oracle.com/cd/E11882_01/appdev.112/e40758/d_cdcpub.htm#ARPLS023 11、创建订阅者并授权 conn / as sysdba create user cdc_subr1 identified by cdc_subr1 default tablespace users; grant create table to cdc_subr1; grant create session to cdc_subr1; grant create view to cdc_subr1; grant unlimited tablespace to cdc_subr1; conn cdc_pubr/cdc_pubr; grant select on cdc_pubr.c_t1 to cdc_subr1; 12、创建订阅集 conn cdc_subr1/cdc_subr1 begin   dbms_cdc_subscribe.create_subscription(change_set_name   => 'cdc_t1',                                          description       => 'change data for scott.t1',                                          subscription_name => 'sub_t1'); end; / 13、订阅源表及相关字段 conn cdc_subr1/cdc_subr1 begin   dbms_cdc_subscribe.subscribe(subscription_name => 'sub_t1',                                source_schema     => 'scott',                                source_table      => 't1',                                column_list       => 'a, b',                                subscriber_view   => 'v_t1'); end; / 14、激活订阅 conn cdc_subr1/cdc_subr1 begin   dbms_cdc_subscribe.activate_subscription(subscription_name => 'sub_t1'); end; / 15、扩展订阅窗口,设置订阅窗口的高位边界,以便能看到数据的变更 conn cdc_subr1/cdc_subr1 begin   dbms_cdc_subscribe.extend_window(subscription_name => 'sub_t1'); end; / dbms_cdc_subscribe过程的详细解释可参考官方资料: https://docs.oracle.com/cd/B19306_01/appdev.102/b14258/d_cdcsub.htm#CHEFHFJA 三、查看订阅内容 conn cdc_subr1/cdc_subr1 select * from v_t1; 因源表scott.t1尚无数据变更,因此该视图无数据。 源表插入数据 conn scott/tiger insert into t1 values(101, 'liuln'); commit; 刷新订阅数据,注意每次查阅前需要这个刷新动作 conn cdc_subr1/cdc_subr1 begin   dbms_cdc_subscribe.extend_window(subscription_name => 'sub_t1'); end; / 查看订阅视图 col commit_timestamp$ for a20 col timestamp$ for a20 col row_id$ for a20 col username$ for a10 select operation$, to_date(commit_timestamp$, 'yyyy-mm-dd hh24:mi:ss') commit_timestamp$, to_date(timestamp$, 'yyyy-mm-dd hh24:mi:ss') timestamp$, row_id$, rsid$, username$, a, b from v_t1; OPERATION$ COMMIT_TIMESTAMP$    TIMESTAMP$           ROW_ID$                   RSID$ USERNAME$           A B ---------- -------------------- -------------------- -------------------- ---------- ---------- ---------- -------------------- I          2018-09-17 17:07:47  2018-09-17 17:07:44  AAAWxgAAEAAAAWGAAA            1 SCOTT             101 liuln 再插入一条数据 conn scott/tiger insert into t1 values(102, 'yaowei'); commit; 观察结果,变更数据是累积的 OP COMMIT_TIMESTAMP$    TIMESTAMP$           ROW_ID$                   RSID$ USERNAME$           A B -- -------------------- -------------------- -------------------- ---------- ---------- ---------- -------------------- I  2018-09-17 17:07:47  2018-09-17 17:07:44  AAAWxgAAEAAAAWGAAA            1 SCOTT             101 liuln I  2018-09-17 17:37:18  2018-09-17 17:37:15  AAAWxgAAEAAAAWGAAB            2 SCOTT             102 yaowei 更新数据 conn scott/tiger update t1 set b = 'liuqy' where a = '102'; commit; 观察结果,分行显示了更新前后的旧值和新值 OP COMMIT_TIMESTAMP$    TIMESTAMP$           ROW_ID$                   RSID$ USERNAME$           A B -- -------------------- -------------------- -------------------- ---------- ---------- ---------- -------------------- I  2018-09-17 17:07:47  2018-09-17 17:07:44  AAAWxgAAEAAAAWGAAA            1 SCOTT             101 liuln I  2018-09-17 17:37:18  2018-09-17 17:37:15  AAAWxgAAEAAAAWGAAB            2 SCOTT             102 yaowei UO 2018-09-17 17:46:01  2018-09-17 17:46:01  AAAWxgAAEAAAAWGAAB            3 SCOTT             102 yaowei UN 2018-09-17 17:46:01  2018-09-17 17:46:01  AAAWxgAAEAAAAWGAAB            3 SCOTT             102 liuqy 删除数据 conn scott/tiger delete from t1 where a = '101'; commit; 观察结果 OP COMMIT_TIMESTAMP$    TIMESTAMP$           ROW_ID$                   RSID$ USERNAME$           A B -- -------------------- -------------------- -------------------- ---------- ---------- ---------- -------------------- I  2018-09-17 17:07:47  2018-09-17 17:07:44  AAAWxgAAEAAAAWGAAA            1 SCOTT             101 liuln I  2018-09-17 17:37:18  2018-09-17 17:37:15  AAAWxgAAEAAAAWGAAB            2 SCOTT             102 yaowei UO 2018-09-17 17:46:01  2018-09-17 17:46:01  AAAWxgAAEAAAAWGAAB            3 SCOTT             102 yaowei UN 2018-09-17 17:46:01  2018-09-17 17:46:01  AAAWxgAAEAAAAWGAAB            3 SCOTT             102 liuqy D  2018-09-17 17:52:03  2018-09-17 17:52:03  AAAWxgAAEAAAAWGAAA            4 SCOTT             101 liuln 操作中发现,刷新订阅数据的动作每次需要执行两遍才能获取到变更数据。 四、订阅数据清理 变更数据提取后如不再需要,可以清除,以腾出空间。 begin   dbms_cdc_subscribe.purge_window(subscription_name => 'sub_t1'); end; / 五、取消订阅 如果订阅不再需要,可以删除。 begin   dbms_cdc_subscribe.drop_subscription(subscription_name => 'sub_t1'); end; /

相关推荐