自定义的 MySQLSink 怎么保证一定会写到数据库呢?

2020-11-10 文章 wangl...@geekplus.com
(statement.getSql(), statement.getParasMap()); } catch (Exception e) { e.printStackTrace(); } } Flink 能保证 namedTemplate.update(statement.getSql(), statement.getParasMap()) 一定执行成功吗? 谢谢, 王磊 [email protected]

ListState 设置 TTL 会在 list 中删除之前的记录吗

2020-09-17 文章 wangl...@geekplus.com
userId 一直持续不断的有消息过来,那 7 天之前 被 add 到 list 的记录会不会被删除呢? 谢谢, 王磊 [email protected]

Re: flink1.10集成PrometheusPushGateway,日志报错Failed to push metrics to PushGateway with jobName

2020-09-02 文章 wangl...@geekplus.com
你 pushgateway 是什么版本?版本降到 0.8.0 试一下 [email protected] Sender: Jim Chen Send Time: 2020-09-02 17:43 Receiver: flink_user_zh Subject: flink1.10集成PrometheusPushGateway,日志报错Failed to push metrics to PushGateway with jobName Hi: 我的环境是flink1.10.1,是基于yarn的per job模式运行的。现在集成了

Flink 涉及到 state 恢复能从 RocketMQ 直接转到 Kafka 吗?

2020-09-02 文章 wangl...@geekplus.com
有一个 flink streaming 的程序,读 RocketMQ,中间有一些复杂度计算逻辑用 RocksDB state 存储. 程序有小的更新直接 cancel -s 取消再 run -s 恢复 现在我们需要用 Kafka 替换掉 RocketMQ,消息内容都是一样的, flink 程序需要改一下改为读 Kafka 我可以直接 cancel -s 后再 run -s 复用之前的 state 吗? [email protected]

TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function

2020-08-19 文章 wangl...@geekplus.com
接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中 keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new ListAggregate()).addSink(new TemplateMySQLSink()); ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次 但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregat

FlinkSQL sink 到 kafka partition 规则是怎样的?

2020-08-18 文章 wangl...@geekplus.com
直接用 FlinkSQL 实现抽取字段、字段转换的功能。 INSERT INTO kafka_dwd_table SELECT a, b, fun(c) FROM kafka_ods_table kafka_dwd_table topic 里面的 record 是客户端依照特定的 partition 规则发送过去的 经过 上面的 FlinkSQL 操作会怎样做 partition 呢? 谢谢, 王磊 [email protected]

Flink slot 可以跨 job 共享吗?

2020-08-10 文章 wangl...@geekplus.com
Flink 一个 job 不同的 operator 可以共享 slot 但能做到不同的 job 共享 slot 吗? [email protected]