这个应该不行,因为这个ttl是删除整个map状态,而我是需要最近半小时内的数据量,所以这个map状态,如果一直有数据写入,我就只需要获取最近的半小时,超过的就在map里面删除,这个ttl不能精确到map里面的具体的k-v 对
athlon...@gmail.com <athlon...@gmail.com> 于2019年7月25日周四 下午6:40写道: > 其实你可以不用自己删除.使用TTL设置短一些时间,试试 > > > > athlon...@gmail.com > > 发件人: 戴嘉诚 > 发送时间: 2019-07-25 18:24 > 收件人: user-zh > 主题: Re: Flink checkpoint 并发问题 > 你好,这个参数我没有设置过,对于checkpoint,我只是设置了rockDB增量写入,其他都没有设置了 > > athlon...@gmail.com <athlon...@gmail.com> 于2019年7月25日周四 下午6:20写道: > > > setMaxConcurrentCheckpoints 这个参数你设置过么? > > > > > > > > athlon...@gmail.com > > > > 发件人: 戴嘉诚 > > 发送时间: 2019-07-25 18:07 > > 收件人: user-zh > > 主题: Flink checkpoint 并发问题 > > 大家好: > > > > 我这里有个Job,会经常不定时的挂掉重启,具体的异常日志为一下,大致是checkpoint的同步失败的 > > > > > > > 在这个job中,具体是查询最近半小时的数据量是否达到配置的阈值,我使用了rockDB作为后端,然后中间会有个processElement方法会把Key出来的消息,在内部中的MapState堆积,同时也会每条消息过来,就遍历一遍MapState,来删除其中过时的数据。 > > > > > > > > > 这个异常是不是我在ProcessElement方法内对MapState进行删除的过程中,checkpoint异步复制这个state到rockDB导致的? > > > > > > java.lang.Exception: Could not perform checkpoint 550 for operator > > KeyedProcess -> async wait operator -> Flat Map -> Sink: 写入redis库存 > > (16/20). > > > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595) > > > > at > > > org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396) > > > > at > > > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292) > > > > at > > > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200) > > > > at org.apache.flink.streaming.runtime.io > > .StreamInputProcessor.processInput(StreamInputProcessor.java:209) > > > > at > > > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > > > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > > > > at java.lang.Thread.run(Thread.java:748) > > > > Caused by: java.lang.Exception: Could not complete snapshot 550 for > > operator KeyedProcess -> async wait operator -> Flat Map -> Sink: > > 写入redis库存 (16/20). > > > > at > > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422) > > > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113) > > > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055) > > > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729) > > > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641) > > > > at > > > org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586) > > > > ... 8 more > > > > Caused by: java.util.ConcurrentModificationException > > > > at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442) > > > > at java.util.HashMap$EntryIterator.next(HashMap.java:1476) > > > > at java.util.HashMap$EntryIterator.next(HashMap.java:1474) > > > > at > > > com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156) > > > > at > > > com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21) > > > > at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862) > > > > at > > > org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248) > > > > at > > > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:105) > > > > at > > > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46) > > > > at > > > org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:73) > > > > at > > > org.apache.flink.runtime.state.PartitionableListState.<init>(PartitionableListState.java:68) > > > > at > > > org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:80) > > > > at > > > org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:88) > > > > at > > > org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:261) > > > > at > > > org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402) > > > > ... 13 more > > >