[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322268#comment-16322268
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK closed the pull request at:

https://github.com/apache/flink/pull/5275


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2018-01-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16322267#comment-16322267
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/5275
  
closed via a316989e5dfbb1dc0d555193425a4d6bd5f42d8d


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2018-01-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16319989#comment-16319989
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/5275

[FLINK-7499][io] fix double buffer release in SpillableSubpartitionView

## What is the purpose of the change

This is a rebase of #4581 for the release-1.4 branch. Please refer to the 
original PR for details

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-7499-1.4

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5275.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5275






> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0
>
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2018-01-05 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16313191#comment-16313191
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4581


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-12-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16289215#comment-16289215
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4581
  
...rebased onto latest master


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268467#comment-16268467
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4581
  
test failures are unrelated this time (failing kafka download for the 
end-to-end tests)


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16268374#comment-16268374
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4581
  
sorry about that - tested locally now and it should go through...we'll see


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-27 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16266940#comment-16266940
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4581
  
Still failing with some checkstyle violations in `ResultPartition.java`


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265890#comment-16265890
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4581
  
(arg - I guess I should really fix my default file template some time...)

-> the latest commit fixes the missing license text in the new test file


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265444#comment-16265444
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4581
  
Rat check did not pass. Could you please fix this @NicoK.


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265155#comment-16265155
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152944166
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -320,4 +559,40 @@ void awaitNotifications(long 
awaitedNumNotifiedBuffers, long timeoutMillis) thro
}
}
}
+
+   /**
+* An {@link IOManagerAsync} that creates closed {@link 
BufferFileWriter} instances in its
+* {@link #createBufferFileWriter(FileIOChannel.ID)} method.
+*
+* These {@link BufferFileWriter} objects will thus throw an 
exception when trying to add
+* write requests, e.g. by calling {@link 
BufferFileWriter#writeBlock(Object)}.
+*/
+   private static class IOManagerAsyncWithClosedBufferFileWriter extends 
IOManagerAsync {
+   @Override
+   public BufferFileWriter createBufferFileWriter(FileIOChannel.ID 
channelID)
+   throws IOException {
+   BufferFileWriter bufferFileWriter = 
super.createBufferFileWriter(channelID);
+   bufferFileWriter.close();
+   return bufferFileWriter;
+   }
+   }
+
+   /**
+* An {@link IOManagerAsync} that creates stalling {@link 
BufferFileWriter} instances in its
+* {@link #createBufferFileWriter(FileIOChannel.ID)} method.
+*
+* These {@link BufferFileWriter} objects will accept {@link 
BufferFileWriter#writeBlock(Object)}
+* requests but never actually perform any write operation (be sure to 
clean up the buffers
+* manually!).
+*/
+   private static class IOManagerAsyncWithStallingBufferFileWriter extends 
IOManagerAsync {
+   @Override
+   public BufferFileWriter createBufferFileWriter(FileIOChannel.ID 
channelID)
+   throws IOException {
+   BufferFileWriter bufferFileWriter = 
spy(super.createBufferFileWriter(channelID));
--- End diff --

I don't have (too) hard feelings about it, so there it is - along with some 
more minor fixes in the tests and one additional test for 
`AsynchronousBufferFileWriter` itself (the change there was not covered yet)


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265044#comment-16265044
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152918616
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -320,4 +559,40 @@ void awaitNotifications(long 
awaitedNumNotifiedBuffers, long timeoutMillis) thro
}
}
}
+
+   /**
+* An {@link IOManagerAsync} that creates closed {@link 
BufferFileWriter} instances in its
+* {@link #createBufferFileWriter(FileIOChannel.ID)} method.
+*
+* These {@link BufferFileWriter} objects will thus throw an 
exception when trying to add
+* write requests, e.g. by calling {@link 
BufferFileWriter#writeBlock(Object)}.
+*/
+   private static class IOManagerAsyncWithClosedBufferFileWriter extends 
IOManagerAsync {
+   @Override
+   public BufferFileWriter createBufferFileWriter(FileIOChannel.ID 
channelID)
+   throws IOException {
+   BufferFileWriter bufferFileWriter = 
super.createBufferFileWriter(channelID);
+   bufferFileWriter.close();
+   return bufferFileWriter;
+   }
+   }
+
+   /**
+* An {@link IOManagerAsync} that creates stalling {@link 
BufferFileWriter} instances in its
+* {@link #createBufferFileWriter(FileIOChannel.ID)} method.
+*
+* These {@link BufferFileWriter} objects will accept {@link 
BufferFileWriter#writeBlock(Object)}
+* requests but never actually perform any write operation (be sure to 
clean up the buffers
+* manually!).
+*/
+   private static class IOManagerAsyncWithStallingBufferFileWriter extends 
IOManagerAsync {
+   @Override
+   public BufferFileWriter createBufferFileWriter(FileIOChannel.ID 
channelID)
+   throws IOException {
+   BufferFileWriter bufferFileWriter = 
spy(super.createBufferFileWriter(channelID));
--- End diff --

:/ and with every such case we increase the costs of refactors. I would 
still extract those classes to the correct package.


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265043#comment-16265043
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152918323
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -300,6 +315,230 @@ public void 
testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
assertTrue(buffer.isRecycled());
}
 
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable 
finished partition.
+*/
+   @Test
+   public void testAddOnFinishedSpillablePartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   partition.finish();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
--- End diff --

Oh right, sorry my bad :) I don't know I thought that it will somehow 
swallow the exceptions :)


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264832#comment-16264832
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4581
  
