xintongsong commented on code in PR #21209: URL: https://github.com/apache/flink/pull/21209#discussion_r1014058947
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategy.java: ########## @@ -128,28 +128,32 @@ private void checkRelease( return; } - int survivedNum = (int) (poolSize - poolSize * releaseBufferRatio); + int releaseNum = (int) (poolSize * releaseBufferRatio); int numSubpartitions = spillingInfoProvider.getNumSubpartitions(); - int subpartitionSurvivedNum = survivedNum / numSubpartitions; - + int expectedSubpartitionReleaseNum = releaseNum / numSubpartitions; TreeMap<Integer, Deque<BufferIndexAndChannel>> bufferToRelease = new TreeMap<>(); for (int subpartitionId = 0; subpartitionId < numSubpartitions; subpartitionId++) { Deque<BufferIndexAndChannel> buffersInOrder = spillingInfoProvider.getBuffersInOrder( subpartitionId, SpillStatus.SPILL, ConsumeStatusWithId.ALL_ANY); - // if the number of subpartition buffers less than survived buffers, reserved all of - // them. - int releaseNum = Math.max(0, buffersInOrder.size() - subpartitionSurvivedNum); - while (releaseNum-- != 0) { + // if the number of subpartition spilling buffers less than expected release number, + // release all of them. + int subpartitionReleaseNum = + Math.min(buffersInOrder.size(), expectedSubpartitionReleaseNum); + int subpartitionSurvivedNum = buffersInOrder.size() - subpartitionReleaseNum; + while (subpartitionSurvivedNum-- != 0) { buffersInOrder.pollLast(); } bufferToRelease.put(subpartitionId, buffersInOrder); } // collect results in order for (int i = 0; i < numSubpartitions; i++) { - builder.addBufferToRelease(i, bufferToRelease.getOrDefault(i, new ArrayDeque<>())); + Deque<BufferIndexAndChannel> bufferIndexAndChannels = bufferToRelease.get(i); + if (bufferIndexAndChannels != null && !bufferIndexAndChannels.isEmpty()) { + builder.addBufferToRelease(i, bufferToRelease.getOrDefault(i, new ArrayDeque<>())); + } Review Comment: I don't quite get the differences between calculating the survive vs. release numbers. What's the purpose? It seems before this change, there's a bug that we are releasing number of survive buffers. ########## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java: ########## @@ -149,9 +147,9 @@ void testDecideActionWithGlobalInfo() { Map<Integer, List<BufferIndexAndChannel>> expectedReleaseBuffers = new HashMap<>(); expectedReleaseBuffers.put( - subpartition1, new ArrayList<>(subpartitionBuffers1.subList(0, 2))); + subpartition1, new ArrayList<>(subpartitionBuffers1.subList(0, 3))); expectedReleaseBuffers.put( - subpartition2, new ArrayList<>(subpartitionBuffers2.subList(1, 3))); + subpartition2, new ArrayList<>(subpartitionBuffers2.subList(1, 4))); Review Comment: And please check `createBufferIndexAndChannelsList`. It's creating memory segment and network buffer for nothing. ########## flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFullSpillingStrategyTest.java: ########## @@ -149,9 +147,9 @@ void testDecideActionWithGlobalInfo() { Map<Integer, List<BufferIndexAndChannel>> expectedReleaseBuffers = new HashMap<>(); expectedReleaseBuffers.put( - subpartition1, new ArrayList<>(subpartitionBuffers1.subList(0, 2))); + subpartition1, new ArrayList<>(subpartitionBuffers1.subList(0, 3))); expectedReleaseBuffers.put( - subpartition2, new ArrayList<>(subpartitionBuffers2.subList(1, 3))); + subpartition2, new ArrayList<>(subpartitionBuffers2.subList(1, 4))); Review Comment: For this test case, I think we no longer need the progresses. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org