Flink slot 可以跨 job 共享吗?
Flink 一个 job 不同的 operator 可以共享 slot 但能做到不同的 job 共享 slot 吗? [email protected]
FlinkSQL sink 到 kafka partition 规则是怎样的?
直接用 FlinkSQL 实现抽取字段、字段转换的功能。 INSERT INTO kafka_dwd_table SELECT a, b, fun(c) FROM kafka_ods_table kafka_dwd_table topic 里面的 record 是客户端依照特定的 partition 规则发送过去的 经过 上面的 FlinkSQL 操作会怎样做 partition 呢? 谢谢, 王磊 [email protected]
TumblingProcessingTimeWindows 每来一条消息就会 trigger aggregate function
接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中 keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new ListAggregate()).addSink(new TemplateMySQLSink()); ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次 但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregate 的计算并且做入库操作 有什么方式让一个窗口只做一次 aggregate 操作吗? 谢谢, 王磊 [email protected]
Flink 涉及到 state 恢复能从 RocketMQ 直接转到 Kafka 吗?
有一个 flink streaming 的程序,读 RocketMQ,中间有一些复杂度计算逻辑用 RocksDB state 存储. 程序有小的更新直接 cancel -s 取消再 run -s 恢复 现在我们需要用 Kafka 替换掉 RocketMQ,消息内容都是一样的, flink 程序需要改一下改为读 Kafka 我可以直接 cancel -s 后再 run -s 复用之前的 state 吗? [email protected]
Re: flink1.10集成PrometheusPushGateway,日志报错Failed to push metrics to PushGateway with jobName
你 pushgateway 是什么版本?版本降到 0.8.0 试一下 [email protected] Sender: Jim Chen Send Time: 2020-09-02 17:43 Receiver: flink_user_zh Subject: flink1.10集成PrometheusPushGateway,日志报错Failed to push metrics to PushGateway with jobName Hi: 我的环境是flink1.10.1,是基于yarn的per job模式运行的。现在集成了PrometheusPushGateway,但是日志中,一直提示Failed to push metrics to PushGateway with jobName。 具体报错日志为: 2020-09-02 15:11:21.901 application_1598509186865_0129 172.22.64.72 wx11-dsj-flink004 [Flink-MetricRegistry-thread-1] WARN o.a.f.m.p. PrometheusPushGatewayReporter - Failed to push metrics to PushGateway with jobName realtime66b27e0fdfaa3860997abeb0170d84bb, groupingKey {}. java.io.IOException: Response code from http://172.16.24.146:9091/metrics/job/realtime66b27e0fdfaa3860997abeb0170d84bb was 200 at org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway .doRequest(PushGateway.java:297) at org.apache.flink.shaded.io.prometheus.client.exporter.PushGateway .push(PushGateway.java:127) at org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter .report(PrometheusPushGatewayReporter.java:109) at org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run( MetricRegistryImpl.java:441) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java: 511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask .access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask .run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor .java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor .java:624) at java.lang.Thread.run(Thread.java:748)
ListState 设置 TTL 会在 list 中删除之前的记录吗
考虑下面的场景:
KeyBy userId, 把该 userId 所用的相关记录存起来放在 ListState 中
private transient ListState list;
@Override
public void processElement(Tuple2 value, Context ctx,
Collector out)
throws Exception {
list.add(value.f1);
}
TTL 设为 7 天。
如果这个 userId 超过 7 天没有任何消息,那这个 userId 相应的 ListState 会被删除。
但如果这 userId 一直持续不断的有消息过来,那 7 天之前 被 add 到 list 的记录会不会被删除呢?
谢谢,
王磊
[email protected]
自定义的 MySQLSink 怎么保证一定会写到数据库呢?
source.map(..).addSink(new MySQLSink())
MySQLSink 就是接收前面算子生成的要执行的 SQL 并执行。
@Override
public void invoke(JDBCStatement statement, Context context) throws Exception {
log.info(statement.getSql());
log.info(statement.getParasMap().toString());
try {
namedTemplate.update(statement.getSql(), statement.getParasMap());
} catch (Exception e) {
e.printStackTrace();
}
}
Flink 能保证 namedTemplate.update(statement.getSql(), statement.getParasMap())
一定执行成功吗?
谢谢,
王磊
[email protected]
