Hello Zoltan Chovan, Alexey Serbin, Gabriella Lotz, Attila Bukor, Kudu Jenkins,
Abhishek Chennaka,
I'd like you to reexamine a change. Please visit
http://gerrit.cloudera.org:8080/23607
to look at the new patch set (#3).
Change subject: KUDU-3662: Fix race condition in Kudu replication
......................................................................
KUDU-3662: Fix race condition in Kudu replication
Implement fix for data loss race condition where splits finishing
between checkpoint snapshot and checkpoint completion are lost,
causing records to be skipped on job restart.
Race condition scenario (without fix):
1. Split finishes, reader sends SplitFinishedEvent to enumerator
2. Enumerator removes split from pending immediately
3. Checkpoint snapshot is taken (split already removed from state)
4. Job crashes before notifyCheckpointComplete
5. On restore, split is missing from checkpoint -> data loss
Solution:
Wrap KuduSourceEnumerator to defer split removal until checkpoint
completes, following Flink's CheckpointListener buffering contract [1].
When SplitFinishedEvent arrives:
- Delegate processes the event normally
- Re-add finished split back to pending (if removed by delegate)
- Track split ID in splitsPendingRemoval buffer
- Only remove from pending in notifyCheckpointComplete()
On restore from checkpoint:
- Re-enqueue all restored pending splits to unassigned for replay
- Clear pending to unblock timestamp advancement
- Guarantees at-least-once processing (splits may be replayed)
Stateless reader wrapper:
To prevent duplicate split assignment issues on restore, wrap
KuduSourceReader with StatelessKuduReaderWrapper that returns empty
state on checkpoint. This ensures all split lifecycle management flows
through the enumerator, eliminating potential crashes from duplicate
split IDs in reader's internal tracking. Since the Kudu connector does
not implement split progress tracking (splits are atomic scan tokens),
there is no efficiency loss from re-reading splits on restore.
Restored pending splits contain two types:
1. Active splits being read when checkpoint was taken - must be replayed
2. Buffered splits finished by reader but held in pending waiting for
checkpoint to complete - records may be in-flight, must be replayed
Cannot distinguish between types, so re-enqueue all. With stateless
reader, this creates clean lifecycle without duplicates. Clearing
pending after re-enqueue unblocks timestamp advancement immediately
(within seconds as splits are reassigned, not minutes waiting for
processing to complete).
Additional changes:
- Add StatelessKuduReaderWrapper to prevent duplicate split assignment
- Add TestMetricWrappedKuduEnumerator verifying the fix and including
canary test to detect upstream resolution of FLINK-38575
- Add checkpoint configuration to ReplicationJobConfig with validation
ensuring checkpointing interval < discovery interval
- Add TestReplicationCheckpoint for end-to-end checkpoint recovery
- Extend ReplicationTestBase with checkpoint test utilities
[1]: https://github.com/apache/flink/blob/master/flink-core/src/main/
java/org/apache/flink/api/common/state/CheckpointListener.java
Change-Id: I605a914aaa86b1bdf47537a5b21789de27972add
---
M java/kudu-replication/build.gradle
M
java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationConfigParser.java
M
java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java
M
java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java
M
java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java
M
java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduSource.java
A
java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/StatelessKuduReaderWrapper.java
M
java/kudu-replication/src/test/java/org/apache/kudu/replication/ReplicationTestBase.java
A
java/kudu-replication/src/test/java/org/apache/kudu/replication/TestMetricWrappedKuduEnumerator.java
A
java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationCheckpoint.java
M
java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationConfigParser.java
M
java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationTableInitializer.java
12 files changed, 1,026 insertions(+), 35 deletions(-)
git pull ssh://gerrit.cloudera.org:29418/kudu refs/changes/07/23607/3
--
To view, visit http://gerrit.cloudera.org:8080/23607
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings
Gerrit-Project: kudu
Gerrit-Branch: master
Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I605a914aaa86b1bdf47537a5b21789de27972add
Gerrit-Change-Number: 23607
Gerrit-PatchSet: 3
Gerrit-Owner: Marton Greber <[email protected]>
Gerrit-Reviewer: Abhishek Chennaka <[email protected]>
Gerrit-Reviewer: Alexey Serbin <[email protected]>
Gerrit-Reviewer: Attila Bukor <[email protected]>
Gerrit-Reviewer: Gabriella Lotz <[email protected]>
Gerrit-Reviewer: Kudu Jenkins (120)
Gerrit-Reviewer: Zoltan Chovan <[email protected]>