ok, fixed


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264825#comment-16264825
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4581
  
ok, now I'm using the buffer incorrectly in 
`SpillableSubpartition#add`...let me re-think it once more


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264638#comment-16264638
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4581
  
alright - using fixup commits now for you ;)
FYI: all `[FLINK-7499]` belong together and can be squashed


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264633#comment-16264633
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152850290
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -320,4 +559,40 @@ void awaitNotifications(long 
awaitedNumNotifiedBuffers, long timeoutMillis) thro
}
}
}
+
+   /**
+* An {@link IOManagerAsync} that creates closed {@link 
BufferFileWriter} instances in its
+* {@link #createBufferFileWriter(FileIOChannel.ID)} method.
+*
+* These {@link BufferFileWriter} objects will thus throw an 
exception when trying to add
+* write requests, e.g. by calling {@link 
BufferFileWriter#writeBlock(Object)}.
+*/
+   private static class IOManagerAsyncWithClosedBufferFileWriter extends 
IOManagerAsync {
+   @Override
+   public BufferFileWriter createBufferFileWriter(FileIOChannel.ID 
channelID)
+   throws IOException {
+   BufferFileWriter bufferFileWriter = 
super.createBufferFileWriter(channelID);
+   bufferFileWriter.close();
+   return bufferFileWriter;
+   }
+   }
+
+   /**
+* An {@link IOManagerAsync} that creates stalling {@link 
BufferFileWriter} instances in its
+* {@link #createBufferFileWriter(FileIOChannel.ID)} method.
+*
+* These {@link BufferFileWriter} objects will accept {@link 
BufferFileWriter#writeBlock(Object)}
+* requests but never actually perform any write operation (be sure to 
clean up the buffers
+* manually!).
+*/
+   private static class IOManagerAsyncWithStallingBufferFileWriter extends 
IOManagerAsync {
+   @Override
+   public BufferFileWriter createBufferFileWriter(FileIOChannel.ID 
channelID)
+   throws IOException {
+   BufferFileWriter bufferFileWriter = 
spy(super.createBufferFileWriter(channelID));
--- End diff --

In theory, you are right - in practice though (and that's what I tried 
first), `AsynchronousBufferFileWriter` cannot be extended from anywhere outside 
its package due to `WriteRequest` being package-private. Since the writer 
implementations I needed are not too generic, I did not want to promote them to 
this package (in the tests folder, of course) nor did I want to make 
`WriteRequest` public...hence Mockito.


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264629#comment-16264629
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152849820
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -300,6 +315,230 @@ public void 
testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
assertTrue(buffer.isRecycled());
}
 
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable 
finished partition.
+*/
+   @Test
+   public void testAddOnFinishedSpillablePartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   partition.finish();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
+   if (!buffer.isRecycled()) {
+   Assert.fail("buffer not recycled");
+   buffer.recycle();
+   }
+   // finish adds an EndOfPartitionEvent
+   assertEquals(1, partition.getTotalNumberOfBuffers());
+   assertEquals(4, partition.getTotalNumberOfBytes());
+   }
+   }
+
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled 
finished partition.
+*/
+   @Test
+   public void testAddOnFinishedSpilledPartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   assertEquals(0, partition.releaseMemory());
+   partition.finish();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
+   if (!buffer.isRecycled()) {
+   Assert.fail("buffer not recycled");
+   buffer.recycle();
+   }
+   // finish adds an EndOfPartitionEvent
+   assertEquals(1, partition.getTotalNumberOfBuffers());
+   assertEquals(4, partition.getTotalNumberOfBytes());
+   }
+   }
+
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable 
released partition.
+*/
+   @Test
+   public void testAddOnReleasedSpillablePartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   partition.release();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
+   if (!buffer.isRecycled()) {
+   Assert.fail("buffer not recycled");
+   buffer.recycle();
+   }
+   assertEquals(0, partition.getTotalNumberOfBuffers());
+   assertEquals(0, partition.getTotalNumberOfBytes());
+   }
+   }
+
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled 
released partition.
+*/
+   @Test
+   public void testAddOnReleasedSpilledPartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   partition.release();
+   assertEquals(0, partition.releaseMemory());
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
+   if (!buffer.isRecycled()) {
+   Assert.fail("buffer not recycled");
+   buffer.recycle();
+   }
+   assertEquals(0, partition.getTotalNumberOfBuffers());
+   assertEquals(0, partition.getTotalNumberOfBytes());
+   }
+   }
+
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled 
partition where adding the
+* write request fails with an exception.
+*/
+   @Test
+   public void testAddOnSpilledPartitionWithSlowWriter() 

