This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 517e4d9278 MINOR: Update comment on verifyTaskGenerationAndOwnership method in DistributedHerder 517e4d9278 is described below commit 517e4d92785d136c4db5aa59af1d4a3eda0c4bfd Author: Yash Mayya <yash.ma...@gmail.com> AuthorDate: Fri Jul 29 02:48:35 2022 +0530 MINOR: Update comment on verifyTaskGenerationAndOwnership method in DistributedHerder Reviewers: Chris Egerton <fearthecel...@gmail.com> --- .../kafka/connect/runtime/distributed/DistributedHerder.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index ded833da59..388bfa4218 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -1732,9 +1732,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { throw ConnectUtils.maybeWrap(cause, "Failed to perform round of zombie fencing"); } }, - () -> { - verifyTaskGenerationAndOwnership(taskId, taskGeneration); - } + () -> verifyTaskGenerationAndOwnership(taskId, taskGeneration) ); } else { return worker.startSourceTask( @@ -1941,8 +1939,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable { } } - // Currently unused, but will be invoked by exactly-once source tasks after they have successfully - // initialized their transactional producer + // Invoked by exactly-once worker source tasks after they have successfully initialized their transactional + // producer to ensure that it is still safe to bring up the task private void verifyTaskGenerationAndOwnership(ConnectorTaskId id, int initialTaskGen) { log.debug("Reading to end of config topic to ensure it is still safe to bring up source task {} with exactly-once support", id); if (!refreshConfigSnapshot(Long.MAX_VALUE)) {