This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new fdcf730  [FLINK-22133][core] Add checkpointID to 
'SplitEnumerator.snapshotState()'
fdcf730 is described below

commit fdcf730ce106cfe0050593a3ddc068a8b7222e94
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Apr 19 17:32:00 2021 +0200

    [FLINK-22133][core] Add checkpointID to 'SplitEnumerator.snapshotState()'
    
    This closes #15677
---
 .../base/source/reader/mocks/MockSplitEnumerator.java  |  2 +-
 .../file/src/impl/ContinuousFileSplitEnumerator.java   |  3 ++-
 .../file/src/impl/StaticFileSplitEnumerator.java       |  2 +-
 .../src/impl/ContinuousFileSplitEnumeratorTest.java    |  6 +++---
 .../file/src/impl/StaticFileSplitEnumeratorTest.java   |  6 +++---
 .../connectors/hive/ContinuousHiveSplitEnumerator.java |  3 ++-
 .../kafka/source/enumerator/KafkaSourceEnumerator.java |  2 +-
 .../kafka/source/enumerator/KafkaEnumeratorTest.java   |  4 ++--
 .../flink/api/connector/source/SplitEnumerator.java    | 18 ++++++++++++++++--
 .../source/lib/util/IteratorSourceEnumerator.java      |  2 +-
 .../connector/source/mocks/MockSplitEnumerator.java    |  2 +-
 .../runtime/source/coordinator/SourceCoordinator.java  |  9 +++++----
 .../source/coordinator/SourceCoordinatorTest.java      |  4 ++--
 .../source/coordinator/TestingSplitEnumerator.java     |  2 +-
 .../OperatorEventSendingCheckpointITCase.java          |  4 ++--
 .../checkpointing/UnalignedCheckpointTestBase.java     |  2 +-
 16 files changed, 44 insertions(+), 27 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
index 188568c..fe8a675 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
@@ -77,7 +77,7 @@ public class MockSplitEnumerator
     }
 
     @Override
-    public List<MockSourceSplit> snapshotState() {
+    public List<MockSourceSplit> snapshotState(long checkpointId) {
         return splits;
     }
 
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java
index 2068445..8102e0d 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java
@@ -124,7 +124,8 @@ public class ContinuousFileSplitEnumerator
     }
 
     @Override
-    public PendingSplitsCheckpoint<FileSourceSplit> snapshotState() throws 
Exception {
+    public PendingSplitsCheckpoint<FileSourceSplit> snapshotState(long 
checkpointId)
+            throws Exception {
         final PendingSplitsCheckpoint<FileSourceSplit> checkpoint =
                 PendingSplitsCheckpoint.fromCollectionSnapshot(
                         splitAssigner.remainingSplits(), 
pathsAlreadyProcessed);
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java
index 7dc2489..140f520 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java
@@ -118,7 +118,7 @@ public class StaticFileSplitEnumerator
     }
 
     @Override
-    public PendingSplitsCheckpoint<FileSourceSplit> snapshotState() {
+    public PendingSplitsCheckpoint<FileSourceSplit> snapshotState(long 
checkpointId) {
         return 
PendingSplitsCheckpoint.fromCollectionSnapshot(splitAssigner.remainingSplits());
     }
 }
diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumeratorTest.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumeratorTest.java
index 72d7823..1fe4b2d 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumeratorTest.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumeratorTest.java
@@ -57,7 +57,7 @@ public class ContinuousFileSplitEnumeratorTest {
         fileEnumerator.addSplits(split);
         context.triggerAllActions();
 
-        assertThat(enumerator.snapshotState().getSplits(), contains(split));
+        assertThat(enumerator.snapshotState(1L).getSplits(), contains(split));
     }
 
     @Test
@@ -77,7 +77,7 @@ public class ContinuousFileSplitEnumeratorTest {
         fileEnumerator.addSplits(split);
         context.triggerAllActions();
 
-        assertThat(enumerator.snapshotState().getSplits(), empty());
+        assertThat(enumerator.snapshotState(1L).getSplits(), empty());
         assertThat(context.getSplitAssignments().get(2).getAssignedSplits(), 
contains(split));
     }
 
@@ -102,7 +102,7 @@ public class ContinuousFileSplitEnumeratorTest {
         context.triggerAllActions();
 
         assertFalse(context.getSplitAssignments().containsKey(2));
-        assertThat(enumerator.snapshotState().getSplits(), contains(split));
+        assertThat(enumerator.snapshotState(1L).getSplits(), contains(split));
     }
 
     // ------------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumeratorTest.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumeratorTest.java
index f554b9f..71fcf2a 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumeratorTest.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumeratorTest.java
@@ -52,7 +52,7 @@ public class StaticFileSplitEnumeratorTest {
         final FileSourceSplit split = createRandomSplit();
         final StaticFileSplitEnumerator enumerator = createEnumerator(context, 
split);
 
-        final PendingSplitsCheckpoint<FileSourceSplit> checkpoint = 
enumerator.snapshotState();
+        final PendingSplitsCheckpoint<FileSourceSplit> checkpoint = 
enumerator.snapshotState(1L);
 
         assertThat(checkpoint.getSplits(), contains(split));
     }
@@ -68,7 +68,7 @@ public class StaticFileSplitEnumeratorTest {
         enumerator.addReader(3);
         enumerator.handleSplitRequest(3, "somehost");
 
-        assertThat(enumerator.snapshotState().getSplits(), empty());
+        assertThat(enumerator.snapshotState(1L).getSplits(), empty());
         assertThat(context.getSplitAssignments().get(3).getAssignedSplits(), 
contains(split));
     }
 
@@ -82,7 +82,7 @@ public class StaticFileSplitEnumeratorTest {
         enumerator.handleSplitRequest(3, "somehost");
 
         assertFalse(context.getSplitAssignments().containsKey(3));
-        assertThat(enumerator.snapshotState().getSplits(), contains(split));
+        assertThat(enumerator.snapshotState(1L).getSplits(), contains(split));
     }
 
     @Test
diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
index 1b85fb0..d5884d1 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java
@@ -132,7 +132,8 @@ public class ContinuousHiveSplitEnumerator<T extends 
Comparable<T>>
     }
 
     @Override