[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264620#comment-16264620
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152849113
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -300,6 +315,230 @@ public void 
testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
assertTrue(buffer.isRecycled());
}
 
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable 
finished partition.
+*/
+   @Test
+   public void testAddOnFinishedSpillablePartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   partition.finish();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
--- End diff --

What do you mean? What failure / normal path? `partition.add()` should 
always succeed in this case, i.e. it does not throw.


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264605#comment-16264605
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152847273
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -300,6 +315,230 @@ public void 
testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
assertTrue(buffer.isRecycled());
}
 
+   /**
--- End diff --

you're right - some tests actually share most of their code. I'll extract a 
common test method to reduce some code.


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264593#comment-16264593
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152845922
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 ---
@@ -145,6 +165,45 @@ protected void testAddOnReleasedPartition(final 
ResultPartitionType pipelined)
}
}
 
+   @Test
+   public void testAddOnPipelinedPartition() throws Exception {
+   testAddOnPartition(ResultPartitionType.PIPELINED);
+   }
+
+   @Test
+   public void testAddOnBlockingPartition() throws Exception {
+   testAddOnPartition(ResultPartitionType.BLOCKING);
+   }
+
+   /**
+* Tests {@link ResultPartition#add} on a working partition.
+*
+* @param pipelined the result partition type to set up
+*/
+   protected void testAddOnPartition(final ResultPartitionType pipelined)
+   throws Exception {
+   ResultPartitionConsumableNotifier notifier = 
mock(ResultPartitionConsumableNotifier.class);
--- End diff --

do you think it's better to have a real implementation of that interface, 
spy on it, and then verify the expected method calls? - this actually seems 
like some more overhead with little/no gain. I'd prefer to leave it as is for 
now.


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264573#comment-16264573
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152841030
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ---
@@ -267,28 +268,29 @@ public ResultPartitionType getPartitionType() {
 * first buffer has been added.
 */
public void add(Buffer buffer, int subpartitionIndex) throws 
IOException {
-   boolean success = false;
+   checkNotNull(buffer);
 
try {
checkInProduceState();
+   } catch (Throwable t) {
+   buffer.recycle();
--- End diff --

Actually, a sanity check for double-recycle comes with #4613 for which I 
also needed this PR. It does, however, work differently and only checks that 
the reference counter does not go below 0 - I guess, this way we do not put too 
much pressure on the garbage collector compared to creating new Buffer 
instances for each `retain()`


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264559#comment-16264559
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152829734
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -320,4 +559,40 @@ void awaitNotifications(long 
awaitedNumNotifiedBuffers, long timeoutMillis) thro
}
}
}
+
+   /**
+* An {@link IOManagerAsync} that creates closed {@link 
BufferFileWriter} instances in its
+* {@link #createBufferFileWriter(FileIOChannel.ID)} method.
+*
+* These {@link BufferFileWriter} objects will thus throw an 
exception when trying to add
+* write requests, e.g. by calling {@link 
BufferFileWriter#writeBlock(Object)}.
+*/
+   private static class IOManagerAsyncWithClosedBufferFileWriter extends 
IOManagerAsync {
+   @Override
+   public BufferFileWriter createBufferFileWriter(FileIOChannel.ID 
channelID)
+   throws IOException {
+   BufferFileWriter bufferFileWriter = 
super.createBufferFileWriter(channelID);
+   bufferFileWriter.close();
+   return bufferFileWriter;
+   }
+   }
+
+   /**
+* An {@link IOManagerAsync} that creates stalling {@link 
BufferFileWriter} instances in its
+* {@link #createBufferFileWriter(FileIOChannel.ID)} method.
+*
+* These {@link BufferFileWriter} objects will accept {@link 
BufferFileWriter#writeBlock(Object)}
+* requests but never actually perform any write operation (be sure to 
clean up the buffers
+* manually!).
+*/
+   private static class IOManagerAsyncWithStallingBufferFileWriter extends 
IOManagerAsync {
+   @Override
+   public BufferFileWriter createBufferFileWriter(FileIOChannel.ID 
channelID)
+   throws IOException {
+   BufferFileWriter bufferFileWriter = 
spy(super.createBufferFileWriter(channelID));
--- End diff --

again, why mockito? It's terrible for debugging and completely fails with 
refactoring (like added new overloaded method 
`BlockChannelWriterWithCallback::writeBlock` or changing signatures). 
Especially that at the same time overloading returning here anonymous class 
overloading `AsynchronousBufferFileWriter`, that replaces `writeBlock` with 
empty method is just as easy :(


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264557#comment-16264557
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152826521
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -300,6 +315,230 @@ public void 
testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
assertTrue(buffer.isRecycled());
}
 
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable 
finished partition.
+*/
+   @Test
+   public void testAddOnFinishedSpillablePartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   partition.finish();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
--- End diff --

? Why does this test covers for both failure and normal paths? What if one 
of them never happen?


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264553#comment-16264553
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152826839
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -300,6 +315,230 @@ public void 
testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
assertTrue(buffer.isRecycled());
}
 
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable 
finished partition.
+*/
+   @Test
+   public void testAddOnFinishedSpillablePartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   partition.finish();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
+   if (!buffer.isRecycled()) {
+   Assert.fail("buffer not recycled");
+   buffer.recycle();
+   }
+   // finish adds an EndOfPartitionEvent
+   assertEquals(1, partition.getTotalNumberOfBuffers());
+   assertEquals(4, partition.getTotalNumberOfBytes());
+   }
+   }
+
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spilled 
finished partition.
+*/
+   @Test
+   public void testAddOnFinishedSpilledPartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   assertEquals(0, partition.releaseMemory());
+   partition.finish();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
+   if (!buffer.isRecycled()) {
+   Assert.fail("buffer not recycled");
+   buffer.recycle();
+   }
+   // finish adds an EndOfPartitionEvent
+   assertEquals(1, partition.getTotalNumberOfBuffers());
+   assertEquals(4, partition.getTotalNumberOfBytes());
+   }
+   }
+
+   /**
+* Tests {@link SpillableSubpartition#add(Buffer)} with a spillable 
released partition.
+*/
+   @Test
+   public void testAddOnReleasedSpillablePartition() throws Exception {
+   SpillableSubpartition partition = createSubpartition();
+   partition.release();
+
+   Buffer buffer = new 
Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+   FreeingBufferRecycler.INSTANCE);
+   try {
+   partition.add(buffer);
+   } finally {
--- End diff --

ditto (and below)


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264552#comment-16264552
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152826013
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
 ---
@@ -145,6 +165,45 @@ protected void testAddOnReleasedPartition(final 
ResultPartitionType pipelined)
}
}
 
