Marton Greber has posted comments on this change. ( http://gerrit.cloudera.org:8080/23607 )
Change subject: KUDU-3662: Fix race condition in Kudu replication ...................................................................... Patch Set 4: (7 comments) Thank you Alexey for the review, I hope I answered the your questions. Let me know if there are other details you would like me to explain etc, I'm happy to share more details. http://gerrit.cloudera.org:8080/#/c/23607/3//COMMIT_MSG Commit Message: http://gerrit.cloudera.org:8080/#/c/23607/3//COMMIT_MSG@21 PS3, Line 21: to defer split removal until checkpoint : completes > Is it possible to get into a situation when taking checkpoints is very slow Yes this can happen. If the sink processes data at a much slower rate than the source. I've added a new additional metric: pendingRemovalCount (the field that this fix patch uses), to have another signal at hand. Now in such a bottlenecked case the pendingRemovalCount does not follow a 'sawtooth' like pattern where it resets to 0 periodically (when a checkpoint is taken and in the callback signal we clear the pendings). Moreover as it comes up at multiple places in your questions, the checkpoint taking has a timeout config: "checkpoint timeout: The time after which a checkpoint-in-progress is aborted, if it did not complete by then." [1] There is a tolerable amount of timeouts, so after the configured number of attempts the job will fail. Bottom line is yes, but this will not go unnoticed, and users can tune the job. After this race fix my plan is to put together docs where I plan to go into detail about the administration side of the job. [1] https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/fault-tolerance/checkpointing/ http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java File java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java: http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java@85 PS3, Line 85: // Use AT_LEAST_ONCE mode since the replication job uses UPSERT semantics which are : // idempotent. This avoids the overhead of checkpoint barrier alignment while still : // providing fault tolerance. : env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); > Besides the fact that we can use AT_LEAST_ONCE mode here because of UPSERTs Actually we can only use AT_LEAST_ONCE semantics because the Flink Kudu sink is not stateful. To be able to use EXACTLY_ONCE we would have to implement two phase commit like in this article: https://flink.apache.org/2018/02/28/an-overview-of-end-to-end-exactly-once-processing-in-apache-flink-with-apache-kafka-too/ In our case the durable state is only in the Flink Kudu source(and only in the enumerator. This means that we neither track in scan token progress. In case of failure these are re-read). The state is basically the lastEndTimestamp for diffscan and the unassigned and pending splits(Kudu scan tokens). On checkpointing these are persisted. In case of failure we have no information about records that have been red by the source. We dont know if they are in flight or already written by the sink. This is not optimal but this was a design decision with the current Kudu source. (On the other hand this way data is rest covered with ease as no records are spilled to disk.) http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/ReplicationEnvProvider.java@101 PS3, Line 101: env.configure(config); > What is the checkpoint timeout? Is it inherited from some system config an By default 10 minutes [1]. These configs can be set on the system/cluster level (conf.yaml in flink config directory) or on the job level by providing execution parameters. As a next step I will work on docs where outline all the necessary invocation parameters, and tuning values. (Flink has a bunch of config values. I've opted not to wrap all of them in our code as that is messy. Only the necessary or crucial ones like CheckpointingIntervalMillis is incorporated into our code. For the rest the documentation will outline the important ones with complete job submission command examples) [1] https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/fault-tolerance/checkpointing/#execution-checkpointing-timeout [2] https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/deployment/config/ http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java File java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java: http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java@180 PS3, Line 180: // Intercept SplitFinishedEvent and defer split removal until checkpoint completes : // Check event type by class name to av > Does it make sense to use a similar approach to 'caching' this field in the Yes we can do lazy init based 'caching' for this one. Done. http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java@182 PS3, Line 182: if (sourceEvent.getClass().getSimpleName().equals("SplitFinishedEvent")) { : // Perform all reflection operations fir > nit: does it make sense to introduce a wrapper similar to getPendingSplits( Yes, done. http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java@208 PS3, Line 208: splitId, subtaskId); : } > It it always atomic in the sense that an exception here might be thrown onl Ah this is a very good catch, thank you Alexey! I moved the two reflection to the beginning of the function before any state mutation. If they throw we fail fast before any state is modified, and we dont fallback to using just the delegate as that defeats the purpose of this patch. Thanks for spotting this! http://gerrit.cloudera.org:8080/#/c/23607/3/java/kudu-replication/src/main/java/org/apache/kudu/replication/wrappedsource/MetricWrappedKuduEnumerator.java@250 PS3, Line 250: removedFromPending) { > Can splitId be in the both pending and unassigned? If not, does it make se Ah yes this is a leftover from the development iterations. Removed the unassigned parts here. (Where I did not put in the stateless readers and it could happen that on failure recovery the readers have splits(KuduScan tokens). But that opens another set of problems. That is why I opted for the stateless reader implementation. And I forgot to remove the experimental part from here) No splits should definitely not be in bot 'states'. Yes your concern regarding the checkpointId and corresponding split mapping is not implemented here. The consideration here is that we dont want concurrent checkpointing. There is a Flink config: execution.checkpointing.max-concurrent-checkpoints which determines this, and by default it is set to 1 [1]. Even if the default is what we want I will make sure to document this, not to mess with this config. [1]: https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/fault-tolerance/checkpointing/#execution-checkpointing-max-concurrent-checkpoints -- To view, visit http://gerrit.cloudera.org:8080/23607 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: kudu Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: I605a914aaa86b1bdf47537a5b21789de27972add Gerrit-Change-Number: 23607 Gerrit-PatchSet: 4 Gerrit-Owner: Marton Greber <[email protected]> Gerrit-Reviewer: Abhishek Chennaka <[email protected]> Gerrit-Reviewer: Alexey Serbin <[email protected]> Gerrit-Reviewer: Attila Bukor <[email protected]> Gerrit-Reviewer: Gabriella Lotz <[email protected]> Gerrit-Reviewer: Kudu Jenkins (120) Gerrit-Reviewer: Marton Greber <[email protected]> Gerrit-Reviewer: Zoltan Chovan <[email protected]> Gerrit-Comment-Date: Mon, 17 Nov 2025 18:14:54 +0000 Gerrit-HasComments: Yes
