Marton Greber has uploaded this change for review. ( http://gerrit.cloudera.org:8080/23607
Change subject: KUDU-3662: Fix race condition in Kudu replication ...................................................................... KUDU-3662: Fix race condition in Kudu replication Implement fix for data loss race condition where splits finishing between checkpoint snapshot and checkpoint completion are lost, causing records to be skipped on job restart. Race condition scenario (without fix): 1. Split finishes, reader sends SplitFinishedEvent to enumerator 2. Enumerator removes split from pending immediately 3. Checkpoint snapshot is taken (split already removed from state) 4. Job crashes before notifyCheckpointComplete 5. On restore, split is missing from checkpoint -> data loss Solution: Wrap KuduSourceEnumerator to defer split removal until checkpoint completes, following Flink's CheckpointListener buffering contract [1]. When SplitFinishedEvent arrives: - Delegate processes the event normally - Re-add finished split back to pending (if removed by delegate) - Track split ID in splitsPendingRemoval buffer - Only remove from pending in notifyCheckpointComplete() On restore from checkpoint: - Re-enqueue all restored pending splits to unassigned for replay - Clear pending to unblock timestamp advancement - Guarantees at-least-once processing (splits may be replayed) Stateless reader wrapper: To prevent duplicate split assignment issues on restore, wrap KuduSourceReader with StatelessKuduReaderWrapper that returns empty state on checkpoint. This ensures all split lifecycle management flows through the enumerator, eliminating potential crashes from duplicate split IDs in reader's internal tracking. Since the Kudu connector does not implement split progress tracking (splits are atomic scan tokens), there is no efficiency loss from re-reading splits on restore. Restored pending splits contain two types: 1. Active splits being read when checkpoint was taken - must be replayed 2. Buffered splits finished by reader but held in pending waiting for checkpoint to complete - records may be in-flight, must be replayed Cannot distinguish between types, so re-enqueue all. With stateless reader, this creates clean lifecycle without duplicates. Clearing pending after re-enqueue unblocks timestamp advancement immediately (within seconds as splits are reassigned, not minutes waiting for processing to complete). Additional changes: - Add StatelessKuduReaderWrapper to prevent duplicate split assignment - Add TestMetricWrappedKuduEnumerator verifying the fix and including canary test to detect upstream resolution of FLINK-38575 - Add checkpoint configuration to ReplicationJobConfig with validation ensuring checkpointing interval < discovery interval - Add TestReplicationCheckpoint for end-to-end checkpoint recovery - Extend ReplicationTestBase with checkpoint test utilities [1]: https://github.com/apache/flink/blob/master/flink-core/src/main/ java/org/apache/flink/api/common/state/CheckpointListener.java Change-Id: I605a914aaa86b1bdf47537a5b21789de27972add --- M java/kudu-replication/build.gradle M java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationConfigParser.java M java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java M java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationJobConfig.java M java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java M java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduSource.java A java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/StatelessKuduReaderWrapper.java M java/kudu-replication/src/test/java/org/apache/kudu/replication/ReplicationTestBase.java A java/kudu-replication/src/test/java/org/apache/kudu/replication/TestMetricWrappedKuduEnumerator.java A java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationCheckpoint.java M java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationConfigParser.java M java/kudu-replication/src/test/java/org/apache/kudu/replication/TestReplicationTableInitializer.java 12 files changed, 1,006 insertions(+), 29 deletions(-) git pull ssh://gerrit.cloudera.org:29418/kudu refs/changes/07/23607/1 -- To view, visit http://gerrit.cloudera.org:8080/23607 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: kudu Gerrit-Branch: master Gerrit-MessageType: newchange Gerrit-Change-Id: I605a914aaa86b1bdf47537a5b21789de27972add Gerrit-Change-Number: 23607 Gerrit-PatchSet: 1 Gerrit-Owner: Marton Greber <[email protected]> Gerrit-Reviewer: Kudu Jenkins (120)
