[GitHub] [flink] reswqa commented on a diff in pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased
reswqa commented on code in PR #22381: URL: https://github.com/apache/flink/pull/22381#discussion_r1166961006 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java: ## @@ -766,13 +776,12 @@ private void returnExcessMemorySegments() { @GuardedBy("availableMemorySegments") private boolean hasExcessBuffers() { -return numberOfRequestedOverdraftMemorySegments > 0 -|| numberOfRequestedMemorySegments > currentPoolSize; +return numberOfRequestedOverdraftMemorySegments > 0; } @GuardedBy("availableMemorySegments") private boolean isRequestedSizeReached() { -return numberOfRequestedMemorySegments >= currentPoolSize; +return numberOfRequestedMemorySegments == currentPoolSize; Review Comment: > In my opinion, the rule of thumb for concurrent code looks something like this: "If the value physically can be greater than the border value we should also compare it with the border as >= even if logically it can not be greater". This rule really make sense to me and thanks for telling me about this! Maybe I was a little too radical before. After all, the world full of concurrency is not safe at all. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased
reswqa commented on code in PR #22381: URL: https://github.com/apache/flink/pull/22381#discussion_r1166952794 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java: ## @@ -766,13 +776,12 @@ private void returnExcessMemorySegments() { @GuardedBy("availableMemorySegments") private boolean hasExcessBuffers() { -return numberOfRequestedOverdraftMemorySegments > 0 -|| numberOfRequestedMemorySegments > currentPoolSize; +return numberOfRequestedOverdraftMemorySegments > 0; Review Comment: > we can make this change as soon as we get rid of numberOfRequestedOverdraftMemorySegments(if we will do it at all). Yes, I'm sure we'll be removing `numberOfRequestedOverdraftMemorySegments` soon as keeping it would complicate the contract a bit. I agree that it's easy to introduce some hard-to-find bugs, especially since we'll be refactoring it soon. I will revert the change here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased
reswqa commented on code in PR #22381: URL: https://github.com/apache/flink/pull/22381#discussion_r1165117526 ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java: ## @@ -255,9 +253,38 @@ void testRecycleAfterDestroy() { void testDecreasePoolSize() throws Exception { final int maxMemorySegments = 10; final int requiredMemorySegments = 4; -final int maxOverdraftBuffers = 2; -final int largePoolSize = 5; -final int smallPoolSize = 4; + +// requested buffers is equal to small pool size. +testDecreasePoolSizeInternal( +maxMemorySegments, requiredMemorySegments, 7, 5, 2, 5, 0, 5, 0); +// requested buffers is less than small pool size. +testDecreasePoolSizeInternal( +maxMemorySegments, requiredMemorySegments, 6, 4, 2, 2, 0, 3, 1); +// exceed buffers is equal to maxOverdraftBuffers +testDecreasePoolSizeInternal( +maxMemorySegments, requiredMemorySegments, 7, 5, 2, 7, 2, 5, 0); +// exceed buffers is greater than maxOverdraftBuffers +testDecreasePoolSizeInternal( +maxMemorySegments, requiredMemorySegments, 9, 5, 3, 9, 4, 5, 0); +// exceed buffers is less than maxOverdraftBuffers +testDecreasePoolSizeInternal( +maxMemorySegments, requiredMemorySegments, 7, 5, 4, 7, 2, 5, 0); +// decrease pool size with overdraft buffer. +testDecreasePoolSizeInternal( +maxMemorySegments, requiredMemorySegments, 7, 5, 6, 9, 4, 5, 0); +} + +void testDecreasePoolSizeInternal( +int maxMemorySegments, +int requiredMemorySegments, +int largePoolSize, +int smallPoolSize, +int maxOverdraftBuffers, +int numBuffersToRequest, +int numOverdraftBuffersAfterDecreasePoolSize, +int numRequestedBuffersAfterDecreasePoolSize, Review Comment: Good suggestions, I have renamed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased
reswqa commented on code in PR #22381: URL: https://github.com/apache/flink/pull/22381#discussion_r1165111871 ## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java: ## @@ -255,9 +253,38 @@ void testRecycleAfterDestroy() { void testDecreasePoolSize() throws Exception { final int maxMemorySegments = 10; final int requiredMemorySegments = 4; -final int maxOverdraftBuffers = 2; -final int largePoolSize = 5; -final int smallPoolSize = 4; + +// requested buffers is equal to small pool size. +testDecreasePoolSizeInternal( +maxMemorySegments, requiredMemorySegments, 7, 5, 2, 5, 0, 5, 0); +// requested buffers is less than small pool size. +testDecreasePoolSizeInternal( +maxMemorySegments, requiredMemorySegments, 6, 4, 2, 2, 0, 3, 1); +// exceed buffers is equal to maxOverdraftBuffers +testDecreasePoolSizeInternal( +maxMemorySegments, requiredMemorySegments, 7, 5, 2, 7, 2, 5, 0); +// exceed buffers is greater than maxOverdraftBuffers +testDecreasePoolSizeInternal( +maxMemorySegments, requiredMemorySegments, 9, 5, 3, 9, 4, 5, 0); +// exceed buffers is less than maxOverdraftBuffers +testDecreasePoolSizeInternal( +maxMemorySegments, requiredMemorySegments, 7, 5, 4, 7, 2, 5, 0); +// decrease pool size with overdraft buffer. +testDecreasePoolSizeInternal( +maxMemorySegments, requiredMemorySegments, 7, 5, 6, 9, 4, 5, 0); Review Comment: The difference between the two test case is that the latter one already holds the `overdraft buffer` before the `poolSize` is changed. IMO, it is better to cover this situation in the unit tests. Of course, there is no major problem in removing this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased
reswqa commented on code in PR #22381: URL: https://github.com/apache/flink/pull/22381#discussion_r1165105388 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java: ## @@ -766,13 +776,12 @@ private void returnExcessMemorySegments() { @GuardedBy("availableMemorySegments") private boolean hasExcessBuffers() { -return numberOfRequestedOverdraftMemorySegments > 0 -|| numberOfRequestedMemorySegments > currentPoolSize; +return numberOfRequestedOverdraftMemorySegments > 0; Review Comment: > I prefer inviting more experts to review this PR, it will be more reliable. Sure, It will be better~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased
reswqa commented on code in PR #22381: URL: https://github.com/apache/flink/pull/22381#discussion_r1164941748 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java: ## @@ -49,7 +49,11 @@ * * The size of this pool can be dynamically changed at runtime ({@link #setNumBuffers(int)}. It * will then lazily return the required number of buffers to the {@link NetworkBufferPool} to match - * its new size. + * its new size. New buffers can be requested only when {@code numberOfRequestedMemorySegments + + * numberOfRequestedOverdraftMemorySegments < currentPoolSize + maxOverdraftBuffersPerGate}. In Review Comment: Yes, good catch! I was stupid when typing the commit message. 藍 In fact, maybe we can never guarantee that `the total number of requested buffers(requested + overdraft) does not exceed poolSize + maxOverdraftBuffersPerGate`, because we will not force buffers to return when the pool size changes, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased
reswqa commented on code in PR #22381: URL: https://github.com/apache/flink/pull/22381#discussion_r1164959231 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java: ## @@ -766,13 +776,12 @@ private void returnExcessMemorySegments() { @GuardedBy("availableMemorySegments") private boolean hasExcessBuffers() { -return numberOfRequestedOverdraftMemorySegments > 0 -|| numberOfRequestedMemorySegments > currentPoolSize; +return numberOfRequestedOverdraftMemorySegments > 0; Review Comment: IIUC, If we allow all buffers that exceed poolSize to be `overdraft`, then `numberOfRequestedMemorySegments > currentPoolSize` will no happen, right? @1996fanrui can you help me to double confirm this, thx~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink] reswqa commented on a diff in pull request #22381: [FLINK-31763][runtime] Convert requested buffers to overdraft buffers when pool size is decreased
reswqa commented on code in PR #22381: URL: https://github.com/apache/flink/pull/22381#discussion_r1164944912 ## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java: ## @@ -671,13 +675,20 @@ public void setNumBuffers(int numBuffers) { currentPoolSize = Math.min(numBuffers, maxNumberOfMemorySegments); -// reset overdraft buffers +// If pool size increases, try to convert overdraft buffer to ordinary buffer. while (numberOfRequestedOverdraftMemorySegments > 0 && numberOfRequestedMemorySegments < currentPoolSize) { numberOfRequestedOverdraftMemorySegments--; numberOfRequestedMemorySegments++; } +// If pool size decreases, try to convert ordinary buffer to overdraft buffer. +while (numberOfRequestedMemorySegments > currentPoolSize +&& numberOfRequestedOverdraftMemorySegments < maxOverdraftBuffersPerGate) { +numberOfRequestedMemorySegments--; +numberOfRequestedOverdraftMemorySegments++; +} Review Comment: > From the discussion of [FLINK-31610](https://issues.apache.org/jira/browse/FLINK-31610), this restriction(numberOfRequestedOverdraftMemorySegments <= maxOverdraftBuffersPerGate) will be broken in [FLINK-31764](https://issues.apache.org/jira/browse/FLINK-31764), right? Yep, you are absolutely right. > If so, I prefer [FLINK-31764](https://issues.apache.org/jira/browse/FLINK-31764) only refactor the code, not change the overdraft strategy or behavior, and we change the strategy in this PR, WDYT? Change the semantic/behavior in this PR also make sense to me. Let's update FLINK-31764 to only simply remove the redundant field. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org