Alexey Serbin has posted comments on this change. ( http://gerrit.cloudera.org:8080/23607 )
Change subject: KUDU-3662: Fix race condition in Kudu replication ...................................................................... Patch Set 3: Code-Review+1 (7 comments) Mostly noob-style questions on the code. I guess it's just the lack of context/knowledge from my side, but the underlying logic is solid, but just in case. Thanks! http://gerrit.cloudera.org:8080/#/c/23607/3//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/23607/3//COMMIT_MSG@21 PS3, Line 21: to defer split removal until checkpoint : completes Is it possible to get into a situation when taking checkpoints is very slow or times out quite often, but the data in the source arriving fast, and this turns into a situation when replication is lagging indefinitely behind the source? If such a situation is possible, is it discoverable with the current state of the observability for Flink-based replication for Kudu tables? http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java File java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java: http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java@85 PS3, Line 85: // Use AT_LEAST_ONCE mode since the replication job uses UPSERT semantics which are : // idempotent. This avoids the overhead of checkpoint barrier alignment while still : // providing fault tolerance. : env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); Besides the fact that we can use AT_LEAST_ONCE mode here because of UPSERTs, why using AT_LEAST_ONCE is better in this context compared with EXACTLY_ONCE? http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java@101 PS3, Line 101: env.configure(config); What is the checkpoint timeout? Is it inherited from some system config and we are good with the default setting, or we might need to intervene and use custom setting due to some reasons? http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java File java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java: http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java@180 PS3, Line 180: Field finishedSplitsField = ReflectionSecurityUtils.getAccessibleField( : sourceEvent, "finishedSplits"); Does it make sense to use a similar approach to 'caching' this field in the constructor, similar to this.unassignedField and this.pendingField? http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java@182 PS3, Line 182: List<KuduSourceSplit> finishedSplits = ReflectionSecurityUtils.getFieldValue( : finishedSplitsField, sourceEvent); nit: does it make sense to introduce a wrapper similar to getPendingSplits(), but this one parameter? http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java@208 PS3, Line 208: LOG.warn("Failed to intercept SplitFinishedEvent via reflection; " + : "falling back to delegate (race condition fix not applied)", e); It it always atomic in the sense that an exception here might be thrown only if the whole thing fails? It that's not atomic, might be there any inconsistency if there is just partial completion of the work under this try-catch, and then the control is passed to the delegate? http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java@250 PS3, Line 250: removedFromPending || removedFromUnassigned Can splitId be in the both pending and unassigned? If not, does it make sense to check for the precondition of it being only in one of the two places? Also, since I'm missing a lot of context here, I'd appreciate if you could add a comment into the code (or just explain in this review comment thread if it's super-obvious): how do we know that the passed 'checkpointId' corresponds to the spitIds that we are purging from splitsPendingRemoval? I'd think there might be is a risk of interlacing calls of notifyCheckpointComplete() and handleSourceEvent() with different ids, no? -- To view, visit http://gerrit.cloudera.org:8080/23607 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: kudu Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I605a914aaa86b1bdf47537a5b21789de27972add Gerrit-Change-Number: 23607 Gerrit-PatchSet: 3 Gerrit-Owner: Marton Greber <[email protected]> Gerrit-Reviewer: Abhishek Chennaka <[email protected]> Gerrit-Reviewer: Alexey Serbin <[email protected]> Gerrit-Reviewer: Attila Bukor <[email protected]> Gerrit-Reviewer: Gabriella Lotz <[email protected]> Gerrit-Reviewer: Kudu Jenkins (120) Gerrit-Reviewer: Marton Greber <[email protected]> Gerrit-Reviewer: Zoltan Chovan <[email protected]> Gerrit-Comment-Date: Tue, 11 Nov 2025 06:49:09 +0000 Gerrit-HasComments: Yes