+   @Test
+   public void testAddOnPipelinedPartition() throws Exception {
+   testAddOnPartition(ResultPartitionType.PIPELINED);
+   }
+
+   @Test
+   public void testAddOnBlockingPartition() throws Exception {
+   testAddOnPartition(ResultPartitionType.BLOCKING);
+   }
+
+   /**
+* Tests {@link ResultPartition#add} on a working partition.
+*
+* @param pipelined the result partition type to set up
+*/
+   protected void testAddOnPartition(final ResultPartitionType pipelined)
+   throws Exception {
+   ResultPartitionConsumableNotifier notifier = 
mock(ResultPartitionConsumableNotifier.class);
--- End diff --

https://imgflip.com/i/1zvsnt :(


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264558#comment-16264558
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152837725
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -300,6 +315,230 @@ public void 
testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
assertTrue(buffer.isRecycled());
}
 
+   /**
--- End diff --

could some of those tests be squashed into fewer methods? Or you think that 
wouldn't be a good idea?


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264556#comment-16264556
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152822778
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
 ---
@@ -95,20 +95,23 @@ public boolean add(Buffer buffer) throws IOException {
return false;
}
 
-   // The number of buffers are needed later when creating
-   // the read views. If you ever remove this line here,
-   // make sure to still count the number of buffers.
-   updateStatistics(buffer);
-
if (spillWriter == null) {
buffers.add(buffer);
+   // The number of buffers are needed later when 
creating
+   // the read views. If you ever remove this line 
here,
+   // make sure to still count the number of 
buffers.
--- End diff --

Is it tested somewhere? 


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264554#comment-16264554
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152825430
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
 ---
@@ -267,28 +268,29 @@ public ResultPartitionType getPartitionType() {
 * first buffer has been added.
 */
