[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322268#comment-16322268 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK closed the pull request at: https://github.com/apache/flink/pull/5275 > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322267#comment-16322267 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/5275 closed via a316989e5dfbb1dc0d555193425a4d6bd5f42d8d > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.5.0, 1.4.1 > > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319989#comment-16319989 ] ASF GitHub Bot commented on FLINK-7499: --- GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/5275 [FLINK-7499][io] fix double buffer release in SpillableSubpartitionView ## What is the purpose of the change This is a rebase of #4581 for the release-1.4 branch. Please refer to the original PR for details You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-7499-1.4 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5275.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5275 > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.5.0 > > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313191#comment-16313191 ] ASF GitHub Bot commented on FLINK-7499: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4581 > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289215#comment-16289215 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4581 ...rebased onto latest master > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268467#comment-16268467 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4581 test failures are unrelated this time (failing kafka download for the end-to-end tests) > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268374#comment-16268374 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4581 sorry about that - tested locally now and it should go through...we'll see > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266940#comment-16266940 ] ASF GitHub Bot commented on FLINK-7499: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4581 Still failing with some checkstyle violations in `ResultPartition.java` > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265890#comment-16265890 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4581 (arg - I guess I should really fix my default file template some time...) -> the latest commit fixes the missing license text in the new test file > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265444#comment-16265444 ] ASF GitHub Bot commented on FLINK-7499: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4581 Rat check did not pass. Could you please fix this @NicoK. > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265155#comment-16265155 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152944166 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -320,4 +559,40 @@ void awaitNotifications(long awaitedNumNotifiedBuffers, long timeoutMillis) thro } } } + + /** +* An {@link IOManagerAsync} that creates closed {@link BufferFileWriter} instances in its +* {@link #createBufferFileWriter(FileIOChannel.ID)} method. +* +* These {@link BufferFileWriter} objects will thus throw an exception when trying to add +* write requests, e.g. by calling {@link BufferFileWriter#writeBlock(Object)}. +*/ + private static class IOManagerAsyncWithClosedBufferFileWriter extends IOManagerAsync { + @Override + public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) + throws IOException { + BufferFileWriter bufferFileWriter = super.createBufferFileWriter(channelID); + bufferFileWriter.close(); + return bufferFileWriter; + } + } + + /** +* An {@link IOManagerAsync} that creates stalling {@link BufferFileWriter} instances in its +* {@link #createBufferFileWriter(FileIOChannel.ID)} method. +* +* These {@link BufferFileWriter} objects will accept {@link BufferFileWriter#writeBlock(Object)} +* requests but never actually perform any write operation (be sure to clean up the buffers +* manually!). +*/ + private static class IOManagerAsyncWithStallingBufferFileWriter extends IOManagerAsync { + @Override + public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) + throws IOException { + BufferFileWriter bufferFileWriter = spy(super.createBufferFileWriter(channelID)); --- End diff -- I don't have (too) hard feelings about it, so there it is - along with some more minor fixes in the tests and one additional test for `AsynchronousBufferFileWriter` itself (the change there was not covered yet) > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265044#comment-16265044 ] ASF GitHub Bot commented on FLINK-7499: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152918616 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -320,4 +559,40 @@ void awaitNotifications(long awaitedNumNotifiedBuffers, long timeoutMillis) thro } } } + + /** +* An {@link IOManagerAsync} that creates closed {@link BufferFileWriter} instances in its +* {@link #createBufferFileWriter(FileIOChannel.ID)} method. +* +* These {@link BufferFileWriter} objects will thus throw an exception when trying to add +* write requests, e.g. by calling {@link BufferFileWriter#writeBlock(Object)}. +*/ + private static class IOManagerAsyncWithClosedBufferFileWriter extends IOManagerAsync { + @Override + public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) + throws IOException { + BufferFileWriter bufferFileWriter = super.createBufferFileWriter(channelID); + bufferFileWriter.close(); + return bufferFileWriter; + } + } + + /** +* An {@link IOManagerAsync} that creates stalling {@link BufferFileWriter} instances in its +* {@link #createBufferFileWriter(FileIOChannel.ID)} method. +* +* These {@link BufferFileWriter} objects will accept {@link BufferFileWriter#writeBlock(Object)} +* requests but never actually perform any write operation (be sure to clean up the buffers +* manually!). +*/ + private static class IOManagerAsyncWithStallingBufferFileWriter extends IOManagerAsync { + @Override + public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) + throws IOException { + BufferFileWriter bufferFileWriter = spy(super.createBufferFileWriter(channelID)); --- End diff -- :/ and with every such case we increase the costs of refactors. I would still extract those classes to the correct package. > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265043#comment-16265043 ] ASF GitHub Bot commented on FLINK-7499: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152918323 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -300,6 +315,230 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception assertTrue(buffer.isRecycled()); } + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable finished partition. +*/ + @Test + public void testAddOnFinishedSpillablePartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + partition.finish(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { --- End diff -- Oh right, sorry my bad :) I don't know I thought that it will somehow swallow the exceptions :) > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264832#comment-16264832 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4581 ok, fixed > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264825#comment-16264825 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4581 ok, now I'm using the buffer incorrectly in `SpillableSubpartition#add`...let me re-think it once more > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264638#comment-16264638 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4581 alright - using fixup commits now for you ;) FYI: all `[FLINK-7499]` belong together and can be squashed > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264633#comment-16264633 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152850290 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -320,4 +559,40 @@ void awaitNotifications(long awaitedNumNotifiedBuffers, long timeoutMillis) thro } } } + + /** +* An {@link IOManagerAsync} that creates closed {@link BufferFileWriter} instances in its +* {@link #createBufferFileWriter(FileIOChannel.ID)} method. +* +* These {@link BufferFileWriter} objects will thus throw an exception when trying to add +* write requests, e.g. by calling {@link BufferFileWriter#writeBlock(Object)}. +*/ + private static class IOManagerAsyncWithClosedBufferFileWriter extends IOManagerAsync { + @Override + public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) + throws IOException { + BufferFileWriter bufferFileWriter = super.createBufferFileWriter(channelID); + bufferFileWriter.close(); + return bufferFileWriter; + } + } + + /** +* An {@link IOManagerAsync} that creates stalling {@link BufferFileWriter} instances in its +* {@link #createBufferFileWriter(FileIOChannel.ID)} method. +* +* These {@link BufferFileWriter} objects will accept {@link BufferFileWriter#writeBlock(Object)} +* requests but never actually perform any write operation (be sure to clean up the buffers +* manually!). +*/ + private static class IOManagerAsyncWithStallingBufferFileWriter extends IOManagerAsync { + @Override + public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) + throws IOException { + BufferFileWriter bufferFileWriter = spy(super.createBufferFileWriter(channelID)); --- End diff -- In theory, you are right - in practice though (and that's what I tried first), `AsynchronousBufferFileWriter` cannot be extended from anywhere outside its package due to `WriteRequest` being package-private. Since the writer implementations I needed are not too generic, I did not want to promote them to this package (in the tests folder, of course) nor did I want to make `WriteRequest` public...hence Mockito. > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264629#comment-16264629 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152849820 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -300,6 +315,230 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception assertTrue(buffer.isRecycled()); } + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable finished partition. +*/ + @Test + public void testAddOnFinishedSpillablePartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + partition.finish(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { + if (!buffer.isRecycled()) { + Assert.fail("buffer not recycled"); + buffer.recycle(); + } + // finish adds an EndOfPartitionEvent + assertEquals(1, partition.getTotalNumberOfBuffers()); + assertEquals(4, partition.getTotalNumberOfBytes()); + } + } + + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled finished partition. +*/ + @Test + public void testAddOnFinishedSpilledPartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + assertEquals(0, partition.releaseMemory()); + partition.finish(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { + if (!buffer.isRecycled()) { + Assert.fail("buffer not recycled"); + buffer.recycle(); + } + // finish adds an EndOfPartitionEvent + assertEquals(1, partition.getTotalNumberOfBuffers()); + assertEquals(4, partition.getTotalNumberOfBytes()); + } + } + + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable released partition. +*/ + @Test + public void testAddOnReleasedSpillablePartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + partition.release(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { + if (!buffer.isRecycled()) { + Assert.fail("buffer not recycled"); + buffer.recycle(); + } + assertEquals(0, partition.getTotalNumberOfBuffers()); + assertEquals(0, partition.getTotalNumberOfBytes()); + } + } + + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled released partition. +*/ + @Test + public void testAddOnReleasedSpilledPartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + partition.release(); + assertEquals(0, partition.releaseMemory()); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { + if (!buffer.isRecycled()) { + Assert.fail("buffer not recycled"); + buffer.recycle(); + } + assertEquals(0, partition.getTotalNumberOfBuffers()); + assertEquals(0, partition.getTotalNumberOfBytes()); + } + } + + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled partition where adding the +* write request fails with an exception. +*/ + @Test + public void testAddOnSpilledPartitionWithSlowWriter()
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264620#comment-16264620 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152849113 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -300,6 +315,230 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception assertTrue(buffer.isRecycled()); } + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable finished partition. +*/ + @Test + public void testAddOnFinishedSpillablePartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + partition.finish(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { --- End diff -- What do you mean? What failure / normal path? `partition.add()` should always succeed in this case, i.e. it does not throw. > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264605#comment-16264605 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152847273 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -300,6 +315,230 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception assertTrue(buffer.isRecycled()); } + /** --- End diff -- you're right - some tests actually share most of their code. I'll extract a common test method to reduce some code. > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264593#comment-16264593 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152845922 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java --- @@ -145,6 +165,45 @@ protected void testAddOnReleasedPartition(final ResultPartitionType pipelined) } } + @Test + public void testAddOnPipelinedPartition() throws Exception { + testAddOnPartition(ResultPartitionType.PIPELINED); + } + + @Test + public void testAddOnBlockingPartition() throws Exception { + testAddOnPartition(ResultPartitionType.BLOCKING); + } + + /** +* Tests {@link ResultPartition#add} on a working partition. +* +* @param pipelined the result partition type to set up +*/ + protected void testAddOnPartition(final ResultPartitionType pipelined) + throws Exception { + ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class); --- End diff -- do you think it's better to have a real implementation of that interface, spy on it, and then verify the expected method calls? - this actually seems like some more overhead with little/no gain. I'd prefer to leave it as is for now. > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264573#comment-16264573 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152841030 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java --- @@ -267,28 +268,29 @@ public ResultPartitionType getPartitionType() { * first buffer has been added. */ public void add(Buffer buffer, int subpartitionIndex) throws IOException { - boolean success = false; + checkNotNull(buffer); try { checkInProduceState(); + } catch (Throwable t) { + buffer.recycle(); --- End diff -- Actually, a sanity check for double-recycle comes with #4613 for which I also needed this PR. It does, however, work differently and only checks that the reference counter does not go below 0 - I guess, this way we do not put too much pressure on the garbage collector compared to creating new Buffer instances for each `retain()` > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264559#comment-16264559 ] ASF GitHub Bot commented on FLINK-7499: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152829734 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -320,4 +559,40 @@ void awaitNotifications(long awaitedNumNotifiedBuffers, long timeoutMillis) thro } } } + + /** +* An {@link IOManagerAsync} that creates closed {@link BufferFileWriter} instances in its +* {@link #createBufferFileWriter(FileIOChannel.ID)} method. +* +* These {@link BufferFileWriter} objects will thus throw an exception when trying to add +* write requests, e.g. by calling {@link BufferFileWriter#writeBlock(Object)}. +*/ + private static class IOManagerAsyncWithClosedBufferFileWriter extends IOManagerAsync { + @Override + public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) + throws IOException { + BufferFileWriter bufferFileWriter = super.createBufferFileWriter(channelID); + bufferFileWriter.close(); + return bufferFileWriter; + } + } + + /** +* An {@link IOManagerAsync} that creates stalling {@link BufferFileWriter} instances in its +* {@link #createBufferFileWriter(FileIOChannel.ID)} method. +* +* These {@link BufferFileWriter} objects will accept {@link BufferFileWriter#writeBlock(Object)} +* requests but never actually perform any write operation (be sure to clean up the buffers +* manually!). +*/ + private static class IOManagerAsyncWithStallingBufferFileWriter extends IOManagerAsync { + @Override + public BufferFileWriter createBufferFileWriter(FileIOChannel.ID channelID) + throws IOException { + BufferFileWriter bufferFileWriter = spy(super.createBufferFileWriter(channelID)); --- End diff -- again, why mockito? It's terrible for debugging and completely fails with refactoring (like added new overloaded method `BlockChannelWriterWithCallback::writeBlock` or changing signatures). Especially that at the same time overloading returning here anonymous class overloading `AsynchronousBufferFileWriter`, that replaces `writeBlock` with empty method is just as easy :( > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264557#comment-16264557 ] ASF GitHub Bot commented on FLINK-7499: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152826521 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -300,6 +315,230 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception assertTrue(buffer.isRecycled()); } + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable finished partition. +*/ + @Test + public void testAddOnFinishedSpillablePartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + partition.finish(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { --- End diff -- ? Why does this test covers for both failure and normal paths? What if one of them never happen? > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264553#comment-16264553 ] ASF GitHub Bot commented on FLINK-7499: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152826839 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -300,6 +315,230 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception assertTrue(buffer.isRecycled()); } + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable finished partition. +*/ + @Test + public void testAddOnFinishedSpillablePartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + partition.finish(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { + if (!buffer.isRecycled()) { + Assert.fail("buffer not recycled"); + buffer.recycle(); + } + // finish adds an EndOfPartitionEvent + assertEquals(1, partition.getTotalNumberOfBuffers()); + assertEquals(4, partition.getTotalNumberOfBytes()); + } + } + + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled finished partition. +*/ + @Test + public void testAddOnFinishedSpilledPartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + assertEquals(0, partition.releaseMemory()); + partition.finish(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { + if (!buffer.isRecycled()) { + Assert.fail("buffer not recycled"); + buffer.recycle(); + } + // finish adds an EndOfPartitionEvent + assertEquals(1, partition.getTotalNumberOfBuffers()); + assertEquals(4, partition.getTotalNumberOfBytes()); + } + } + + /** +* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable released partition. +*/ + @Test + public void testAddOnReleasedSpillablePartition() throws Exception { + SpillableSubpartition partition = createSubpartition(); + partition.release(); + + Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), + FreeingBufferRecycler.INSTANCE); + try { + partition.add(buffer); + } finally { --- End diff -- ditto (and below) > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264552#comment-16264552 ] ASF GitHub Bot commented on FLINK-7499: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152826013 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java --- @@ -145,6 +165,45 @@ protected void testAddOnReleasedPartition(final ResultPartitionType pipelined) } } + @Test + public void testAddOnPipelinedPartition() throws Exception { + testAddOnPartition(ResultPartitionType.PIPELINED); + } + + @Test + public void testAddOnBlockingPartition() throws Exception { + testAddOnPartition(ResultPartitionType.BLOCKING); + } + + /** +* Tests {@link ResultPartition#add} on a working partition. +* +* @param pipelined the result partition type to set up +*/ + protected void testAddOnPartition(final ResultPartitionType pipelined) + throws Exception { + ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class); --- End diff -- https://imgflip.com/i/1zvsnt :( > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264558#comment-16264558 ] ASF GitHub Bot commented on FLINK-7499: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152837725 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -300,6 +315,230 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception assertTrue(buffer.isRecycled()); } + /** --- End diff -- could some of those tests be squashed into fewer methods? Or you think that wouldn't be a good idea? > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264556#comment-16264556 ] ASF GitHub Bot commented on FLINK-7499: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152822778 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java --- @@ -95,20 +95,23 @@ public boolean add(Buffer buffer) throws IOException { return false; } - // The number of buffers are needed later when creating - // the read views. If you ever remove this line here, - // make sure to still count the number of buffers. - updateStatistics(buffer); - if (spillWriter == null) { buffers.add(buffer); + // The number of buffers are needed later when creating + // the read views. If you ever remove this line here, + // make sure to still count the number of buffers. --- End diff -- Is it tested somewhere? > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264554#comment-16264554 ] ASF GitHub Bot commented on FLINK-7499: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152825430 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java --- @@ -267,28 +268,29 @@ public ResultPartitionType getPartitionType() { * first buffer has been added. */ public void add(Buffer buffer, int subpartitionIndex) throws IOException { - boolean success = false; + checkNotNull(buffer); try { checkInProduceState(); + } catch (Throwable t) { + buffer.recycle(); --- End diff -- I wonder if we should have some sanity illegal state detection for double recycling the buffers. For example each buffer could only be recycled once (protected by a private field in the Buffer `boolean wasRecycled`). Whenever you call `retain()`, you would get a new instance of the `Buffer`, pointing to the same memory, but with new flag (so that both original and retained buffers could be recycled independently, but each one of them only once). > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264401#comment-16264401 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4581 ok, I extracted two hotfix commits not directly related to this fix (one for the added test checks you found, another one for fixing additional test cases) and made a follow-up commit with the changes still required to completely fix this. I also extended the test coverage a lot now. > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264399#comment-16264399 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152813899 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -192,21 +198,39 @@ public void testConsumeSpilledPartition() throws Exception { Buffer read = reader.getNextBuffer(); assertNotNull(read); + assertNotSame(buffer, read); + assertFalse(read.isRecycled()); read.recycle(); + assertTrue(read.isRecycled()); read = reader.getNextBuffer(); assertNotNull(read); + assertNotSame(buffer, read); + assertFalse(read.isRecycled()); read.recycle(); + assertTrue(read.isRecycled()); read = reader.getNextBuffer(); assertNotNull(read); + assertNotSame(buffer, read); + assertFalse(read.isRecycled()); read.recycle(); + assertTrue(read.isRecycled()); // End of partition read = reader.getNextBuffer(); assertNotNull(read); assertEquals(EndOfPartitionEvent.class, EventSerializer.fromBuffer(read, ClassLoader.getSystemClassLoader()).getClass()); + assertFalse(read.isRecycled()); read.recycle(); + assertTrue(read.isRecycled()); + + // finally check that the buffer has been freed after a successful (or failed) write --- End diff -- no - this is why I added numerous more tests now :) thanks for pointing this out > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264398#comment-16264398 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152813796 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -192,21 +198,39 @@ public void testConsumeSpilledPartition() throws Exception { Buffer read = reader.getNextBuffer(); assertNotNull(read); + assertNotSame(buffer, read); + assertFalse(read.isRecycled()); read.recycle(); + assertTrue(read.isRecycled()); read = reader.getNextBuffer(); assertNotNull(read); + assertNotSame(buffer, read); + assertFalse(read.isRecycled()); read.recycle(); + assertTrue(read.isRecycled()); --- End diff -- alright...extracted those changes into a separate hotfix > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264394#comment-16264394 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152813605 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -192,21 +198,39 @@ public void testConsumeSpilledPartition() throws Exception { Buffer read = reader.getNextBuffer(); assertNotNull(read); + assertNotSame(buffer, read); + assertFalse(read.isRecycled()); read.recycle(); + assertTrue(read.isRecycled()); read = reader.getNextBuffer(); assertNotNull(read); + assertNotSame(buffer, read); + assertFalse(read.isRecycled()); read.recycle(); + assertTrue(read.isRecycled()); read = reader.getNextBuffer(); assertNotNull(read); + assertNotSame(buffer, read); + assertFalse(read.isRecycled()); read.recycle(); + assertTrue(read.isRecycled()); // End of partition read = reader.getNextBuffer(); assertNotNull(read); assertEquals(EndOfPartitionEvent.class, EventSerializer.fromBuffer(read, ClassLoader.getSystemClassLoader()).getClass()); + assertFalse(read.isRecycled()); read.recycle(); + assertTrue(read.isRecycled()); + + // finally check that the buffer has been freed after a successful (or failed) write + final long deadline = System.currentTimeMillis() + 30_000L; // 30 secs + while (!buffer.isRecycled() && System.currentTimeMillis() < deadline) { + Thread.sleep(1); + } + assertTrue(buffer.isRecycled()); --- End diff -- No, it's not recycled in `partition.releaseMemory()` directly (or at least should not! - which is fixed now). The buffer will be recycled by the asynchronous writer thread once its content has been written to disk, i.e. after the `partition.releaseMemory()` call - I included such a check right before that call but actually, this is of limited use. > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264395#comment-16264395 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152813679 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -192,21 +198,39 @@ public void testConsumeSpilledPartition() throws Exception { --- End diff -- done > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261077#comment-16261077 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152342531 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java --- @@ -108,11 +108,7 @@ int releaseMemory() throws IOException { for (int i = 0; i < numBuffers; i++) { Buffer buffer = buffers.remove(); spilledBytes += buffer.getSize(); - try { - spillWriter.writeBlock(buffer); - } finally { - buffer.recycle(); - } + spillWriter.writeBlock(buffer); --- End diff -- Actually, if I see this correctly, here the original code is wrong since it is already recycling a buffer which was added to an asynchronous file write operation. This would lead to data corruption if the buffer is re-used in the meanwhile, wouldn't it?! > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261075#comment-16261075 ] ASF GitHub Bot commented on FLINK-7499: --- Github user NicoK commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r152342364 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java --- @@ -31,9 +31,26 @@ protected AsynchronousBufferFileWriter(ID channelID, RequestQueue super(channelID, requestQueue, CALLBACK, true); } + /** +* Writes the given block asynchronously. +* +* @param buffer +* the buffer to be written (will be recycled when done) --- End diff -- good catch, but actually, `SpillableSubpartition` doesn't do any recycling itself: in its `finish()` method, it relies on the buffer being on-heap and then garbage-collected, for the `add()` function, it relies on the caller, i.e. `ResultPartition#add()` (which I also forgot to adapt). > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16235762#comment-16235762 ] ASF GitHub Bot commented on FLINK-7499: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r148534322 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -192,21 +198,39 @@ public void testConsumeSpilledPartition() throws Exception { --- End diff -- nit: I'm evil I know, but could you replace mock `listener` with `new AwaitableBufferAvailablityListener()` if I please very nicely? > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16235760#comment-16235760 ] ASF GitHub Bot commented on FLINK-7499: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r148533217 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -192,21 +198,39 @@ public void testConsumeSpilledPartition() throws Exception { Buffer read = reader.getNextBuffer(); assertNotNull(read); + assertNotSame(buffer, read); + assertFalse(read.isRecycled()); read.recycle(); + assertTrue(read.isRecycled()); read = reader.getNextBuffer(); assertNotNull(read); + assertNotSame(buffer, read); + assertFalse(read.isRecycled()); read.recycle(); + assertTrue(read.isRecycled()); read = reader.getNextBuffer(); assertNotNull(read); + assertNotSame(buffer, read); + assertFalse(read.isRecycled()); read.recycle(); + assertTrue(read.isRecycled()); // End of partition read = reader.getNextBuffer(); assertNotNull(read); assertEquals(EndOfPartitionEvent.class, EventSerializer.fromBuffer(read, ClassLoader.getSystemClassLoader()).getClass()); + assertFalse(read.isRecycled()); read.recycle(); + assertTrue(read.isRecycled()); + + // finally check that the buffer has been freed after a successful (or failed) write --- End diff -- Is this testing for failed write anywhere? Could this test catch previous error in double recycling? > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16235764#comment-16235764 ] ASF GitHub Bot commented on FLINK-7499: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r148531416 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java --- @@ -108,11 +108,7 @@ int releaseMemory() throws IOException { for (int i = 0; i < numBuffers; i++) { Buffer buffer = buffers.remove(); spilledBytes += buffer.getSize(); - try { - spillWriter.writeBlock(buffer); - } finally { - buffer.recycle(); - } + spillWriter.writeBlock(buffer); --- End diff -- Here is the place where you depend on recycling which is not guaranteed by `spillWriter`'s interface. > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16235758#comment-16235758 ] ASF GitHub Bot commented on FLINK-7499: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r148532507 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -192,21 +198,39 @@ public void testConsumeSpilledPartition() throws Exception { Buffer read = reader.getNextBuffer(); assertNotNull(read); + assertNotSame(buffer, read); + assertFalse(read.isRecycled()); read.recycle(); + assertTrue(read.isRecycled()); read = reader.getNextBuffer(); assertNotNull(read); + assertNotSame(buffer, read); + assertFalse(read.isRecycled()); read.recycle(); + assertTrue(read.isRecycled()); read = reader.getNextBuffer(); assertNotNull(read); + assertNotSame(buffer, read); + assertFalse(read.isRecycled()); read.recycle(); + assertTrue(read.isRecycled()); // End of partition read = reader.getNextBuffer(); assertNotNull(read); assertEquals(EndOfPartitionEvent.class, EventSerializer.fromBuffer(read, ClassLoader.getSystemClassLoader()).getClass()); + assertFalse(read.isRecycled()); read.recycle(); + assertTrue(read.isRecycled()); + + // finally check that the buffer has been freed after a successful (or failed) write + final long deadline = System.currentTimeMillis() + 30_000L; // 30 secs + while (!buffer.isRecycled() && System.currentTimeMillis() < deadline) { + Thread.sleep(1); + } + assertTrue(buffer.isRecycled()); --- End diff -- Where is the last place in this test were you can add `assertFalse(buffer.isRecycled())`? I think placing it somewhere would help to understand what's going on and would be nice sanity check. ~~Isn't the `buffer` recycled in `partition.releaseMemory()` call far above?~~ yes it is ;) > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16235759#comment-16235759 ] ASF GitHub Bot commented on FLINK-7499: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r148537550 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -231,31 +255,48 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception // Initial notification assertEquals(1, listener.getNumNotifiedBuffers()); + assertFalse(buffer.isRecycled()); Buffer read = reader.getNextBuffer(); - assertNotNull(read); + assertSame(buffer, read); read.recycle(); assertEquals(2, listener.getNumNotifiedBuffers()); + assertFalse(buffer.isRecycled()); // Spill now assertEquals(2, partition.releaseMemory()); + assertFalse(buffer.isRecycled()); // still one in the reader! listener.awaitNotifications(4, 30_000); assertEquals(4, listener.getNumNotifiedBuffers()); read = reader.getNextBuffer(); - assertNotNull(read); + assertSame(buffer, read); read.recycle(); + // now the buffer may be freed, depending on the timing of the write operation + // -> let's do this check at the end of the test (to save some time) read = reader.getNextBuffer(); assertNotNull(read); + assertNotSame(buffer, read); + assertFalse(read.isRecycled()); read.recycle(); + assertTrue(read.isRecycled()); // End of partition read = reader.getNextBuffer(); assertNotNull(read); assertEquals(EndOfPartitionEvent.class, EventSerializer.fromBuffer(read, ClassLoader.getSystemClassLoader()).getClass()); + assertFalse(read.isRecycled()); read.recycle(); + assertTrue(read.isRecycled()); + + // finally check that the buffer has been freed after a successful (or failed) write --- End diff -- ditto failed write case and does this test have anything to do with fixed bug? > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16235763#comment-16235763 ] ASF GitHub Bot commented on FLINK-7499: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r148531290 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java --- @@ -31,9 +31,26 @@ protected AsynchronousBufferFileWriter(ID channelID, RequestQueue super(channelID, requestQueue, CALLBACK, true); } + /** +* Writes the given block asynchronously. +* +* @param buffer +* the buffer to be written (will be recycled when done) --- End diff -- Shouldn't this contract be in `BlockChannelWriterWithCallback`? Now it's kind of strange that recycle is implementation dependent. Especially that `SpillableSubpartition` is using `BufferFileWriter` and still depends on recycling. > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16235761#comment-16235761 ] ASF GitHub Bot commented on FLINK-7499: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4581#discussion_r148532775 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java --- @@ -192,21 +198,39 @@ public void testConsumeSpilledPartition() throws Exception { Buffer read = reader.getNextBuffer(); assertNotNull(read); + assertNotSame(buffer, read); + assertFalse(read.isRecycled()); read.recycle(); + assertTrue(read.isRecycled()); read = reader.getNextBuffer(); assertNotNull(read); + assertNotSame(buffer, read); + assertFalse(read.isRecycled()); read.recycle(); + assertTrue(read.isRecycled()); --- End diff -- Are those `read.isRecycled()` checks somehow related to this change/fix? Or just additional side hotfix? If the latter one, I would prefer to have them in separate commit because they are confusing me. > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber >Priority: Blocker > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView
[ https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140073#comment-16140073 ] ASF GitHub Bot commented on FLINK-7499: --- GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/4581 [FLINK-7499][io] also let AsynchronousBufferFileWriter#writeBlock() recycle the buffer in case of failures ## What is the purpose of the change `SpillableSubpartitionView#releaseMemory()` recycled the given buffer twice: once asynchronously after the write operation initiated via `AsynchronousBufferFileWriter#writeBlock()`and once in SpillableSubpartitionView#releaseMemory() after adding the write operation to the queue. Additionally, other uses of `AsynchronousBufferFileWriter#writeBlock()` did not cleanup in an error case which was also not done by `AsynchronousBufferFileWriter#writeBlock()` itself. This PR changes the behaviour of `AsynchronousBufferFileWriter#writeBlock()` to always take care of releasing the buffer, even adding the (asynchronous) write operation failed. ## Brief change log - let `AsynchronousBufferFileWriter#writeBlock()` take full recycling responsibility of the given buffer even in case of failures - remove the additional `recycle` call in `SpillableSubpartitionView#releaseMemory()` - adapt `SpillableSubpartitionTest` to find the duplicate `recycle()` calls ## Verifying this change This change added further checks to `SpillableSubpartitionTest` to verify the intended behaviour ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-7499 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4581.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4581 commit a986ebac2da4ac4ad00717e834fdc33f9fe9eb3a Author: Nico KruberDate: 2017-08-24T10:17:08Z [FLINK-7499][io] also let AsynchronousBufferFileWriter#writeBlock() recycle the buffer in case of failures This fixes a double-recycle in SpillableSubpartitionView and also makes sure that even if adding the (asynchronous) write operation fails, the buffer is properly freed in code that did not perform this cleanup. It avoids code duplication of this cleanup and it is also more consistent to take over responsibility of the given buffer even if an exception is thrown. > double buffer release in SpillableSubpartitionView > -- > > Key: FLINK-7499 > URL: https://issues.apache.org/jira/browse/FLINK-7499 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, > 1.3.2, 1.3.3 >Reporter: Nico Kruber >Assignee: Nico Kruber > > {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: > once asynchronously after the write operation and once in > {{SpillableSubpartitionView#releaseMemory()}} after adding the write > operation to the queue. > 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer > is recycled, the memory region may already be reused despite the pending write > 2) If, for some reason (probably only in tests like > {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?), > the buffer is retained and to be used in parallel somewhere else it may also > not be available anymore or contain corrupt data. -- This message was sent by Atlassian JIRA (v6.4.14#64029)