来源:大数据杂货铺

当 Flink 作业启动时,Connector 会自动检查源数据库和 Apache Doris 之间的数据等效性。如果数据源包含 Doris 中不存在的表,Connector 会自动在 Doris 中创建相同的表,并利用 Flink 的侧输出来方便一次摄取多个表;如果源中发生架构更改,它将自动获取 DDL 语句并在 Doris 中进行相同的架构更改。
下载 JAR 文件:https://github.com/apache/doris-flink-connector/releases/tag/1.4.0
行家:
<dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.15</artifactId><!--artifactId>flink-doris-connector-1.16</artifactId--><!--artifactId>flink-doris-connector-1.17</artifactId--><version>1.4.0</version></dependency>
<FLINK_HOME>/bin/flink run \-Dexecution.checkpointing.interval=10s \-Dparallelism.default=1 \-c org.apache.doris.flink.tools.cdc.CdcTools \lib/flink-doris-connector-1.16-1.4.0.jar \mysql-sync-database \--database test_db \--mysql-conf hostname=127.0.0.1 \--mysql-conf username=root \--mysql-conf password=123456 \--mysql-conf database-name=mysql_db \--including-tables "tbl|test.*" \--sink-conf fenodes=127.0.0.1:8030 \--sink-conf username=root \--sink-conf password=123456 \--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \--sink-conf sink.label-prefix=label1 \--table-conf replication_num=1



根据早期采用者的反馈,该Connector在生产环境中的万表数据库同步中也提供了高性能和系统稳定性。这证明Apache Doris和Flink CDC的结合能够高效可靠地进行大规模数据同步。
2、节俭 SDK
3、按需流加载
4、后端节点轮询
5、支持更多数据类型
CREATE TABLE flink_doris_source (name STRING,age INT,score DECIMAL(5,2))WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','table.identifier' = 'database.table','username' = 'root','password' = 'password','doris.filter.query' = 'age=18');SELECT * FROM flink_doris_source;
连接维度表和事实表:
CREATE TABLE fact_table (`id` BIGINT,`name` STRING,`city` STRING,`process_time` as proctime()) WITH ('connector' = 'kafka',...);create table dim_city(`city` STRING,`level` INT ,`province` STRING,`country` STRING) WITH ('connector' = 'doris','fenodes' = '127.0.0.1:8030','jdbc-url' = 'jdbc:mysql://127.0.0.1:9030','lookup.jdbc.async' = 'true','table.identifier' = 'dim.dim_city','username' = 'root','password' = '');SELECT a.id, a.name, a.city, c.province, c.country,c.levelFROM fact_table aLEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS cON a.city = c.city
写入Apache Doris:
CREATE TABLE doris_sink ( name STRING, age INT, score DECIMAL(5,2) ) WITH ( 'connector' = 'doris', 'fenodes' = '127.0.0.1:8030', 'table.identifier' = 'database.table', 'username' = 'root', 'password' = '', 'sink.label-prefix' = 'doris_label', //json write in 'sink.properties.format' = 'json', 'sink.properties.read_json_by_line' = 'true');
原文作者:ApacheDoris
