This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push: new fdcf730 [FLINK-22133][core] Add checkpointID to 'SplitEnumerator.snapshotState()' fdcf730 is described below commit fdcf730ce106cfe0050593a3ddc068a8b7222e94 Author: Stephan Ewen <se...@apache.org> AuthorDate: Mon Apr 19 17:32:00 2021 +0200 [FLINK-22133][core] Add checkpointID to 'SplitEnumerator.snapshotState()' This closes #15677 --- .../base/source/reader/mocks/MockSplitEnumerator.java | 2 +- .../file/src/impl/ContinuousFileSplitEnumerator.java | 3 ++- .../file/src/impl/StaticFileSplitEnumerator.java | 2 +- .../src/impl/ContinuousFileSplitEnumeratorTest.java | 6 +++--- .../file/src/impl/StaticFileSplitEnumeratorTest.java | 6 +++--- .../connectors/hive/ContinuousHiveSplitEnumerator.java | 3 ++- .../kafka/source/enumerator/KafkaSourceEnumerator.java | 2 +- .../kafka/source/enumerator/KafkaEnumeratorTest.java | 4 ++-- .../flink/api/connector/source/SplitEnumerator.java | 18 ++++++++++++++++-- .../source/lib/util/IteratorSourceEnumerator.java | 2 +- .../connector/source/mocks/MockSplitEnumerator.java | 2 +- .../runtime/source/coordinator/SourceCoordinator.java | 9 +++++---- .../source/coordinator/SourceCoordinatorTest.java | 4 ++-- .../source/coordinator/TestingSplitEnumerator.java | 2 +- .../OperatorEventSendingCheckpointITCase.java | 4 ++-- .../checkpointing/UnalignedCheckpointTestBase.java | 2 +- 16 files changed, 44 insertions(+), 27 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java index 188568c..fe8a675 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java @@ -77,7 +77,7 @@ public class MockSplitEnumerator } @Override - public List<MockSourceSplit> snapshotState() { + public List<MockSourceSplit> snapshotState(long checkpointId) { return splits; } diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java index 2068445..8102e0d 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java @@ -124,7 +124,8 @@ public class ContinuousFileSplitEnumerator } @Override - public PendingSplitsCheckpoint<FileSourceSplit> snapshotState() throws Exception { + public PendingSplitsCheckpoint<FileSourceSplit> snapshotState(long checkpointId) + throws Exception { final PendingSplitsCheckpoint<FileSourceSplit> checkpoint = PendingSplitsCheckpoint.fromCollectionSnapshot( splitAssigner.remainingSplits(), pathsAlreadyProcessed); diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java index 7dc2489..140f520 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java @@ -118,7 +118,7 @@ public class StaticFileSplitEnumerator } @Override - public PendingSplitsCheckpoint<FileSourceSplit> snapshotState() { + public PendingSplitsCheckpoint<FileSourceSplit> snapshotState(long checkpointId) { return PendingSplitsCheckpoint.fromCollectionSnapshot(splitAssigner.remainingSplits()); } } diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumeratorTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumeratorTest.java index 72d7823..1fe4b2d 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumeratorTest.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumeratorTest.java @@ -57,7 +57,7 @@ public class ContinuousFileSplitEnumeratorTest { fileEnumerator.addSplits(split); context.triggerAllActions(); - assertThat(enumerator.snapshotState().getSplits(), contains(split)); + assertThat(enumerator.snapshotState(1L).getSplits(), contains(split)); } @Test @@ -77,7 +77,7 @@ public class ContinuousFileSplitEnumeratorTest { fileEnumerator.addSplits(split); context.triggerAllActions(); - assertThat(enumerator.snapshotState().getSplits(), empty()); + assertThat(enumerator.snapshotState(1L).getSplits(), empty()); assertThat(context.getSplitAssignments().get(2).getAssignedSplits(), contains(split)); } @@ -102,7 +102,7 @@ public class ContinuousFileSplitEnumeratorTest { context.triggerAllActions(); assertFalse(context.getSplitAssignments().containsKey(2)); - assertThat(enumerator.snapshotState().getSplits(), contains(split)); + assertThat(enumerator.snapshotState(1L).getSplits(), contains(split)); } // ------------------------------------------------------------------------ diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumeratorTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumeratorTest.java index f554b9f..71fcf2a 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumeratorTest.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumeratorTest.java @@ -52,7 +52,7 @@ public class StaticFileSplitEnumeratorTest { final FileSourceSplit split = createRandomSplit(); final StaticFileSplitEnumerator enumerator = createEnumerator(context, split); - final PendingSplitsCheckpoint<FileSourceSplit> checkpoint = enumerator.snapshotState(); + final PendingSplitsCheckpoint<FileSourceSplit> checkpoint = enumerator.snapshotState(1L); assertThat(checkpoint.getSplits(), contains(split)); } @@ -68,7 +68,7 @@ public class StaticFileSplitEnumeratorTest { enumerator.addReader(3); enumerator.handleSplitRequest(3, "somehost"); - assertThat(enumerator.snapshotState().getSplits(), empty()); + assertThat(enumerator.snapshotState(1L).getSplits(), empty()); assertThat(context.getSplitAssignments().get(3).getAssignedSplits(), contains(split)); } @@ -82,7 +82,7 @@ public class StaticFileSplitEnumeratorTest { enumerator.handleSplitRequest(3, "somehost"); assertFalse(context.getSplitAssignments().containsKey(3)); - assertThat(enumerator.snapshotState().getSplits(), contains(split)); + assertThat(enumerator.snapshotState(1L).getSplits(), contains(split)); } @Test diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java index 1b85fb0..d5884d1 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/ContinuousHiveSplitEnumerator.java @@ -132,7 +132,8 @@ public class ContinuousHiveSplitEnumerator<T extends Comparable<T>> } @Override - public PendingSplitsCheckpoint<HiveSourceSplit> snapshotState() throws Exception { + public PendingSplitsCheckpoint<HiveSourceSplit> snapshotState(long checkpointId) + throws Exception { Collection<HiveSourceSplit> remainingSplits = (Collection<HiveSourceSplit>) (Collection<?>) splitAssigner.remainingSplits(); return new ContinuousHivePendingSplitsCheckpoint( diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java index b8f2728..9c69f38 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java @@ -190,7 +190,7 @@ public class KafkaSourceEnumerator } @Override - public KafkaSourceEnumState snapshotState() throws Exception { + public KafkaSourceEnumState snapshotState(long checkpointId) throws Exception { return new KafkaSourceEnumState(assignedPartitions); } diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java index 679f732..a46ac2d 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/enumerator/KafkaEnumeratorTest.java @@ -333,7 +333,7 @@ public class KafkaEnumeratorTest { enumerator.start(); // No reader is registered, so the state should be empty - final KafkaSourceEnumState state1 = enumerator.snapshotState(); + final KafkaSourceEnumState state1 = enumerator.snapshotState(1L); assertTrue(state1.assignedPartitions().isEmpty()); registerReader(context, enumerator, READER0); @@ -341,7 +341,7 @@ public class KafkaEnumeratorTest { context.runNextOneTimeCallable(); // The state should contain splits assigned to READER0 and READER1 - final KafkaSourceEnumState state2 = enumerator.snapshotState(); + final KafkaSourceEnumState state2 = enumerator.snapshotState(1L); verifySplitAssignmentWithPartitions( getExpectedAssignments( new HashSet<>(Arrays.asList(READER0, READER1)), PRE_EXISTING_TOPICS), diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java index 5bc0791..2654671 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java @@ -68,12 +68,26 @@ public interface SplitEnumerator<SplitT extends SourceSplit, CheckpointT> void addReader(int subtaskId); /** - * Checkpoints the state of this split enumerator. + * Creates a snapshot of the state of this split enumerator, to be stored in a checkpoint. * + * <p>The snapshot should contain the latest state of the enumerator: It should assume that all + * operations that happened before the snapshot have successfully completed. For example all + * splits assigned to readers via {@link SplitEnumeratorContext#assignSplit(SourceSplit, int)} + * and {@link SplitEnumeratorContext#assignSplits(SplitsAssignment)}) don't need to be included + * in the snapshot anymore. + * + * <p>This method takes the ID of the checkpoint for which the state is snapshotted. Most + * implementations should be able to ignore this parameter, because for the contents of the + * snapshot, it doesn't matter for which checkpoint it gets created. This parameter can be + * interesting for source connectors with external systems where those systems are themselves + * aware of checkpoints; for example in cases where the enumerator notifies that system about a + * specific checkpoint being triggered. + * + * @param checkpointId The ID of the checkpoint for which the snapshot is created. * @return an object containing the state of the split enumerator. * @throws Exception when the snapshot cannot be taken. */ - CheckpointT snapshotState() throws Exception; + CheckpointT snapshotState(long checkpointId) throws Exception; /** * Called to close the enumerator, in case it holds on to any resources, like threads or network diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java index 8d16d15..9c37cca 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java @@ -72,7 +72,7 @@ public class IteratorSourceEnumerator<SplitT extends IteratorSourceSplit<?, ?>> } @Override - public Collection<SplitT> snapshotState() throws Exception { + public Collection<SplitT> snapshotState(long checkpointId) throws Exception { return remainingSplits; } diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java index 213a864..2b9dbd3 100644 --- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java +++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java @@ -98,7 +98,7 @@ public class MockSplitEnumerator implements SplitEnumerator<MockSourceSplit, Set } @Override - public Set<MockSourceSplit> snapshotState() { + public Set<MockSourceSplit> snapshotState(long checkpointId) { return unassignedSplits; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java index 0c9852c..add85bb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java @@ -235,7 +235,7 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> checkpointId); try { context.onCheckpoint(checkpointId); - result.complete(toBytes()); + result.complete(toBytes(checkpointId)); } catch (Throwable e) { ExceptionUtils.rethrowIfFatalErrorOrOOM(e); result.completeExceptionally( @@ -352,8 +352,9 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> * @return A byte array containing the serialized state of the source coordinator. * @throws Exception When something goes wrong in serialization. */ - private byte[] toBytes() throws Exception { - return writeCheckpointBytes(enumerator.snapshotState(), enumCheckpointSerializer); + private byte[] toBytes(long checkpointId) throws Exception { + return writeCheckpointBytes( + enumerator.snapshotState(checkpointId), enumCheckpointSerializer); } static <EnumChkT> byte[] writeCheckpointBytes( @@ -378,7 +379,7 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> /** * Restore the state of this source coordinator from the state bytes. * - * @param bytes The checkpoint bytes that was returned from {@link #toBytes()} + * @param bytes The checkpoint bytes that was returned from {@link #toBytes(long)} * @throws Exception When the deserialization failed. */ private EnumChkT deserializeCheckpoint(byte[] bytes) throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java index 20d65f8..0b62215 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java @@ -336,7 +336,7 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase { // Build checkpoint data with serde version 0 final TestingSplitEnumerator<MockSourceSplit> enumerator = getEnumerator(); final Set<MockSourceSplit> splits = new HashSet<>(); - enumerator.runInEnumThreadAndSync(() -> splits.addAll(enumerator.snapshotState())); + enumerator.runInEnumThreadAndSync(() -> splits.addAll(enumerator.snapshotState(1L))); final byte[] checkpointDataForV0Serde = createCheckpointDataWithSerdeV0(splits); @@ -428,7 +428,7 @@ public class SourceCoordinatorTest extends SourceCoordinatorTestBase { } @Override - public Set<MockSourceSplit> snapshotState() throws Exception { + public Set<MockSourceSplit> snapshotState(long checkpointId) throws Exception { throw new UnsupportedOperationException(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/TestingSplitEnumerator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/TestingSplitEnumerator.java index eda29a9..8af418b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/TestingSplitEnumerator.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/TestingSplitEnumerator.java @@ -110,7 +110,7 @@ public class TestingSplitEnumerator<SplitT extends SourceSplit> } @Override - public Set<SplitT> snapshotState() { + public Set<SplitT> snapshotState(long checkpointId) { return new HashSet<>(splits); } diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java index 9ca560c..9c91386 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorEventSendingCheckpointITCase.java @@ -270,12 +270,12 @@ public class OperatorEventSendingCheckpointITCase extends TestLogger { } @Override - public Collection<SplitT> snapshotState() throws Exception { + public Collection<SplitT> snapshotState(long checkpointId) throws Exception { // this will be enqueued in the enumerator thread, so it will actually run after this // method (the snapshot operation) is complete! context.runInCoordinatorThread(this::fullFillPendingRequests); - return super.snapshotState(); + return super.snapshotState(checkpointId); } private void fullFillPendingRequests() { diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java index eeae84b..b2202cc 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointTestBase.java @@ -549,7 +549,7 @@ public abstract class UnalignedCheckpointTestBase extends TestLogger { } @Override - public EnumeratorState snapshotState() throws Exception { + public EnumeratorState snapshotState(long checkpointId) throws Exception { LOG.info("snapshotState {}", state); return state; }