-    public PendingSplitsCheckpoint<HiveSourceSplit> snapshotState() throws 
Exception {
+    public PendingSplitsCheckpoint<HiveSourceSplit> snapshotState(long 
checkpointId)
+            throws Exception {
         Collection<HiveSourceSplit> remainingSplits =
                 (Collection<HiveSourceSplit>) (Collection<?>) 
splitAssigner.remainingSplits();
         return new ContinuousHivePendingSplitsCheckpoint(
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
index b8f2728..9c69f38 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
@@ -190,7 +190,7 @@ public class KafkaSourceEnumerator
     }
 
     @Override
-    public KafkaSourceEnumState snapshotState() throws Exception {
+    public KafkaSourceEnumState snapshotState(long checkpointId) throws 
Exception {
         return new KafkaSourceEnumState(assignedPartitions);
     }
 
diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
index 679f732..a46ac2d 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java
@@ -333,7 +333,7 @@ public class KafkaEnumeratorTest {
         enumerator.start();
 
         // No reader is registered, so the state should be empty
-        final KafkaSourceEnumState state1 = enumerator.snapshotState();
+        final KafkaSourceEnumState state1 = enumerator.snapshotState(1L);
         assertTrue(state1.assignedPartitions().isEmpty());
 
         registerReader(context, enumerator, READER0);
@@ -341,7 +341,7 @@ public class KafkaEnumeratorTest {
         context.runNextOneTimeCallable();
 
         // The state should contain splits assigned to READER0 and READER1
-        final KafkaSourceEnumState state2 = enumerator.snapshotState();
+        final KafkaSourceEnumState state2 = enumerator.snapshotState(1L);
         verifySplitAssignmentWithPartitions(
                 getExpectedAssignments(
                         new HashSet<>(Arrays.asList(READER0, READER1)), 
PRE_EXISTING_TOPICS),
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
index 5bc0791..2654671 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
@@ -68,12 +68,26 @@ public interface SplitEnumerator<SplitT extends 
SourceSplit, CheckpointT>
     void addReader(int subtaskId);
 
     /**
-     * Checkpoints the state of this split enumerator.
+     * Creates a snapshot of the state of this split enumerator, to be stored 
in a checkpoint.
      *
+     * <p>The snapshot should contain the latest state of the enumerator: It 
should assume that all
+     * operations that happened before the snapshot have successfully 
completed. For example all
+     * splits assigned to readers via {@link 
SplitEnumeratorContext#assignSplit(SourceSplit, int)}
+     * and {@link SplitEnumeratorContext#assignSplits(SplitsAssignment)}) 
don't need to be included
+     * in the snapshot anymore.
+     *
+     * <p>This method takes the ID of the checkpoint for which the state is 
snapshotted. Most
+     * implementations should be able to ignore this parameter, because for 
the contents of the
+     * snapshot, it doesn't matter for which checkpoint it gets created. This 
parameter can be
+     * interesting for source connectors with external systems where those 
systems are themselves
+     * aware of checkpoints; for example in cases where the enumerator 
notifies that system about a
+     * specific checkpoint being triggered.
+     *
+     * @param checkpointId The ID of the checkpoint for which the snapshot is 
created.
      * @return an object containing the state of the split enumerator.
      * @throws Exception when the snapshot cannot be taken.
      */
-    CheckpointT snapshotState() throws Exception;
+    CheckpointT snapshotState(long checkpointId) throws Exception;
 
     /**
      * Called to close the enumerator, in case it holds on to any resources, 
like threads or network
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
index 8d16d15..9c37cca 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
@@ -72,7 +72,7 @@ public class IteratorSourceEnumerator<SplitT extends 
IteratorSourceSplit<?, ?>>
     }
 
     @Override
-    public Collection<SplitT> snapshotState() throws Exception {
+    public Collection<SplitT> snapshotState(long checkpointId) throws 
Exception {
         return remainingSplits;
     }
 
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
index 213a864..2b9dbd3 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
@@ -98,7 +98,7 @@ public class MockSplitEnumerator implements 
SplitEnumerator<MockSourceSplit, Set
     }
 
     @Override
-    public Set<MockSourceSplit> snapshotState() {
+    public Set<MockSourceSplit> snapshotState(long checkpointId) {
         return unassignedSplits;
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index 0c9852c..add85bb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -235,7 +235,7 @@ public class SourceCoordinator<SplitT extends SourceSplit, 
EnumChkT>
                             checkpointId);
                     try {
                         context.onCheckpoint(checkpointId);
-                        result.complete(toBytes());
+                        result.complete(toBytes(checkpointId));
                     } catch (Throwable e) {
                         ExceptionUtils.rethrowIfFatalErrorOrOOM(e);
                         result.completeExceptionally(
@@ -352,8 +352,9 @@ public class SourceCoordinator<SplitT extends SourceSplit, 
EnumChkT>
      * @return A byte array containing the serialized state of the source 
coordinator.
      * @throws Exception When something goes wrong in serialization.
      */
-    private byte[] toBytes() throws Exception {
-        return writeCheckpointBytes(enumerator.snapshotState(), 
enumCheckpointSerializer);
+    private byte[] toBytes(long checkpointId) throws Exception {
+        return writeCheckpointBytes(
+                enumerator.snapshotState(checkpointId), 
enumCheckpointSerializer);
     }
 
     static <EnumChkT> byte[] writeCheckpointBytes(
@@ -378,7 +379,7 @@ public class SourceCoordinator<SplitT extends SourceSplit, 
EnumChkT>
     /**
      * Restore the state of this source coordinator from the state bytes.
      *
-     * @param bytes The checkpoint bytes that was returned from {@link 
#toBytes()}
+     * @param bytes The checkpoint bytes that was returned from {@link 
#toBytes(long)}
      * @throws Exception When the deserialization failed.
      */
     private EnumChkT deserializeCheckpoint(byte[] bytes) throws Exception {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
index 20d65f8..0b62215 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java
@@ -336,7 +336,7 @@ public class SourceCoordinatorTest extends 
SourceCoordinatorTestBase {
         // Build checkpoint data with serde version 0
         final TestingSplitEnumerator<MockSourceSplit> enumerator = 
getEnumerator();
         final Set<MockSourceSplit> splits = new HashSet<>();
-        enumerator.runInEnumThreadAndSync(() -> 
splits.addAll(enumerator.snapshotState()));
+        enumerator.runInEnumThreadAndSync(() -> 
splits.addAll(enumerator.snapshotState(1L)));
 
         final byte[] checkpointDataForV0Serde = 
createCheckpointDataWithSerdeV0(splits);
 
@@ -428,7 +428,7 @@ public class SourceCoordinatorTest extends 
SourceCoordinatorTestBase {
         }
 
         @Override
-        public Set<MockSourceSplit> snapshotState() throws Exception {
+        public Set<MockSourceSplit> snapshotState(long checkpointId) throws 
Exception {
             throw new UnsupportedOperationException();
         }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/TestingSplitEnumerator.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/TestingSplitEnumerator.java
index eda29a9..8af418b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/TestingSplitEnumerator.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/TestingSplitEnumerator.java
@@ -110,7 +110,7 @@ public class TestingSplitEnumerator<SplitT extends 
SourceSplit>
     }
 
     @Override
-    public Set<SplitT> snapshotState() {
+    public Set<SplitT> snapshotState(long checkpointId) {
         return new HashSet<>(splits);
     }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
index 9ca560c..9c91386 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java
@@ -270,12 +270,12 @@ public class OperatorEventSendingCheckpointITCase extends 
TestLogger {
         }
 
         @Override
-        public Collection<SplitT> snapshotState() throws Exception {
+        public Collection<SplitT> snapshotState(long checkpointId) throws 
Exception {
             // this will be enqueued in the enumerator thread, so it will 
actually run after this
             // method (the snapshot operation) is complete!
             context.runInCoordinatorThread(this::fullFillPendingRequests);
 
-            return super.snapshotState();
+            return super.snapshotState(checkpointId);
         }
 
         private void fullFillPendingRequests() {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
index eeae84b..b2202cc 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java
@@ -549,7 +549,7 @@ public abstract class UnalignedCheckpointTestBase extends 
TestLogger {
             }
 
             @Override
-            public EnumeratorState snapshotState() throws Exception {
+            public EnumeratorState snapshotState(long checkpointId) throws 
Exception {
                 LOG.info("snapshotState {}", state);
                 return state;
             }

Reply via email to