Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on PR #15910: URL: https://github.com/apache/kafka/pull/15910#issuecomment-2130312242 cherry-picked on 3.7 git -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on PR #15910: URL: https://github.com/apache/kafka/pull/15910#issuecomment-2129247940 @gharris1727 please see https://github.com/apache/kafka/pull/16070 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
gharris1727 commented on PR #15910: URL: https://github.com/apache/kafka/pull/15910#issuecomment-2127631801 Yeah, if a section like that doesn't exist yet we can start it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on PR #15910: URL: https://github.com/apache/kafka/pull/15910#issuecomment-2127624210 do you mean a PR for a notable entry in docs/upgrade.html ? adding a section like ``` Notable changes in 3.7.1 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
gharris1727 commented on PR #15910: URL: https://github.com/apache/kafka/pull/15910#issuecomment-2127588361 From my earlier comment: > I think you can backport this once you have a full release note written that can be backported at the same time. Please open a PR for the release note first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on PR #15910: URL: https://github.com/apache/kafka/pull/15910#issuecomment-2127581875 @gharris1727 we would like to backport to 3.7 - ok ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar merged PR #15910: URL: https://github.com/apache/kafka/pull/15910 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
gharris1727 commented on PR #15910: URL: https://github.com/apache/kafka/pull/15910#issuecomment-2125340147 @edoardocomar Since we're not substantially changing that class, I think it's acceptable to keep the old visibility or add the suppression, rather than fix the this-escape. It's up to you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on PR #15910: URL: https://github.com/apache/kafka/pull/15910#issuecomment-2125147371 Hi @gharris1727 ... more about warnings. there are two Java21 compiler warnings that result in a compile failure ``` [2024-05-22T02:09:24.247Z] > Task :connect:mirror:compileJava [2024-05-22T02:09:24.247Z] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15910@2/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:78: warning: [this-escape] possible 'this' escape before subclass is fully initialized [2024-05-22T02:09:24.247Z] store = createBackingStore(config, consumer, admin); [2024-05-22T02:09:24.247Z] ^ [2024-05-22T02:09:24.247Z] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-15910@2/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:94: warning: [this-escape] previous possible 'this' escape happens here via invocation [2024-05-22T02:09:24.247Z] (error, record) -> this.handleRecord(record), [2024-05-22T02:09:24.247Z] ^ [2024-05-22T02:09:24.247Z] error: warnings found and -Werror specified [2024-05-22T02:09:24.247Z] 1 error [2024-05-22T02:09:24.247Z] 2 warnings [2024-05-22T02:09:24.247Z] [2024-05-22T02:09:24.247Z] > Task :connect:mirror:compileJava FAILED ``` Do you agree that as we introduced overridable methods in the constructor for test usage, this time we should suppress these with the annotation @SuppressWarnings("this-escape") ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on PR #15910: URL: https://github.com/apache/kafka/pull/15910#issuecomment-2124241780 Thanks @gharris1727 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
gharris1727 commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1609062958 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -105,12 +106,19 @@ private KafkaBasedLog createBackingStore(MirrorCheckpointConfig /** * Start the OffsetSyncStore, blocking until all previous Offset Syncs have been read from backing storage. */ -public void start() { -backingStore.start(); +public void start(boolean initializationMustReadToEnd) { +this.initializationMustReadToEnd = initializationMustReadToEnd; +log.debug("OffsetSyncStore starting - must read to OffsetSync end = ", initializationMustReadToEnd); Review Comment: noticed one typo here, the log message actually doesn't print the boolean -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on PR #15910: URL: https://github.com/apache/kafka/pull/15910#issuecomment-2123528226 @gharris1727 I gave up and used the ugly try. That warning is not occurring in every test... But I went all the way in `OffsetSyncStoreTest` as I prefer consistency to beauty. Removed a couple of warnings in MirrorCheckpointTaskTest too in the second commit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
gharris1727 commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1608965614 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java: ## @@ -82,77 +86,131 @@ public void testOffsetTranslation() { @Test public void testNoTranslationIfStoreNotStarted() { -try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { -// no offsets exist and store is not started -assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); -assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); -assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); +FakeOffsetSyncStore store = new FakeOffsetSyncStore() { Review Comment: > if you approve I'd also backport the fix to 3.7 I'm on the fence about that, leaning towards yes. I regret backporting KAFKA-12468 so far and introducing this issue, and I didn't communicate it properly to users. I think you can backport this once you have a full release note written that can be backported at the same time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
gharris1727 commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1608959573 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java: ## @@ -82,77 +86,131 @@ public void testOffsetTranslation() { @Test public void testNoTranslationIfStoreNotStarted() { -try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { -// no offsets exist and store is not started -assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); -assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); -assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); +FakeOffsetSyncStore store = new FakeOffsetSyncStore() { Review Comment: I have somewhat strong feelings, I wouldn't call them very strong. If someone noticed this IDE warning and created a ticket and a PR to fix it, I would review that. Am I going to make the build enforce this warning? No, but I have seen other situations where the warning did point out real resource leaks... I just wanted to save the effort required to go and rework this later, and prevent this PR from introducing an easily avoidable warning. I agree with you about suppressing warnings, I don't think that is a healthy practice to have. I just tried making this a try-with-resources and the indenting turned out fine. The body of backingStoreStart is at the exact same indentation as it is currently. ``` try (FakeOffsetSyncStore store = new FakeOffsetSyncStore() { @Override void backingStoreStart() { // read a sync during startup sync(tp, 100, 200); assertEquals(OptionalLong.empty(), translateDownstream(null, tp, 0)); assertEquals(OptionalLong.empty(), translateDownstream(null, tp, 100)); assertEquals(OptionalLong.empty(), translateDownstream(null, tp, 200)); } }) { // no offsets exist and store is not started assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); // After the store is started all offsets are visible store.start(true); assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 0)); assertEquals(OptionalLong.of(200), store.translateDownstream(null, tp, 100)); assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 200)); } ``` Here's the Consumer alternative I thought about, which uses one less indentation level at the cost of a variable, a field, and two constructors: ``` Consumer init = store -> { // read a sync during startup store.sync(tp, 100, 200); assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); }; try (FakeOffsetSyncStore store = new FakeOffsetSyncStore(init)) { // no offsets exist and store is not started assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); // After the store is started all offsets are visible store.start(true); assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 0)); assertEquals(OptionalLong.of(200), store.translateDownstream(null, tp, 100)); assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 200)); } ``` Either of these is preferable to having the warning or suppressing it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1608907673 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java: ## @@ -82,77 +86,131 @@ public void testOffsetTranslation() { @Test public void testNoTranslationIfStoreNotStarted() { -try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { -// no offsets exist and store is not started -assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); -assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); -assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); +FakeOffsetSyncStore store = new FakeOffsetSyncStore() { Review Comment: on another note, if you approve I'd also backport the fix to 3.7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1608896699 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java: ## @@ -82,77 +86,131 @@ public void testOffsetTranslation() { @Test public void testNoTranslationIfStoreNotStarted() { -try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { -// no offsets exist and store is not started -assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); -assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); -assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); +FakeOffsetSyncStore store = new FakeOffsetSyncStore() { Review Comment: Hi @gharris1727 I use IntelliJ too and saw the warning. I could have used a `@suppress` annotation but I am very reluctant to make code less readable because of limited insight by linters. Similarly to make the fake store more complex. Using try-with-resource with a local class results in horrible indentation as you said. I don't share a strong worry of future leaks in testing - seems speculative to me. In this instance unless you have very strong feelings, I'd really leave the test as-is -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1608896699 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java: ## @@ -82,77 +86,131 @@ public void testOffsetTranslation() { @Test public void testNoTranslationIfStoreNotStarted() { -try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) { -// no offsets exist and store is not started -assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0)); -assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100)); -assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200)); +FakeOffsetSyncStore store = new FakeOffsetSyncStore() { Review Comment: Hi @gharris1727 I use IntelliJ too and see the warning. I am reluctant to -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on PR #15910: URL: https://github.com/apache/kafka/pull/15910#issuecomment-2122938849 @gharris1727 please review, thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1608522984 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java: ## @@ -155,6 +154,58 @@ public void testPastOffsetTranslation() { } } +// this test has been wriiten knowing the exact offsets syncs stored +@Test +public void testPastOffsetTranslationWithoutInitializationReadToEnd() { +final int maxOffsetLag = 10; + +FakeOffsetSyncStore store = new FakeOffsetSyncStore() { +@Override +void backingStoreStart() { +for (int offset = 0; offset <= 1000; offset += maxOffsetLag) { +sync(tp, offset, offset); +assertSparseSyncInvariant(this, tp); +} +} +}; + +store.start(false); + +// After starting but before seeing new offsets +assertTranslationsNearby(store, 400, 480, 0); +assertTranslationsNearby(store, 500, 720, 480); +assertTranslationsNearby(store, 1000, 1000, 990); + +for (int offset = 1000; offset <= 1; offset += maxOffsetLag) { +store.sync(tp, offset, offset); +assertSparseSyncInvariant(store, tp); +} + +// After seeing new offsets, 1000 was kicked out of the store, so +// 1000 can only be traslated to 1, only previously stored offset is 0 +assertTranslationsNearby(store, 1000, 3840, 0); + +// We can translate offsets between the latest startup offset and the latest offset with variable precision +// Older offsets are less precise and translation ends up farther apart +assertTranslationsNearby(store, 3840, 3840, 0); +assertTranslationsNearby(store, 7680, 7680, 3840); +assertTranslationsNearby(store, 8640, 8640, 7680); +assertTranslationsNearby(store, 9120, 9120, 8640); +assertTranslationsNearby(store, 9600, 9600, 9120); +assertTranslationsNearby(store, 9840, 9840, 9600); +assertTranslationsNearby(store, 9900, 9900, 9840); +assertTranslationsNearby(store, 9960, 9960, 9900); +assertTranslationsNearby(store, 9990, 9990, 9960); +assertTranslationsNearby(store, 1, 1, 9990); Review Comment: brilliant suggestion! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1608522984 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java: ## @@ -155,6 +154,58 @@ public void testPastOffsetTranslation() { } } +// this test has been wriiten knowing the exact offsets syncs stored +@Test +public void testPastOffsetTranslationWithoutInitializationReadToEnd() { +final int maxOffsetLag = 10; + +FakeOffsetSyncStore store = new FakeOffsetSyncStore() { +@Override +void backingStoreStart() { +for (int offset = 0; offset <= 1000; offset += maxOffsetLag) { +sync(tp, offset, offset); +assertSparseSyncInvariant(this, tp); +} +} +}; + +store.start(false); + +// After starting but before seeing new offsets +assertTranslationsNearby(store, 400, 480, 0); +assertTranslationsNearby(store, 500, 720, 480); +assertTranslationsNearby(store, 1000, 1000, 990); + +for (int offset = 1000; offset <= 1; offset += maxOffsetLag) { +store.sync(tp, offset, offset); +assertSparseSyncInvariant(store, tp); +} + +// After seeing new offsets, 1000 was kicked out of the store, so +// 1000 can only be traslated to 1, only previously stored offset is 0 +assertTranslationsNearby(store, 1000, 3840, 0); + +// We can translate offsets between the latest startup offset and the latest offset with variable precision +// Older offsets are less precise and translation ends up farther apart +assertTranslationsNearby(store, 3840, 3840, 0); +assertTranslationsNearby(store, 7680, 7680, 3840); +assertTranslationsNearby(store, 8640, 8640, 7680); +assertTranslationsNearby(store, 9120, 9120, 8640); +assertTranslationsNearby(store, 9600, 9600, 9120); +assertTranslationsNearby(store, 9840, 9840, 9600); +assertTranslationsNearby(store, 9900, 9900, 9840); +assertTranslationsNearby(store, 9960, 9960, 9900); +assertTranslationsNearby(store, 9990, 9990, 9960); +assertTranslationsNearby(store, 1, 1, 9990); Review Comment: brilliant! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1608507355 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java: ## @@ -119,7 +118,7 @@ public void testPastOffsetTranslation() { store.sync(tp, offset, offset); assertSparseSyncInvariant(store, tp); Review Comment: ok - we can assert sync is only called after start -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1608471528 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java: ## @@ -271,4 +284,102 @@ private Map assertCheckpointForTopic( assertEquals(truth, checkpoints.containsKey(remoteTp), "should" + (truth ? "" : " not") + " emit offset sync"); return checkpoints; } + +@Test +public void testCheckpointsTaskRestartUsesExistingCheckpoints() { Review Comment: Thanks - we fixed the reassignments. We already load the OffsetSyncStore with different OffsetSync, but we think the CheckpointStore at restart of the task should contain the exact last checkpoint emitted by the previous instance of the task -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1608024060 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/CheckpointsStore.java: ## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.TopicAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.kafka.connect.mirror.MirrorCheckpointConfig.CHECKPOINTS_TARGET_CONSUMER_ROLE; + +/** + * Reads once the Kafka log for checkpoints and populates a map of + * checkpoints per consumer group. + * + * The Kafka log is closed after the initial load and only the in memory map is + * used after start. + */ +class CheckpointsStore implements AutoCloseable { Review Comment: Thanks for the feedback. The general practice is known, but we made this class package-local because so is the existing OffsetSyncStore - will revert CheckpointsStore and also make OffsetSyncStore public for consistency -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1608024060 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/CheckpointsStore.java: ## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.TopicAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.kafka.connect.mirror.MirrorCheckpointConfig.CHECKPOINTS_TARGET_CONSUMER_ROLE; + +/** + * Reads once the Kafka log for checkpoints and populates a map of + * checkpoints per consumer group. + * + * The Kafka log is closed after the initial load and only the in memory map is + * used after start. + */ +class CheckpointsStore implements AutoCloseable { Review Comment: Thanks this usage of access is clear, but we made this class package-local because so is the existing OffsetSyncStore - will revert -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
gharris1727 commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1605384701 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/CheckpointsStore.java: ## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.TopicAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.kafka.connect.mirror.MirrorCheckpointConfig.CHECKPOINTS_TARGET_CONSUMER_ROLE; + +/** + * Reads once the Kafka log for checkpoints and populates a map of + * checkpoints per consumer group. + * + * The Kafka log is closed after the initial load and only the in memory map is + * used after start. + */ +class CheckpointsStore implements AutoCloseable { Review Comment: optional nit: This class can be public, along with the methods that are intended to be used by MirrorCheckpointTask, because this isn't a publically-documented package (like clients, or connect-api, etc.) Outside of those publically-documented packages, the general practice is public for external callers, even if the current callers are in the same package. We only use package-local for things that would be protected/private, but need to be accessed in tests (and so come with the visibility comment.) ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java: ## @@ -271,4 +284,102 @@ private Map assertCheckpointForTopic( assertEquals(truth, checkpoints.containsKey(remoteTp), "should" + (truth ? "" : " not") + " emit offset sync"); return checkpoints; } + +@Test +public void testCheckpointsTaskRestartUsesExistingCheckpoints() { Review Comment: I think using "real checkpoints" generated by the first MirrorCheckpointTask to test the second MirrorCheckpointTask is not necessary, and you can use simulated checkpoints instead. Reassigning variables and copy-pasting sections in tests is typo-prone and I think we can avoid it here. ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java: ## @@ -155,6 +154,58 @@ public void testPastOffsetTranslation() { } } +// this test has been wriiten knowing the exact offsets syncs stored +@Test +public void testPastOffsetTranslationWithoutInitializationReadToEnd() { +final int maxOffsetLag = 10; + +FakeOffsetSyncStore store = new FakeOffsetSyncStore() { +@Override +void backingStoreStart() { +for (int offset = 0; offset <= 1000; offset += maxOffsetLag) { +sync(tp, offset, offset); +assertSparseSyncInvariant(this, tp); +} +} +}; + +store.start(false); + +// After starting but before seeing new offsets +assertTranslationsNearby(store, 400, 480, 0); +assertTranslationsNearby(store, 500, 720, 480); +assertTranslationsNearby(store, 1000, 1000, 990); + +for (int offset = 1000; offset <= 1; offset += maxOffsetLag) { +store.sync(tp, offset, offset); +assertSparseSyncInvariant(store, tp); +} + +// After seeing new offsets, 1000 was kicked out of the store, so +// 1000 can only be traslated to 1, only previously stored offset is 0 +assertTranslationsNearby(store, 1000, 3840, 0); + +// We can translate offsets between the latest startup offset and the latest offset
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on PR #15910: URL: https://github.com/apache/kafka/pull/15910#issuecomment-2120792114 Hi @gharris1727 if you have the time, can you please have a look again ? thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
prestona commented on PR #15910: URL: https://github.com/apache/kafka/pull/15910#issuecomment-2118027385 Hi @gharris1727, hopefully the latest commits address your review comments. Once again, really appreciate all your feedback and suggestions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1604720566 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -105,7 +106,10 @@ private KafkaBasedLog createBackingStore(MirrorCheckpointConfig /** * Start the OffsetSyncStore, blocking until all previous Offset Syncs have been read from backing storage. */ -public void start() { +public void start(boolean initializationMustReadToEnd) { +this.initializationMustReadToEnd = initializationMustReadToEnd; +log.info("OffsetSyncStore initializationMustReadToEnd:{}{}", initializationMustReadToEnd, +initializationMustReadToEnd ? " - fewer checkpoints may be emitted" : ""); Review Comment: thanks we reworded the message where we catch the error, though we didn't write the offsets as there may be many groups handled by a task -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
prestona commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1603914389 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/CheckpointsStore.java: ## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.TopicAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.kafka.connect.mirror.MirrorCheckpointConfig.CHECKPOINTS_TARGET_CONSUMER_ROLE; + +/** + * Reads once the Kafka log for checkpoints and populates a map of + * checkpoints per consumer group + */ +public class CheckpointsStore implements AutoCloseable { + +private static final Logger log = LoggerFactory.getLogger(CheckpointsStore.class); + +private final MirrorCheckpointTaskConfig config; +private final Set consumerGroups; + +private TopicAdmin cpAdmin = null; +private KafkaBasedLog backingStore = null; +private Map> checkpointsPerConsumerGroup; + +private volatile boolean loadSuccess = false; +private volatile boolean isInitialized = false; + +public CheckpointsStore(MirrorCheckpointTaskConfig config, Set consumerGroups) { +this.config = config; +this.consumerGroups = new HashSet<>(consumerGroups); +} + +// for testing +CheckpointsStore(Map> checkpointsPerConsumerGroup) { +this.config = null; +this.consumerGroups = null; +this.checkpointsPerConsumerGroup = checkpointsPerConsumerGroup; +isInitialized = true; +loadSuccess = true; +} + +// potentially long running +public void start() { +checkpointsPerConsumerGroup = readCheckpoints(); +isInitialized = true; +log.trace("Checkpoints store content : {}", checkpointsPerConsumerGroup); +} + +public boolean loadSuccess() { +return loadSuccess; +} + +public boolean isInitialized() { +return isInitialized; +} + + +// return a mutable map - it is expected to be mutated by the Task +public Map> contents() { Review Comment: Appreciate the feedback. We oscillated forward and backwards on whether to break encapsulation or risk `CheckpointStore` increasingly implementing all the methods of `Map`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
prestona commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1603914389 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/CheckpointsStore.java: ## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.TopicAdmin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import static org.apache.kafka.connect.mirror.MirrorCheckpointConfig.CHECKPOINTS_TARGET_CONSUMER_ROLE; + +/** + * Reads once the Kafka log for checkpoints and populates a map of + * checkpoints per consumer group + */ +public class CheckpointsStore implements AutoCloseable { + +private static final Logger log = LoggerFactory.getLogger(CheckpointsStore.class); + +private final MirrorCheckpointTaskConfig config; +private final Set consumerGroups; + +private TopicAdmin cpAdmin = null; +private KafkaBasedLog backingStore = null; +private Map> checkpointsPerConsumerGroup; + +private volatile boolean loadSuccess = false; +private volatile boolean isInitialized = false; + +public CheckpointsStore(MirrorCheckpointTaskConfig config, Set consumerGroups) { +this.config = config; +this.consumerGroups = new HashSet<>(consumerGroups); +} + +// for testing +CheckpointsStore(Map> checkpointsPerConsumerGroup) { +this.config = null; +this.consumerGroups = null; +this.checkpointsPerConsumerGroup = checkpointsPerConsumerGroup; +isInitialized = true; +loadSuccess = true; +} + +// potentially long running +public void start() { +checkpointsPerConsumerGroup = readCheckpoints(); +isInitialized = true; +log.trace("Checkpoints store content : {}", checkpointsPerConsumerGroup); +} + +public boolean loadSuccess() { +return loadSuccess; +} + +public boolean isInitialized() { +return isInitialized; +} + + +// return a mutable map - it is expected to be mutated by the Task +public Map> contents() { Review Comment: Appreciate the feedback. We oscillated forward and backwards on whether to break encapsulation (at the risk of ´CheckpointStore` increasingly implementing all the methods of `Map`). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
gharris1727 commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1603861223 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ## @@ -395,7 +401,7 @@ void syncGroupOffset(String consumerGroupId, Map> getConvertedUpstreamOffset() { Map> result = new HashMap<>(); -for (Entry> entry : checkpointsPerConsumerGroup.entrySet()) { +for (Entry> entry : checkpointsStore.contents().entrySet()) { String consumerId = entry.getKey(); Review Comment: this getConvertedUpstreamOffset could be moved to CheckpointStore since it only depends on the checkpoint store state. ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ## @@ -69,20 +69,22 @@ public class MirrorCheckpointTask extends SourceTask { private MirrorCheckpointMetrics metrics; private Scheduler scheduler; private Map> idleConsumerGroupsOffset; -private Map> checkpointsPerConsumerGroup; +private CheckpointsStore checkpointsStore; + public MirrorCheckpointTask() {} // for testing MirrorCheckpointTask(String sourceClusterAlias, String targetClusterAlias, -ReplicationPolicy replicationPolicy, OffsetSyncStore offsetSyncStore, -Map> idleConsumerGroupsOffset, -Map> checkpointsPerConsumerGroup) { + ReplicationPolicy replicationPolicy, OffsetSyncStore offsetSyncStore, Set consumerGroups, + Map> idleConsumerGroupsOffset, + CheckpointsStore checkpointsStore) { Review Comment: nit: indenting ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -105,7 +106,10 @@ private KafkaBasedLog createBackingStore(MirrorCheckpointConfig /** * Start the OffsetSyncStore, blocking until all previous Offset Syncs have been read from backing storage. */ -public void start() { +public void start(boolean initializationMustReadToEnd) { +this.initializationMustReadToEnd = initializationMustReadToEnd; +log.info("OffsetSyncStore initializationMustReadToEnd:{}{}", initializationMustReadToEnd, +initializationMustReadToEnd ? " - fewer checkpoints may be emitted" : ""); Review Comment: nit: Make this more verbose and user-oriented. They don't care that the variable is called initializationMustReadToEnd, and "Must read to end" is a very technical description of what is happening here. Specify more precisely which checkpoints aren't being emitted. Fewer could mean every other one, but it's actually offsets which were mirrored before the task started. Actually, this message works better if you put it after the backingStore.start() call: You can print out the oldest offset sync to say that translation is starting there, and whether this is limited by the initialization setting. ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java: ## @@ -39,10 +39,12 @@ static class FakeOffsetSyncStore extends OffsetSyncStore { super(); } -@Override -public void start() { Review Comment: bump on this comment, now that we're converging on the final design. ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ## @@ -172,7 +178,7 @@ private List sourceRecordsForGroup(String group) throws Interrupte long timestamp = System.currentTimeMillis(); Map upstreamGroupOffsets = listConsumerGroupOffsets(group); Map newCheckpoints = checkpointsForGroup(upstreamGroupOffsets, group); -Map oldCheckpoints = checkpointsPerConsumerGroup.computeIfAbsent(group, ignored -> new HashMap<>()); +Map oldCheckpoints = checkpointsStore.contents().computeIfAbsent(group, ignored -> new HashMap<>()); oldCheckpoints.putAll(newCheckpoints); Review Comment: Move this to a new `CheckpointsStore#emitCheckpoints(Map)` method ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ## @@ -195,7 +201,7 @@ Map checkpointsForGroup(Map checkpoints = checkpointsPerConsumerGroup.get(checkpoint.consumerGroupId()); +Map checkpoints = checkpointsStore.contents().get(checkpoint.consumerGroupId()); if (checkpoints == null) { log.trace("Emitting {} (first for this group)", checkpoint); return true; Review Comment: This can be moved to a new `CheckpointStore#get(String, TopicPartition)` method. ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/CheckpointsStore.java: ## @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + *
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on PR #15910: URL: https://github.com/apache/kafka/pull/15910#issuecomment-2115715035 Hi @gharris1727 we worked out the asynchronous loading using a wrapper to the checkpointsPerGroupMap. however when testing with different level of authorizations to see the fallback behaviour, the simplest approach was to have the callback rethrow. It's all encapsulated so it's not spoiling the task IMHO ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1602265328 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -105,7 +106,13 @@ private KafkaBasedLog createBackingStore(MirrorCheckpointConfig /** * Start the OffsetSyncStore, blocking until all previous Offset Syncs have been read from backing storage. */ -public void start() { +public void start(boolean initializationMustReadToEnd) { +this.initializationMustReadToEnd = initializationMustReadToEnd; +if (initializationMustReadToEnd) { +log.warn("OffsetSyncStore initializationMustReadToEnd = {}", initializationMustReadToEnd); Review Comment: I think that getting the fallback degraded behavior is not the normal case and therefore I strongly prefer a warn here. However here we do know the reason why we are getting this - so we should explain that in the record handler as suggested -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
prestona commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1602244281 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ## @@ -103,10 +113,11 @@ public void start(Map props) { targetAdminClient = config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin")); metrics = config.metrics(); idleConsumerGroupsOffset = new HashMap<>(); -checkpointsPerConsumerGroup = new HashMap<>(); +Optional>> checkpoints = readCheckpoints(config); Review Comment: Thanks for the explanation, and suggestion. We'll take another look at reading the checkpoint topic asynchronously from the start() method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
prestona commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1602234146 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ## @@ -116,6 +127,73 @@ public void start(Map props) { consumerGroups.size(), sourceClusterAlias, config.targetClusterAlias(), consumerGroups); } +// read the checkpoints topic to initialize the checkpointsPerConsumerGroup state of this task +// the callback may only handle errors thrown by consumer.poll in KafkaBasedLog +// e.g. unauthorized to read from topic (non-retriable) +// if any are encountered, treat the loading of Checkpoints as failed. +Optional>> readCheckpoints(MirrorCheckpointTaskConfig config) { +AtomicBoolean successful = new AtomicBoolean(true); +Map> checkpoints = new HashMap<>(); +Callback> consumedCallback = new Callback>() { +@Override +public void onCompletion(Throwable error, ConsumerRecord cpRecord) { +if (error != null && successful.getAndSet(false)) { +log.error("Error loading Checkpoint topic", error); Review Comment: If we special case not authorized (as above), then the main reasons for hitting this are (hopefully?) transitory problems - for example: all brokers being down when the connector is first started. I agree that this should be a warning with a better explanation of impact. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
prestona commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1602207815 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -105,7 +106,13 @@ private KafkaBasedLog createBackingStore(MirrorCheckpointConfig /** * Start the OffsetSyncStore, blocking until all previous Offset Syncs have been read from backing storage. */ -public void start() { +public void start(boolean initializationMustReadToEnd) { +this.initializationMustReadToEnd = initializationMustReadToEnd; +if (initializationMustReadToEnd) { +log.warn("OffsetSyncStore initializationMustReadToEnd = {}", initializationMustReadToEnd); Review Comment: Our intention was to alert the Kafka admin that they are getting degraded behavior. However, reflecting on this, I wonder if would be better to special case "not authorized" such that: 1. For not authorized we emit a warning (with a better worded explanation) - because the Kafka admin can choose to take an action that improves the frequency of checkpoints. 1. For other cases, log at debug level. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
gharris1727 commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1602156155 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ## @@ -103,10 +113,11 @@ public void start(Map props) { targetAdminClient = config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin")); metrics = config.metrics(); idleConsumerGroupsOffset = new HashMap<>(); -checkpointsPerConsumerGroup = new HashMap<>(); +Optional>> checkpoints = readCheckpoints(config); Review Comment: This is a potentially long blocking operation, and those should be avoided in start() methods because while the task is starting, it can't be stopped, and if the task can't be stopped within `task.shutdown.graceful.timeout.ms` it is aggressively cancelled. Since the main thread needs the result from readCheckpoints, I think it would be fine to check if it's been loaded and if not, just return an empty poll(). ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ## @@ -116,6 +127,73 @@ public void start(Map props) { consumerGroups.size(), sourceClusterAlias, config.targetClusterAlias(), consumerGroups); } +// read the checkpoints topic to initialize the checkpointsPerConsumerGroup state of this task +// the callback may only handle errors thrown by consumer.poll in KafkaBasedLog +// e.g. unauthorized to read from topic (non-retriable) +// if any are encountered, treat the loading of Checkpoints as failed. +Optional>> readCheckpoints(MirrorCheckpointTaskConfig config) { +AtomicBoolean successful = new AtomicBoolean(true); +Map> checkpoints = new HashMap<>(); +Callback> consumedCallback = new Callback>() { +@Override +public void onCompletion(Throwable error, ConsumerRecord cpRecord) { +if (error != null && successful.getAndSet(false)) { +log.error("Error loading Checkpoint topic", error); Review Comment: I'm on the fence whether this should be error or warn. It it something that the user _must_ address? I'm not so sure. I do think that this should have an actionable recommendation, or an explanation that the task is gracefully degrading because of this. ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -105,7 +106,13 @@ private KafkaBasedLog createBackingStore(MirrorCheckpointConfig /** * Start the OffsetSyncStore, blocking until all previous Offset Syncs have been read from backing storage. */ -public void start() { +public void start(boolean initializationMustReadToEnd) { +this.initializationMustReadToEnd = initializationMustReadToEnd; +if (initializationMustReadToEnd) { +log.warn("OffsetSyncStore initializationMustReadToEnd = {}", initializationMustReadToEnd); Review Comment: debug level, this is not worth warning about. :+1: for the variable name -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on PR #15910: URL: https://github.com/apache/kafka/pull/15910#issuecomment-2112731563 Hi @gharris1727 commit e33edd2a72 hopefully address most of your comments. Thanks for the quick feedback. We also noticed that the loading of the checkpoints must complete before the task start method completes. This to avoid the checkpointsPerConsumerGroup map to be accessed during the task active polls and not be completely initialized. So we moved that before scheduler.execute -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
gharris1727 commented on code in PR #15910: URL: https://github.com/apache/kafka/pull/15910#discussion_r1600452206 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ## @@ -116,6 +125,71 @@ public void start(Map props) { consumerGroups.size(), sourceClusterAlias, config.targetClusterAlias(), consumerGroups); } +// read the checkpoints topic to initialize the checkpointsPerConsumerGroup state of this task +private boolean initializeCheckpoints(MirrorCheckpointTaskConfig config) { + +class CheckpointRecordHandler { +private volatile KafkaException lastLoggedErrorReadingCheckpoints = null; + +void handle(Throwable error, ConsumerRecord cpRecord) { +// See KafkaBasedLog.poll : only KafkaException can be passed as error +if (error instanceof KafkaException) { +// only log once +if (lastLoggedErrorReadingCheckpoints == null || !lastLoggedErrorReadingCheckpoints.getClass().equals(error.getClass())) { +log.error("Error loading Checkpoint topic", error); +lastLoggedErrorReadingCheckpoints = (KafkaException) error; +} + +if (error instanceof RetriableException) { +return; +} else { +throw (KafkaException) error; +} +} else { // error is null +lastLoggedErrorReadingCheckpoints = null; +Checkpoint cp = Checkpoint.deserializeRecord(cpRecord); Review Comment: deserialization can fail due to bad data in the topic ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ## @@ -116,6 +125,71 @@ public void start(Map props) { consumerGroups.size(), sourceClusterAlias, config.targetClusterAlias(), consumerGroups); } +// read the checkpoints topic to initialize the checkpointsPerConsumerGroup state of this task +private boolean initializeCheckpoints(MirrorCheckpointTaskConfig config) { + +class CheckpointRecordHandler { +private volatile KafkaException lastLoggedErrorReadingCheckpoints = null; + +void handle(Throwable error, ConsumerRecord cpRecord) { +// See KafkaBasedLog.poll : only KafkaException can be passed as error +if (error instanceof KafkaException) { +// only log once +if (lastLoggedErrorReadingCheckpoints == null || !lastLoggedErrorReadingCheckpoints.getClass().equals(error.getClass())) { +log.error("Error loading Checkpoint topic", error); +lastLoggedErrorReadingCheckpoints = (KafkaException) error; +} + +if (error instanceof RetriableException) { +return; +} else { +throw (KafkaException) error; +} +} else { // error is null +lastLoggedErrorReadingCheckpoints = null; +Checkpoint cp = Checkpoint.deserializeRecord(cpRecord); +if (consumerGroups.contains(cp.consumerGroupId())) { +Map cps = checkpointsPerConsumerGroup.computeIfAbsent(cp.consumerGroupId(), ignored -> new HashMap<>()); +cps.put(cp.topicPartition(), cp); +} +} +} +} + +CheckpointRecordHandler handler = new CheckpointRecordHandler(); +TopicAdmin cpAdmin = null; +KafkaBasedLog previousCheckpoints = null; + +try { +cpAdmin = new TopicAdmin( +config.targetAdminConfig("checkpoint-target-admin"), + config.forwardingAdmin(config.targetAdminConfig("checkpoint-target-admin"))); + +previousCheckpoints = KafkaBasedLog.withExistingClients( +config.checkpointsTopic(), + MirrorUtils.newConsumer(config.targetConsumerConfig(CHECKPOINTS_TARGET_CONSUMER_ROLE)), +null, +cpAdmin, +(error, cpRecord) -> handler.handle(error, cpRecord), +Time.SYSTEM, +ignored -> { }, +topicPartition -> topicPartition.partition() == 0); + +log.info("Starting loading Checkpoint topic : {}", config.checkpointsTopic()); +previousCheckpoints.start(true); +previousCheckpoints.stop(); +log.info("Finished loading Checkpoint topic : {}", config.checkpointsTopic()); +log.debug("Initial checkpointsPerConsumerGroup : {}", checkpointsPerConsumerGroup); +return true; +} catch (KafkaException kexc)
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on PR #15910: URL: https://github.com/apache/kafka/pull/15910#issuecomment-2110419872 looks like we should rename this PR as addressing mainly https://issues.apache.org/jira/browse/KAFKA-16622 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on PR #15910: URL: https://github.com/apache/kafka/pull/15910#issuecomment-2110417331 testing results in the following scenario: - produce 1 records - consume them 250 at a time - stop mm2 - produce 1 records - restart consuming - restart mm2 Emitted Checkpoints: ``` % bin/kafka-console-consumer.sh --bootstrap-server localhost:9192 \ --topic source.checkpoints.internal \ --formatter org.apache.kafka.connect.mirror.formatters.CheckpointFormatter \ --from-beginning ``` NEW implementation with checkpoints read by Checkpoint task ``` Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=500, downstreamOffset=1, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=6250, downstreamOffset=5820, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=7250, downstreamOffset=7228, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=8750, downstreamOffset=8658, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=9500, downstreamOffset=9362, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=9750, downstreamOffset=9714, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=1, downstreamOffset=1, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=12250, downstreamOffset=11519, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=14500, downstreamOffset=14335, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=17500, downstreamOffset=17195, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=18750, downstreamOffset=18603, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19500, downstreamOffset=19307, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19750, downstreamOffset=19659, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=2, downstreamOffset=2, metadata=} ``` NEW implementation with checkpoints FAILED read by Checkpoint task ``` Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=1, downstreamOffset=1, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=11250, downstreamOffset=1, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=16000, downstreamOffset=15919, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=17500, downstreamOffset=17349, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19000, downstreamOffset=18757, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19500, downstreamOffset=19461, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=2, downstreamOffset=2, metadata=} ``` Original implementation (prior to this PR) ``` Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=1, downstreamOffset=1, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=11250, downstreamOffset=1, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=16250, downstreamOffset=16249, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=18000, downstreamOffset=17987, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19000, downstreamOffset=18768, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=19750, downstreamOffset=19747, metadata=} Checkpoint{consumerGroupId=mygroup1, topicPartition=source.mytopic-0, upstreamOffset=2, downstreamOffset=2, metadata=} ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on PR #15910: URL: https://github.com/apache/kafka/pull/15910#issuecomment-2109896449 @gharris1727 thanks for your feedback. we've added another commit to allow for the old OffsetSyncs load behavior in case the task cannot read the checkpoints -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
gharris1727 commented on PR #15910: URL: https://github.com/apache/kafka/pull/15910#issuecomment-2104915418 @edoardocomar In general connectors do have to add a configuration like this eventually, because users have different tolerances for errors. Some users want the errors to cause the connector to become FAILED, so that they can see the exception in the REST API and retry it explicitly. Other users want the connector to retry internally infinitely, and not fail for any reason. MM2 has a _lot_ of operations that can fail, and virtually none of them cause the connector to fail. The reason for this is that MM2 has dedicated mode, where there isn't a REST API to surface errors or perform external retries, so external retries are very expensive. It is definitely something that could be fixed eventually with like a "strict mode"? configuration or similar. We've also considered ways to address this from the framework side, with retry policies and automatic restarts, but none of that has been fully designed or implemented yet. I think we should not block this fix on solving that more general problem. If there is a permissions error loading the checkpoints, MM2 should log that, and then degrade gracefully to the current behavior. We can have a KIP that adds "strict mode" make this failure surface, to make this new permission required. In practical terms, without a configuration and with the graceful degradation implementation, we can get this into 3.8. If you're interested in the configuration, that will delay this feature until 4.0. I'm fine with either, but I think the current behavior has caused such considerable friction in the community that we should prefer a 3.8 release. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar commented on PR #15910: URL: https://github.com/apache/kafka/pull/15910#issuecomment-2104835965 Hi @gharris1727 we're now handling errors in loading the Checkpoints topic. (we still have to add unit tests) Specifically we tested with the not authorized to read case - which the existing KafkaBasedLog was not handling well. At this current stage the task start would fail, which to us seems an improvement as it is detectable and actionable (expecting the change to be noted in the release notes). This looks to us a better behavior than reverting to the old one in case of failure, as maintaining and testing two modes of operation seems too complex. Do you still think we need a KIP - to introduce yet another config to choose between the old behavior (default) and the new one (arguably better in the eyes of this PR authors ...) ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
gharris1727 commented on PR #15910: URL: https://github.com/apache/kafka/pull/15910#issuecomment-2103188026 Hey @edoardocomar and @prestona thanks for the PR! One of the reasons I thought this might require a KIP is because it requires additional permissions that the current MM2 doesn't need: If an operator has already configured ACLs such that MM2 has write permissions for the checkpoints topic but no read permissions, it could be operating today and then failing after an upgrade with this change. I don't know if that is a common configuration or even a recommended one, but it does seem possible in the wild. Perhaps this can be configuration-less and backwards-compatible if we fallback to the old behavior if reading the checkpoints fails for any reason, including insufficient permissions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently i… [kafka]
edoardocomar opened a new pull request, #15910: URL: https://github.com/apache/kafka/pull/15910 KAFKA-15905 Restarts of MirrorCheckpointTask should not permanently interrupt offset translation MirrorCheckpointTask reloads the last checkpoint at start, OffsetSyncStore stores OffsetSyncs before reading till end. Add test case simulating restarted task where the store is reinitialized with later OffsetSyncs and check that emitted Checkpoint do not rewind. Also addresses KAFKA-16622 Mirromaker2 first Checkpoint not emitted until consumer group fully catches up once because the OffsetSyncStore store is populated before reading to log end. Co-Authored-By: Adrian Preston -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org