Cai Liuyang created FLINK-25664: ----------------------------------- Summary: Notify will be not triggered for PipelinedSubpartition if more than one buffer is added during isBlocked == true Key: FLINK-25664 URL: https://issues.apache.org/jira/browse/FLINK-25664 Project: Flink Issue Type: Bug Reporter: Cai Liuyang
For now, there might be case like: # PipelinedSubPartition only have one aligned-chk-barried-buffer (isBlocked == false) # CreditBasedSequenceNumberingViewReader pool this buffer and PipelinedSubPartition become to Blocked (isBlocked == true) # Before downStream resumeConsumption, we add two finished-buffer to this PipelinedSubPartition (there is no limit for adding buffer to blocked-PipelinedSubPartition) ## add the first finished-buffer will not notifyDataAvailable because isBlocked == true ## add the second finished-buffer will also not notifyDataAvailable because of isBlocked == true and finishedBuffer > 1 # DownStream resumeConsumption, PipelinedSubPartition is unblocked (isBlocked == false) # OutputFlusher call PipelinedSubPartition will not notifyDataAvailable because of finishedBuffer > 1 In conclusion,There are three case we should trigger notifyDataAvailable: case1: only have one finished buffer (handled by add) case2: only have one unfinished buffer (handled by flush) case3: have more than on finished buffer, which is add during PipelinedSubPartition is blocked (not handled) {code:java} // test code for this case // add this test case to org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionWithReadViewTest will @Test public void testBlockedByCheckpointAndAddTwoDataBufferBeforeResumeConsumption() throws Exception { blockSubpartitionByCheckpoint(1); subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE)); subpartition.add(createFilledFinishedBufferConsumer(BUFFER_SIZE)); assertEquals(1, availablityListener.getNumNotifications()); readView.resumeConsumption(); subpartition.flush(); assertEquals(2, availablityListener.getNumNotifications()); } {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)