xintongsong commented on code in PR #21209:
URL: https://github.com/apache/flink/pull/21209#discussion_r1014058947


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java:
##########
@@ -128,28 +128,32 @@ private void checkRelease(
             return;
         }
 
-        int survivedNum = (int) (poolSize - poolSize * releaseBufferRatio);
+        int releaseNum = (int) (poolSize * releaseBufferRatio);
         int numSubpartitions = spillingInfoProvider.getNumSubpartitions();
-        int subpartitionSurvivedNum = survivedNum / numSubpartitions;
-
+        int expectedSubpartitionReleaseNum = releaseNum / numSubpartitions;
         TreeMap<Integer, Deque<BufferIndexAndChannel>> bufferToRelease = new 
TreeMap<>();
 
         for (int subpartitionId = 0; subpartitionId < numSubpartitions; 
subpartitionId++) {
             Deque<BufferIndexAndChannel> buffersInOrder =
                     spillingInfoProvider.getBuffersInOrder(
                             subpartitionId, SpillStatus.SPILL, 
ConsumeStatusWithId.ALL_ANY);
-            // if the number of subpartition buffers less than survived 
buffers, reserved all of
-            // them.
-            int releaseNum = Math.max(0, buffersInOrder.size() - 
subpartitionSurvivedNum);
-            while (releaseNum-- != 0) {
+            // if the number of subpartition spilling buffers less than 
expected release number,
+            // release all of  them.
+            int subpartitionReleaseNum =
+                    Math.min(buffersInOrder.size(), 
expectedSubpartitionReleaseNum);
+            int subpartitionSurvivedNum = buffersInOrder.size() - 
subpartitionReleaseNum;
+            while (subpartitionSurvivedNum-- != 0) {
                 buffersInOrder.pollLast();
             }
             bufferToRelease.put(subpartitionId, buffersInOrder);
         }
 
         // collect results in order
         for (int i = 0; i < numSubpartitions; i++) {
-            builder.addBufferToRelease(i, bufferToRelease.getOrDefault(i, new 
ArrayDeque<>()));
+            Deque<BufferIndexAndChannel> bufferIndexAndChannels = 
bufferToRelease.get(i);
+            if (bufferIndexAndChannels != null && 
!bufferIndexAndChannels.isEmpty()) {
+                builder.addBufferToRelease(i, bufferToRelease.getOrDefault(i, 
new ArrayDeque<>()));
+            }

Review Comment:
   I don't quite get the differences between calculating the survive vs. 
release numbers. What's the purpose?
   
   It seems before this change, there's a bug that we are releasing number of 
survive buffers.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java:
##########
@@ -149,9 +147,9 @@ void testDecideActionWithGlobalInfo() {
 
         Map<Integer, List<BufferIndexAndChannel>> expectedReleaseBuffers = new 
HashMap<>();
         expectedReleaseBuffers.put(
-                subpartition1, new ArrayList<>(subpartitionBuffers1.subList(0, 
2)));
+                subpartition1, new ArrayList<>(subpartitionBuffers1.subList(0, 
3)));
         expectedReleaseBuffers.put(
-                subpartition2, new ArrayList<>(subpartitionBuffers2.subList(1, 
3)));
+                subpartition2, new ArrayList<>(subpartitionBuffers2.subList(1, 
4)));

Review Comment:
   And please check `createBufferIndexAndChannelsList`. It's creating memory 
segment and network buffer for nothing.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java:
##########
@@ -149,9 +147,9 @@ void testDecideActionWithGlobalInfo() {
 
         Map<Integer, List<BufferIndexAndChannel>> expectedReleaseBuffers = new 
HashMap<>();
         expectedReleaseBuffers.put(
-                subpartition1, new ArrayList<>(subpartitionBuffers1.subList(0, 
2)));
+                subpartition1, new ArrayList<>(subpartitionBuffers1.subList(0, 
3)));
         expectedReleaseBuffers.put(
-                subpartition2, new ArrayList<>(subpartitionBuffers2.subList(1, 
3)));
+                subpartition2, new ArrayList<>(subpartitionBuffers2.subList(1, 
4)));

Review Comment:
   For this test case, I think we no longer need the progresses.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to