public void add(Buffer buffer, int subpartitionIndex) throws 
IOException {
-   boolean success = false;
+   checkNotNull(buffer);
 
try {
checkInProduceState();
+   } catch (Throwable t) {
+   buffer.recycle();
--- End diff --

I wonder if we should have some sanity illegal state detection for double 
recycling the buffers. For example each buffer could only be recycled once 
(protected by a private field in the Buffer `boolean wasRecycled`). Whenever 
you call `retain()`, you would get a new instance of the `Buffer`, pointing to 
the same memory, but with new flag (so that both original and retained buffers 
could be recycled independently, but each one of them only once).


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264401#comment-16264401
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4581
  
ok, I extracted two hotfix commits not directly related to this fix (one 
for the added test checks you found, another one for fixing additional test 
cases) and made a follow-up commit with the changes still required to 
completely fix this. I also extended the test coverage a lot now.


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264399#comment-16264399
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152813899
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -192,21 +198,39 @@ public void testConsumeSpilledPartition() throws 
Exception {
 
Buffer read = reader.getNextBuffer();
assertNotNull(read);
+   assertNotSame(buffer, read);
+   assertFalse(read.isRecycled());
read.recycle();
+   assertTrue(read.isRecycled());
 
read = reader.getNextBuffer();
assertNotNull(read);
+   assertNotSame(buffer, read);
+   assertFalse(read.isRecycled());
read.recycle();
+   assertTrue(read.isRecycled());
 
read = reader.getNextBuffer();
assertNotNull(read);
+   assertNotSame(buffer, read);
+   assertFalse(read.isRecycled());
read.recycle();
+   assertTrue(read.isRecycled());
 
// End of partition
read = reader.getNextBuffer();
assertNotNull(read);
assertEquals(EndOfPartitionEvent.class, 
EventSerializer.fromBuffer(read, 
ClassLoader.getSystemClassLoader()).getClass());
+   assertFalse(read.isRecycled());
read.recycle();
+   assertTrue(read.isRecycled());
+
+   // finally check that the buffer has been freed after a 
successful (or failed) write
--- End diff --

no - this is why I added numerous more tests now :) thanks for pointing 
this out


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264398#comment-16264398
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152813796
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -192,21 +198,39 @@ public void testConsumeSpilledPartition() throws 
Exception {
 
Buffer read = reader.getNextBuffer();
assertNotNull(read);
+   assertNotSame(buffer, read);
+   assertFalse(read.isRecycled());
read.recycle();
+   assertTrue(read.isRecycled());
 
read = reader.getNextBuffer();
assertNotNull(read);
+   assertNotSame(buffer, read);
+   assertFalse(read.isRecycled());
read.recycle();
+   assertTrue(read.isRecycled());
--- End diff --

alright...extracted those changes into a separate hotfix


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264394#comment-16264394
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152813605
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -192,21 +198,39 @@ public void testConsumeSpilledPartition() throws 
Exception {
 
Buffer read = reader.getNextBuffer();
assertNotNull(read);
+   assertNotSame(buffer, read);
+   assertFalse(read.isRecycled());
read.recycle();
+   assertTrue(read.isRecycled());
 
read = reader.getNextBuffer();
assertNotNull(read);
+   assertNotSame(buffer, read);
+   assertFalse(read.isRecycled());
read.recycle();
+   assertTrue(read.isRecycled());
 
read = reader.getNextBuffer();
assertNotNull(read);
+   assertNotSame(buffer, read);
+   assertFalse(read.isRecycled());
read.recycle();
+   assertTrue(read.isRecycled());
 
// End of partition
read = reader.getNextBuffer();
assertNotNull(read);
assertEquals(EndOfPartitionEvent.class, 
EventSerializer.fromBuffer(read, 
ClassLoader.getSystemClassLoader()).getClass());
+   assertFalse(read.isRecycled());
read.recycle();
+   assertTrue(read.isRecycled());
+
+   // finally check that the buffer has been freed after a 
successful (or failed) write
+   final long deadline = System.currentTimeMillis() + 30_000L; // 
30 secs
+   while (!buffer.isRecycled() && System.currentTimeMillis() < 
deadline) {
+   Thread.sleep(1);
+   }
+   assertTrue(buffer.isRecycled());
--- End diff --

No, it's not recycled in `partition.releaseMemory()` directly (or at least 
should not! - which is fixed now). The buffer will be recycled by the 
asynchronous writer thread once its content has been written to disk, i.e. 
after the `partition.releaseMemory()` call - I included such a check right 
before that call but actually, this is of limited use.


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16264395#comment-16264395
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152813679
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -192,21 +198,39 @@ public void testConsumeSpilledPartition() throws 
Exception {
 
--- End diff --

done


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261077#comment-16261077
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152342531
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 ---
@@ -108,11 +108,7 @@ int releaseMemory() throws IOException {
for (int i = 0; i < numBuffers; i++) {
Buffer buffer = buffers.remove();
spilledBytes += buffer.getSize();
-   try {
-   spillWriter.writeBlock(buffer);
-   } finally {
-   buffer.recycle();
-   }
+   spillWriter.writeBlock(buffer);
--- End diff --

Actually, if I see this correctly, here the original code is wrong since it 
is already recycling a buffer which was added to an asynchronous file write 
operation. This would lead to data corruption if the buffer is re-used in the 
meanwhile, wouldn't it?!


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16261075#comment-16261075
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user NicoK commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r152342364
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
 ---
@@ -31,9 +31,26 @@ protected AsynchronousBufferFileWriter(ID channelID, 
RequestQueue
super(channelID, requestQueue, CALLBACK, true);
}
 
+   /**
+* Writes the given block asynchronously.
+*
+* @param buffer
+*  the buffer to be written (will be recycled when done)
--- End diff --

good catch, but actually, `SpillableSubpartition` doesn't do any recycling 
itself: in its `finish()` method, it relies on the buffer being on-heap and 
then garbage-collected, for the `add()` function, it relies on the caller, i.e. 
`ResultPartition#add()` (which I also forgot to adapt).


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16235762#comment-16235762
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r148534322
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -192,21 +198,39 @@ public void testConsumeSpilledPartition() throws 
Exception {
 
--- End diff --

nit: I'm evil I know, but could you replace mock `listener` with `new 
AwaitableBufferAvailablityListener()` if I please very nicely?


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16235760#comment-16235760
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r148533217
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -192,21 +198,39 @@ public void testConsumeSpilledPartition() throws 
Exception {
 
Buffer read = reader.getNextBuffer();
assertNotNull(read);
+   assertNotSame(buffer, read);
+   assertFalse(read.isRecycled());
read.recycle();
+   assertTrue(read.isRecycled());
 
read = reader.getNextBuffer();
assertNotNull(read);
+   assertNotSame(buffer, read);
+   assertFalse(read.isRecycled());
read.recycle();
+   assertTrue(read.isRecycled());
 
read = reader.getNextBuffer();
assertNotNull(read);
+   assertNotSame(buffer, read);
+   assertFalse(read.isRecycled());
read.recycle();
+   assertTrue(read.isRecycled());
 
// End of partition
read = reader.getNextBuffer();
assertNotNull(read);
assertEquals(EndOfPartitionEvent.class, 
EventSerializer.fromBuffer(read, 
ClassLoader.getSystemClassLoader()).getClass());
+   assertFalse(read.isRecycled());
read.recycle();
+   assertTrue(read.isRecycled());
+
+   // finally check that the buffer has been freed after a 
successful (or failed) write
--- End diff --

Is this testing for failed write anywhere? Could this test catch previous 
error in double recycling? 


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16235764#comment-16235764
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r148531416
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
 ---
@@ -108,11 +108,7 @@ int releaseMemory() throws IOException {
for (int i = 0; i < numBuffers; i++) {
Buffer buffer = buffers.remove();
spilledBytes += buffer.getSize();
-   try {
-   spillWriter.writeBlock(buffer);
-   } finally {
-   buffer.recycle();
-   }
+   spillWriter.writeBlock(buffer);
--- End diff --

Here is the place where you depend on recycling which is not guaranteed by 
`spillWriter`'s interface.


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16235758#comment-16235758
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r148532507
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -192,21 +198,39 @@ public void testConsumeSpilledPartition() throws 
Exception {
 
Buffer read = reader.getNextBuffer();
assertNotNull(read);
+   assertNotSame(buffer, read);
+   assertFalse(read.isRecycled());
read.recycle();
+   assertTrue(read.isRecycled());
 
read = reader.getNextBuffer();
assertNotNull(read);
+   assertNotSame(buffer, read);
+   assertFalse(read.isRecycled());
read.recycle();
+   assertTrue(read.isRecycled());
 
read = reader.getNextBuffer();
assertNotNull(read);
+   assertNotSame(buffer, read);
+   assertFalse(read.isRecycled());
read.recycle();
+   assertTrue(read.isRecycled());
 
// End of partition
read = reader.getNextBuffer();
assertNotNull(read);
assertEquals(EndOfPartitionEvent.class, 
EventSerializer.fromBuffer(read, 
ClassLoader.getSystemClassLoader()).getClass());
+   assertFalse(read.isRecycled());
read.recycle();
+   assertTrue(read.isRecycled());
+
+   // finally check that the buffer has been freed after a 
successful (or failed) write
+   final long deadline = System.currentTimeMillis() + 30_000L; // 
30 secs
+   while (!buffer.isRecycled() && System.currentTimeMillis() < 
deadline) {
+   Thread.sleep(1);
+   }
+   assertTrue(buffer.isRecycled());
--- End diff --

Where is the last place in this test were you can add 
`assertFalse(buffer.isRecycled())`? I think placing it somewhere would help to 
understand what's going on and would be nice sanity check.

~~Isn't the `buffer` recycled in `partition.releaseMemory()` call far 
above?~~ yes it is ;)


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16235759#comment-16235759
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r148537550
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -231,31 +255,48 @@ public void 
testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
 
// Initial notification
assertEquals(1, listener.getNumNotifiedBuffers());
+   assertFalse(buffer.isRecycled());
 
Buffer read = reader.getNextBuffer();
-   assertNotNull(read);
+   assertSame(buffer, read);
read.recycle();
assertEquals(2, listener.getNumNotifiedBuffers());
+   assertFalse(buffer.isRecycled());
 
// Spill now
assertEquals(2, partition.releaseMemory());
+   assertFalse(buffer.isRecycled()); // still one in the reader!
 
listener.awaitNotifications(4, 30_000);
assertEquals(4, listener.getNumNotifiedBuffers());
 
read = reader.getNextBuffer();
-   assertNotNull(read);
+   assertSame(buffer, read);
read.recycle();
+   // now the buffer may be freed, depending on the timing of the 
write operation
+   // -> let's do this check at the end of the test (to save some 
time)
 
read = reader.getNextBuffer();
assertNotNull(read);
+   assertNotSame(buffer, read);
+   assertFalse(read.isRecycled());
read.recycle();
+   assertTrue(read.isRecycled());
 
// End of partition
read = reader.getNextBuffer();
assertNotNull(read);
assertEquals(EndOfPartitionEvent.class, 
EventSerializer.fromBuffer(read, 
ClassLoader.getSystemClassLoader()).getClass());
+   assertFalse(read.isRecycled());
read.recycle();
+   assertTrue(read.isRecycled());
+
+   // finally check that the buffer has been freed after a 
successful (or failed) write
--- End diff --

ditto failed write case and does this test have anything to do with fixed 
bug?


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16235763#comment-16235763
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r148531290
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
 ---
@@ -31,9 +31,26 @@ protected AsynchronousBufferFileWriter(ID channelID, 
RequestQueue
super(channelID, requestQueue, CALLBACK, true);
}
 
+   /**
+* Writes the given block asynchronously.
+*
+* @param buffer
+*  the buffer to be written (will be recycled when done)
--- End diff --

Shouldn't this contract be in `BlockChannelWriterWithCallback`? Now it's 
kind of strange that recycle is implementation dependent. Especially that 
`SpillableSubpartition` is using `BufferFileWriter` and still depends on 
recycling. 


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-11-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16235761#comment-16235761
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4581#discussion_r148532775
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
 ---
@@ -192,21 +198,39 @@ public void testConsumeSpilledPartition() throws 
Exception {
 
Buffer read = reader.getNextBuffer();
assertNotNull(read);
+   assertNotSame(buffer, read);
+   assertFalse(read.isRecycled());
read.recycle();
+   assertTrue(read.isRecycled());
 
read = reader.getNextBuffer();
assertNotNull(read);
+   assertNotSame(buffer, read);
+   assertFalse(read.isRecycled());
read.recycle();
+   assertTrue(read.isRecycled());
--- End diff --

Are those `read.isRecycled()` checks somehow related to this change/fix? Or 
just additional side hotfix? If the latter one, I would prefer to have them in 
separate commit because they are confusing me.


> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7499) double buffer release in SpillableSubpartitionView

2017-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16140073#comment-16140073
 ] 

ASF GitHub Bot commented on FLINK-7499:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/4581

[FLINK-7499][io] also let AsynchronousBufferFileWriter#writeBlock() recycle 
the buffer in case of failures

## What is the purpose of the change

`SpillableSubpartitionView#releaseMemory()` recycled the given buffer 
twice: once asynchronously after the write operation initiated via 
`AsynchronousBufferFileWriter#writeBlock()`and once in 
SpillableSubpartitionView#releaseMemory() after adding the write operation to 
the queue. Additionally, other uses of 
`AsynchronousBufferFileWriter#writeBlock()` did not cleanup in an error case 
which was also not done by `AsynchronousBufferFileWriter#writeBlock()` itself.

This PR changes the behaviour of 
`AsynchronousBufferFileWriter#writeBlock()` to always take care of releasing 
the buffer, even adding the (asynchronous) write operation failed.

## Brief change log

- let `AsynchronousBufferFileWriter#writeBlock()` take full recycling 
responsibility of the given buffer even in case of failures
- remove the additional `recycle` call in 
`SpillableSubpartitionView#releaseMemory()`
- adapt `SpillableSubpartitionTest` to find the duplicate `recycle()` calls

## Verifying this change

This change added further checks to `SpillableSubpartitionTest` to verify 
the intended behaviour

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (yes)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-7499

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4581.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4581


commit a986ebac2da4ac4ad00717e834fdc33f9fe9eb3a
Author: Nico Kruber 
Date:   2017-08-24T10:17:08Z

[FLINK-7499][io] also let AsynchronousBufferFileWriter#writeBlock() recycle 
the buffer in case of failures

This fixes a double-recycle in SpillableSubpartitionView and also makes sure
that even if adding the (asynchronous) write operation fails, the buffer is
properly freed in code that did not perform this cleanup. It avoids code
duplication of this cleanup and it is also more consistent to take over
responsibility of the given buffer even if an exception is thrown.




> double buffer release in SpillableSubpartitionView
> --
>
> Key: FLINK-7499
> URL: https://issues.apache.org/jira/browse/FLINK-7499
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.2.0, 1.1.4, 1.3.0, 1.1.5, 1.2.1, 1.2.2, 1.3.1, 1.4.0, 
> 1.3.2, 1.3.3
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> {{SpillableSubpartitionView#releaseMemory()}} recycles its buffers twice: 
> once asynchronously after the write operation and once in 
> {{SpillableSubpartitionView#releaseMemory()}} after adding the write 
> operation to the queue.
> 1) if {{SpillableSubpartitionView#releaseMemory()}} hits first and the buffer 
> is recycled, the memory region may already be reused despite the pending write
> 2) If, for some reason (probably only in tests like 
> {{SpillableSubpartitionTest#testConsumeSpillablePartitionSpilledDuringConsume()}}?),
>  the buffer is retained and to be used in parallel somewhere else it may also 
> not be available anymore or contain corrupt data.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)