This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ea4b391784df08661efc0474c9f0b18e17647fba
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sat Apr 10 00:43:37 2021 +0200

    [FLINK-21996][refactor] Make IteratorSourceReader work with multiple split 
requests.
    
    Previously, the IteratorSourceReader was designed to only ever be able to 
send a single split request.
    No second request could be handled, multiple splits could only be handled 
when they were restored from
    a checkpoint.
    
    That is both fragile and inextensible, because it doesn't support cases 
where more splits than subtasks are
    generated, and the contract can be broken with certain combinations of 
deployments, checkpoints, scaling, restoring.
    A variant that simply handles multiple splits when necessary is minimally 
more complex in the code, and
    a lot more flexible and robust.
---
 .../source/lib/util/IteratorSourceReader.java      | 80 ++++++++++++++--------
 .../source/lib/NumberSequenceSourceTest.java       |  6 +-
 2 files changed, 56 insertions(+), 30 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
index 4a5701e..801b87b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
@@ -34,7 +34,6 @@ import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A {@link SourceReader} that returns the values of an iterator, supplied via 
an {@link
@@ -57,7 +56,7 @@ public class IteratorSourceReader<
     private final SourceReaderContext context;
 
     /** The availability future. This reader is available as soon as a split 
is assigned. */
-    private final CompletableFuture<Void> availability;
+    private CompletableFuture<Void> availability;
 
     /**
      * The iterator producing data. Non-null after a split has been assigned. 
This field is null or
@@ -71,40 +70,67 @@ public class IteratorSourceReader<
      */
     @Nullable private SplitT currentSplit;
 
-    /** The remaining splits. Null means no splits have yet been assigned. */
-    @Nullable private Queue<SplitT> remainingSplits;
+    /** The remaining splits that were assigned but not yet processed. */
+    private final Queue<SplitT> remainingSplits;
+
+    private boolean noMoreSplits;
 
     public IteratorSourceReader(SourceReaderContext context) {
         this.context = checkNotNull(context);
         this.availability = new CompletableFuture<>();
+        this.remainingSplits = new ArrayDeque<>();
     }
 
     // ------------------------------------------------------------------------
 
     @Override
     public void start() {
-        // request a split only if we did not get one during restore
-        if (remainingSplits == null) {
+        // request a split if we don't have one
+        if (remainingSplits.isEmpty()) {
             context.sendSplitRequest();
         }
     }
 
     @Override
     public InputStatus pollNext(ReaderOutput<E> output) {
-        if (iterator != null && iterator.hasNext()) {
-            output.collect(iterator.next());
+        if (iterator != null) {
+            if (iterator.hasNext()) {
+                output.collect(iterator.next());
+                return InputStatus.MORE_AVAILABLE;
+            } else {
+                finishSplit();
+            }
+        }
+
+        return tryMoveToNextSplit();
+    }
+
+    private void finishSplit() {
+        iterator = null;
+        currentSplit = null;
+
+        // request another split if no other is left
+        // we do this only here in the finishSplit part to avoid requesting a 
split
+        // whenever the reader is polled and doesn't currently have a split
+        if (remainingSplits.isEmpty() && !noMoreSplits) {
+            context.sendSplitRequest();
+        }
+    }
+
+    private InputStatus tryMoveToNextSplit() {
+        currentSplit = remainingSplits.poll();
+        if (currentSplit != null) {
+            iterator = currentSplit.getIterator();
             return InputStatus.MORE_AVAILABLE;
-        } else if (remainingSplits == null) {
-            // nothing assigned yet, need to wait and come back when splits 
have been assigned
-            return InputStatus.NOTHING_AVAILABLE;
+        } else if (noMoreSplits) {
+            return InputStatus.END_OF_INPUT;
         } else {
-            currentSplit = remainingSplits.poll();
-            if (currentSplit != null) {
-                iterator = currentSplit.getIterator();
-                return pollNext(output);
-            } else {
-                return InputStatus.END_OF_INPUT;
+            // ensure we are not called in a loop by resetting the 
availability future
+            if (availability.isDone()) {
+                availability = new CompletableFuture<>();
             }
+
+            return InputStatus.NOTHING_AVAILABLE;
         }
     }
 
@@ -115,32 +141,28 @@ public class IteratorSourceReader<
 
     @Override
     public void addSplits(List<SplitT> splits) {
-        checkState(remainingSplits == null, "Cannot accept more than one split 
assignment");
-        remainingSplits = new ArrayDeque<>(splits);
-        availability.complete(null); // from now on we are always available
+        remainingSplits.addAll(splits);
+        // set availability so that pollNext is actually called
+        availability.complete(null);
     }
 
     @Override
     public void notifyNoMoreSplits() {
-        // if we get this after we already had a split, we must have requested 
more than
-        // one split, which is not expected here.
-        checkState(remainingSplits == null, "Unexpected response, requested 
more than one split.");
-
-        // non-null queue signals splits were assigned, in this case no splits
-        remainingSplits = new ArrayDeque<>();
-
+        noMoreSplits = true;
         // set availability so that pollNext is actually called
         availability.complete(null);
     }
 
     @Override
     public List<SplitT> snapshotState(long checkpointId) {
-        if (remainingSplits == null) {
-            // no assignment yet
+        if (currentSplit == null && remainingSplits.isEmpty()) {
             return Collections.emptyList();
         }
+
         final ArrayList<SplitT> allSplits = new ArrayList<>(1 + 
remainingSplits.size());
         if (iterator != null && iterator.hasNext()) {
+            assert currentSplit != null;
+
             @SuppressWarnings("unchecked")
             final SplitT inProgressSplit =
                     (SplitT) currentSplit.getUpdatedSplitForIterator(iterator);
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
index f497150..02e2a7f 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceTest.java
@@ -66,7 +66,11 @@ public class NumberSequenceSourceTest {
 
                 // re-create and restore
                 reader = createReader();
-                reader.addSplits(splits);
+                if (splits.isEmpty()) {
+                    reader.notifyNoMoreSplits();
+                } else {
+                    reader.addSplits(splits);
+                }
             }
         }
 

Reply via email to