Alexey Serbin has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/23607 )

Change subject: KUDU-3662: Fix race condition in Kudu replication
......................................................................


Patch Set 3: Code-Review+1

(7 comments)

Mostly noob-style questions on the code.  I guess it's just the lack of 
context/knowledge from my side, but the underlying logic is solid, but just in 
case.

Thanks!

http://gerrit.cloudera.org:8080/#/c/23607/3//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/23607/3//COMMIT_MSG@21
PS3, Line 21: to defer split removal until checkpoint
            : completes
Is it possible to get into a situation when taking checkpoints is very slow or 
times out quite often, but the data in the source arriving fast, and this turns 
into a situation when replication is lagging indefinitely behind the source?

If such a situation is possible, is it discoverable with the current state of 
the observability for Flink-based replication for Kudu tables?


http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java
File 
java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java:

http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java@85
PS3, Line 85:     // Use AT_LEAST_ONCE mode since the replication job uses 
UPSERT semantics which are
            :     // idempotent. This avoids the overhead of checkpoint barrier 
alignment while still
            :     // providing fault tolerance.
            :     
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
Besides the fact that we can use AT_LEAST_ONCE mode here because of UPSERTs, 
why using AT_LEAST_ONCE is better in this context compared with EXACTLY_ONCE?


http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java@101
PS3, Line 101:     env.configure(config);
What is the checkpoint timeout?  Is it inherited from some system config and we 
are good with the default setting, or we might need to intervene and use custom 
setting due to some reasons?


http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java
File 
java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java:

http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java@180
PS3, Line 180:         Field finishedSplitsField = 
ReflectionSecurityUtils.getAccessibleField(
             :             sourceEvent, "finishedSplits");
Does it make sense to use a similar approach to 'caching' this field in the 
constructor, similar to this.unassignedField and this.pendingField?


http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java@182
PS3, Line 182:         List<KuduSourceSplit> finishedSplits = 
ReflectionSecurityUtils.getFieldValue(
             :             finishedSplitsField, sourceEvent);
nit: does it make sense to introduce a wrapper similar to getPendingSplits(), 
but this one parameter?


http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java@208
PS3, Line 208:         LOG.warn("Failed to intercept SplitFinishedEvent via 
reflection; " +
             :                  "falling back to delegate (race condition fix 
not applied)", e);
It it always atomic in the sense that an exception here might be thrown only if 
the whole thing fails?  It that's not atomic, might be there any inconsistency 
if there is just partial completion of the work under this try-catch, and then 
the control is passed to the delegate?


http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java@250
PS3, Line 250: removedFromPending || removedFromUnassigned
Can splitId be in the both pending and unassigned?  If not, does it make sense 
to check for the precondition of it being only in one of the two places?

Also, since I'm missing a lot of context here, I'd appreciate if you could add 
a comment into the code (or just explain in this review comment thread if it's 
super-obvious): how do we know that the passed 'checkpointId' corresponds to 
the spitIds that we are purging from splitsPendingRemoval?  I'd think there 
might be is a risk of interlacing calls of notifyCheckpointComplete() and 
handleSourceEvent() with different ids, no?



--
To view, visit http://gerrit.cloudera.org:8080/23607
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: kudu
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I605a914aaa86b1bdf47537a5b21789de27972add
Gerrit-Change-Number: 23607
Gerrit-PatchSet: 3
Gerrit-Owner: Marton Greber <[email protected]>
Gerrit-Reviewer: Abhishek Chennaka <[email protected]>
Gerrit-Reviewer: Alexey Serbin <[email protected]>
Gerrit-Reviewer: Attila Bukor <[email protected]>
Gerrit-Reviewer: Gabriella Lotz <[email protected]>
Gerrit-Reviewer: Kudu Jenkins (120)
Gerrit-Reviewer: Marton Greber <[email protected]>
Gerrit-Reviewer: Zoltan Chovan <[email protected]>
Gerrit-Comment-Date: Tue, 11 Nov 2025 06:49:09 +0000
Gerrit-HasComments: Yes

Reply via email to