[GitHub] [flink] reswqa commented on a diff in pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased

2023-04-14 Thread via GitHub


reswqa commented on code in PR #22381:
URL: https://github.com/apache/flink/pull/22381#discussion_r1166961006


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##
@@ -766,13 +776,12 @@ private void returnExcessMemorySegments() {
 
 @GuardedBy("availableMemorySegments")
 private boolean hasExcessBuffers() {
-return numberOfRequestedOverdraftMemorySegments > 0
-|| numberOfRequestedMemorySegments > currentPoolSize;
+return numberOfRequestedOverdraftMemorySegments > 0;
 }
 
 @GuardedBy("availableMemorySegments")
 private boolean isRequestedSizeReached() {
-return numberOfRequestedMemorySegments >= currentPoolSize;
+return numberOfRequestedMemorySegments == currentPoolSize;

Review Comment:
   > In my opinion, the rule of thumb for concurrent code looks something like 
this: "If the value physically can be greater than the border value we should 
also compare it with the border as >= even if logically it can not be greater".
   
   This rule really make sense to me and thanks for telling me about this! 
Maybe I was a little too radical before. After all, the world full of 
concurrency is not safe at all. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased

2023-04-14 Thread via GitHub


reswqa commented on code in PR #22381:
URL: https://github.com/apache/flink/pull/22381#discussion_r1166952794


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##
@@ -766,13 +776,12 @@ private void returnExcessMemorySegments() {
 
 @GuardedBy("availableMemorySegments")
 private boolean hasExcessBuffers() {
-return numberOfRequestedOverdraftMemorySegments > 0
-|| numberOfRequestedMemorySegments > currentPoolSize;
+return numberOfRequestedOverdraftMemorySegments > 0;

Review Comment:
   > we can make this change as soon as we get rid of 
numberOfRequestedOverdraftMemorySegments(if we will do it at all).
   
   Yes, I'm sure we'll be removing `numberOfRequestedOverdraftMemorySegments` 
soon as keeping it would complicate the contract a bit.
   
   I agree that it's easy to introduce some hard-to-find bugs, especially since 
we'll be refactoring it soon. I will revert the change here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased

2023-04-13 Thread via GitHub


reswqa commented on code in PR #22381:
URL: https://github.com/apache/flink/pull/22381#discussion_r1165117526


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##
@@ -255,9 +253,38 @@ void testRecycleAfterDestroy() {
 void testDecreasePoolSize() throws Exception {
 final int maxMemorySegments = 10;
 final int requiredMemorySegments = 4;
-final int maxOverdraftBuffers = 2;
-final int largePoolSize = 5;
-final int smallPoolSize = 4;
+
+// requested buffers is equal to small pool size.
+testDecreasePoolSizeInternal(
+maxMemorySegments, requiredMemorySegments, 7, 5, 2, 5, 0, 5, 
0);
+// requested buffers is less than small pool size.
+testDecreasePoolSizeInternal(
+maxMemorySegments, requiredMemorySegments, 6, 4, 2, 2, 0, 3, 
1);
+// exceed buffers is equal to maxOverdraftBuffers
+testDecreasePoolSizeInternal(
+maxMemorySegments, requiredMemorySegments, 7, 5, 2, 7, 2, 5, 
0);
+// exceed buffers is greater than maxOverdraftBuffers
+testDecreasePoolSizeInternal(
+maxMemorySegments, requiredMemorySegments, 9, 5, 3, 9, 4, 5, 
0);
+// exceed buffers is less than maxOverdraftBuffers
+testDecreasePoolSizeInternal(
+maxMemorySegments, requiredMemorySegments, 7, 5, 4, 7, 2, 5, 
0);
+// decrease pool size with overdraft buffer.
+testDecreasePoolSizeInternal(
+maxMemorySegments, requiredMemorySegments, 7, 5, 6, 9, 4, 5, 
0);
+}
+
+void testDecreasePoolSizeInternal(
+int maxMemorySegments,
+int requiredMemorySegments,
+int largePoolSize,
+int smallPoolSize,
+int maxOverdraftBuffers,
+int numBuffersToRequest,
+int numOverdraftBuffersAfterDecreasePoolSize,
+int numRequestedBuffersAfterDecreasePoolSize,

Review Comment:
   Good suggestions, I have renamed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased

2023-04-13 Thread via GitHub


reswqa commented on code in PR #22381:
URL: https://github.com/apache/flink/pull/22381#discussion_r1165111871


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java:
##
@@ -255,9 +253,38 @@ void testRecycleAfterDestroy() {
 void testDecreasePoolSize() throws Exception {
 final int maxMemorySegments = 10;
 final int requiredMemorySegments = 4;
-final int maxOverdraftBuffers = 2;
-final int largePoolSize = 5;
-final int smallPoolSize = 4;
+
+// requested buffers is equal to small pool size.
+testDecreasePoolSizeInternal(
+maxMemorySegments, requiredMemorySegments, 7, 5, 2, 5, 0, 5, 
0);
+// requested buffers is less than small pool size.
+testDecreasePoolSizeInternal(
+maxMemorySegments, requiredMemorySegments, 6, 4, 2, 2, 0, 3, 
1);
+// exceed buffers is equal to maxOverdraftBuffers
+testDecreasePoolSizeInternal(
+maxMemorySegments, requiredMemorySegments, 7, 5, 2, 7, 2, 5, 
0);
+// exceed buffers is greater than maxOverdraftBuffers
+testDecreasePoolSizeInternal(
+maxMemorySegments, requiredMemorySegments, 9, 5, 3, 9, 4, 5, 
0);
+// exceed buffers is less than maxOverdraftBuffers
+testDecreasePoolSizeInternal(
+maxMemorySegments, requiredMemorySegments, 7, 5, 4, 7, 2, 5, 
0);
+// decrease pool size with overdraft buffer.
+testDecreasePoolSizeInternal(
+maxMemorySegments, requiredMemorySegments, 7, 5, 6, 9, 4, 5, 
0);

Review Comment:
   The difference between the two test case is that the latter one already 
holds the `overdraft buffer` before the `poolSize` is changed. IMO, it is 
better to cover this situation in the unit tests. Of course, there is no major 
problem in removing this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased

2023-04-13 Thread via GitHub


reswqa commented on code in PR #22381:
URL: https://github.com/apache/flink/pull/22381#discussion_r1165105388


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##
@@ -766,13 +776,12 @@ private void returnExcessMemorySegments() {
 
 @GuardedBy("availableMemorySegments")
 private boolean hasExcessBuffers() {
-return numberOfRequestedOverdraftMemorySegments > 0
-|| numberOfRequestedMemorySegments > currentPoolSize;
+return numberOfRequestedOverdraftMemorySegments > 0;

Review Comment:
   > I prefer inviting more experts to review this PR, it will be more reliable.
   
   Sure, It will be better~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased

2023-04-12 Thread via GitHub


reswqa commented on code in PR #22381:
URL: https://github.com/apache/flink/pull/22381#discussion_r1164941748


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##
@@ -49,7 +49,11 @@
  *
  * The size of this pool can be dynamically changed at runtime ({@link 
#setNumBuffers(int)}. It
  * will then lazily return the required number of buffers to the {@link 
NetworkBufferPool} to match
- * its new size.
+ * its new size. New buffers can be requested only when {@code 
numberOfRequestedMemorySegments +
+ * numberOfRequestedOverdraftMemorySegments < currentPoolSize + 
maxOverdraftBuffersPerGate}. In

Review Comment:
   Yes, good catch! I was stupid when typing the commit message. 藍 
   
   In fact, maybe we can never guarantee that `the total number of requested 
buffers(requested + overdraft) does not exceed poolSize + 
maxOverdraftBuffersPerGate`, because we will not force buffers to return when 
the pool size changes, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased

2023-04-12 Thread via GitHub


reswqa commented on code in PR #22381:
URL: https://github.com/apache/flink/pull/22381#discussion_r1164959231


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##
@@ -766,13 +776,12 @@ private void returnExcessMemorySegments() {
 
 @GuardedBy("availableMemorySegments")
 private boolean hasExcessBuffers() {
-return numberOfRequestedOverdraftMemorySegments > 0
-|| numberOfRequestedMemorySegments > currentPoolSize;
+return numberOfRequestedOverdraftMemorySegments > 0;

Review Comment:
   IIUC, If we allow all buffers that exceed poolSize to be `overdraft`, then 
`numberOfRequestedMemorySegments > currentPoolSize` will no happen, right? 
@1996fanrui can you help me to double confirm this, thx~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased

2023-04-12 Thread via GitHub


reswqa commented on code in PR #22381:
URL: https://github.com/apache/flink/pull/22381#discussion_r1164944912


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java:
##
@@ -671,13 +675,20 @@ public void setNumBuffers(int numBuffers) {
 
 currentPoolSize = Math.min(numBuffers, maxNumberOfMemorySegments);
 
-// reset overdraft buffers
+// If pool size increases, try to convert overdraft buffer to 
ordinary buffer.
 while (numberOfRequestedOverdraftMemorySegments > 0
 && numberOfRequestedMemorySegments < currentPoolSize) {
 numberOfRequestedOverdraftMemorySegments--;
 numberOfRequestedMemorySegments++;
 }
 
+// If pool size decreases, try to convert ordinary buffer to 
overdraft buffer.
+while (numberOfRequestedMemorySegments > currentPoolSize
+&& numberOfRequestedOverdraftMemorySegments < 
maxOverdraftBuffersPerGate) {
+numberOfRequestedMemorySegments--;
+numberOfRequestedOverdraftMemorySegments++;
+}

Review Comment:
   > From the discussion of 
[FLINK-31610](https://issues.apache.org/jira/browse/FLINK-31610), this 
restriction(numberOfRequestedOverdraftMemorySegments <= 
maxOverdraftBuffersPerGate) will be broken in 
[FLINK-31764](https://issues.apache.org/jira/browse/FLINK-31764), right?
   
   Yep, you are absolutely right.
   
   > If so, I prefer 
[FLINK-31764](https://issues.apache.org/jira/browse/FLINK-31764) only refactor 
the code, not change the overdraft strategy or behavior, and we change the 
strategy in this PR, WDYT?
   
   Change the semantic/behavior in this PR also make sense to me. Let's update 
FLINK-31764 to only simply remove the redundant field.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org