Marton Greber has posted comments on this change. ( 
http://gerrit.cloudera.org:8080/23607 )

Change subject: KUDU-3662: Fix race condition in Kudu replication
......................................................................


Patch Set 4:

(7 comments)

Thank you Alexey for the review, I hope I answered the your questions. Let me 
know if there are other details you would like me to explain etc, I'm happy to 
share more details.

http://gerrit.cloudera.org:8080/#/c/23607/3//COMMIT_MSG
Commit Message:

http://gerrit.cloudera.org:8080/#/c/23607/3//COMMIT_MSG@21
PS3, Line 21: to defer split removal until checkpoint
            : completes
> Is it possible to get into a situation when taking checkpoints is very slow
Yes this can happen.
If the sink processes data at a much slower rate than the source. I've added a 
new additional metric: pendingRemovalCount (the field that this fix patch 
uses), to have another signal at hand.
Now in such a bottlenecked case the pendingRemovalCount does not follow a 
'sawtooth' like pattern where it resets to 0 periodically (when a checkpoint is 
taken and in the callback signal we clear the pendings).
Moreover as it comes up at multiple places in your questions, the checkpoint 
taking has a timeout config:
"checkpoint timeout: The time after which a checkpoint-in-progress is aborted, 
if it did not complete by then." [1]
There is a tolerable amount of timeouts, so after the configured number of 
attempts the job will fail.
Bottom line is yes, but this will not go unnoticed, and users can tune the job.
After this race fix my plan is to put together docs where I plan to go into 
detail about the administration side of the job.

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/fault-tolerance/checkpointing/


http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java
File 
java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java:

http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java@85
PS3, Line 85:     // Use AT_LEAST_ONCE mode since the replication job uses 
UPSERT semantics which are
            :     // idempotent. This avoids the overhead of checkpoint barrier 
alignment while still
            :     // providing fault tolerance.
            :     
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
> Besides the fact that we can use AT_LEAST_ONCE mode here because of UPSERTs
Actually we can only use AT_LEAST_ONCE semantics because the Flink Kudu sink is 
not stateful. To be able to use EXACTLY_ONCE we would have to implement two 
phase commit like in this article:
https://flink.apache.org/2018/02/28/an-overview-of-end-to-end-exactly-once-processing-in-apache-flink-with-apache-kafka-too/

In our case the durable state is only in the Flink Kudu source(and only in the 
enumerator. This means that we neither track in scan token progress. In case of 
failure these are re-read). The state is basically the lastEndTimestamp for 
diffscan and the unassigned and pending splits(Kudu scan tokens). On 
checkpointing these are persisted. In case of failure we have no information 
about records that have been red by the source. We dont know if they are in 
flight or already written by the sink. This is not optimal but this was a 
design decision with the current Kudu source. (On the other hand this way data 
is rest covered with ease as no records are spilled to disk.)


http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java@101
PS3, Line 101:     env.configure(config);
> What is the checkpoint timeout?  Is it inherited from some system config an
By default 10 minutes [1].
These configs can be set on the system/cluster level (conf.yaml in flink config 
directory) or on the job level by providing execution parameters.
As a next step I will work on docs where outline all the necessary invocation 
parameters, and tuning values.
(Flink has a bunch of config values. I've opted not to wrap all of them in our 
code as that is messy. Only the necessary or crucial ones like 
CheckpointingIntervalMillis is incorporated into our code. For the rest the 
documentation will outline the important ones with complete job submission 
command examples)


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/fault-tolerance/checkpointing/#execution-checkpointing-timeout

[2] 
https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/config/


http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java
File 
java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java:

http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java@180
PS3, Line 180:     // Intercept SplitFinishedEvent and defer split removal 
until checkpoint completes
             :     // Check event type by class name to av
> Does it make sense to use a similar approach to 'caching' this field in the
Yes we can do lazy init based 'caching' for this one.
Done.


http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java@182
PS3, Line 182:     if 
(sourceEvent.getClass().getSimpleName().equals("SplitFinishedEvent")) {
             :       // Perform all reflection operations fir
> nit: does it make sense to introduce a wrapper similar to getPendingSplits(
Yes, done.


http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java@208
PS3, Line 208:                   splitId, subtaskId);
             :       }
> It it always atomic in the sense that an exception here might be thrown onl
Ah this is a very good catch, thank you Alexey!
I moved the two reflection to the beginning of the function before any state 
mutation. If they throw we fail fast before any state is modified, and we dont 
fallback to using just the delegate as that defeats the purpose of this patch.
Thanks for spotting this!


http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java@250
PS3, Line 250: removedFromPending) {
> Can splitId be in the both pending and unassigned?  If not, does it make se
Ah yes this is a leftover from the development iterations. Removed the 
unassigned parts here. (Where I did not put in the stateless readers and it 
could happen that on failure recovery the readers have splits(KuduScan tokens). 
But that opens another set of problems. That is why I opted for the stateless 
reader implementation. And I forgot to remove the experimental part from here)
No splits should definitely not be in bot 'states'.

Yes your concern regarding the checkpointId and corresponding split mapping is 
not implemented here.
The consideration here is that we dont want concurrent checkpointing. There is 
a Flink config: execution.checkpointing.max-concurrent-checkpoints which 
determines this, and by default it is set to 1 [1].
Even if the default is what we want I will make sure to document this, not to 
mess with this config.

[1]: 
https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/fault-tolerance/checkpointing/#execution-checkpointing-max-concurrent-checkpoints



--
To view, visit http://gerrit.cloudera.org:8080/23607
To unsubscribe, visit http://gerrit.cloudera.org:8080/settings

Gerrit-Project: kudu
Gerrit-Branch: master
Gerrit-MessageType: comment
Gerrit-Change-Id: I605a914aaa86b1bdf47537a5b21789de27972add
Gerrit-Change-Number: 23607
Gerrit-PatchSet: 4
Gerrit-Owner: Marton Greber <[email protected]>
Gerrit-Reviewer: Abhishek Chennaka <[email protected]>
Gerrit-Reviewer: Alexey Serbin <[email protected]>
Gerrit-Reviewer: Attila Bukor <[email protected]>
Gerrit-Reviewer: Gabriella Lotz <[email protected]>
Gerrit-Reviewer: Kudu Jenkins (120)
Gerrit-Reviewer: Marton Greber <[email protected]>
Gerrit-Reviewer: Zoltan Chovan <[email protected]>
Gerrit-Comment-Date: Mon, 17 Nov 2025 18:14:54 +0000
Gerrit-HasComments: Yes

Reply via email to