Flink 一个 job 不同的 operator 可以共享 slot
但能做到不同的 job 共享 slot 吗?
[email protected]
直接用 FlinkSQL 实现抽取字段、字段转换的功能。
INSERT INTO kafka_dwd_table SELECT a, b, fun(c) FROM kafka_ods_table
kafka_dwd_table topic 里面的 record 是客户端依照特定的 partition 规则发送过去的
经过 上面的 FlinkSQL 操作会怎样做 partition 呢?
谢谢,
王磊
[email protected]
接收 kakka 消息,按消息中代表 tableName 的字段做 keyBy, 然后写到 数据库中
keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(1))).aggregate(new
ListAggregate()).addSink(new TemplateMySQLSink());
ListAggregate 只是做简单的合并,我这样做的目的是把一个表一秒中的所有消息 merge 成一个 List, 后续只操作数据库一次
但我测试发现一秒内来多条消息时候,每一条消息都会触发 aggregat
有一个 flink streaming 的程序,读 RocketMQ,中间有一些复杂度计算逻辑用 RocksDB state 存储.
程序有小的更新直接 cancel -s 取消再 run -s 恢复
现在我们需要用 Kafka 替换掉 RocketMQ,消息内容都是一样的, flink 程序需要改一下改为读 Kafka
我可以直接 cancel -s 后再 run -s 复用之前的 state 吗?
[email protected]
你 pushgateway 是什么版本?版本降到 0.8.0 试一下
[email protected]
Sender: Jim Chen
Send Time: 2020-09-02 17:43
Receiver: flink_user_zh
Subject: flink1.10集成PrometheusPushGateway,日志报错Failed to push metrics to
PushGateway with jobName
Hi:
我的环境是flink1.10.1,是基于yarn的per
job模式运行的。现在集成了
userId 一直持续不断的有消息过来,那 7 天之前 被 add 到 list 的记录会不会被删除呢?
谢谢,
王磊
[email protected]
(statement.getSql(), statement.getParasMap());
} catch (Exception e) {
e.printStackTrace();
}
}
Flink 能保证 namedTemplate.update(statement.getSql(), statement.getParasMap())
一定执行成功吗?
谢谢,
王磊
[email protected]