[ https://issues.apache.org/jira/browse/FLINK-22424?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Piotr Nowojski closed FLINK-22424. ---------------------------------- Resolution: Fixed merged to release-1.13 as da26733e484 and 65319b256c8 > Writing to already released buffers potentially causing data corruption > during job failover/cancellation > -------------------------------------------------------------------------------------------------------- > > Key: FLINK-22424 > URL: https://issues.apache.org/jira/browse/FLINK-22424 > Project: Flink > Issue Type: Bug > Components: Runtime / Network > Affects Versions: 1.6.4, 1.7.2, 1.8.3, 1.9.3, 1.10.3, 1.11.3, 1.12.2, > 1.13.0 > Reporter: Piotr Nowojski > Assignee: Piotr Nowojski > Priority: Critical > Labels: pull-request-available > Fix For: 1.11.4, 1.14.0, 1.13.1, 1.12.4 > > > I modified the code to not re-use the same memory segments, but on recycling > always free up the segment. And what I have observed is a similar problem as > reported in FLINK-21181 ticket, but even more severe: > {noformat} > Caused by: java.lang.RuntimeException: segment has been freed > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:109) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:93) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:44) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) > at > org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50) > at > org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase$ReEmitAll.process(UnalignedCheckpointStressITCase.java:477) > at > org.apache.flink.test.checkpointing.UnalignedCheckpointStressITCase$ReEmitAll.process(UnalignedCheckpointStressITCase.java:468) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57) > at > org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:577) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:533) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1395) > ... 11 more > Caused by: java.lang.IllegalStateException: segment has been freed > at > org.apache.flink.core.memory.MemorySegment.put(MemorySegment.java:483) > at > org.apache.flink.core.memory.MemorySegment.put(MemorySegment.java:1398) > at > org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:100) > at > org.apache.flink.runtime.io.network.buffer.BufferBuilder.appendAndCommit(BufferBuilder.java:82) > at > org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForNewRecord(BufferWritingResultPartition.java:250) > at > org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:142) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:104) > at > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) > ... 24 more > {noformat} > That's happening also during cancellation/job failover. It's failing when > trying to write to already `free`'ed up buffer. Without my changes, this code > would silently write some data to a buffer that has already been > recycled/returned to the pool. If someone else would pick up this buffer, it > would easily lead to the data corruption. > As far as I can tell, the exact reason behind this is that the buffer to > which timer attempts to write to, has been released from > `ResultSubpartition#onConsumedSubpartition`, causing `BufferConsumer` to be > closed (which recycles/frees underlying memory segment ), while matching > `BufferBuilder` is still being used... -- This message was sent by Atlassian Jira (v8.3.4#803005)