Re:回复: Re: 滑动窗口按照处理时间触发的问题

2024-01-02 文章 haishui
比如并行度是4,任务执行图是: Source(p=1) ==reblance=> flatMap和Timestamp/watermrk(p=4) =hash=> window(p=4) window的水位线取上游四个算子水位线的最小值, 你需要写4个数据,才能让四个子任务水位线更新,window的水位线才有一次更新 Best regards, haishui 在 2024-01-03 14:25:48,"ha.fen...@aisino.com" 写道: >设置并行度1确实可以了。env.setParallelism(1); >这

Re:回复: Re: 滑动窗口按照处理时间触发的问题

2024-01-02 文章 haishui
Hi, 应该是并行度的原因,你可以先将并行度设置为1试试。 Best regards, haishui 在 2024-01-03 12:24:20,"ha.fen...@aisino.com" 写道: >帮我看看代码,感觉是代码的问题,使用滚动窗口问题一样,5分钟的滚动,也是输入1704130441000才触发函数的 >public static void main(String[] args) throws Exception { >StreamExecu

事件流和规则流匹配问题

2023-08-24 文章 haishui
Hi all, 对于广播状态[1]文档中提到的事件流和规则流匹配问题,想问问大家有什么好的办法保证规则流应该加载完成后再处理事件流。 我想通过mysql cdc加载规则,在快照消费完之前,已经有事件消费了而且没有匹配到相应的规则,有没有办法能够在快照处理结束后再开始消费kafka里面的事件 [1] https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/fault-tolerance/broadcast_state/

水位线对齐与空闲问题

2023-03-16 文章 haishui
hi, 我在1.15.x和1.16.1对水位线策略进行测试发现水位线对齐和idleness同时使用会造成变成空闲的source无法再消费kafka数据。这是一个bug吗? 我的水位线策略如下: WatermarkStrategy .forBoundedOutOfOrderness(Duration.ofMillis(0)) .withTimestampAssigner((element, recordTimestamp) -> Long.parseLong(element))

Re:Flink Kafka Sink时间戳异常

2023-02-27 文章 haishui
hi, 这个问题是因为经过窗口算子后StreamRecord中指定的时间时间戳被改成了window.maxTimestamp(),可以查看[1]中WindowOperator或EvictingWindowOperator中的emitWindowContents方法。 如果想要更改时间戳,可以实现一个ProcessFuncton TimestampedCollector collector = (TimestampedCollector) out; collector.setAbsoluteTimestamp( ); collector.collect(value);

FlinkSql如何实现水位线对齐

2023-02-21 文章 haishui
Hi, all 以并行度4读取kafka的topic1和topic2形成两个流,然后IntervalJoin。在kafka堆积大量数据的情况下,我分别用SQL和DataStream API实现了上述功能。 使用SQL实现的作业中IntervalJoin算子的状态会逐渐增大,直到checkpoint失败。原因是在8个Source分区中输出水位线差距很大。 使用API实现的作业,在使用Flink15版本的水位线对齐后可以保证正常读取topic内的所有数据。

Re:Re: 关于Managed Memory的疑问

2022-09-22 文章 haishui
感谢解答 在 2022-09-22 15:00:48,"Shammon FY" 写道: >Hi @haishui > >这里提到的Managed Memory用于排序、哈希表等,一般是在flink批式作业里用到,例如HashJoin。 >流式计算的join算子,使用statebackend存储状态,例如rocksdb。批式计算的join算子跟流式的join算子处理不同,例如批式的HashJoinOperator算子,会创建BinaryHashTable进行分桶并建立hash表,BinaryHashTable会从

关于Managed Memory的疑问

2022-09-22 文章 haishui
官方文档的Configration对于taskmanager.memory.managed.size的介绍说托管内存用于排序、哈希表、缓存中间结果和RocksDB状态后端。 在MemoryConfigration的Memory Tuning Guide中介绍HashMap状态后端时说 如果Job是无状态或使用HashMap状态后端时可以设置托管内存为0。

Re:Re: Re: 关于keyby()如何保留原并行度的问题

2022-09-08 文章 haishui
定时器需要keyedStateBackend,所以必须是KeyedStream才能使用定时器。 如果让上游数据不改变subTask可以考虑DataStreamUtils#reinterpretAsKeyedStream方法,这是一个实验功能,见[1]。需要保证原来的DataStream已经是按key分组过的。 [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/experimental/ 在 2022-09-08

回复: Flink状态过期时是否可以将其输出到日志中

2022-07-08 文章 haishui
有什么方式可以获取过期的状态呢,似乎flink没有提供这方面的接口 发件人: yidan zhao 发送时间: 2022年7月8日 13:46 收件人: user-zh 主题: Re: Flink状态过期时是否可以将其输出到日志中 可以考虑通过sideout方式自己打印,或者补充计算。 haishui 于2022年6月27日周一 14:10写道: > > Hi, > Flink的状态过期是否能像咖啡因缓存那样,在数据过期时调用回调函数将过期的内容打印在日志中。 > > Best Regards!

Flink状态过期时是否可以将其输出到日志中

2022-06-27 文章 haishui
Hi, Flink的状态过期是否能像咖啡因缓存那样,在数据过期时调用回调函数将过期的内容打印在日志中。 Best Regards!