比如并行度是4,任务执行图是: Source(p=1) ==reblance=> flatMap和Timestamp/watermrk(p=4)
=hash=> window(p=4)
window的水位线取上游四个算子水位线的最小值, 你需要写4个数据,才能让四个子任务水位线更新,window的水位线才有一次更新
Best regards,
haishui
在 2024-01-03 14:25:48,"ha.fen...@aisino.com" 写道:
>设置并行度1确实可以了。env.setParallelism(1);
>这
Hi,
应该是并行度的原因,你可以先将并行度设置为1试试。
Best regards,
haishui
在 2024-01-03 12:24:20,"ha.fen...@aisino.com" 写道:
>帮我看看代码,感觉是代码的问题,使用滚动窗口问题一样,5分钟的滚动,也是输入1704130441000才触发函数的
>public static void main(String[] args) throws Exception {
>StreamExecu
Hi all,
对于广播状态[1]文档中提到的事件流和规则流匹配问题,想问问大家有什么好的办法保证规则流应该加载完成后再处理事件流。
我想通过mysql cdc加载规则,在快照消费完之前,已经有事件消费了而且没有匹配到相应的规则,有没有办法能够在快照处理结束后再开始消费kafka里面的事件
[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/broadcast_state/
hi,
我在1.15.x和1.16.1对水位线策略进行测试发现水位线对齐和idleness同时使用会造成变成空闲的source无法再消费kafka数据。这是一个bug吗?
我的水位线策略如下:
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofMillis(0))
.withTimestampAssigner((element, recordTimestamp) ->
Long.parseLong(element))
hi,
这个问题是因为经过窗口算子后StreamRecord中指定的时间时间戳被改成了window.maxTimestamp(),可以查看[1]中WindowOperator或EvictingWindowOperator中的emitWindowContents方法。
如果想要更改时间戳,可以实现一个ProcessFuncton
TimestampedCollector collector = (TimestampedCollector) out;
collector.setAbsoluteTimestamp( );
collector.collect(value);
Hi, all
以并行度4读取kafka的topic1和topic2形成两个流,然后IntervalJoin。在kafka堆积大量数据的情况下,我分别用SQL和DataStream
API实现了上述功能。
使用SQL实现的作业中IntervalJoin算子的状态会逐渐增大,直到checkpoint失败。原因是在8个Source分区中输出水位线差距很大。
使用API实现的作业,在使用Flink15版本的水位线对齐后可以保证正常读取topic内的所有数据。
感谢解答
在 2022-09-22 15:00:48,"Shammon FY" 写道:
>Hi @haishui
>
>这里提到的Managed Memory用于排序、哈希表等,一般是在flink批式作业里用到,例如HashJoin。
>流式计算的join算子,使用statebackend存储状态,例如rocksdb。批式计算的join算子跟流式的join算子处理不同,例如批式的HashJoinOperator算子,会创建BinaryHashTable进行分桶并建立hash表,BinaryHashTable会从
官方文档的Configration对于taskmanager.memory.managed.size的介绍说托管内存用于排序、哈希表、缓存中间结果和RocksDB状态后端。
在MemoryConfigration的Memory Tuning Guide中介绍HashMap状态后端时说
如果Job是无状态或使用HashMap状态后端时可以设置托管内存为0。
定时器需要keyedStateBackend,所以必须是KeyedStream才能使用定时器。
如果让上游数据不改变subTask可以考虑DataStreamUtils#reinterpretAsKeyedStream方法,这是一个实验功能,见[1]。需要保证原来的DataStream已经是按key分组过的。
[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/experimental/
在 2022-09-08
有什么方式可以获取过期的状态呢,似乎flink没有提供这方面的接口
发件人: yidan zhao
发送时间: 2022年7月8日 13:46
收件人: user-zh
主题: Re: Flink状态过期时是否可以将其输出到日志中
可以考虑通过sideout方式自己打印,或者补充计算。
haishui 于2022年6月27日周一 14:10写道:
>
> Hi,
> Flink的状态过期是否能像咖啡因缓存那样,在数据过期时调用回调函数将过期的内容打印在日志中。
>
> Best Regards!
Hi,
Flink的状态过期是否能像咖啡因缓存那样,在数据过期时调用回调函数将过期的内容打印在日志中。
Best Regards!
11 matches
Mail list logo