[jira] [Updated] (KAFKA-12889) log clean group consider empty log segment to avoid empty log left
[ https://issues.apache.org/jira/browse/KAFKA-12889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] qiang Liu updated KAFKA-12889: -- Affects Version/s: 3.1.0 2.8.0 > log clean group consider empty log segment to avoid empty log left > -- > > Key: KAFKA-12889 > URL: https://issues.apache.org/jira/browse/KAFKA-12889 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Affects Versions: 0.10.1.1, 2.8.0, 3.1.0 >Reporter: qiang Liu >Priority: Trivial > > to avoid log index 4 byte relative offset overflow, log cleaner group check > log segments offset to make sure group offset range not exceed Int.MaxValue. > this offset check currentlly not cosider next is next log segment is empty, > so there will left empty log files every about 2^31 messages. > the left empty logs will be reprocessed every clean cycle, which will rewrite > it with same empty content, witch cause little no need io. > for __consumer_offsets topic, normally we can set cleanup.policy to > compact,delete to get rid of this. > my cluster is 0.10.1.1, but after aylize trunk code, it should has same > problem too. > > some of my left empty logs,(run ls -l) > -rw-r- 1 u g 0 Dec 16 2017 .index > -rw-r- 1 u g 0 Dec 16 2017 .log > -rw-r- 1 u g 0 Dec 16 2017 .timeindex > -rw-r- 1 u g 0 Jan 15 2018 002148249632.index > -rw-r- 1 u g 0 Jan 15 2018 002148249632.log > -rw-r- 1 u g 0 Jan 15 2018 002148249632.timeindex > -rw-r- 1 u g 0 Jan 27 2018 004295766494.index > -rw-r- 1 u g 0 Jan 27 2018 004295766494.log > -rw-r- 1 u g 0 Jan 27 2018 004295766494.timeindex > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12935) Flaky Test RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore
[ https://issues.apache.org/jira/browse/KAFKA-12935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361374#comment-17361374 ] A. Sophie Blee-Goldman commented on KAFKA-12935: Filed https://issues.apache.org/jira/browse/KAFKA-12936 > Flaky Test > RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore > > > Key: KAFKA-12935 > URL: https://issues.apache.org/jira/browse/KAFKA-12935 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > > {quote}java.lang.AssertionError: Expected: <0L> but: was <5005L> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at > org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(RestoreIntegrationTest.java:374) > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12936) In-memory stores are always restored from scratch after dropping out of the group
A. Sophie Blee-Goldman created KAFKA-12936: -- Summary: In-memory stores are always restored from scratch after dropping out of the group Key: KAFKA-12936 URL: https://issues.apache.org/jira/browse/KAFKA-12936 Project: Kafka Issue Type: Bug Components: streams Reporter: A. Sophie Blee-Goldman Whenever an in-memory store is closed, the actual store contents are garbage collected and the state will need to be restored from scratch if the task is reassigned and re-initialized. We introduced the recycling feature to prevent this from occurring when a task is transitioned from standby to active (or vice versa), but it's still possible for the in-memory state to be unnecessarily wiped out in the case the member has dropped out of the group. In this case, the onPartitionsLost callback is invoked, which will close all active tasks as dirty before the member rejoins the group. This means that all these tasks will need to be restored from scratch if they are reassigned back to this consumer. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12935) Flaky Test RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore
[ https://issues.apache.org/jira/browse/KAFKA-12935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361367#comment-17361367 ] A. Sophie Blee-Goldman commented on KAFKA-12935: Hm, actually, I guess that could be considered a bug in itself, or at least a flaw in the recycling feature – for persistent stores with ALOS, dropping out of the group only causes tasks to be closed dirty, it doesn't force them to be wiped out to restore from the changelog from scratch. But with in-memory stores, simply closing them is akin to physically wiping out the state directory for that task. Avoiding that was the basis for this recycling feature in the first place. This does kind of suck, but at least it should be a relatively rare event. I'm a bit worried about how much complexity it would introduce to the code to fix this "bug", but I'll at least file a ticket for it now and we can go from there > Flaky Test > RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore > > > Key: KAFKA-12935 > URL: https://issues.apache.org/jira/browse/KAFKA-12935 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > > {quote}java.lang.AssertionError: Expected: <0L> but: was <5005L> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at > org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(RestoreIntegrationTest.java:374) > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12935) Flaky Test RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore
[ https://issues.apache.org/jira/browse/KAFKA-12935?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361359#comment-17361359 ] A. Sophie Blee-Goldman commented on KAFKA-12935: Hm, yeah I think I've seen this fail once or twice before. I did look into it a bit a while back, and just could not figure out whether it was a possible bug or an issue with the test itself. My money's definitely on the latter, but it might be worth taking another look sometime if we have the chance. If there is a bug that this is uncovering, at least it would not be a correctness bug, only an annoyance in restoring when it's not necessary. I think it's more likely that the test is flaky because for example the consumer dropped out of the group and invoked onPartitionsLost, which would close the task as dirty and require restoring from the changelog > Flaky Test > RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore > > > Key: KAFKA-12935 > URL: https://issues.apache.org/jira/browse/KAFKA-12935 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > > {quote}java.lang.AssertionError: Expected: <0L> but: was <5005L> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at > org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(RestoreIntegrationTest.java:374) > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12851) Flaky Test RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable
[ https://issues.apache.org/jira/browse/KAFKA-12851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361312#comment-17361312 ] Matthias J. Sax commented on KAFKA-12851: - Failed again. > Flaky Test RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable > --- > > Key: KAFKA-12851 > URL: https://issues.apache.org/jira/browse/KAFKA-12851 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: A. Sophie Blee-Goldman >Priority: Blocker > Fix For: 3.0.0 > > > Failed twice on a [PR > build|https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10755/6/testReport/] > h3. Stacktrace > org.opentest4j.AssertionFailedError: expected: but was: at > org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at > org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at > org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:35) at > org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:162) at > org.apache.kafka.raft.RaftEventSimulationTest.canMakeProgressIfMajorityIsReachable(RaftEventSimulationTest.java:263) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12934) Move some controller classes to the metadata package
[ https://issues.apache.org/jira/browse/KAFKA-12934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-12934. -- Fix Version/s: 3.0.0 Reviewer: Jason Gustafson Resolution: Fixed > Move some controller classes to the metadata package > > > Key: KAFKA-12934 > URL: https://issues.apache.org/jira/browse/KAFKA-12934 > Project: Kafka > Issue Type: Improvement > Components: controller >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Minor > Labels: kip-500 > Fix For: 3.0.0 > > > Move some controller classes to the metadata package so that they can be used > with broker snapshots. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12935) Flaky Test RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore
Matthias J. Sax created KAFKA-12935: --- Summary: Flaky Test RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore Key: KAFKA-12935 URL: https://issues.apache.org/jira/browse/KAFKA-12935 Project: Kafka Issue Type: Test Components: streams, unit tests Reporter: Matthias J. Sax {quote}java.lang.AssertionError: Expected: <0L> but: was <5005L> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(RestoreIntegrationTest.java:374) {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe opened a new pull request #10865: KAFKA-12934: Move some controller classes to the metadata package
cmccabe opened a new pull request #10865: URL: https://github.com/apache/kafka/pull/10865 Move some controller classes to the metadata package so that they can be used with broker snapshots. Rename ControllerTestUtils to RecordTestUtils. Move PartitionInfo to PartitionRegistration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12934) Move some controller classes to the metadata package
Colin McCabe created KAFKA-12934: Summary: Move some controller classes to the metadata package Key: KAFKA-12934 URL: https://issues.apache.org/jira/browse/KAFKA-12934 Project: Kafka Issue Type: Improvement Reporter: Colin McCabe Assignee: Colin McCabe Move some controller classes to the metadata package so that they can be used with broker snapshots. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12934) Move some controller classes to the metadata package
[ https://issues.apache.org/jira/browse/KAFKA-12934?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe updated KAFKA-12934: - Component/s: controller Labels: kip-500 (was: ) > Move some controller classes to the metadata package > > > Key: KAFKA-12934 > URL: https://issues.apache.org/jira/browse/KAFKA-12934 > Project: Kafka > Issue Type: Improvement > Components: controller >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Minor > Labels: kip-500 > > Move some controller classes to the metadata package so that they can be used > with broker snapshots. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kpatelatwork edited a comment on pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kpatelatwork edited a comment on pull request #10822: URL: https://github.com/apache/kafka/pull/10822#issuecomment-859064545 Passing system tests results ![image](https://user-images.githubusercontent.com/29556518/121596558-32270f80-ca05-11eb-843d-9a23824b98fd.png) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on pull request #10822: KAFKA-4793: Connect API to restart connector and tasks (KIP-745)
kpatelatwork commented on pull request #10822: URL: https://github.com/apache/kafka/pull/10822#issuecomment-859064545 Passing system tests ![image](https://user-images.githubusercontent.com/29556518/121596558-32270f80-ca05-11eb-843d-9a23824b98fd.png) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #10736: KAFKA-9295: revert session timeout to default value
ableegoldman merged pull request #10736: URL: https://github.com/apache/kafka/pull/10736 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10862: KAFKA-12928: Add a check whether the Task's statestore is actually a directory
ableegoldman commented on a change in pull request #10862: URL: https://github.com/apache/kafka/pull/10862#discussion_r649497973 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -312,7 +319,7 @@ private boolean taskDirIsEmpty(final File taskDir) { */ File globalStateDir() { final File dir = new File(stateDir, "global"); -if (hasPersistentStores && !dir.exists() && !dir.mkdir()) { +if (hasPersistentStores && ((dir.exists() && !dir.isDirectory()) || (!dir.exists() && !dir.mkdir( { throw new ProcessorStateException( String.format("global state directory [%s] doesn't exist and couldn't be created", dir.getPath())); Review comment: ditto here, please add a separate check and exception ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -126,7 +126,7 @@ public StateDirectory(final StreamsConfig config, final Time time, final boolean throw new ProcessorStateException( String.format("base state directory [%s] doesn't exist and couldn't be created", stateDirName)); } -if (!stateDir.exists() && !stateDir.mkdir()) { +if ((stateDir.exists() && !stateDir.isDirectory()) || (!stateDir.exists() && !stateDir.mkdir())) { Review comment: Please split this up into a separate check for `if ((stateDir.exists() && !stateDir.isDirectory())` and then throw an accurate exception, eg `state directory could not be created as there is an existing file with the same name` ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -230,18 +230,25 @@ public UUID initializeProcessId() { public File getOrCreateDirectoryForTask(final TaskId taskId) { final File taskParentDir = getTaskDirectoryParentName(taskId); final File taskDir = new File(taskParentDir, StateManagerUtil.toTaskDirString(taskId)); -if (hasPersistentStores && !taskDir.exists()) { -synchronized (taskDirCreationLock) { -// to avoid a race condition, we need to check again if the directory does not exist: -// otherwise, two threads might pass the outer `if` (and enter the `then` block), -// one blocks on `synchronized` while the other creates the directory, -// and the blocking one fails when trying to create it after it's unblocked -if (!taskParentDir.exists() && !taskParentDir.mkdir()) { -throw new ProcessorStateException( +if (hasPersistentStores) { +if (!taskDir.exists()) { +synchronized (taskDirCreationLock) { +// to avoid a race condition, we need to check again if the directory does not exist: +// otherwise, two threads might pass the outer `if` (and enter the `then` block), +// one blocks on `synchronized` while the other creates the directory, +// and the blocking one fails when trying to create it after it's unblocked +if (!taskParentDir.exists() && !taskParentDir.mkdir()) { +throw new ProcessorStateException( String.format("Parent [%s] of task directory [%s] doesn't exist and couldn't be created", -taskParentDir.getPath(), taskDir.getPath())); +taskParentDir.getPath(), taskDir.getPath())); +} +if (!taskDir.exists() && !taskDir.mkdir()) { +throw new ProcessorStateException( +String.format("task directory [%s] doesn't exist and couldn't be created", taskDir.getPath())); +} } -if (!taskDir.exists() && !taskDir.mkdir()) { +} else { +if (!taskDir.isDirectory()) { Review comment: Same here, this exception message does not apply to the case this is trying to catch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9671: KAFKA-10793: move handling of FindCoordinatorFuture to fix race condition
ableegoldman commented on pull request #9671: URL: https://github.com/apache/kafka/pull/9671#issuecomment-859044497 It should be sufficient to upgrade just the consumers, this is a client-side fix only -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #10856: MINOR: Small optimizations and removal of unused code in Streams
vvcephei commented on a change in pull request #10856: URL: https://github.com/apache/kafka/pull/10856#discussion_r649509926 ## File path: streams/src/main/java/org/apache/kafka/streams/StoreQueryParameters.java ## @@ -25,8 +25,8 @@ */ public class StoreQueryParameters { -private Integer partition; -private boolean staleStores; +private final Integer partition; +private final boolean staleStores; Review comment: Yeah, it's a limitation of checkstyle, which makes me a bit sad. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10855: MINOR: clean up unneeded `@SuppressWarnings` on Streams module
jlprat commented on pull request #10855: URL: https://github.com/apache/kafka/pull/10855#issuecomment-859028893 Unless the CI is still broken, of course -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #10850: KAFKA-12924 Replace EasyMock and PowerMock with Mockito in streams (metrics)
vvcephei commented on pull request #10850: URL: https://github.com/apache/kafka/pull/10850#issuecomment-859023848 Oh, right, I meant to say that the core integration tests are broken right now. I've just run the Streams tests on my laptop, and we also have passing tests for the ARM build. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #10850: KAFKA-12924 Replace EasyMock and PowerMock with Mockito in streams (metrics)
vvcephei commented on pull request #10850: URL: https://github.com/apache/kafka/pull/10850#issuecomment-859022980 Thanks for the updates, @wycc ! Merged. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #10850: KAFKA-12924 Replace EasyMock and PowerMock with Mockito in streams (metrics)
vvcephei merged pull request #10850: URL: https://github.com/apache/kafka/pull/10850 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #10850: KAFKA-12924 Replace EasyMock and PowerMock with Mockito in streams (metrics)
vvcephei commented on a change in pull request #10850: URL: https://github.com/apache/kafka/pull/10850#discussion_r649499312 ## File path: streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java ## @@ -135,45 +124,38 @@ public void shouldGetFailedStreamThreadsSensor() { false, description ); -replay(StreamsMetricsImpl.class, streamsMetrics); final Sensor sensor = ClientMetrics.failedStreamThreadSensor(streamsMetrics); - -verify(StreamsMetricsImpl.class, streamsMetrics); Review comment: I see. I thought it also verified that the mocked methods actually got used. Looking at the test closer, though, I think that additional verification isn't really needed here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup
vvcephei merged pull request #9414: URL: https://github.com/apache/kafka/pull/9414 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10855: MINOR: clean up unneeded `@SuppressWarnings` on Streams module
jlprat commented on pull request #10855: URL: https://github.com/apache/kafka/pull/10855#issuecomment-858998167 @mjsax Could you restart the build? Last time it failed with the `1` exit code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8940) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
[ https://issues.apache.org/jira/browse/KAFKA-8940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361198#comment-17361198 ] Josep Prat commented on KAFKA-8940: --- Hi [~ableegoldman] I read you comment about a month ago, but as I didn't see it failing again, I thought it might have been fixed (by maybe some refactoring). But I agree with your judgement. I vote for disabling this test and creating a new ticket to code it properly. > Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance > - > > Key: KAFKA-8940 > URL: https://issues.apache.org/jira/browse/KAFKA-8940 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Guozhang Wang >Priority: Major > Labels: flaky-test, newbie++ > > The test does not properly account for windowing. See this comment for full > details. > We can patch this test by fixing the timestamps of the input data to avoid > crossing over a window boundary, or account for this when verifying the > output. Since we have access to the input data it should be possible to > compute whether/when we do cross a window boundary, and adjust the expected > output accordingly -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup
vvcephei commented on pull request #9414: URL: https://github.com/apache/kafka/pull/9414#issuecomment-858990902 It looks like the core integration tests have gotten into bad shape. They've been failing on trunk as well. I just ran the Streams integration tests on my machine, and they passed, so I'll go ahead and merge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning
mumrah commented on a change in pull request #10864: URL: https://github.com/apache/kafka/pull/10864#discussion_r649482956 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -294,19 +298,14 @@ final class KafkaMetadataLog private ( } } - override def deleteBeforeSnapshot(logStartSnapshotId: OffsetAndEpoch): Boolean = { + override def deleteBeforeSnapshot(deleteBeforeSnapshotId: OffsetAndEpoch): Boolean = { Review comment: Since this is no longer used to increase the Log Start Offset, I think this method can go away. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah opened a new pull request #10864: KAFKA-12155 Metadata log and snapshot cleaning
mumrah opened a new pull request #10864: URL: https://github.com/apache/kafka/pull/10864 This PR includes changes to KafkaRaftClient and KafkaMetadataLog to support periodic cleaning of old log segments and snapshots. TODO the rest of the description -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8940) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
[ https://issues.apache.org/jira/browse/KAFKA-8940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361196#comment-17361196 ] A. Sophie Blee-Goldman commented on KAFKA-8940: --- Hey [~mjsax] [~josep.prat] (and others), if you read my last comment on this ticket it explains exactly why this test is failing. Luckily it has to do with only the test itself, as it's a bug in the assumptions for the generated input. It's just not necessarily a quick fix. Maybe we should @Ignore it for now, and then file a separate ticket to circle back and correct the assumptions in this test. > Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance > - > > Key: KAFKA-8940 > URL: https://issues.apache.org/jira/browse/KAFKA-8940 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Guozhang Wang >Priority: Major > Labels: flaky-test, newbie++ > > The test does not properly account for windowing. See this comment for full > details. > We can patch this test by fixing the timestamps of the input data to avoid > crossing over a window boundary, or account for this when verifying the > output. Since we have access to the input data it should be possible to > compute whether/when we do cross a window boundary, and adjust the expected > output accordingly -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-12468) Initial offsets are copied from source to target cluster
[ https://issues.apache.org/jira/browse/KAFKA-12468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361084#comment-17361084 ] Amber Liu edited comment on KAFKA-12468 at 6/10/21, 7:41 PM: - I was using standalone mode with active-passive setup and saw negative offsets in the past as well. One of the reasons I found was due to consumer request timeout, e.g. org.apache.kafka.common.errors.DisconnectException. I increased the request timeout and tasks.max and the offsets are synced correctly now. {code:java} # consumer, need to set higher timeout source.admin.request.timeout.ms = 18 source.consumer.request.timeout.ms = 18{code} was (Author: aaamber): I was using standalone mode with active-passive setup and saw negative offsets in the past as well. One of the reasons I found was due to consumer request timeout, e.g. org.apache.kafka.common.errors.DisconnectException. I increased the request timeout and tasks.max and the offsets are synced correctly now. {code:java} # consumer, need to set higher timeout source.admin.request.timeout.ms = 18 source.consumer.session.timeout.ms = 18 source.consumer.request.timeout.ms = 18{code} > Initial offsets are copied from source to target cluster > > > Key: KAFKA-12468 > URL: https://issues.apache.org/jira/browse/KAFKA-12468 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.0 >Reporter: Bart De Neuter >Priority: Major > > We have an active-passive setup where the 3 connectors from mirror maker 2 > (heartbeat, checkpoint and source) are running on a dedicated Kafka connect > cluster on the target cluster. > Offset syncing is enabled as specified by KIP-545. But when activated, it > seems the offsets from the source cluster are initially copied to the target > cluster without translation. This causes a negative lag for all synced > consumer groups. Only when we reset the offsets for each topic/partition on > the target cluster and produce a record on the topic/partition in the source, > the sync starts working correctly. > I would expect that the consumer groups are synced but that the current > offsets of the source cluster are not copied to the target cluster. > This is the configuration we are currently using: > Heartbeat connector > > {code:xml} > { > "name": "mm2-mirror-heartbeat", > "config": { > "name": "mm2-mirror-heartbeat", > "connector.class": > "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector", > "source.cluster.alias": "eventador", > "target.cluster.alias": "msk", > "source.cluster.bootstrap.servers": "", > "target.cluster.bootstrap.servers": "", > "topics": ".*", > "groups": ".*", > "tasks.max": "1", > "replication.policy.class": "CustomReplicationPolicy", > "sync.group.offsets.enabled": "true", > "sync.group.offsets.interval.seconds": "5", > "emit.checkpoints.enabled": "true", > "emit.checkpoints.interval.seconds": "30", > "emit.heartbeats.interval.seconds": "30", > "key.converter": " > org.apache.kafka.connect.converters.ByteArrayConverter", > "value.converter": > "org.apache.kafka.connect.converters.ByteArrayConverter" > } > } > {code} > Checkpoint connector: > {code:xml} > { > "name": "mm2-mirror-checkpoint", > "config": { > "name": "mm2-mirror-checkpoint", > "connector.class": > "org.apache.kafka.connect.mirror.MirrorCheckpointConnector", > "source.cluster.alias": "eventador", > "target.cluster.alias": "msk", > "source.cluster.bootstrap.servers": "", > "target.cluster.bootstrap.servers": "", > "topics": ".*", > "groups": ".*", > "tasks.max": "40", > "replication.policy.class": "CustomReplicationPolicy", > "sync.group.offsets.enabled": "true", > "sync.group.offsets.interval.seconds": "5", > "emit.checkpoints.enabled": "true", > "emit.checkpoints.interval.seconds": "30", > "emit.heartbeats.interval.seconds": "30", > "key.converter": " > org.apache.kafka.connect.converters.ByteArrayConverter", > "value.converter": > "org.apache.kafka.connect.converters.ByteArrayConverter" > } > } > {code} > Source connector: > {code:xml} > { > "name": "mm2-mirror-source", > "config": { > "name": "mm2-mirror-source", > "connector.class": > "org.apache.kafka.connect.mirror.MirrorSourceConnector", > "source.cluster.alias": "eventador", > "target.cluster.alias": "msk", > "source.cluster.bootstrap.servers": "", > "target.cluster.bootstrap.servers": "", > "topics": ".*", > "groups": ".*", > "tasks.max": "40", > "replication.policy.class": "CustomReplicationPolicy", > "sync.group.offsets.enabled": "true", > "sync.group.offsets.interval.seconds": "5", >
[GitHub] [kafka] jolshan commented on a change in pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs
jolshan commented on a change in pull request #10584: URL: https://github.com/apache/kafka/pull/10584#discussion_r649472712 ## File path: clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java ## @@ -92,6 +93,13 @@ public MetadataRequest build(short version) { if (!data.allowAutoTopicCreation() && version < 4) throw new UnsupportedVersionException("MetadataRequest versions older than 4 don't support the " + "allowAutoTopicCreation field"); +if (data.topics() != null) { Review comment: I think this one is needed as we can't remove in java files. I tried to remove all of the ones in scala files. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs
jolshan commented on a change in pull request #10584: URL: https://github.com/apache/kafka/pull/10584#discussion_r649460759 ## File path: clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java ## @@ -92,6 +93,13 @@ public MetadataRequest build(short version) { if (!data.allowAutoTopicCreation() && version < 4) throw new UnsupportedVersionException("MetadataRequest versions older than 4 don't support the " + "allowAutoTopicCreation field"); +if (data.topics() != null) { Review comment: will do -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12870) RecordAccumulator stuck in a flushing state
[ https://issues.apache.org/jira/browse/KAFKA-12870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reassigned KAFKA-12870: --- Assignee: Jason Gustafson > RecordAccumulator stuck in a flushing state > --- > > Key: KAFKA-12870 > URL: https://issues.apache.org/jira/browse/KAFKA-12870 > Project: Kafka > Issue Type: Bug > Components: producer , streams >Affects Versions: 2.5.1, 2.8.0, 2.7.1, 2.6.2 >Reporter: Niclas Lockner >Assignee: Jason Gustafson >Priority: Major > Fix For: 3.0.0 > > Attachments: RecordAccumulator.log, full.log > > > After a Kafka Stream with exactly once enabled has performed its first > commit, the RecordAccumulator within the stream's internal producer gets > stuck in a state where all subsequent ProducerBatches that get allocated are > immediately flushed instead of being held in memory until they expire, > regardless of the stream's linger or batch size config. > This is reproduced in the example code found at > [https://github.com/niclaslockner/kafka-12870] which can be run with > ./gradlew run --args= > The example has a producer that sends 1 record/sec to one topic, and a Kafka > stream with EOS enabled that forwards the records from that topic to another > topic with the configuration linger = 5 sec, commit interval = 10 sec. > > The expected behavior when running the example is that the stream's > ProducerBatches will expire (or get flushed because of the commit) every 5th > second, and that the stream's producer will send a ProduceRequest every 5th > second with an expired ProducerBatch that contains 5 records. > The actual behavior is that the ProducerBatch is made immediately available > for the Sender, and the Sender sends one ProduceRequest for each record. > > The example code contains a copy of the RecordAccumulator class (copied from > kafka-clients 2.8.0) with some additional logging added to > * RecordAccumulator#ready(Cluster, long) > * RecordAccumulator#beginFlush() > * RecordAccumulator#awaitFlushCompletion() > These log entries show (see the attached RecordsAccumulator.log) > * that the batches are considered sendable because a flush is in progress > * that Sender.maybeSendAndPollTransactionalRequest() calls > RecordAccumulator's beginFlush() without also calling awaitFlushCompletion(), > and that this makes RecordAccumulator's flushesInProgress jump between 1-2 > instead of the expected 0-1. > > This issue is not reproducible in version 2.3.1 or 2.4.1. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio commented on pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot
jsancio commented on pull request #10593: URL: https://github.com/apache/kafka/pull/10593#issuecomment-858925958 > @jsancio Can you share more details about the possible concurrency scenario with createSnapshot ? BTW, will moving the validation to onSnapshotFrozen imply that before creating the snapshot, there's no validation? I think maybe we can keep the validation here and add some additional check before freeze() which makes the snapshot visible? @feyman2016, I think it is reasonable to do both. Validate when `createSnapshot` is called and validate again in `onSnapshotFrozen`. In both cases this validation should be optional. Validate if it is created through `RaftClient.createSnapshot`. Don't validate if `KafkaRaftClient` creates the snapshot internally because of a `FetchResponse` from the leader. I have been working on a PR related to this if you want to take a look: https://github.com/apache/kafka/pull/10786. It would be nice to get your PR merged before my PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9897) Flaky Test StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores
[ https://issues.apache.org/jira/browse/KAFKA-9897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361180#comment-17361180 ] Matthias J. Sax commented on KAFKA-9897: Different test, but same sympton: {quote}java.lang.AssertionError: Expected: a string containing "Cannot get state store source-table because the stream thread is PARTITIONS_ASSIGNED, not RUNNING" but: was "The state store, source-table, may have migrated to another instance." at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at org.apache.kafka.streams.integration.StoreQueryIntegrationTest.lambda$shouldQueryOnlyActivePartitionStoresByDefault$2(StoreQueryIntegrationTest.java:151) at org.apache.kafka.streams.integration.StoreQueryIntegrationTest.until(StoreQueryIntegrationTest.java:420) at org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryOnlyActivePartitionStoresByDefault(StoreQueryIntegrationTest.java:131){quote} > Flaky Test StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores > - > > Key: KAFKA-9897 > URL: https://issues.apache.org/jira/browse/KAFKA-9897 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.6.0 >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > > [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/22/testReport/junit/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/shouldQuerySpecificActivePartitionStores/] > {quote}org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get > state store source-table because the stream thread is PARTITIONS_ASSIGNED, > not RUNNING at > org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:85) > at > org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:61) > at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1183) at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:178){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12377) Flaky Test SaslAuthenticatorTest#testSslClientAuthRequiredForSaslSslListener
[ https://issues.apache.org/jira/browse/KAFKA-12377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361177#comment-17361177 ] Matthias J. Sax commented on KAFKA-12377: - Failed again. > Flaky Test SaslAuthenticatorTest#testSslClientAuthRequiredForSaslSslListener > > > Key: KAFKA-12377 > URL: https://issues.apache.org/jira/browse/KAFKA-12377 > Project: Kafka > Issue Type: Test > Components: core, security, unit tests >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > > {quote}org.opentest4j.AssertionFailedError: expected: > but was: at > org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at > org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62) at > org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) at > org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177) at > org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1124) at > org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:111) > at > org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.createAndCheckClientConnectionFailure(SaslAuthenticatorTest.java:2187) > at > org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.createAndCheckSslAuthenticationFailure(SaslAuthenticatorTest.java:2210) > at > org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.verifySslClientAuthForSaslSslListener(SaslAuthenticatorTest.java:1846) > at > org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest.testSslClientAuthRequiredForSaslSslListener(SaslAuthenticatorTest.java:1800){quote} > STDOUT > {quote}[2021-02-26 07:18:57,220] ERROR Extensions provided in login context > without a token > (org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule:318) > java.io.IOException: Extensions provided in login context without a token at > org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handle(OAuthBearerUnsecuredLoginCallbackHandler.java:165) > at > org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.identifyToken(OAuthBearerLoginModule.java:316) > at > org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.login(OAuthBearerLoginModule.java:301) > [...] > Caused by: > org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerConfigException: > Extensions provided in login context without a token at > org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handleTokenCallback(OAuthBearerUnsecuredLoginCallbackHandler.java:192) > at > org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler.handle(OAuthBearerUnsecuredLoginCallbackHandler.java:163) > ... 116 more{quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8940) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
[ https://issues.apache.org/jira/browse/KAFKA-8940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361176#comment-17361176 ] Matthias J. Sax commented on KAFKA-8940: Failed again. > Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance > - > > Key: KAFKA-8940 > URL: https://issues.apache.org/jira/browse/KAFKA-8940 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Guozhang Wang >Priority: Major > Labels: flaky-test, newbie++ > > The test does not properly account for windowing. See this comment for full > details. > We can patch this test by fixing the timestamps of the input data to avoid > crossing over a window boundary, or account for this when verifying the > output. Since we have access to the input data it should be possible to > compute whether/when we do cross a window boundary, and adjust the expected > output accordingly -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12933) Flaky test ReassignPartitionsIntegrationTest.testReassignmentWithAlterIsrDisabled
[ https://issues.apache.org/jira/browse/KAFKA-12933?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361174#comment-17361174 ] Matthias J. Sax commented on KAFKA-12933: - Failed a second time. > Flaky test > ReassignPartitionsIntegrationTest.testReassignmentWithAlterIsrDisabled > - > > Key: KAFKA-12933 > URL: https://issues.apache.org/jira/browse/KAFKA-12933 > Project: Kafka > Issue Type: Test > Components: admin >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > > {quote}org.opentest4j.AssertionFailedError: expected: but was: > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at > org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at > org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:35) at > org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:162) at > kafka.admin.ReassignPartitionsIntegrationTest.executeAndVerifyReassignment(ReassignPartitionsIntegrationTest.scala:130) > at > kafka.admin.ReassignPartitionsIntegrationTest.testReassignmentWithAlterIsrDisabled(ReassignPartitionsIntegrationTest.scala:74){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12933) Flaky test ReassignPartitionsIntegrationTest.testReassignmentWithAlterIsrDisabled
Matthias J. Sax created KAFKA-12933: --- Summary: Flaky test ReassignPartitionsIntegrationTest.testReassignmentWithAlterIsrDisabled Key: KAFKA-12933 URL: https://issues.apache.org/jira/browse/KAFKA-12933 Project: Kafka Issue Type: Test Components: admin Reporter: Matthias J. Sax {quote}org.opentest4j.AssertionFailedError: expected: but was: at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:35) at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:162) at kafka.admin.ReassignPartitionsIntegrationTest.executeAndVerifyReassignment(ReassignPartitionsIntegrationTest.scala:130) at kafka.admin.ReassignPartitionsIntegrationTest.testReassignmentWithAlterIsrDisabled(ReassignPartitionsIntegrationTest.scala:74){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on a change in pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs
ijuma commented on a change in pull request #10584: URL: https://github.com/apache/kafka/pull/10584#discussion_r649435657 ## File path: clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java ## @@ -92,6 +93,13 @@ public MetadataRequest build(short version) { if (!data.allowAutoTopicCreation() && version < 4) throw new UnsupportedVersionException("MetadataRequest versions older than 4 don't support the " + "allowAutoTopicCreation field"); +if (data.topics() != null) { Review comment: Can we remove the unnecessary `()` from the various files? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10749: KAFKA-12773: Use UncheckedIOException when wrapping IOException
jsancio commented on a change in pull request #10749: URL: https://github.com/apache/kafka/pull/10749#discussion_r649429301 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -372,27 +371,23 @@ private void maybeFireLeaderChange() { @Override public void initialize() { -try { -quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch())); +quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch())); -long currentTimeMs = time.milliseconds(); -if (quorum.isLeader()) { -throw new IllegalStateException("Voter cannot initialize as a Leader"); -} else if (quorum.isCandidate()) { -onBecomeCandidate(currentTimeMs); -} else if (quorum.isFollower()) { -onBecomeFollower(currentTimeMs); -} +long currentTimeMs = time.milliseconds(); +if (quorum.isLeader()) { +throw new IllegalStateException("Voter cannot initialize as a Leader"); +} else if (quorum.isCandidate()) { +onBecomeCandidate(currentTimeMs); +} else if (quorum.isFollower()) { +onBecomeFollower(currentTimeMs); +} -// When there is only a single voter, become candidate immediately -if (quorum.isVoter() -&& quorum.remoteVoters().isEmpty() -&& !quorum.isCandidate()) { +// When there is only a single voter, become candidate immediately +if (quorum.isVoter() +&& quorum.remoteVoters().isEmpty() +&& !quorum.isCandidate()) { -transitionToCandidate(currentTimeMs); -} -} catch (IOException e) { -throw new RuntimeException(e); +transitionToCandidate(currentTimeMs); Review comment: Leaving this note for future readers. My comments above are not accurate. I misread the diff generated by GitHub. When I wrote the comment, I was under the impression that the old code was handling, wrapping and re-throwing the `IOException`. Instead the old code wrapped and re-threw the `IOException`; it was not handling the exception. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs
jolshan commented on a change in pull request #10584: URL: https://github.com/apache/kafka/pull/10584#discussion_r649426864 ## File path: core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala ## @@ -234,6 +235,32 @@ class MetadataRequestTest extends AbstractMetadataRequestTest { } } + @Test + def testInvalidMetadataRequestReturnsError(): Unit = { Review comment: Ah wait. I think I figured out a possible way to do this after all. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs
jolshan commented on a change in pull request #10584: URL: https://github.com/apache/kafka/pull/10584#discussion_r649425805 ## File path: core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala ## @@ -234,6 +235,32 @@ class MetadataRequestTest extends AbstractMetadataRequestTest { } } + @Test + def testInvalidMetadataRequestReturnsError(): Unit = { Review comment: I must have missed this comment earlier. I moved the test to KafkaApisTest. There we don't check if the response is built since that is part of the request/response handling code. From what I saw the other invalid tests were very focused on one thing. I think the biggest question is whether we care to test the null name is correctly being set to the empty string. I agree it is good not to start and stop another kafka cluster. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on pull request #10813: KAFKA-9559: Change default serde to be `null`
lct45 commented on pull request #10813: URL: https://github.com/apache/kafka/pull/10813#issuecomment-858849165 Okay I did some digging -> `defaultKeySerde` and `defaultValueSerde` are only called from the `init` of `AbstractProcessorContext`. I checked all the places that we call `AbstractProcessorContext#keySerde()` and `AbstractProcessorContext#valueSerde()` to make sure we're catching all the potential NPEs and I am fairly confident that we're ok. I did some streamlining so now we throw the `ConfigException` right after we access `AbstractProcessorContext#keySerde()` / `valueSerde()` so we aren't passing null's around and there's some tracking b/w throwing errors and calling a certain method. The one place this wasn't possible, was with creating state stores. Right now, we pass around `context.KeySerde()` and `context.valueSerde()` rather than just the `context` in `MeteredKeyValueStore`, `MeteredSessionStore`, and `MeteredWindowStore`. The tricky part with moving to passing around context is that we need to accept two types of context, a `ProcessorContext` and a `StateStoreContext`. I'm open to either leaving these calls as less streamlined than everything else, or duplicating code in `WrappingNullableUtils` to accept both types of context. Thoughts @mjsax @ableegoldman ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client
jsancio commented on pull request #10786: URL: https://github.com/apache/kafka/pull/10786#issuecomment-858848250 Created a Jira for renaming the types SnapshotWriter and SnapshotReader, and to instead add interface with the same name. https://issues.apache.org/jira/browse/KAFKA-12932 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12932) Interfaces for SnapshotReader and SnapshotWriter
Jose Armando Garcia Sancio created KAFKA-12932: -- Summary: Interfaces for SnapshotReader and SnapshotWriter Key: KAFKA-12932 URL: https://issues.apache.org/jira/browse/KAFKA-12932 Project: Kafka Issue Type: Sub-task Reporter: Jose Armando Garcia Sancio Change the snapshot API so that SnapshotWriter and SnapshotReader are interfaces. Change the existing types SnapshotWriter and SnapshotReader to use a different name and to implement the interfaces introduced by this issue. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12931) KIP-746: Revise KRaft Metadata Records
[ https://issues.apache.org/jira/browse/KAFKA-12931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-12931: --- Fix Version/s: 3.0.0 > KIP-746: Revise KRaft Metadata Records > -- > > Key: KAFKA-12931 > URL: https://issues.apache.org/jira/browse/KAFKA-12931 > Project: Kafka > Issue Type: Improvement > Components: controller, kraft >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dchristle edited a comment on pull request #10847: KAFKA-12921: Upgrade ZSTD JNI from 1.4.9-1 to 1.5.0-1
dchristle edited a comment on pull request #10847: URL: https://github.com/apache/kafka/pull/10847#issuecomment-858843145 > The current version is 1.4.9, so I'm a bit confused why we're mentioning anything besides 1.5.0. Woops - I'm getting my wires crossed on a different zstd 1.5.0 related PR I have with a larger upgrade. You are right -- this is just from `1.4.9-1` to `1.5.0-1`. Sorry for my confusion. I updated the PR description to reflect this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dchristle commented on pull request #10847: KAFKA-12921: Upgrade ZSTD JNI from 1.4.9-1 to 1.5.0-1
dchristle commented on pull request #10847: URL: https://github.com/apache/kafka/pull/10847#issuecomment-858843145 > The current version is 1.4.9, so I'm a bit confused why we're mentioning anything besides 1.5.0. Woops - I'm getting my wires crossed on a different zstd 1.5.0 related PR I have with a larger upgrade. You are right -- this is just from `1.4.9-1` to `1.5.0-1`. Sorry for my confusion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12931) KIP-746: Revise KRaft Metadata Records
Colin McCabe created KAFKA-12931: Summary: KIP-746: Revise KRaft Metadata Records Key: KAFKA-12931 URL: https://issues.apache.org/jira/browse/KAFKA-12931 Project: Kafka Issue Type: Improvement Components: kraft, controller Reporter: Colin McCabe Assignee: Colin McCabe -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dchristle edited a comment on pull request #10847: KAFKA-12921: Upgrade ZSTD JNI from 1.4.9-1 to 1.5.0-1
dchristle edited a comment on pull request #10847: URL: https://github.com/apache/kafka/pull/10847#issuecomment-858804912 @ijuma > This is a good change, but can we please quality the perf improvements claim? My understanding is that only applies to certain compression levels and Kafka currently always picks a specific one. @dongjinleekr is working on making that configurable via a separate KIP. It is true that the most recent performance improvements I quoted (for `1.5.0`) appear only in mid-range compression levels. > Also, why are we listing versions in the PR description that are not relevant to this upgrade? I tried to follow a previous `zstd-jni` PR's convention here: https://github.com/apache/kafka/pull/10285 . I think it gives context on the magnitude of the upgrade, but I can change the commit message/PR title to remove the existing version reference if you like. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma edited a comment on pull request #10847: KAFKA-12921: Upgrade ZSTD JNI from 1.4.9-1 to 1.5.0-1
ijuma edited a comment on pull request #10847: URL: https://github.com/apache/kafka/pull/10847#issuecomment-858829828 The current version is 1.4.9, so I'm a bit confused why we're mentioning anything besides 1.5.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10847: KAFKA-12921: Upgrade ZSTD JNI from 1.4.9-1 to 1.5.0-1
ijuma commented on pull request #10847: URL: https://github.com/apache/kafka/pull/10847#issuecomment-858829828 The current version if 1.4.9, so I'm a bit confused why we're mentioning anything besides 1.5.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.
satishd commented on a change in pull request #10579: URL: https://github.com/apache/kafka/pull/10579#discussion_r649390677 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java ## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME; + +/** + * This class is responsible for consuming messages from remote log metadata topic ({@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}) + * partitions and maintain the state of the remote log segment metadata. It gives an API to add or remove + * for what topic partition's metadata should be consumed by this instance using + * {{@link #addAssignmentsForPartitions(Set)}} and {@link #removeAssignmentsForPartitions(Set)} respectively. + * + * When a broker is started, controller sends topic partitions that this broker is leader or follower for and the + * partitions to be deleted. This class receives those notifications with + * {@link #addAssignmentsForPartitions(Set)} and {@link #removeAssignmentsForPartitions(Set)} assigns consumer for the + * respective remote log metadata partitions by using {@link RemoteLogMetadataTopicPartitioner#metadataPartition(TopicIdPartition)}. + * Any leadership changes later are called through the same API. We will remove the partitions that are deleted from + * this broker which are received through {@link #removeAssignmentsForPartitions(Set)}. + * + * After receiving these events it invokes {@link RemotePartitionMetadataEventHandler#handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata)}, + * which maintains in-memory representation of the state of {@link RemoteLogSegmentMetadata}. + */ +class ConsumerTask implements Runnable, Closeable { +private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class); + +private static final long POLL_INTERVAL_MS = 30L; + +private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); +private final KafkaConsumer consumer; +private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler; +private final RemoteLogMetadataTopicPartitioner topicPartitioner; + +private volatile boolean close = false; Review comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.
satishd commented on a change in pull request #10579: URL: https://github.com/apache/kafka/pull/10579#discussion_r649390516 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ProducerManager.java ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.time.Duration; + +/** + * This class is responsible for publishing messages into the remote log metadata topic partitions. + */ +public class ProducerManager implements Closeable { +private static final Logger log = LoggerFactory.getLogger(ProducerManager.class); + +private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); +private final KafkaProducer producer; +private final RemoteLogMetadataTopicPartitioner topicPartitioner; +private final TopicBasedRemoteLogMetadataManagerConfig rlmmConfig; + +private volatile boolean close = false; Review comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.
satishd commented on a change in pull request #10579: URL: https://github.com/apache/kafka/pull/10579#discussion_r649390143 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ProducerManager.java ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.time.Duration; + +/** + * This class is responsible for publishing messages into the remote log metadata topic partitions. + */ +public class ProducerManager implements Closeable { +private static final Logger log = LoggerFactory.getLogger(ProducerManager.class); + +private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); +private final KafkaProducer producer; +private final RemoteLogMetadataTopicPartitioner topicPartitioner; +private final TopicBasedRemoteLogMetadataManagerConfig rlmmConfig; + +private volatile boolean close = false; + +public ProducerManager(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig, + RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner) { +this.rlmmConfig = rlmmConfig; +this.producer = new KafkaProducer<>(rlmmConfig.producerProperties()); +topicPartitioner = rlmmTopicPartitioner; +} + +public RecordMetadata publishMessage(TopicIdPartition topicIdPartition, + RemoteLogMetadata remoteLogMetadata) throws KafkaException { +ensureNotClosed(); + +int metadataPartitionNo = topicPartitioner.metadataPartition(topicIdPartition); +log.debug("Publishing metadata message of partition:[{}] into metadata topic partition:[{}] with payload: [{}]", +topicIdPartition, metadataPartitionNo, remoteLogMetadata); + +ProducerCallback callback = new ProducerCallback(); +try { +if (metadataPartitionNo >= rlmmConfig.metadataTopicPartitionsCount()) { +// This should never occur as long as metadata partitions always remain the same. +throw new KafkaException("Chosen partition no " + metadataPartitionNo + + " is more than the partition count: " + rlmmConfig.metadataTopicPartitionsCount()); +} +producer.send(new ProducerRecord<>(rlmmConfig.remoteLogMetadataTopicName(), metadataPartitionNo, null, +serde.serialize(remoteLogMetadata)), callback).get(); +} catch (KafkaException e) { +throw e; +} catch (Exception e) { +throw new KafkaException("Exception occurred while publishing message for topicIdPartition: " + topicIdPartition, e); +} + +if (callback.exception() != null) { +Exception ex = callback.exception(); +if (ex instanceof KafkaException) { +throw (KafkaException) ex; +} else { +throw new KafkaException(ex); +} +} else { +return callback.recordMetadata(); +} +} + +private void ensureNotClosed() { +if (close) { +throw new IllegalStateException("This instance is already set in close state."); Review comment: Done ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ProducerManager.java ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding
[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.
satishd commented on a change in pull request #10579: URL: https://github.com/apache/kafka/pull/10579#discussion_r649388017 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java ## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME; + +/** + * This class is responsible for consuming messages from remote log metadata topic ({@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}) + * partitions and maintain the state of the remote log segment metadata. It gives an API to add or remove + * for what topic partition's metadata should be consumed by this instance using + * {{@link #addAssignmentsForPartitions(Set)}} and {@link #removeAssignmentsForPartitions(Set)} respectively. + * + * When a broker is started, controller sends topic partitions that this broker is leader or follower for and the + * partitions to be deleted. This class receives those notifications with + * {@link #addAssignmentsForPartitions(Set)} and {@link #removeAssignmentsForPartitions(Set)} assigns consumer for the + * respective remote log metadata partitions by using {@link RemoteLogMetadataTopicPartitioner#metadataPartition(TopicIdPartition)}. + * Any leadership changes later are called through the same API. We will remove the partitions that are deleted from + * this broker which are received through {@link #removeAssignmentsForPartitions(Set)}. + * + * After receiving these events it invokes {@link RemotePartitionMetadataEventHandler#handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata)}, + * which maintains in-memory representation of the state of {@link RemoteLogSegmentMetadata}. + */ +class ConsumerTask implements Runnable, Closeable { +private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class); + +private static final long POLL_INTERVAL_MS = 30L; + +private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); +private final KafkaConsumer consumer; +private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler; +private final RemoteLogMetadataTopicPartitioner topicPartitioner; + +private volatile boolean close = false; +private volatile boolean assignPartitions = false; + +private final Object assignPartitionsLock = new Object(); + +// Remote log metadata topic partitions that consumer is assigned to. +private volatile Set assignedMetaPartitions = Collections.emptySet(); + +// User topic partitions that this broker is a leader/follower for. +private Set assignedTopicPartitions = Collections.emptySet(); + +// Map of remote log metadata topic partition to consumed offsets. +private final Map partitionToConsumedOffsets = new ConcurrentHashMap<>(); + +public ConsumerTask(KafkaConsumer consumer, +RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler, +RemoteLogMetadataTopicPartitioner topicPartitioner) { +this.consumer = consumer; +
[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.
satishd commented on a change in pull request #10579: URL: https://github.com/apache/kafka/pull/10579#discussion_r649387744 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; +import static org.apache.kafka.common.config.ConfigDef.Type.INT; +import static org.apache.kafka.common.config.ConfigDef.Type.LONG; + +public final class TopicBasedRemoteLogMetadataManagerConfig { +private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerConfig.class.getName()); + +public static final String REMOTE_LOG_METADATA_TOPIC_NAME = "__remote_log_metadata"; + +public static final String REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP = "remote.log.metadata.topic.replication.factor"; +public static final String REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP = "remote.log.metadata.topic.num.partitions"; +public static final String REMOTE_LOG_METADATA_TOPIC_RETENTION_MILLIS_PROP = "remote.log.metadata.topic.retention.ms"; +public static final String REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP = "remote.log.metadata.publish.wait.ms"; + +public static final int DEFAULT_REMOTE_LOG_METADATA_TOPIC_PARTITIONS = 50; +public static final long DEFAULT_REMOTE_LOG_METADATA_TOPIC_RETENTION_MILLIS = -1L; +public static final int DEFAULT_REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR = 3; +public static final long DEFAULT_REMOTE_LOG_METADATA_CONSUME_WAIT_MS = 60 * 1000L; + +public static final String REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_DOC = "Replication factor of remote log metadata Topic."; +public static final String REMOTE_LOG_METADATA_TOPIC_PARTITIONS_DOC = "The number of partitions for remote log metadata Topic."; +public static final String REMOTE_LOG_METADATA_TOPIC_RETENTION_MILLIS_DOC = "Remote log metadata topic log retention in milli seconds." + +"Default: -1, that means unlimited. Users can configure this value based on their use cases. " + +"To avoid any data loss, this value should be more than the maximum retention period of any topic enabled with " + +"tiered storage in the cluster."; +public static final String REMOTE_LOG_METADATA_CONSUME_WAIT_MS_DOC = "The amount of time in milli seconds to wait for the local consumer to " + Review comment: We do not want this to be completely blocked as we want to release the remote log thread after a specific timeout in case of any intermittent issues so that other partitions tiring can proceed. ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing,
[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.
satishd commented on a change in pull request #10579: URL: https://github.com/apache/kafka/pull/10579#discussion_r649387317 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java ## @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; +import static org.apache.kafka.common.config.ConfigDef.Type.INT; +import static org.apache.kafka.common.config.ConfigDef.Type.LONG; + +public final class TopicBasedRemoteLogMetadataManagerConfig { +private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerConfig.class.getName()); + +public static final String REMOTE_LOG_METADATA_TOPIC_NAME = "__remote_log_metadata"; Review comment: I plan to add this once RLMM is called from remote log layer classes. I wanted this change to be self contained for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dchristle commented on pull request #10847: KAFKA-12921: Upgrade ZSTD JNI from 1.4.9-1 to 1.5.0-1
dchristle commented on pull request #10847: URL: https://github.com/apache/kafka/pull/10847#issuecomment-858804912 @ijuma > This is a good change, but can we please quality the perf improvements claim? My understanding is that only applies to certain compression levels and Kafka currently always picks a specific one. @dongjinleekr is working on making that configurable via a separate KIP. It is true that the most recent performance improvements I quoted (for `1.5.0`) appear only in mid-range compression levels. I did not highlight it in my description, but besides bug fixes, the earlier releases quote consistent perf improvements: - `1.4.4`: ~10% decompression bump, no level-dependence quoted - `1.4.5`: 5-10% decompression improvement in `x86_64` architecture, +15-50% in various `arm` processors - `1.4.7`: Improved `--long` mode compression ratio at high levels, 5-30% decompression improvement for blocks < 32kB - `1.4.9`: 2x faster `--long` mode compression speed > Also, why are we listing versions in the PR description that are not relevant to this upgrade? I tried to follow a previous `zstd-jni` PR's convention here: https://github.com/apache/kafka/pull/10285 . I think it gives context on the magnitude of the upgrade, but I can change the commit message/PR title to remove the existing version reference if you like. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10733: KAFKA-12816 Added tiered storage related configs including remote log manager configs.
junrao commented on a change in pull request #10733: URL: https://github.com/apache/kafka/pull/10733#discussion_r649337876 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java ## @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; +import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; +import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN; +import static org.apache.kafka.common.config.ConfigDef.Type.INT; +import static org.apache.kafka.common.config.ConfigDef.Type.LONG; +import static org.apache.kafka.common.config.ConfigDef.Type.STRING; + +public final class RemoteLogManagerConfig { + +/** + * Prefix used for properties to be passed to {@link RemoteStorageManager} implementation. Remote log subsystem collects all the properties having + * this prefix and passed to {@code RemoteStorageManager} using {@link RemoteStorageManager#configure(Map)}. Review comment: passed => passes ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java ## @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.storage; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; +import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; +import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; +import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN; +import static org.apache.kafka.common.config.ConfigDef.Type.INT; +import static org.apache.kafka.common.config.ConfigDef.Type.LONG; +import static org.apache.kafka.common.config.ConfigDef.Type.STRING; + +public final class RemoteLogManagerConfig { + +/** + * Prefix used for properties to be passed to {@link RemoteStorageManager} implementation. Remote log subsystem collects all the properties having + * this prefix and passed to {@code RemoteStorageManager} using {@link RemoteStorageManager#configure(Map)}. + */ +public static final String REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP = "remote.log.storage.manager.impl.prefix"; +public static final String REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_DOC = "Prefix used for properties to be passed to RemoteStorageManager " + +"implementation. For example this value can be `rsm.s3.`."; + +/** + * Prefix used for properties to be passed to {@link RemoteLogMetadataManager} implementation. Remote log subsystem collects all the properties having + * this prefix and passed to {@code RemoteLogMetadataManager} using {@link RemoteLogMetadataManager#configure(Map)}. + */ +public static final String REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP =
[GitHub] [kafka] ijuma commented on a change in pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs
ijuma commented on a change in pull request #10584: URL: https://github.com/apache/kafka/pull/10584#discussion_r649369319 ## File path: core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala ## @@ -234,6 +235,32 @@ class MetadataRequestTest extends AbstractMetadataRequestTest { } } + @Test + def testInvalidMetadataRequestReturnsError(): Unit = { Review comment: if you think it's valuable, we can keep it. But can we piggy back on another test that tests invalid things? Then we'd save starting and stopping a kafka cluster. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs
ijuma commented on a change in pull request #10584: URL: https://github.com/apache/kafka/pull/10584#discussion_r649368540 ## File path: clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java ## @@ -92,6 +93,15 @@ public MetadataRequest build(short version) { if (!data.allowAutoTopicCreation() && version < 4) throw new UnsupportedVersionException("MetadataRequest versions older than 4 don't support the " + "allowAutoTopicCreation field"); +if (version >= 10) { +if (data.topics() != null) { +data.topics().forEach(topic -> { +if (topic.name() == null || topic.topicId() != Uuid.ZERO_UUID) +throw new UnsupportedVersionException("MetadataRequest version " + version + +" does not support null topic names or topic IDs."); Review comment: Yeah, makes sense. ## File path: clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java ## @@ -92,6 +93,15 @@ public MetadataRequest build(short version) { if (!data.allowAutoTopicCreation() && version < 4) throw new UnsupportedVersionException("MetadataRequest versions older than 4 don't support the " + "allowAutoTopicCreation field"); +if (version >= 10) { Review comment: +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12468) Initial offsets are copied from source to target cluster
[ https://issues.apache.org/jira/browse/KAFKA-12468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361084#comment-17361084 ] Amber Liu commented on KAFKA-12468: --- I was using standalone mode with active-passive setup and saw negative offsets in the past as well. One of the reasons I found was due to consumer request timeout, e.g. org.apache.kafka.common.errors.DisconnectException. I increased the request timeout and tasks.max and the offsets are synced correctly now. {code:java} # consumer, need to set higher timeout source.admin.request.timeout.ms = 18 source.consumer.session.timeout.ms = 18 source.consumer.request.timeout.ms = 18{code} > Initial offsets are copied from source to target cluster > > > Key: KAFKA-12468 > URL: https://issues.apache.org/jira/browse/KAFKA-12468 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.0 >Reporter: Bart De Neuter >Priority: Major > > We have an active-passive setup where the 3 connectors from mirror maker 2 > (heartbeat, checkpoint and source) are running on a dedicated Kafka connect > cluster on the target cluster. > Offset syncing is enabled as specified by KIP-545. But when activated, it > seems the offsets from the source cluster are initially copied to the target > cluster without translation. This causes a negative lag for all synced > consumer groups. Only when we reset the offsets for each topic/partition on > the target cluster and produce a record on the topic/partition in the source, > the sync starts working correctly. > I would expect that the consumer groups are synced but that the current > offsets of the source cluster are not copied to the target cluster. > This is the configuration we are currently using: > Heartbeat connector > > {code:xml} > { > "name": "mm2-mirror-heartbeat", > "config": { > "name": "mm2-mirror-heartbeat", > "connector.class": > "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector", > "source.cluster.alias": "eventador", > "target.cluster.alias": "msk", > "source.cluster.bootstrap.servers": "", > "target.cluster.bootstrap.servers": "", > "topics": ".*", > "groups": ".*", > "tasks.max": "1", > "replication.policy.class": "CustomReplicationPolicy", > "sync.group.offsets.enabled": "true", > "sync.group.offsets.interval.seconds": "5", > "emit.checkpoints.enabled": "true", > "emit.checkpoints.interval.seconds": "30", > "emit.heartbeats.interval.seconds": "30", > "key.converter": " > org.apache.kafka.connect.converters.ByteArrayConverter", > "value.converter": > "org.apache.kafka.connect.converters.ByteArrayConverter" > } > } > {code} > Checkpoint connector: > {code:xml} > { > "name": "mm2-mirror-checkpoint", > "config": { > "name": "mm2-mirror-checkpoint", > "connector.class": > "org.apache.kafka.connect.mirror.MirrorCheckpointConnector", > "source.cluster.alias": "eventador", > "target.cluster.alias": "msk", > "source.cluster.bootstrap.servers": "", > "target.cluster.bootstrap.servers": "", > "topics": ".*", > "groups": ".*", > "tasks.max": "40", > "replication.policy.class": "CustomReplicationPolicy", > "sync.group.offsets.enabled": "true", > "sync.group.offsets.interval.seconds": "5", > "emit.checkpoints.enabled": "true", > "emit.checkpoints.interval.seconds": "30", > "emit.heartbeats.interval.seconds": "30", > "key.converter": " > org.apache.kafka.connect.converters.ByteArrayConverter", > "value.converter": > "org.apache.kafka.connect.converters.ByteArrayConverter" > } > } > {code} > Source connector: > {code:xml} > { > "name": "mm2-mirror-source", > "config": { > "name": "mm2-mirror-source", > "connector.class": > "org.apache.kafka.connect.mirror.MirrorSourceConnector", > "source.cluster.alias": "eventador", > "target.cluster.alias": "msk", > "source.cluster.bootstrap.servers": "", > "target.cluster.bootstrap.servers": "", > "topics": ".*", > "groups": ".*", > "tasks.max": "40", > "replication.policy.class": "CustomReplicationPolicy", > "sync.group.offsets.enabled": "true", > "sync.group.offsets.interval.seconds": "5", > "emit.checkpoints.enabled": "true", > "emit.checkpoints.interval.seconds": "30", > "emit.heartbeats.interval.seconds": "30", > "key.converter": " > org.apache.kafka.connect.converters.ByteArrayConverter", > "value.converter": > "org.apache.kafka.connect.converters.ByteArrayConverter" > } > } > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jolshan commented on a change in pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs
jolshan commented on a change in pull request #10584: URL: https://github.com/apache/kafka/pull/10584#discussion_r649360987 ## File path: core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala ## @@ -234,6 +235,32 @@ class MetadataRequestTest extends AbstractMetadataRequestTest { } } + @Test + def testInvalidMetadataRequestReturnsError(): Unit = { Review comment: I guess the only other reason I tested the whole path was to make sure the response could be sent back (if the name was null, it could not have), but it should suffice to also have a non-null check. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison merged pull request #10653: MINOR: Add missing parameter description from AdminZkClient
mimaison merged pull request #10653: URL: https://github.com/apache/kafka/pull/10653 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs
jolshan commented on a change in pull request #10584: URL: https://github.com/apache/kafka/pull/10584#discussion_r649353683 ## File path: clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java ## @@ -92,6 +93,15 @@ public MetadataRequest build(short version) { if (!data.allowAutoTopicCreation() && version < 4) throw new UnsupportedVersionException("MetadataRequest versions older than 4 don't support the " + "allowAutoTopicCreation field"); +if (version >= 10) { +if (data.topics() != null) { +data.topics().forEach(topic -> { +if (topic.name() == null || topic.topicId() != Uuid.ZERO_UUID) +throw new UnsupportedVersionException("MetadataRequest version " + version + +" does not support null topic names or topic IDs."); Review comment: Would it make sense to say non-zero topic IDs? Since the null ID is represented with all zeros? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs
jolshan commented on a change in pull request #10584: URL: https://github.com/apache/kafka/pull/10584#discussion_r649352825 ## File path: clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java ## @@ -92,6 +93,15 @@ public MetadataRequest build(short version) { if (!data.allowAutoTopicCreation() && version < 4) throw new UnsupportedVersionException("MetadataRequest versions older than 4 don't support the " + "allowAutoTopicCreation field"); +if (version >= 10) { Review comment: We will need to check the version when this is fixed, but I can remove the version check for now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #10584: KAFKA-12701: NPE in MetadataRequest when using topic IDs
jolshan commented on a change in pull request #10584: URL: https://github.com/apache/kafka/pull/10584#discussion_r649352099 ## File path: core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala ## @@ -234,6 +235,32 @@ class MetadataRequestTest extends AbstractMetadataRequestTest { } } + @Test + def testInvalidMetadataRequestReturnsError(): Unit = { Review comment: This was one way to test the KafkaApis code, but I suppose I could move this to a unit test that only tests the method itself (and not the whole request path) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #10805: KAFKA-12436 KIP-720 Deprecate MirrorMaker v1
mimaison commented on pull request #10805: URL: https://github.com/apache/kafka/pull/10805#issuecomment-858766776 This KIP was adopted on the basis of having an IdentityReplicationPolicy which is in this PR: https://github.com/apache/kafka/pull/10652 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12925) prefixScan missing from intermediate interfaces
[ https://issues.apache.org/jira/browse/KAFKA-12925?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361062#comment-17361062 ] Michael Viamari commented on KAFKA-12925: - I can flesh out a larger example if needed, but the basic usage for me was getting a reference to the state store using {{context.getStateStore()}} inside {{Transformer#init}}, and then when attempting to use {{TimestampedKeyValueStore#prefixScan}}, the exception was thrown. {code:java} public class TransformerPrefixScan implements Transformer> { private ProcessorContext context; private TimestampedKeyValueStore lookupStore; public TransformerPrefixScan() {} @Override @SuppressWarnings("unchecked") public void init(ProcessorContext context) { this.context = context; lookupStore = context.getStateStore(lookupStoreName); } @Override public KeyValue transform(K key, V value) { String keyPrefix = extractPrefix(key); try (KeyValueIterator> lookupIterator = lookupStore.prefixScan(keyPrefix, Serdes.String())) { //handle results } return null; } @Override public void close() { } } {code} > prefixScan missing from intermediate interfaces > --- > > Key: KAFKA-12925 > URL: https://issues.apache.org/jira/browse/KAFKA-12925 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Michael Viamari >Assignee: Sagar Rao >Priority: Major > Fix For: 3.0.0, 2.8.1 > > > [KIP-614|https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores] > and [KAFKA-10648|https://issues.apache.org/jira/browse/KAFKA-10648] > introduced support for {{prefixScan}} to StateStores. > It seems that many of the intermediate {{StateStore}} interfaces are missing > a definition for {{prefixScan}}, and as such is not accessible in all cases. > For example, when accessing the state stores through a the processor context, > the {{KeyValueStoreReadWriteDecorator}} and associated interfaces do not > define {{prefixScan}} and it falls back to the default implementation in > {{KeyValueStore}}, which throws {{UnsupportedOperationException}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mimaison merged pull request #10849: KAFKA-12922: MirrorCheckpointTask should close topic filter
mimaison merged pull request #10849: URL: https://github.com/apache/kafka/pull/10849 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] edoardocomar commented on pull request #10649: KAFKA-12762: Use connection timeout when polling the network for new …
edoardocomar commented on pull request #10649: URL: https://github.com/apache/kafka/pull/10649#issuecomment-858752590 This last commit (thanks @tombentley ) allows the integration test to leave the Admin interface unchanged, the expanded factory method is only part of test classes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client
jsancio commented on pull request #10786: URL: https://github.com/apache/kafka/pull/10786#issuecomment-858750625 @hachikuji thanks for the review. Updated the PR to address your comments. cc @cmccabe -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client
jsancio commented on a change in pull request #10786: URL: https://github.com/apache/kafka/pull/10786#discussion_r649273990 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -233,18 +233,40 @@ final class KafkaMetadataLog private ( log.topicId.get } - override def createSnapshot(snapshotId: OffsetAndEpoch): RawSnapshotWriter = { -// Do not let the state machine create snapshots older than the latest snapshot -latestSnapshotId().ifPresent { latest => - if (latest.epoch > snapshotId.epoch || latest.offset > snapshotId.offset) { -// Since snapshots are less than the high-watermark absolute offset comparison is okay. -throw new IllegalArgumentException( - s"Attempting to create a snapshot ($snapshotId) that is not greater than the latest snapshot ($latest)" -) - } + override def createSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotWriter] = { +if (snapshots.contains(snapshotId)) { + Optional.empty() +} else { + Optional.of(FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this))) +} + } + + override def createSnapshotFromEndOffset(endOffset: Long): Optional[RawSnapshotWriter] = { +val highWatermarkOffset = highWatermark.offset +if (endOffset > highWatermarkOffset) { + throw new IllegalArgumentException( +s"Cannot create a snapshot for an end offset ($endOffset) greater than the high-watermark ($highWatermarkOffset)" + ) +} + +if (endOffset < startOffset) { + throw new IllegalArgumentException( +s"Cannot create a snapshot for an end offset ($endOffset) less than the log start offset ($startOffset)" + ) +} + +val epoch = log.leaderEpochCache.flatMap(_.findEpochEntryByEndOffset(endOffset)) match { + case Some(epochEntry) => +epochEntry.epoch + case None => +// Assume that the end offset falls in the current epoch since based on the check above: Review comment: I removed this code. To avoid scanning the leader epoch cache, I reverted the snapshot creation API so that both the offset and the epoch is pass to `createSnapshot`. The new code just validates that the given offset and epoch are valid according to the record batches in the log and leader epoch cache. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client
jsancio commented on a change in pull request #10786: URL: https://github.com/apache/kafka/pull/10786#discussion_r649320552 ## File path: metadata/src/main/java/org/apache/kafka/controller/SnapshotGenerator.java ## @@ -74,56 +74,49 @@ String name() { this.batch = null; this.section = null; this.numRecords = 0; -this.numWriteTries = 0; } /** * Returns the epoch of the snapshot that we are generating. */ long epoch() { -return writer.epoch(); +return writer.lastOffset(); Review comment: Yes but the names are not great. Updated the names of `SnapshotGenerator.epoch` and `SnapshotWriter.lastOffset` to `lastOffsetFromLog`. This should make it clear that the offset of the batches in the snapshots are independent of the last offset from the log that is included in the snapshot. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] socutes commented on pull request #10749: KAFKA-12773: Use UncheckedIOException when wrapping IOException
socutes commented on pull request #10749: URL: https://github.com/apache/kafka/pull/10749#issuecomment-858743167 @hachikuji Please review the changes again! Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] socutes commented on a change in pull request #10749: KAFKA-12773: Use UncheckedIOException when wrapping IOException
socutes commented on a change in pull request #10749: URL: https://github.com/apache/kafka/pull/10749#discussion_r649316231 ## File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java ## @@ -68,15 +68,22 @@ public static Path snapshotPath(Path logDir, OffsetAndEpoch snapshotId) { return snapshotDir(logDir).resolve(filenameFromSnapshotId(snapshotId) + SUFFIX); } -public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) throws IOException { +public static Path createTempFile(Path logDir, OffsetAndEpoch snapshotId) { Path dir = snapshotDir(logDir); +Path tempFile; -// Create the snapshot directory if it doesn't exists -Files.createDirectories(dir); - -String prefix = String.format("%s-", filenameFromSnapshotId(snapshotId)); +try { +// Create the snapshot directory if it doesn't exists +Files.createDirectories(dir); -return Files.createTempFile(dir, prefix, PARTIAL_SUFFIX); +String prefix = String.format("%s-", filenameFromSnapshotId(snapshotId)); +tempFile = Files.createTempFile(dir, prefix, PARTIAL_SUFFIX); Review comment: You're right!Thanks ## File path: raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java ## @@ -78,7 +85,11 @@ public void append(MemoryRecords records) { checkIfFrozen("Append"); Utils.writeFully(channel, records.buffer()); } catch (IOException e) { -throw new RuntimeException(e); +throw new UncheckedIOException( +String.format("Error writing file snapshot," + Review comment: Fixed! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12892) InvalidACLException thrown in tests caused jenkins build unstable
[ https://issues.apache.org/jira/browse/KAFKA-12892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17361026#comment-17361026 ] Igor Soarez commented on KAFKA-12892: - Yes it was - by applying the ACL changes to a unique child znode instead of to the root, there shouldn't be any interference with other tests. I'm not sure if this is the new test that's still a problem or if there's any lingering state in zookeeper across builds. It is strange that only some test runs are affected. Disabling the test will let us know. > InvalidACLException thrown in tests caused jenkins build unstable > - > > Key: KAFKA-12892 > URL: https://issues.apache.org/jira/browse/KAFKA-12892 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Igor Soarez >Priority: Major > Attachments: image-2021-06-04-21-05-57-222.png > > > In KAFKA-12866, we fixed the issue that Kafka required ZK root access even > when using a chroot. But after the PR merged (build #183), trunk build keeps > failing at least one test group (mostly, JDK 15 and Scala 2.13). The build > result will said nothing useful: > {code:java} > > Task :core:integrationTest FAILED > [2021-06-04T03:19:18.974Z] > [2021-06-04T03:19:18.974Z] FAILURE: Build failed with an exception. > [2021-06-04T03:19:18.974Z] > [2021-06-04T03:19:18.974Z] * What went wrong: > [2021-06-04T03:19:18.974Z] Execution failed for task ':core:integrationTest'. > [2021-06-04T03:19:18.974Z] > Process 'Gradle Test Executor 128' finished with > non-zero exit value 1 > [2021-06-04T03:19:18.974Z] This problem might be caused by incorrect test > process configuration. > [2021-06-04T03:19:18.974Z] Please refer to the test execution section in > the User Manual at > https://docs.gradle.org/7.0.2/userguide/java_testing.html#sec:test_execution > {code} > > After investigation, I found the failed tests is because there are many > `InvalidACLException` thrown during the tests, ex: > > {code:java} > GssapiAuthenticationTest > testServerNotFoundInKerberosDatabase() FAILED > [2021-06-04T02:25:45.419Z] > org.apache.zookeeper.KeeperException$InvalidACLException: KeeperErrorCode = > InvalidACL for /config/topics/__consumer_offsets > [2021-06-04T02:25:45.419Z] at > org.apache.zookeeper.KeeperException.create(KeeperException.java:128) > [2021-06-04T02:25:45.419Z] at > org.apache.zookeeper.KeeperException.create(KeeperException.java:54) > [2021-06-04T02:25:45.419Z] at > kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:583) > [2021-06-04T02:25:45.419Z] at > kafka.zk.KafkaZkClient.createRecursive(KafkaZkClient.scala:1729) > [2021-06-04T02:25:45.419Z] at > kafka.zk.KafkaZkClient.createOrSet$1(KafkaZkClient.scala:366) > [2021-06-04T02:25:45.419Z] at > kafka.zk.KafkaZkClient.setOrCreateEntityConfigs(KafkaZkClient.scala:376) > [2021-06-04T02:25:45.419Z] at > kafka.zk.AdminZkClient.createTopicWithAssignment(AdminZkClient.scala:109) > [2021-06-04T02:25:45.419Z] at > kafka.zk.AdminZkClient.createTopic(AdminZkClient.scala:60) > [2021-06-04T02:25:45.419Z] at > kafka.utils.TestUtils$.$anonfun$createTopic$1(TestUtils.scala:357) > [2021-06-04T02:25:45.419Z] at > kafka.utils.TestUtils$.createTopic(TestUtils.scala:848) > [2021-06-04T02:25:45.419Z] at > kafka.utils.TestUtils$.createOffsetsTopic(TestUtils.scala:428) > [2021-06-04T02:25:45.419Z] at > kafka.api.IntegrationTestHarness.doSetup(IntegrationTestHarness.scala:109) > [2021-06-04T02:25:45.419Z] at > kafka.api.IntegrationTestHarness.setUp(IntegrationTestHarness.scala:84) > [2021-06-04T02:25:45.419Z] at > kafka.server.GssapiAuthenticationTest.setUp(GssapiAuthenticationTest.scala:68) > {code} > > Log can be found > [here|[https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka/branches/trunk/runs/195/nodes/14/steps/145/log/?start=0]] > After tracing back, I found it could because we add a test in the KAFKA-12866 > to lock root access in zookeeper, but somehow it didn't unlock after the test > in testChrootExistsAndRootIsLocked. Also, while all the InvalidACLException > failed tests happened right after testChrootExistsAndRootIsLocked not long. > Ex: below testChrootExistsAndRootIsLocked completed at 02:24:30, and the > above failed test is at 02:25:45 (and following more than 10 tests with the > same InvalidACLException. > {code:java} > [2021-06-04T02:24:29.370Z] ZkClientAclTest > > testChrootExistsAndRootIsLocked() STARTED > [2021-06-04T02:24:30.321Z] > [2021-06-04T02:24:30.321Z] ZkClientAclTest > > testChrootExistsAndRootIsLocked() PASSED{code} > > !image-2021-06-04-21-05-57-222.png|width=489,height=! > We should have further
[GitHub] [kafka] jsancio commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client
jsancio commented on a change in pull request #10786: URL: https://github.com/apache/kafka/pull/10786#discussion_r649281362 ## File path: metadata/src/main/java/org/apache/kafka/controller/QuorumController.java ## @@ -1009,7 +999,7 @@ private QuorumController(LogContext logContext, snapshotRegistry, sessionTimeoutNs, replicaPlacer); this.featureControl = new FeatureControlManager(supportedFeatures, snapshotRegistry); this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry); -this.snapshotGeneratorManager = new SnapshotGeneratorManager(snapshotWriterBuilder); +this.snapshotGeneratorManager = new SnapshotGeneratorManager(raftClient::createSnapshot); Review comment: Fair enough. Removing the `BiFunction` from the constructor. `SnapshotGeneratorManager` is an inner class so it should have access to the `raftClient`. > Was this done for testing or something? I am not sure why this was added. It is not used on tests. I think the previous code didn't have access to the `raftClient` because this code was merged before reversing the dependency between the `metadata` project and the `raft` project. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10786: KAFKA-12787: Integrate controller snapshoting with raft client
jsancio commented on a change in pull request #10786: URL: https://github.com/apache/kafka/pull/10786#discussion_r649273990 ## File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala ## @@ -233,18 +233,40 @@ final class KafkaMetadataLog private ( log.topicId.get } - override def createSnapshot(snapshotId: OffsetAndEpoch): RawSnapshotWriter = { -// Do not let the state machine create snapshots older than the latest snapshot -latestSnapshotId().ifPresent { latest => - if (latest.epoch > snapshotId.epoch || latest.offset > snapshotId.offset) { -// Since snapshots are less than the high-watermark absolute offset comparison is okay. -throw new IllegalArgumentException( - s"Attempting to create a snapshot ($snapshotId) that is not greater than the latest snapshot ($latest)" -) - } + override def createSnapshot(snapshotId: OffsetAndEpoch): Optional[RawSnapshotWriter] = { +if (snapshots.contains(snapshotId)) { + Optional.empty() +} else { + Optional.of(FileRawSnapshotWriter.create(log.dir.toPath, snapshotId, Optional.of(this))) +} + } + + override def createSnapshotFromEndOffset(endOffset: Long): Optional[RawSnapshotWriter] = { +val highWatermarkOffset = highWatermark.offset +if (endOffset > highWatermarkOffset) { + throw new IllegalArgumentException( +s"Cannot create a snapshot for an end offset ($endOffset) greater than the high-watermark ($highWatermarkOffset)" + ) +} + +if (endOffset < startOffset) { + throw new IllegalArgumentException( +s"Cannot create a snapshot for an end offset ($endOffset) less than the log start offset ($startOffset)" + ) +} + +val epoch = log.leaderEpochCache.flatMap(_.findEpochEntryByEndOffset(endOffset)) match { + case Some(epochEntry) => +epochEntry.epoch + case None => +// Assume that the end offset falls in the current epoch since based on the check above: Review comment: I remove this code. To avoid scanning the leader epoch cache, I reverted the snapshot creation API so that both the offset and the epoch is pass to `createSnapshot`. The new code just validates that the given offset and epoch are valid according to the record batches in the log and leader epoch cache. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] IgnacioAcunaF edited a comment on pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh
IgnacioAcunaF edited a comment on pull request #10858: URL: https://github.com/apache/kafka/pull/10858#issuecomment-858703098 PING @hachikuji @apovzner (as I saw you on [KAFKA-9507](https://github.com/apache/kafka/pull/8057)) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] IgnacioAcunaF commented on pull request #10858: KAFKA-12926: ConsumerGroupCommand's java.lang.NullPointerException at negative offsets while running kafka-consumer-groups.sh
IgnacioAcunaF commented on pull request #10858: URL: https://github.com/apache/kafka/pull/10858#issuecomment-858703098 PING @hachikuji @apovzner as I saw you on [KAFKA-9507](https://github.com/apache/kafka/pull/8057) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #10652: KAFKA-9726 IdentityReplicationPolicy
mimaison commented on a change in pull request #10652: URL: https://github.com/apache/kafka/pull/10652#discussion_r649253273 ## File path: connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java ## @@ -159,4 +191,12 @@ public void remoteTopicsSeparatorTest() throws InterruptedException { assertTrue(remoteTopics.contains("source3__source4__source5__topic6")); } +public void testIdentityReplicationTopicSource() { Review comment: Missing `@Test` annotation ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java ## @@ -492,7 +492,12 @@ boolean isCycle(String topic) { } else if (source.equals(sourceAndTarget.target())) { return true; } else { -return isCycle(replicationPolicy.upstreamTopic(topic)); +String upstreamTopic = replicationPolicy.upstreamTopic(topic); +if (upstreamTopic.equals(topic)) { Review comment: Can we cover this new branch with a test in `MirrorSourceConnectorTest`? ## File path: connect/mirror-client/src/test/java/org/apache/kafka/connect/mirror/MirrorClientTest.java ## @@ -159,4 +191,12 @@ public void remoteTopicsSeparatorTest() throws InterruptedException { assertTrue(remoteTopics.contains("source3__source4__source5__topic6")); } +public void testIdentityReplicationTopicSource() { +MirrorClient client = new FakeMirrorClient( +new IdentityReplicationPolicy("primary"), Arrays.asList()); +assertEquals("topic1", client.replicationPolicy() +.formatRemoteTopic("primary", "topic1")); Review comment: Should we also try `formatRemoteTopic()` with a heartbeat topic? ## File path: connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/MirrorClient.java ## @@ -60,7 +60,7 @@ private ReplicationPolicy replicationPolicy; private Map consumerConfig; -public MirrorClient(Map props) { +public MirrorClient(Map props) { Review comment: Is this actually needed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10856: MINOR: Small optimizations and removal of unused code in Streams
jlprat commented on pull request #10856: URL: https://github.com/apache/kafka/pull/10856#issuecomment-858694138 Thanks both for the reviews -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna merged pull request #10856: MINOR: Small optimizations and removal of unused code in Streams
cadonna merged pull request #10856: URL: https://github.com/apache/kafka/pull/10856 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #10856: MINOR: Small optimizations and removal of unused code in Streams
cadonna commented on pull request #10856: URL: https://github.com/apache/kafka/pull/10856#issuecomment-858689473 JDK 11 and ARM passed. Failed tests are unrelated and the issue is known. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #10848: MINOR Updated transaction index as optional in LogSegmentData.
satishd commented on pull request #10848: URL: https://github.com/apache/kafka/pull/10848#issuecomment-858686600 Thanks @junrao @ijuma for the review. Addressed the review comments with the latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12892) InvalidACLException thrown in tests caused jenkins build unstable
[ https://issues.apache.org/jira/browse/KAFKA-12892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17360983#comment-17360983 ] Bruno Cadonna commented on KAFKA-12892: --- Is PR #10821 supposed to solve the issue? I still see a lot of {code:java} MultipleListenersWithAdditionalJaasContextTest > testProduceConsume() FAILED [2021-06-10T11:11:52.209Z] org.apache.zookeeper.KeeperException$InvalidACLException: KeeperErrorCode = InvalidACL for /brokers/ids [2021-06-10T11:11:52.209Z] at org.apache.zookeeper.KeeperException.create(KeeperException.java:128) [2021-06-10T11:11:52.209Z] at org.apache.zookeeper.KeeperException.create(KeeperException.java:54) [2021-06-10T11:11:52.209Z] at kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:583) [2021-06-10T11:11:52.209Z] at kafka.zk.KafkaZkClient.createRecursive(KafkaZkClient.scala:1729) [2021-06-10T11:11:52.209Z] at kafka.zk.KafkaZkClient.makeSurePersistentPathExists(KafkaZkClient.scala:1627) [2021-06-10T11:11:52.209Z] at kafka.zk.KafkaZkClient.$anonfun$createTopLevelPaths$1(KafkaZkClient.scala:1619) [2021-06-10T11:11:52.209Z] at kafka.zk.KafkaZkClient.$anonfun$createTopLevelPaths$1$adapted(KafkaZkClient.scala:1619) [2021-06-10T11:11:52.209Z] at scala.collection.immutable.List.foreach(List.scala:333) [2021-06-10T11:11:52.209Z] at kafka.zk.KafkaZkClient.createTopLevelPaths(KafkaZkClient.scala:1619) [2021-06-10T11:11:52.209Z] at kafka.server.KafkaServer.initZkClient(KafkaServer.scala:454) [2021-06-10T11:11:52.209Z] at kafka.server.KafkaServer.startup(KafkaServer.scala:192) [2021-06-10T11:11:52.209Z] at kafka.utils.TestUtils$.createServer(TestUtils.scala:166) [2021-06-10T11:11:52.209Z] at kafka.server.MultipleListenersWithSameSecurityProtocolBaseTest.$anonfun$setUp$1(MultipleListenersWithSameSecurityProtocolBaseTest.scala:103) [2021-06-10T11:11:52.210Z] at kafka.server.MultipleListenersWithSameSecurityProtocolBaseTest.$anonfun$setUp$1$adapted(MultipleListenersWithSameSecurityProtocolBaseTest.scala:76) [2021-06-10T11:11:52.210Z] at scala.collection.immutable.Range.foreach(Range.scala:190) {code} Also on PRs that contain PR #10821. For example https://ci-builds.apache.org/blue/rest/organizations/jenkins/pipelines/Kafka/pipelines/kafka-pr/branches/PR-10856/runs/3/nodes/14/steps/121/log/?start=0 > InvalidACLException thrown in tests caused jenkins build unstable > - > > Key: KAFKA-12892 > URL: https://issues.apache.org/jira/browse/KAFKA-12892 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Igor Soarez >Priority: Major > Attachments: image-2021-06-04-21-05-57-222.png > > > In KAFKA-12866, we fixed the issue that Kafka required ZK root access even > when using a chroot. But after the PR merged (build #183), trunk build keeps > failing at least one test group (mostly, JDK 15 and Scala 2.13). The build > result will said nothing useful: > {code:java} > > Task :core:integrationTest FAILED > [2021-06-04T03:19:18.974Z] > [2021-06-04T03:19:18.974Z] FAILURE: Build failed with an exception. > [2021-06-04T03:19:18.974Z] > [2021-06-04T03:19:18.974Z] * What went wrong: > [2021-06-04T03:19:18.974Z] Execution failed for task ':core:integrationTest'. > [2021-06-04T03:19:18.974Z] > Process 'Gradle Test Executor 128' finished with > non-zero exit value 1 > [2021-06-04T03:19:18.974Z] This problem might be caused by incorrect test > process configuration. > [2021-06-04T03:19:18.974Z] Please refer to the test execution section in > the User Manual at > https://docs.gradle.org/7.0.2/userguide/java_testing.html#sec:test_execution > {code} > > After investigation, I found the failed tests is because there are many > `InvalidACLException` thrown during the tests, ex: > > {code:java} > GssapiAuthenticationTest > testServerNotFoundInKerberosDatabase() FAILED > [2021-06-04T02:25:45.419Z] > org.apache.zookeeper.KeeperException$InvalidACLException: KeeperErrorCode = > InvalidACL for /config/topics/__consumer_offsets > [2021-06-04T02:25:45.419Z] at > org.apache.zookeeper.KeeperException.create(KeeperException.java:128) > [2021-06-04T02:25:45.419Z] at > org.apache.zookeeper.KeeperException.create(KeeperException.java:54) > [2021-06-04T02:25:45.419Z] at > kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:583) > [2021-06-04T02:25:45.419Z] at > kafka.zk.KafkaZkClient.createRecursive(KafkaZkClient.scala:1729) > [2021-06-04T02:25:45.419Z] at > kafka.zk.KafkaZkClient.createOrSet$1(KafkaZkClient.scala:366) > [2021-06-04T02:25:45.419Z] at > kafka.zk.KafkaZkClient.setOrCreateEntityConfigs(KafkaZkClient.scala:376) >
[GitHub] [kafka] mimaison commented on a change in pull request #9878: KAFKA-6987: Add KafkaFuture.toCompletionStage()
mimaison commented on a change in pull request #9878: URL: https://github.com/apache/kafka/pull/9878#discussion_r649241897 ## File path: clients/src/test/java/org/apache/kafka/common/KafkaFutureTest.java ## @@ -17,68 +17,261 @@ package org.apache.kafka.common; import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.utils.Java; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Supplier; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; /** * A unit test for KafkaFuture. */ @Timeout(120) public class KafkaFutureTest { +/** Asserts that the given future is done, didn't fail and wasn't cancelled. */ +private void assertIsSuccessful(KafkaFuture future) { +assertTrue(future.isDone()); +assertFalse(future.isCompletedExceptionally()); +assertFalse(future.isCancelled()); +} + +/** Asserts that the given future is done, failed and wasn't cancelled. */ +private void assertIsFailed(KafkaFuture future) { +assertTrue(future.isDone()); +assertFalse(future.isCancelled()); +assertTrue(future.isCompletedExceptionally()); +} + +/** Asserts that the given future is done, didn't fail and was cancelled. */ +private void assertIsCancelled(KafkaFuture future) { +assertTrue(future.isDone()); +assertTrue(future.isCancelled()); +assertTrue(future.isCompletedExceptionally()); +} + +private void awaitAndAssertResult(KafkaFuture future, + T expectedResult, + T alternativeValue) { +assertNotEquals(expectedResult, alternativeValue); +try { +assertEquals(expectedResult, future.get(5, TimeUnit.MINUTES)); +} catch (Exception e) { +throw new AssertionError("Unexpected exception", e); +} +try { +assertEquals(expectedResult, future.get()); +} catch (Exception e) { +throw new AssertionError("Unexpected exception", e); +} +try { +assertEquals(expectedResult, future.getNow(alternativeValue)); +} catch (Exception e) { +throw new AssertionError("Unexpected exception", e); +} +} + +private void awaitAndAssertFailure(KafkaFuture future, + Class expectedException, + String expectedMessage) { +try { +future.get(5, TimeUnit.MINUTES); +fail("Expected an exception"); +} catch (ExecutionException e) { +assertEquals(expectedException, e.getCause().getClass()); +assertEquals(expectedMessage, e.getCause().getMessage()); +} catch (Exception e) { +throw new AssertionError("Unexpected exception", e); +} +try { +future.get(); +fail("Expected an exception"); +} catch (ExecutionException e) { +assertEquals(expectedException, e.getCause().getClass()); +assertEquals(expectedMessage, e.getCause().getMessage()); +} catch (Exception e) { +throw new AssertionError("Unexpected exception", e); +} +try { +future.getNow(null); +fail("Expected an exception"); +} catch (ExecutionException e) { +assertEquals(expectedException, e.getCause().getClass()); +assertEquals(expectedMessage, e.getCause().getMessage()); +} catch (Exception e) { +throw new AssertionError("Unexpected exception", e); +} +} + + +private void awaitAndAssertCancelled(KafkaFuture future, String expectedMessage) { +try { +future.get(5, TimeUnit.MINUTES); +fail("Expected an exception"); +} catch (CancellationException e) { +assertEquals(CancellationException.class, e.getClass()); +assertEquals(expectedMessage, e.getMessage()); +} catch (Exception e) { +throw new
[jira] [Commented] (KAFKA-12870) RecordAccumulator stuck in a flushing state
[ https://issues.apache.org/jira/browse/KAFKA-12870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17360937#comment-17360937 ] Ismael Juma commented on KAFKA-12870: - I think the claim is that there's a bug in the `Sender` when exactly-once is used. > RecordAccumulator stuck in a flushing state > --- > > Key: KAFKA-12870 > URL: https://issues.apache.org/jira/browse/KAFKA-12870 > Project: Kafka > Issue Type: Bug > Components: producer , streams >Affects Versions: 2.5.1, 2.8.0, 2.7.1, 2.6.2 >Reporter: Niclas Lockner >Priority: Major > Fix For: 3.0.0 > > Attachments: RecordAccumulator.log, full.log > > > After a Kafka Stream with exactly once enabled has performed its first > commit, the RecordAccumulator within the stream's internal producer gets > stuck in a state where all subsequent ProducerBatches that get allocated are > immediately flushed instead of being held in memory until they expire, > regardless of the stream's linger or batch size config. > This is reproduced in the example code found at > [https://github.com/niclaslockner/kafka-12870] which can be run with > ./gradlew run --args= > The example has a producer that sends 1 record/sec to one topic, and a Kafka > stream with EOS enabled that forwards the records from that topic to another > topic with the configuration linger = 5 sec, commit interval = 10 sec. > > The expected behavior when running the example is that the stream's > ProducerBatches will expire (or get flushed because of the commit) every 5th > second, and that the stream's producer will send a ProduceRequest every 5th > second with an expired ProducerBatch that contains 5 records. > The actual behavior is that the ProducerBatch is made immediately available > for the Sender, and the Sender sends one ProduceRequest for each record. > > The example code contains a copy of the RecordAccumulator class (copied from > kafka-clients 2.8.0) with some additional logging added to > * RecordAccumulator#ready(Cluster, long) > * RecordAccumulator#beginFlush() > * RecordAccumulator#awaitFlushCompletion() > These log entries show (see the attached RecordsAccumulator.log) > * that the batches are considered sendable because a flush is in progress > * that Sender.maybeSendAndPollTransactionalRequest() calls > RecordAccumulator's beginFlush() without also calling awaitFlushCompletion(), > and that this makes RecordAccumulator's flushesInProgress jump between 1-2 > instead of the expected 0-1. > > This issue is not reproducible in version 2.3.1 or 2.4.1. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12870) RecordAccumulator stuck in a flushing state
[ https://issues.apache.org/jira/browse/KAFKA-12870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-12870: Fix Version/s: 3.0.0 > RecordAccumulator stuck in a flushing state > --- > > Key: KAFKA-12870 > URL: https://issues.apache.org/jira/browse/KAFKA-12870 > Project: Kafka > Issue Type: Bug > Components: producer , streams >Affects Versions: 2.5.1, 2.8.0, 2.7.1, 2.6.2 >Reporter: Niclas Lockner >Priority: Major > Fix For: 3.0.0 > > Attachments: RecordAccumulator.log, full.log > > > After a Kafka Stream with exactly once enabled has performed its first > commit, the RecordAccumulator within the stream's internal producer gets > stuck in a state where all subsequent ProducerBatches that get allocated are > immediately flushed instead of being held in memory until they expire, > regardless of the stream's linger or batch size config. > This is reproduced in the example code found at > [https://github.com/niclaslockner/kafka-12870] which can be run with > ./gradlew run --args= > The example has a producer that sends 1 record/sec to one topic, and a Kafka > stream with EOS enabled that forwards the records from that topic to another > topic with the configuration linger = 5 sec, commit interval = 10 sec. > > The expected behavior when running the example is that the stream's > ProducerBatches will expire (or get flushed because of the commit) every 5th > second, and that the stream's producer will send a ProduceRequest every 5th > second with an expired ProducerBatch that contains 5 records. > The actual behavior is that the ProducerBatch is made immediately available > for the Sender, and the Sender sends one ProduceRequest for each record. > > The example code contains a copy of the RecordAccumulator class (copied from > kafka-clients 2.8.0) with some additional logging added to > * RecordAccumulator#ready(Cluster, long) > * RecordAccumulator#beginFlush() > * RecordAccumulator#awaitFlushCompletion() > These log entries show (see the attached RecordsAccumulator.log) > * that the batches are considered sendable because a flush is in progress > * that Sender.maybeSendAndPollTransactionalRequest() calls > RecordAccumulator's beginFlush() without also calling awaitFlushCompletion(), > and that this makes RecordAccumulator's flushesInProgress jump between 1-2 > instead of the expected 0-1. > > This issue is not reproducible in version 2.3.1 or 2.4.1. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dongjinleekr commented on pull request #10847: KAFKA-12921: Upgrade ZSTD JNI from 1.4.9-1 to 1.5.0-1
dongjinleekr commented on pull request #10847: URL: https://github.com/apache/kafka/pull/10847#issuecomment-858653169 @ijuma @dchristle Since we have more time for [KIP-390](https://cwiki.apache.org/confluence/display/KAFKA/KIP-390%3A+Allow+fine-grained+configuration+for+compression), I will run the benchmark with this zstd binding. Stay tuned! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 merged pull request #10860: MINOR: fix client_compatibility_features_test.py - DescribeAcls is al…
chia7712 merged pull request #10860: URL: https://github.com/apache/kafka/pull/10860 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #10743: KIP-699: Update FindCoordinator to resolve multiple Coordinators at a time
mimaison commented on a change in pull request #10743: URL: https://github.com/apache/kafka/pull/10743#discussion_r649213934 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -858,6 +885,12 @@ public void onSuccess(ClientResponse resp, RequestFuture future) { public void onFailure(RuntimeException e, RequestFuture future) { log.debug("FindCoordinator request failed due to {}", e.toString()); +if (e instanceof UnsupportedBatchLookupException) { Review comment: I've only taken a very brief look and I think this approach would work well for Connect, Producer and Consumer, however it's a bit more complicated with Admin. In Admin, requests are built by lookup strategies. Lookups can be sent to any broker so knowing the max version for a specific call is not completely trivial. That said, it's not impossible either so if there's concensus it would be preferable I can give that a try. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #10860: MINOR: fix client_compatibility_features_test.py - DescribeAcls is al…
chia7712 commented on pull request #10860: URL: https://github.com/apache/kafka/pull/10860#issuecomment-858649375 > Do the system tests pass with this change? yep. This system test shows following error message without this patch. ``` java.lang.RuntimeException: Did not expect describeAclsSupported to be supported, but it was. at org.apache.kafka.tools.ClientCompatibilityTest.tryFeature(ClientCompatibilityTest.java:525) at org.apache.kafka.tools.ClientCompatibilityTest.tryFeature(ClientCompatibilityTest.java:509) at org.apache.kafka.tools.ClientCompatibilityTest.testAdminClient(ClientCompatibilityTest.java:301) at org.apache.kafka.tools.ClientCompatibilityTest.run(ClientCompatibilityTest.java:238) at org.apache.kafka.tools.ClientCompatibilityTest.main(ClientCompatibilityTest.java:191) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12929) KIP-750: Deprecate support for Java 8 in Kafka 3.0
[ https://issues.apache.org/jira/browse/KAFKA-12929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-12929: Summary: KIP-750: Deprecate support for Java 8 in Kafka 3.0 (was: KIP-750: Deprecate Java 8 in Kafka 3.0) > KIP-750: Deprecate support for Java 8 in Kafka 3.0 > -- > > Key: KAFKA-12929 > URL: https://issues.apache.org/jira/browse/KAFKA-12929 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12930) KIP-751: Deprecate support for Scala 2.12 in Kafka 3.0
[ https://issues.apache.org/jira/browse/KAFKA-12930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-12930: Summary: KIP-751: Deprecate support for Scala 2.12 in Kafka 3.0 (was: Deprecate support for Scala 2.12 in Kafka 3.0) > KIP-751: Deprecate support for Scala 2.12 in Kafka 3.0 > -- > > Key: KAFKA-12930 > URL: https://issues.apache.org/jira/browse/KAFKA-12930 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Major > Fix For: 3.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12930) Deprecate support for Scala 2.12 in Kafka 3.0
Ismael Juma created KAFKA-12930: --- Summary: Deprecate support for Scala 2.12 in Kafka 3.0 Key: KAFKA-12930 URL: https://issues.apache.org/jira/browse/KAFKA-12930 Project: Kafka Issue Type: Sub-task Reporter: Ismael Juma Assignee: Ismael Juma Fix For: 3.0.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12929) KIP-750: Deprecate Java 8 in Kafka 3.0
Ismael Juma created KAFKA-12929: --- Summary: KIP-750: Deprecate Java 8 in Kafka 3.0 Key: KAFKA-12929 URL: https://issues.apache.org/jira/browse/KAFKA-12929 Project: Kafka Issue Type: Sub-task Reporter: Ismael Juma Assignee: Ismael Juma Fix For: 3.0.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)