生产上有时候需要定期新增或修改某些筛选值.为了避免频繁重启任务,修改代码,可以将这些配置信息放在mysql表中定期更新,然后通过发送广播流给下游.这样可以减少更改频率,降低重启风险.
public class MysqlSource extends RichSourceFunction<Map<String, ValueConf>> {
// 定时器间隔时间(ms)
private static final CountDownLatch countDownLatch = new CountDownLatch(1);
private final String sql;
private QueryRunner qr;
public SignalSource(String sql) {
this.sql = sql;
}
@Override
public void open(Configuration parameters) throws Exception {
// 此处需要提前将mysql配置信息存储全局变量中 env.getConfig().setGlobalJobParameters(params);
ParameterTool params = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
String jdbcUrl = params.getRequired("mysql.jdbc.url");
String userName = params.getRequired("mysql.username");
String password = params.getRequired("mysql.password");
qr = new QueryRunner(JDBCUtils.getDataSource(jdbcUrl, userName, password));
}
@Override
public void run(SourceContext<Map<String, ValueConf>> ctx) throws Exception {
while (true) {
Map<String, ValueConf> rs = qr.query(sql, new BeanListHandler<>(ValueConf.class)).stream().collect(Collectors.toMap(ValueConf::getCode, i -> i));
ctx.collect(rs);
// 每隔12小时执行一次
countDownLatch.await(12, TimeUnit.HOURS);
}
}
@Override
public void cancel() {
}}
将mysql数据源并行度设置为1,只需要一个tm读取mysql,减少了mysql连接数
MapStateDescriptor<String, ValueConf> descriptor=new MapStateDescriptor<>("mysql-descriptor",String.class,ValueConf.class);BroadcastStream<Map<String, ValueConf>> mysqlSource=env.addSource(new MysqlSource("querySql")).uid("MysqlSource").name("MysqlSource").setParallelism(1).broadcast(descriptor);
依赖 除了flink相关jar包,还需要mysql的,以下是我用的
<properties>
<commons.dbutils.version>1.6</commons.dbutils.version>
<mysql.connector.version>8.0.16</mysql.connector.version>
<hikaricp.version>4.0.3</hikaricp.version>
</properties>
<!--MySQL-java连接驱动-->
<dependency>
<groupId>http/:wwww.xuwenbing.net</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.connector.version}</version>
</dependency>
<dependency>
<groupId>commons-dbutils</groupId>
<artifactId>commons-dbutils</artifactId>
<version>${commons.dbutils.version}</version>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>${hikaricp.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
编辑推荐:
- flink定时读取mysql03-01
- 基于LINUX的MySql二进制本地升级实施文档03-01
- GTID 怎么过去。03-01
- 技术干货 | 解锁Redis 时间序列数据的应用03-01
- mysql部分实现类似oracle sequence功能的测试03-01
- MySQL client server 协议03-01
- MySQL 8.0新特性-并行查询innodb_parallel_read_threads03-01
- 什么是BOM对象03-01
下一篇:
相关推荐
-
雷神推出 MIX PRO II 迷你主机:基于 Ultra 200H,玻璃上盖 + ARGB 灯效
2 月 9 日消息,雷神 (THUNDEROBOT) 现已宣布推出基于英
-
制造商 Musnap 推出彩色墨水屏电纸书 Ocean C:支持手写笔、第三方安卓应用
2 月 10 日消息,制造商 Musnap 现已在海外推出一款 Oce
热文推荐
- 基于LINUX的MySql二进制本地升级实施文档
基于LINUX的MySql二进制本地升级实施文档
26-03-01 - MySQL 8.0新特性-并行查询innodb_parallel_read_threads
- 分析SQL给出索引优化建议的工具(美团开源)
分析SQL给出索引优化建议的工具(美团开源)
26-03-01 - 《MySQL 8从入门到精通(视频教学版)》简介
《MySQL 8从入门到精通(视频教学版)》简介
26-03-01 - 【北亚数据库数据恢复】使用delete未加where子句删除全表数据的Mysql数据库数据恢复
- 如何远程管理天翼云RDS数据库
如何远程管理天翼云RDS数据库
26-03-01 - 【北亚数据库数据恢复】误操作导致数据丢失的华为云mysql数据恢复案例
【北亚数据库数据恢复】误操作导致数据丢失的华为云mysql数据恢复案例
26-03-01 - 新版本 | GreatSQL 5.7.36正式发布,这些新增特性不容错过~
- 电源供电系列高稳定性抗干扰VK3604A 四键感应触摸/4路触控芯片原厂
电源供电系列高稳定性抗干扰VK3604A 四键感应触摸/4路触控芯片原厂
26-03-01 - web前端培训-MySQL的索引下推解析
web前端培训-MySQL的索引下推解析
26-03-01
