Re: [PR] MINOR: Fix the flaky TBRLMM `testInternalTopicExists` test [kafka]
showuon commented on code in PR #14840: URL: https://github.com/apache/kafka/pull/14840#discussion_r1407350742 ## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java: ## @@ -70,18 +70,20 @@ public TopicBasedRemoteLogMetadataManager topicBasedRlmm() { } @Test -public void testInternalTopicExists() { +public void testDoesTopicExist() { Properties adminConfig = remoteLogMetadataManagerHarness.adminClientConfig(); ListenerName listenerName = remoteLogMetadataManagerHarness.listenerName(); try (Admin admin = remoteLogMetadataManagerHarness.createAdminClient(listenerName, adminConfig)) { -String topic = topicBasedRlmm().config().remoteLogMetadataTopicName(); +String topic = "test-topic-exist"; +remoteLogMetadataManagerHarness.createTopic(topic, 1, 1, new Properties(), Review Comment: Yes, you're right, we'll make sure the metadata is propagated to all nodes for both KRaft and ZK modes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15695: Update the local log start offset of a log after rebuilding the auxiliary state [kafka]
nikramakrishnan commented on code in PR #14649: URL: https://github.com/apache/kafka/pull/14649#discussion_r1407348678 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -1754,6 +1754,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, leaderEpochCache.foreach(_.clearAndFlush()) producerStateManager.truncateFullyAndStartAt(newOffset) logStartOffset = logStartOffsetOpt.getOrElse(newOffset) +if (remoteLogEnabled()) maybeIncrementLocalLogStartOffset(newOffset, LogStartOffsetIncrementReason.SegmentDeletion) Review Comment: Thanks @kamalcph. I agree with that. I have updated the PR to update the local log start offset in `truncateFullyAndStartAt` and also in `buildRemoteLogAuxState` with the correct reason. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Ensure that DisplayName is set in all parameterized tests [kafka]
dajac opened a new pull request, #14850: URL: https://github.com/apache/kafka/pull/14850 This is a follow-up to https://github.com/apache/kafka/pull/14687 as we found out that some parameterized tests do not include the test method name in their name. For the context, the JUnit XML report does not include the name of the method by default but only rely on the display name provided. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]
showuon commented on PR #14428: URL: https://github.com/apache/kafka/pull/14428#issuecomment-1829267766 > Hi @showuon , > > Thanks for the changes. They look good to me in general. One potential issue with this implementation is that the leader doesn't check that the fetching voters are making progress. > > Just because the leader returned a successful response to FETCH and FETCH_SNAPSHOT doesn't mean that the followers were able to handle the response correctly. > > For example, imagine the case where the log end offset (LEO) is at 1000 and all of the followers are continuously fetching at offset 0 without ever increasing their fetch offset. This can happen if the followers encounter an error when processing the FETCH or FETCH_SNAPSHOT response. > > In this scenario the leader will never be able to increase the HWM. I think that this scenario is specific to KRaft and doesn't exists in Raft because KRaft is pull vs Raft which is push. > > What do you think? Do you agree? If so should we address this issue in this PR or create an issue for this and fix it in a future PR? Good catch! Yes, that's indeed a potential problem. This PR has been pending for a long time, let's focus on the current issue in this PR first. I've filed: [KAFKA-15911](https://issues.apache.org/jira/browse/KAFKA-15911) for the potential issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15911) KRaft quorum leader should make sure the follower fetch is making progress
Luke Chen created KAFKA-15911: - Summary: KRaft quorum leader should make sure the follower fetch is making progress Key: KAFKA-15911 URL: https://issues.apache.org/jira/browse/KAFKA-15911 Project: Kafka Issue Type: Bug Components: kraft Reporter: Luke Chen Just because the leader returned a successful response to FETCH and FETCH_SNAPSHOT doesn't mean that the followers were able to handle the response correctly. For example, imagine the case where the log end offset (LEO) is at 1000 and all of the followers are continuously fetching at offset 0 without ever increasing their fetch offset. This can happen if the followers encounter an error when processing the FETCH or FETCH_SNAPSHOT response. In this scenario the leader will never be able to increase the HWM. I think that this scenario is specific to KRaft and doesn't exists in Raft because KRaft is pull vs Raft which is push. https://github.com/apache/kafka/pull/14428#pullrequestreview-1751408695 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]
showuon commented on code in PR #14428: URL: https://github.com/apache/kafka/pull/14428#discussion_r1407337495 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -1990,7 +1995,7 @@ private long pollLeader(long currentTimeMs) { LeaderState state = quorum.leaderStateOrThrow(); maybeFireLeaderChange(state); -if (shutdown.get() != null || state.isResignRequested()) { +if (shutdown.get() != null || state.isResignRequested() || state.hasMajorityFollowerFetchExpired(currentTimeMs)) { Review Comment: Ah, good catch! Updated the `hasMajorityFollowerFetchExpired` to `timeUntilCheckQuorumExpires` now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]
showuon commented on code in PR #14428: URL: https://github.com/apache/kafka/pull/14428#discussion_r1407336829 ## raft/src/test/java/org/apache/kafka/raft/LeaderStateTest.java: ## @@ -447,6 +452,38 @@ public void testDescribeQuorumWithObservers() { observerState); } +@Test +public void testMajorityFollowerFetchTimeoutExpiration() { +int node1 = 1; +int node2 = 2; +int node3 = 3; +int node4 = 4; +int observer5 = 5; +LeaderState state = newLeaderState(mkSet(localId, node1, node2, node3, node4), 0L); + assertFalse(state.hasMajorityFollowerFetchExpired(time.milliseconds())); +int resignLeadershipTimeout = (int) (fetchTimeoutMs * 1.5); Review Comment: Agree! If we need to update the `1.5` factor, we can change in one place. Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15901: Client changes for registering telemetry and API calls (KIP-714) [kafka]
apoorvmittal10 commented on code in PR #14843: URL: https://github.com/apache/kafka/pull/14843#discussion_r1407336676 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -4387,7 +4398,52 @@ public FenceProducersResult fenceProducers(Collection transactionalIds, @Override public Uuid clientInstanceId(Duration timeout) { Review Comment: @AndrewJSchofield Please can you re-review this method change as I added this after your review. Also I am not sure if the changes in AdminClient is helpful for fetching `client instance id`. I cannot think of a valid use case where this method in AdminClient will be required. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]
showuon commented on code in PR #14428: URL: https://github.com/apache/kafka/pull/14428#discussion_r1407334210 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -1340,6 +1341,9 @@ private FetchSnapshotResponseData handleFetchSnapshotRequest( UnalignedRecords records = snapshot.slice(partitionSnapshot.position(), Math.min(data.maxBytes(), maxSnapshotSize)); +Optional> state = quorum.maybeLeaderState(); +state.ifPresent(s -> s.maybeResetMajorityFollowerFetchTimer(data.replicaId(), currentTimeMs)); + Review Comment: Good point. Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]
showuon commented on code in PR #14428: URL: https://github.com/apache/kafka/pull/14428#discussion_r1407333526 ## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ## @@ -79,6 +86,39 @@ protected LeaderState( this.grantingVoters = Collections.unmodifiableSet(new HashSet<>(grantingVoters)); this.log = logContext.logger(LeaderState.class); this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); +// use the 1.5x fetch timeout to tolerate some network transition time or other IO time. +this.fetchTimeoutMs = (int) (fetchTimeoutMs * 1.5); +this.fetchTimer = time.timer(fetchTimeoutMs); Review Comment: Good suggestion. Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15618: Kafka metrics collector and supporting classes (KIP-714) [kafka]
apoorvmittal10 commented on PR #14620: URL: https://github.com/apache/kafka/pull/14620#issuecomment-1829257829 @xvrl @mjsax Please help closing the PR, do we need to address any concern in the PR or we can merge. If required we can have a follow up PR as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15143: Adding in MockFixedKeyProcessorContext and Test [kafka]
lihaosky commented on code in PR #14605: URL: https://github.com/apache/kafka/pull/14605#discussion_r1407319712 ## streams/test-utils/src/test/java/org/apache/kafka/streams/test/MockFixedKeyProcessorContextTest.java: ## @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.test; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.api.FixedKeyProcessor; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; +import org.apache.kafka.streams.processor.api.InternalFixedKeyRecordFactory; +import org.apache.kafka.streams.processor.api.MockFixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.processor.api.RecordMetadata; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.Properties; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; + +public class MockFixedKeyProcessorContextTest { Review Comment: Right. I think you can parameterize `MockProcessorContextAPITest` so common tests run for both processor context mock and add some tests for `FixedKeyProcessor` only (return directly for other processor context) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee commented on code in PR #14842: URL: https://github.com/apache/kafka/pull/14842#discussion_r1407198957 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -269,60 +275,56 @@ void cleanup() { * completed in time. */ // Visible for testing -void maybeAutoCommitAndLeaveGroup(final Timer timer) { +void maybeAutocommitOnClose(final Timer timer) { if (!requestManagers.coordinatorRequestManager.isPresent()) return; +if (!requestManagers.commitRequestManager.isPresent()) { +log.error("Expecting a CommitRequestManager but the object was never initialized. Shutting down."); +return; +} + +if (!requestManagers.commitRequestManager.get().canAutoCommit()) { +return; +} + ensureCoordinatorReady(timer); -List tasks = closingRequests(); -networkClientDelegate.addAll(tasks); +List autocommitRequest = + Collections.singletonList(requestManagers.commitRequestManager.get().commitAllConsumedPositions()); +networkClientDelegate.addAll(autocommitRequest); do { long currentTimeMs = timer.currentTimeMs(); ensureCoordinatorReady(timer); networkClientDelegate.poll(timer.remainingMs(), currentTimeMs); -} while (timer.notExpired() && !tasks.stream().allMatch(v -> v.future().isDone())); +} while (timer.notExpired() && !autocommitRequest.get(0).future().isDone()); +} + +void maybeLeaveGroup(final Timer timer) { +// TODO: Leave group upon closing the consumer } private void ensureCoordinatorReady(final Timer timer) { -while (!coordinatorReady()) { +while (!coordinatorReady() && timer.notExpired()) { findCoordinatorSync(timer); } } private boolean coordinatorReady() { -CoordinatorRequestManager coordinatorRequestManager = requestManagers.coordinatorRequestManager.get(); +CoordinatorRequestManager coordinatorRequestManager = requestManagers.coordinatorRequestManager.orElseThrow( +() -> new IllegalStateException("CoordinatorRequestManager uninitialized.")); Review Comment: Probably unnecessary - but adding this for a safety check wdyt? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee commented on code in PR #14842: URL: https://github.com/apache/kafka/pull/14842#discussion_r1407198120 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -210,17 +210,19 @@ public CompletableFuture maybeAutoCommitAllConsumed() { return maybeAutoCommit(subscriptions.allConsumed()); } +boolean canAutoCommit() { Review Comment: Unhappy with the naming, but can't seem to find a better way to restructure this at the moment. Didn't like it because there is an infinite variation of canXxxx(). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix the flaky TBRLMM `testInternalTopicExists` test [kafka]
kamalcph commented on code in PR #14840: URL: https://github.com/apache/kafka/pull/14840#discussion_r1407187421 ## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java: ## @@ -70,18 +70,20 @@ public TopicBasedRemoteLogMetadataManager topicBasedRlmm() { } @Test -public void testInternalTopicExists() { +public void testDoesTopicExist() { Properties adminConfig = remoteLogMetadataManagerHarness.adminClientConfig(); ListenerName listenerName = remoteLogMetadataManagerHarness.listenerName(); try (Admin admin = remoteLogMetadataManagerHarness.createAdminClient(listenerName, adminConfig)) { -String topic = topicBasedRlmm().config().remoteLogMetadataTopicName(); +String topic = "test-topic-exist"; +remoteLogMetadataManagerHarness.createTopic(topic, 1, 1, new Properties(), Review Comment: [TestUtils#createTopicWithAdmin](https://sourcegraph.com/github.com/apache/kafka@fade3d10ea07eea5d6076b8fb1b68e2db5ffec48/-/blob/core/src/test/scala/unit/kafka/utils/TestUtils.scala?L497) waits for the metadata to be propagated to all the brokers before returning the call so the test will pass. Also, tried running the test repeatedly 100 times to ensure that the test is not flaky. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee commented on code in PR #14842: URL: https://github.com/apache/kafka/pull/14842#discussion_r1406787063 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -154,7 +154,7 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { if (!coordinatorRequestManager.coordinator().isPresent()) return EMPTY; -maybeAutoCommitAllConsumed(); +maybeAutoCommit(); Review Comment: we always commit allConsumed(), so there's no point to reinstate that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15910: New group coordinator needs to generate snapshots while loading [kafka]
jeffkbkim opened a new pull request, #14849: URL: https://github.com/apache/kafka/pull/14849 After the new coordinator loads a __consumer_offsets partition, it logs the following exception when making a read operation (fetch/list groups, etc): ``` java.lang.RuntimeException: No in-memory snapshot for epoch 740745. Snapshot epochs are: at org.apache.kafka.timeline.SnapshotRegistry.getSnapshot(SnapshotRegistry.java:178) at org.apache.kafka.timeline.SnapshottableHashTable.snapshottableIterator(SnapshottableHashTable.java:407) at org.apache.kafka.timeline.TimelineHashMap$ValueIterator.(TimelineHashMap.java:283) at org.apache.kafka.timeline.TimelineHashMap$Values.iterator(TimelineHashMap.java:271) ``` This happens because we don't have a snapshot at the last updated high watermark after loading. We cannot generate a snapshot at the high watermark after loading all batches because it may contain records that have not yet been committed. We also don't know where the high watermark will advance up to so we need to generate a snapshot for each offset the loader observes to be greater than the current high watermark. Then once we add the high watermark listener and update the high watermark we can delete all of the older snapshots. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]
satishd merged PR #14766: URL: https://github.com/apache/kafka/pull/14766 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15047: Roll active segment when it breaches the retention policy [kafka]
satishd commented on PR #14766: URL: https://github.com/apache/kafka/pull/14766#issuecomment-1829038408 Thanks @kamalcph for pointing out the JIRA to track the existing intermittent tiered storage related test failure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15224: automating version change [kafka]
github-actions[bot] commented on PR #14229: URL: https://github.com/apache/kafka/pull/14229#issuecomment-1829016302 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15910) New group coordinator needs to generate snapshots while loading
Jeff Kim created KAFKA-15910: Summary: New group coordinator needs to generate snapshots while loading Key: KAFKA-15910 URL: https://issues.apache.org/jira/browse/KAFKA-15910 Project: Kafka Issue Type: Sub-task Reporter: Jeff Kim Assignee: Jeff Kim After the new coordinator loads a __consumer_offsets partition, it logs the following exception when making a read operation (fetch/list groups, etc): {{{}java.lang.RuntimeException: No in-memory snapshot for epoch 740745. Snapshot epochs are:{}}}{{{}at org.apache.kafka.timeline.SnapshotRegistry.getSnapshot(SnapshotRegistry.java:178){}}}{{{}at org.apache.kafka.timeline.SnapshottableHashTable.snapshottableIterator(SnapshottableHashTable.java:407){}}}{{{}at org.apache.kafka.timeline.TimelineHashMap$ValueIterator.(TimelineHashMap.java:283){}}}{{{}at org.apache.kafka.timeline.TimelineHashMap$Values.iterator(TimelineHashMap.java:271){}}} {{...}} This happens because we don't have a snapshot at the last updated high watermark after loading. We cannot generate a snapshot at the high watermark after loading all batches because it may contain records that have not yet been committed. We also don't know where the high watermark will advance up to so we need to generate a snapshot for each offset the loader observes to be greater than the current high watermark. Then once we add the high watermark listener and update the high watermark we can delete all of the snapshots prior. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]
jsancio commented on code in PR #14428: URL: https://github.com/apache/kafka/pull/14428#discussion_r1406995366 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -1990,7 +1995,7 @@ private long pollLeader(long currentTimeMs) { LeaderState state = quorum.leaderStateOrThrow(); maybeFireLeaderChange(state); -if (shutdown.get() != null || state.isResignRequested()) { +if (shutdown.get() != null || state.isResignRequested() || state.hasMajorityFollowerFetchExpired(currentTimeMs)) { Review Comment: In line [2014](https://github.com/apache/kafka/pull/14428/files#diff-1da15c51e641ea46ea5c86201ab8f21cfee9e7c575102a39c7bae0d5ffd7de39R2014) KRaft calculates the next time it should poll the leader. ```java return Math.min(timeUntilFlush, timeUntilSend); ``` If you extend this to something like below, it should wake up the leader when the timer has expired. ```java return Math.min(timeUntilFlush, timeUntilSend, state.timeUntilCheckQuorumExpires(currentTimeMs)); ``` The other option is to extend `hasMajorityFollowerFetchExpired` to include this information by using `Timer::remainingMs` instead of `isExpired`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]
jsancio commented on code in PR #14428: URL: https://github.com/apache/kafka/pull/14428#discussion_r1406995366 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -1990,7 +1995,7 @@ private long pollLeader(long currentTimeMs) { LeaderState state = quorum.leaderStateOrThrow(); maybeFireLeaderChange(state); -if (shutdown.get() != null || state.isResignRequested()) { +if (shutdown.get() != null || state.isResignRequested() || state.hasMajorityFollowerFetchExpired(currentTimeMs)) { Review Comment: In line [2014](https://github.com/apache/kafka/pull/14428/files#diff-1da15c51e641ea46ea5c86201ab8f21cfee9e7c575102a39c7bae0d5ffd7de39R2014) KRaft calculates the next time it should poll the leader. ```java return Math.min(timeUntilFlush, timeUntilSend); ``` If you extend this to something like below, it should wake up the leader when the timer has expired. ```java return Math.min(timeUntilFlush, timeUntilSend, state.timeUntilCheckQuorumExpires(currentTimeMs)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]
jsancio commented on code in PR #14428: URL: https://github.com/apache/kafka/pull/14428#discussion_r1406995366 ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -1990,7 +1995,7 @@ private long pollLeader(long currentTimeMs) { LeaderState state = quorum.leaderStateOrThrow(); maybeFireLeaderChange(state); -if (shutdown.get() != null || state.isResignRequested()) { +if (shutdown.get() != null || state.isResignRequested() || state.hasMajorityFollowerFetchExpired(currentTimeMs)) { Review Comment: In line [2014](https://github.com/apache/kafka/pull/14428/files#diff-1da15c51e641ea46ea5c86201ab8f21cfee9e7c575102a39c7bae0d5ffd7de39R2014) KRaft calculates the next time it should poll the leader. ```java return Math.min(timeUntilFlush, timeUntilSend); ``` If you extend this to something like below, it should wake up the leader when the timer has expired. ```java return Math.min(timeUntilFlush, timeUntilSend, state.checkQuorumExpiry()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]
jsancio commented on code in PR #14428: URL: https://github.com/apache/kafka/pull/14428#discussion_r1406806920 ## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ## @@ -79,6 +86,39 @@ protected LeaderState( this.grantingVoters = Collections.unmodifiableSet(new HashSet<>(grantingVoters)); this.log = logContext.logger(LeaderState.class); this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); +// use the 1.5x fetch timeout to tolerate some network transition time or other IO time. +this.fetchTimeoutMs = (int) (fetchTimeoutMs * 1.5); +this.fetchTimer = time.timer(fetchTimeoutMs); +} + +// Check if the fetchTimer is expired because we didn't receive a valid fetch/fetchSnapshot request from the majority of +// the voters within fetch timeout. +public boolean hasMajorityFollowerFetchExpired(long currentTimeMs) { +fetchTimer.update(currentTimeMs); +boolean isExpired = fetchTimer.isExpired(); +if (isExpired) { +log.info("Did not receive fetch request from the majority of the voters within {}ms. Current fetched voters are {}.", +fetchTimeoutMs, fetchedVoters); +} +return isExpired; +} + +// Reset the fetch timer if we've received fetch/fetchSnapshot request from the majority of the voter +public void maybeResetMajorityFollowerFetchTimer(int id, long currentTimeMs) { +updateFetchedVoters(id); +// The majority number of the voters excluding the leader. Ex: 3 voters, the value will be 1 +int majority = voterStates.size() / 2; +if (fetchedVoters.size() >= majority) { +fetchedVoters.clear(); +fetchTimer.update(currentTimeMs); +fetchTimer.reset(fetchTimeoutMs); +} +} + +private void updateFetchedVoters(int id) { +if (isVoter(id)) { Review Comment: We should be defensive against this getting called with the local replica. Let's throw an `IllegalArgumentException`, if `id` is equal to the `localId`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]
jsancio commented on code in PR #14428: URL: https://github.com/apache/kafka/pull/14428#discussion_r1406803592 ## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ## @@ -79,6 +86,39 @@ protected LeaderState( this.grantingVoters = Collections.unmodifiableSet(new HashSet<>(grantingVoters)); this.log = logContext.logger(LeaderState.class); this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); +// use the 1.5x fetch timeout to tolerate some network transition time or other IO time. +this.fetchTimeoutMs = (int) (fetchTimeoutMs * 1.5); +this.fetchTimer = time.timer(fetchTimeoutMs); +} + +// Check if the fetchTimer is expired because we didn't receive a valid fetch/fetchSnapshot request from the majority of +// the voters within fetch timeout. +public boolean hasMajorityFollowerFetchExpired(long currentTimeMs) { Review Comment: See my other comment about check quorum and Alyssa's pre-vote KIP. If you agree, maybe we should call this `hasCheckQuorumFailed`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]
jsancio commented on code in PR #14428: URL: https://github.com/apache/kafka/pull/14428#discussion_r1406771117 ## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ## @@ -79,6 +86,39 @@ protected LeaderState( this.grantingVoters = Collections.unmodifiableSet(new HashSet<>(grantingVoters)); this.log = logContext.logger(LeaderState.class); this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); +// use the 1.5x fetch timeout to tolerate some network transition time or other IO time. +this.fetchTimeoutMs = (int) (fetchTimeoutMs * 1.5); +this.fetchTimer = time.timer(fetchTimeoutMs); +} + +// Check if the fetchTimer is expired because we didn't receive a valid fetch/fetchSnapshot request from the majority of +// the voters within fetch timeout. Review Comment: Can we make this a Java doc comment. E.g.: ```java /** * Check if the fetchTimer is expired because we didn't receive a valid fetch/fetchSnapshot request from * the majority of the voters within fetch timeout. * * @param ... * @return ... */ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15670: add "inter.broker.listener.name" config in KRaft controller config [kafka]
showuon commented on PR #14631: URL: https://github.com/apache/kafka/pull/14631#issuecomment-1828916867 @mumrah , call for review. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15870: Move new group coordinator metrics from Yammer to Metrics [kafka]
jeffkbkim opened a new pull request, #14848: URL: https://github.com/apache/kafka/pull/14848 Move the following metrics from Yammer to Kafka metrics to continue with the migration from yammer to kafka metrics. These are not exposed and newly created metrics so there are no compatibility issues. * NumConsumerGroups (yammer) -> consumer-groups-size (kafka) * NumConsumerGroupsEmpty (yammer) -> empty-consumer-groups-size (kafka) * NumConsumerGroupsAssigning (yammer) -> assigning-consumer-groups-size (kafka) * NumConsumerGroupsReconciling (yammer) -> reconciling-consumer-groups-size (kafka) * NumConsumerGroupsStable (yammer) -> stable-consumer-groups-size (kafka) * NumConsumerGroupsDead (yammer) -> dead-consumer-groups-size (kafka) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15886: Always specify directories for new partition registrations [kafka]
soarez commented on code in PR #14820: URL: https://github.com/apache/kafka/pull/14820#discussion_r1406925061 ## metadata/src/test/java/org/apache/kafka/metadata/util/MetadataFeatureUtil.java: ## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.util; + +import org.apache.kafka.server.common.MetadataVersion; +import org.mockito.internal.util.MockUtil; + +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +public class MetadataFeatureUtil { Review Comment: I agree, it is getting a bit annoying to have to mock MetadataVersion everywhere. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15886: Always specify directories for new partition registrations [kafka]
soarez commented on code in PR #14820: URL: https://github.com/apache/kafka/pull/14820#discussion_r1406924620 ## metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java: ## @@ -377,7 +386,7 @@ public ApiMessageAndVersion toRecord(Uuid topicId, int partitionId, ImageWriterO record.setDirectories(Uuid.toList(directories)); } else { for (Uuid directory : directories) { -if (!DirectoryId.UNASSIGNED.equals(directory)) { +if (!DirectoryId.UNASSIGNED.equals(directory) && !DirectoryId.MIGRATING.equals(directory)) { Review Comment: Just trying to be true to the semantics. If we end up downgrading metadata but all we have is `UNASSIGNED`, there isn't really any metadata loss there. I don't feel strongly about this, so let me know if you think this is wrong. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15886: Always specify directories for new partition registrations [kafka]
soarez commented on code in PR #14820: URL: https://github.com/apache/kafka/pull/14820#discussion_r1406913694 ## metadata/src/main/java/org/apache/kafka/metadata/placement/DefaultDirProvider.java: ## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.placement; + +import org.apache.kafka.common.Uuid; + +/** + * Provide the default directory for new partitions in a given broker. + */ +@FunctionalInterface +public interface DefaultDirProvider { +Uuid defaultDir(int brokerId); Review Comment: I'll add documentation. But I don't think this should ever return `MIGRATING`, we simply do not call this provider unless the MetadataVersion supports direcotry assignments. The other result this might have is the single dir for a broker that supports JBOD but is registered with a single directory. Essentially this is so we can preset directory assignment to the only registered directory in a broker for new partitions or reassignments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]
soarez commented on code in PR #14836: URL: https://github.com/apache/kafka/pull/14836#discussion_r1406906907 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -201,7 +201,7 @@ class BrokerLifecycleManagerTest { while (!future.isDone || context.mockClient.hasInFlightRequests) { context.poll() manager.eventQueue.wakeup() - context.time.sleep(100) + context.time.sleep(5) Review Comment: It's true in theory, but in this unit test, it is not. I just tried with 6 heartbeats, and in 1,000 repetitions of the test, one of the runs was missing the last directory. On my laptop, with 7 heartbeats, 10,000 test runs had no failures. I think this happens because of how `poll()` works: it's rapidly advancing the clock and notifying three separate systems - BrokerLifecycleManager, MockClient and MockChannelManager — signaling them using `Object.notify()`, which does not guarantee each of those threads will run straightaway, it's still up to the OS to schedule them onto the CPU. In practice, outside of unit tests, the delays in scheduling the BrokerLifecycleManager thread should be insignificant compared to the heartbeat interval. So I don't expect failures to be delayed for more than a heartbeat. If the test proves to still be flaky even with 10 heartbeats, I think we can just increase the number. ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -201,7 +201,7 @@ class BrokerLifecycleManagerTest { while (!future.isDone || context.mockClient.hasInFlightRequests) { context.poll() manager.eventQueue.wakeup() - context.time.sleep(100) + context.time.sleep(5) Review Comment: It's true in theory, but in this unit test, it is not. I just tried with 6 heartbeats, and in 1,000 repetitions of the test, one of the runs was missing the last directory. On my laptop, with 7 heartbeats, 10,000 test runs had no failures. I think this happens because of how `poll()` works: it's rapidly advancing the clock and notifying three separate systems - BrokerLifecycleManager, MockClient and MockChannelManager — signaling them using `Object.notify()`, which does not guarantee each of those threads will run straightaway, it's still up to the OS to schedule them onto the CPU. In practice, outside of unit tests, the delays in scheduling the BrokerLifecycleManager thread should be insignificant compared to the heartbeat interval. So I don't expect failures to be delayed for more than a heartbeat. If the test proves to still be flaky even with 10 heartbeats, I think we can just increase the number. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14438: Throw error when consumer configured with empty/whitespace-only group.id for AsyncKafkaConsumer [kafka]
kirktrue commented on PR #14768: URL: https://github.com/apache/kafka/pull/14768#issuecomment-1828804792 @mjsax would you be willing to review this PR? It's small and (hopefully!) straightforward. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15046: Get rid of unnecessary fsyncs inside UnifiedLog.lock to stabilize performance [kafka]
ocadaruma commented on PR #14242: URL: https://github.com/apache/kafka/pull/14242#issuecomment-1828798534 @junrao Oh I misinterpreted as all green with only checking pipeline-view but I had to check tests view. I checked. Seems none of them are related to this change, and failures are due to the flakiness because all failed tests still succeeded on at least some JDK build. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]
junrao commented on code in PR #14836: URL: https://github.com/apache/kafka/pull/14836#discussion_r1406880863 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -201,7 +201,7 @@ class BrokerLifecycleManagerTest { while (!future.isDone || context.mockClient.hasInFlightRequests) { context.poll() manager.eventQueue.wakeup() - context.time.sleep(100) + context.time.sleep(5) Review Comment: Thanks for the explanation, @soarez. There is a heartbeat request after the initial registration. Each `manager.propagateDirectoryFailure` could trigger a separate heartbeat request. So, is it true that after the 4th heartbeat, each heart request is guaranteed to include all three failed dirs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15908) Remove deprecated Consumer API poll(long timeout)
[ https://issues.apache.org/jira/browse/KAFKA-15908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15908: -- Fix Version/s: 4.0.0 > Remove deprecated Consumer API poll(long timeout) > - > > Key: KAFKA-15908 > URL: https://issues.apache.org/jira/browse/KAFKA-15908 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Fix For: 4.0.0 > > > Per > [KIP-266|https://cwiki.apache.org/confluence/display/KAFKA/KIP-266%3A+Fix+consumer+indefinite+blocking+behavior], > the {{Consumer.poll(long timeout)}} method was deprecated back in 2.0.0. > In 3.7, there are two implementations, each with different behavior: > * The {{LegacyKafkaConsumer}} implementation will continue to work but will > log a warning about its removal > * The {{AsyncKafkaConsumer}} implementation will throw an error. > In 4.0, the `poll` method that takes a single `long` timeout will be removed > altogether. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14438) Throw error when consumer configured with empty/whitespace-only group.id for AsyncKafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-14438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14438: -- Fix Version/s: 3.7.0 (was: 4.0.0) > Throw error when consumer configured with empty/whitespace-only group.id for > AsyncKafkaConsumer > --- > > Key: KAFKA-14438 > URL: https://issues.apache.org/jira/browse/KAFKA-14438 > Project: Kafka > Issue Type: Task > Components: consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, kip-848-e2e, kip-848-preview > Fix For: 3.7.0 > > > Currently, a warning message is logged upon using an empty consumer groupId. > In the next major release, we should drop the support of empty ("") consumer > groupId. > cc [~hachikuji] > See > [KIP-289|https://cwiki.apache.org/confluence/display/KAFKA/KIP-289%3A+Improve+the+default+group+id+behavior+in+KafkaConsumer] > for more detail. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15907) Remove previously deprecated Consumer features from 4.0
[ https://issues.apache.org/jira/browse/KAFKA-15907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15907: -- Fix Version/s: 4.0.0 > Remove previously deprecated Consumer features from 4.0 > --- > > Key: KAFKA-15907 > URL: https://issues.apache.org/jira/browse/KAFKA-15907 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Fix For: 4.0.0 > > > This Jira serves as the main collection of APIs, logic, etc. that were > previously marked as "deprecated" by other KIPs. With 4.0, we will be > updating the code to remove the deprecated features. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15909) Throw error when consumer configured with empty/whitespace-only group.id for LegacyKafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-15909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15909: -- Summary: Throw error when consumer configured with empty/whitespace-only group.id for LegacyKafkaConsumer (was: Remove support for empty "group.id" for "generic" group protocol) > Throw error when consumer configured with empty/whitespace-only group.id for > LegacyKafkaConsumer > > > Key: KAFKA-15909 > URL: https://issues.apache.org/jira/browse/KAFKA-15909 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Fix For: 4.0.0 > > > Per > [KIP-289|https://cwiki.apache.org/confluence/display/KAFKA/KIP-289%3A+Improve+the+default+group+id+behavior+in+KafkaConsumer], > the use of an empty value for {{group.id}} configuration was deprecated back > in 2.2.0. > In 3.7, the {{AsyncKafkaConsumer}} implementation will throw an error (see > KAFKA-14438). > This task is to update the {{LegacyKafkaConsumer}} implementation to throw an > error in 4.0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14438) Throw error when consumer configured with empty/whitespace-only group.id for AsyncKafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-14438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14438: -- Summary: Throw error when consumer configured with empty/whitespace-only group.id for AsyncKafkaConsumer (was: Throw error when consumer configured with empty/whitespace-only group.id) > Throw error when consumer configured with empty/whitespace-only group.id for > AsyncKafkaConsumer > --- > > Key: KAFKA-14438 > URL: https://issues.apache.org/jira/browse/KAFKA-14438 > Project: Kafka > Issue Type: Task > Components: consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Blocker > Labels: kip-848-client-support, kip-848-e2e, kip-848-preview > Fix For: 4.0.0 > > > Currently, a warning message is logged upon using an empty consumer groupId. > In the next major release, we should drop the support of empty ("") consumer > groupId. > cc [~hachikuji] > See > [KIP-289|https://cwiki.apache.org/confluence/display/KAFKA/KIP-289%3A+Improve+the+default+group+id+behavior+in+KafkaConsumer] > for more detail. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15909) Remove support for empty "group.id" for "generic" group protocol
[ https://issues.apache.org/jira/browse/KAFKA-15909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15909: -- Description: Per [KIP-289|https://cwiki.apache.org/confluence/display/KAFKA/KIP-289%3A+Improve+the+default+group+id+behavior+in+KafkaConsumer], the use of an empty value for {{group.id}} configuration was deprecated back in 2.2.0. In 3.7, the {{AsyncKafkaConsumer}} implementation will throw an error (see KAFKA-14438). This task is to update the {{LegacyKafkaConsumer}} implementation to throw an error in 4.0. was: Per KIP-289, the use of an empty value for {{group.id}} configuration was deprecated back in 2.2.0. In 3.7, the {{AsyncKafkaConsumer}} implementation will throw an error (see KAFKA-14438). This task is to update the {{LegacyKafkaConsumer}} implementation to throw an error in 4.0. > Remove support for empty "group.id" for "generic" group protocol > > > Key: KAFKA-15909 > URL: https://issues.apache.org/jira/browse/KAFKA-15909 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Fix For: 4.0.0 > > > Per > [KIP-289|https://cwiki.apache.org/confluence/display/KAFKA/KIP-289%3A+Improve+the+default+group+id+behavior+in+KafkaConsumer], > the use of an empty value for {{group.id}} configuration was deprecated back > in 2.2.0. > In 3.7, the {{AsyncKafkaConsumer}} implementation will throw an error (see > KAFKA-14438). > This task is to update the {{LegacyKafkaConsumer}} implementation to throw an > error in 4.0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15886: Always specify directories for new partition registrations [kafka]
cmccabe commented on code in PR #14820: URL: https://github.com/apache/kafka/pull/14820#discussion_r1406862990 ## metadata/src/test/java/org/apache/kafka/metadata/util/MetadataFeatureUtil.java: ## @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.util; + +import org.apache.kafka.server.common.MetadataVersion; +import org.mockito.internal.util.MockUtil; + +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +public class MetadataFeatureUtil { Review Comment: I've been thinking about this more, and I think maybe we should just claim a metadata version number. It's time. Maybe even move KIP-966 up a notch since we may finish before they do. I guess let's discuss offline. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15909) Remove support for empty "group.id" for "generic" group protocol
[ https://issues.apache.org/jira/browse/KAFKA-15909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15909: -- Description: Per KIP-289, the use of an empty value for {{group.id}} configuration was deprecated back in 2.2.0. In 3.7, the {{AsyncKafkaConsumer}} implementation will throw an error (see KAFKA-14438). This task is to update the {{LegacyKafkaConsumer}} implementation to throw an error in 4.0. was: Per KIP-266, the {{Consumer.poll(long timeout)}} method was deprecated back in 2.0.0. In 3.7, there are two implementations, each with different behavior: * The {{LegacyKafkaConsumer}} implementation will continue to work but will log a warning about its removal * The {{AsyncKafkaConsumer}} implementation will throw an error. In 4.0, the `poll` method that takes a single `long` timeout will be removed altogether. > Remove support for empty "group.id" for "generic" group protocol > > > Key: KAFKA-15909 > URL: https://issues.apache.org/jira/browse/KAFKA-15909 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Fix For: 4.0.0 > > > Per KIP-289, the use of an empty value for {{group.id}} configuration was > deprecated back in 2.2.0. > In 3.7, the {{AsyncKafkaConsumer}} implementation will throw an error (see > KAFKA-14438). > This task is to update the {{LegacyKafkaConsumer}} implementation to throw an > error in 4.0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]
soarez commented on code in PR #14836: URL: https://github.com/apache/kafka/pull/14836#discussion_r1406861254 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -214,28 +214,20 @@ class BrokerLifecycleManagerTest { ctx.controllerNodeProvider.node.set(controllerNode) val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new BrokerRegistrationResponseData().setBrokerEpoch(1000))) -val heartbeats = Seq.fill(6)(prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData( - manager.start(() => ctx.highestMetadataOffset.get(), ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners, Collections.emptyMap(), OptionalLong.empty()) poll(ctx, manager, registration) manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA")) -poll(ctx, manager, heartbeats(0)).data() -val dirs1 = poll(ctx, manager, heartbeats(1)).data().offlineLogDirs() - manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow")) -poll(ctx, manager, heartbeats(2)).data() -val dirs2 = poll(ctx, manager, heartbeats(3)).data().offlineLogDirs() - manager.propagateDirectoryFailure(Uuid.fromString("1iF76HVNRPqC7Y4r6647eg")) -poll(ctx, manager, heartbeats(4)).data() -val dirs3 = poll(ctx, manager, heartbeats(5)).data().offlineLogDirs() - -assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA").map(Uuid.fromString), dirs1.asScala.toSet) -assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow").map(Uuid.fromString), dirs2.asScala.toSet) -assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow", "1iF76HVNRPqC7Y4r6647eg").map(Uuid.fromString), dirs3.asScala.toSet) +val latestHeartbeat = Seq.fill(10)( + prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())) +).map(poll(ctx, manager, _)).last Review Comment: I need to update the KIP. Last week in a disucssion with @cmccabe and @pprovenzano, we realized that because of overload mode for heartbeats, it will be easier to handle failed log directories if the broker always sends the accumulated list. Hence #14770 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15886: Always specify directories for new partition registrations [kafka]
cmccabe commented on code in PR #14820: URL: https://github.com/apache/kafka/pull/14820#discussion_r1406861113 ## metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java: ## @@ -377,7 +386,7 @@ public ApiMessageAndVersion toRecord(Uuid topicId, int partitionId, ImageWriterO record.setDirectories(Uuid.toList(directories)); } else { for (Uuid directory : directories) { -if (!DirectoryId.UNASSIGNED.equals(directory)) { +if (!DirectoryId.UNASSIGNED.equals(directory) && !DirectoryId.MIGRATING.equals(directory)) { Review Comment: Hmm. Why do we need to special-case UNASSIGNED here? If we're pre-JBOD, we should only have MIGRATING, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15886: Always specify directories for new partition registrations [kafka]
cmccabe commented on code in PR #14820: URL: https://github.com/apache/kafka/pull/14820#discussion_r1406859690 ## metadata/src/main/java/org/apache/kafka/metadata/placement/DefaultDirProvider.java: ## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.placement; + +import org.apache.kafka.common.Uuid; + +/** + * Provide the default directory for new partitions in a given broker. + */ +@FunctionalInterface +public interface DefaultDirProvider { +Uuid defaultDir(int brokerId); Review Comment: So for a broker in JBOD mode (i.e. it has multiple directories), `defaultDir` would return `UNASSIGNED`, right? And when we're using an older MetadataVersion, we'll always get back `MIGRATING`, right? It would be good to document that in the JavaDoc. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15886: Always specify directories for new partition registrations [kafka]
cmccabe commented on code in PR #14820: URL: https://github.com/apache/kafka/pull/14820#discussion_r1406858333 ## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ## @@ -405,7 +415,7 @@ private void completeReassignmentIfNeeded() { targetAdding = Collections.emptyList(); } -public Optional build() { +public Optional build(DefaultDirProvider defaultDirProvider) { Review Comment: Please don't add arguments to build(). If you want to have a way to set the default directory, then have a `Builder.setDefaultDirProvider` function. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15886: Always specify directories for new partition registrations [kafka]
cmccabe commented on code in PR #14820: URL: https://github.com/apache/kafka/pull/14820#discussion_r1406858333 ## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ## @@ -405,7 +415,7 @@ private void completeReassignmentIfNeeded() { targetAdding = Collections.emptyList(); } -public Optional build() { +public Optional build(DefaultDirProvider defaultDirProvider) { Review Comment: Please don't add arguments to build(). If you want to have a way to set the default directory, then have a `setDefaultDirProvider` function. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15886: Always specify directories for new partition registrations [kafka]
cmccabe commented on code in PR #14820: URL: https://github.com/apache/kafka/pull/14820#discussion_r1406856346 ## metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java: ## @@ -123,6 +127,7 @@ public PartitionChangeBuilder( this.targetElr = Replicas.toList(partition.elr); this.targetLastKnownElr = Replicas.toList(partition.lastKnownElr); this.targetLeaderRecoveryState = partition.leaderRecoveryState; +this.targetDirectories = DirectoryId.createAssignmentMap(partition.replicas, partition.directories); Review Comment: One thing I'm trying to understand is why we should use a map here rather than a list of the same length as the replicas list. This also applies to other uses of `createAssignmentMap` -- is there ever a place where we would be better off with a map rather than just an array of the right size? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]
soarez commented on code in PR #14836: URL: https://github.com/apache/kafka/pull/14836#discussion_r1406855449 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -201,7 +201,7 @@ class BrokerLifecycleManagerTest { while (!future.isDone || context.mockClient.hasInFlightRequests) { context.poll() manager.eventQueue.wakeup() - context.time.sleep(100) + context.time.sleep(5) Review Comment: No. This can mitigate it, but it cannot prevent the race condition entirely. AssignmentsManager has its own event loop thread that is batching and sending the accumulated failed directories. It's a bit tricky to predict the content of each request, so instead I opted to only assert after a few heartbeats. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15890: Consumer.poll with long timeout unaware of assigned partitions [kafka]
AndrewJSchofield commented on code in PR #14835: URL: https://github.com/apache/kafka/pull/14835#discussion_r1406853731 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -417,4 +394,127 @@ private void updateHeartbeatIntervalMs(final long heartbeatIntervalMs) { this.heartbeatTimer.updateAndReset(heartbeatIntervalMs); } } + +/** + * Builds the heartbeat requests correctly, ensuring that all information is sent according to + * the protocol, but subsequent requests do not send information which has not changed. This + * is important to ensure that reconciliation completes successfully. + */ +static class HeartbeatState { +private final SubscriptionState subscriptions; +private final MembershipManager membershipManager; +private final int rebalanceTimeoutMs; + +// Fields of ConsumerHeartbeatRequest sent in the most recent request +private String sentInstanceId; +private int sentRebalanceTimeoutMs; +private TreeSet sentSubscribedTopicNames; +// private String sentSubscribedTopicRegex; +private String sentServerAssignor; +private TreeSet sentTopicPartitions; Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15909) Remove support for empty "group.id" for "generic" group protocol
[ https://issues.apache.org/jira/browse/KAFKA-15909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15909: -- Summary: Remove support for empty "group.id" for "generic" group protocol (was: Remove support for empty "group.id") > Remove support for empty "group.id" for "generic" group protocol > > > Key: KAFKA-15909 > URL: https://issues.apache.org/jira/browse/KAFKA-15909 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Fix For: 4.0.0 > > > Per KIP-266, the {{Consumer.poll(long timeout)}} method was deprecated back > in 2.0.0. > In 3.7, there are two implementations, each with different behavior: > * The {{LegacyKafkaConsumer}} implementation will continue to work but will > log a warning about its removal > * The {{AsyncKafkaConsumer}} implementation will throw an error. > In 4.0, the `poll` method that takes a single `long` timeout will be removed > altogether. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15909) Remove support for empty "group.id"
[ https://issues.apache.org/jira/browse/KAFKA-15909?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15909: -- Fix Version/s: 4.0.0 > Remove support for empty "group.id" > --- > > Key: KAFKA-15909 > URL: https://issues.apache.org/jira/browse/KAFKA-15909 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Fix For: 4.0.0 > > > Per KIP-266, the {{Consumer.poll(long timeout)}} method was deprecated back > in 2.0.0. > In 3.7, there are two implementations, each with different behavior: > * The {{LegacyKafkaConsumer}} implementation will continue to work but will > log a warning about its removal > * The {{AsyncKafkaConsumer}} implementation will throw an error. > In 4.0, the `poll` method that takes a single `long` timeout will be removed > altogether. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15908) Remove deprecated Consumer API poll(long timeout)
[ https://issues.apache.org/jira/browse/KAFKA-15908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15908: -- Description: Per [KIP-266|https://cwiki.apache.org/confluence/display/KAFKA/KIP-266%3A+Fix+consumer+indefinite+blocking+behavior], the {{Consumer.poll(long timeout)}} method was deprecated back in 2.0.0. In 3.7, there are two implementations, each with different behavior: * The {{LegacyKafkaConsumer}} implementation will continue to work but will log a warning about its removal * The {{AsyncKafkaConsumer}} implementation will throw an error. In 4.0, the `poll` method that takes a single `long` timeout will be removed altogether. was: Per KIP-266, the {{Consumer.poll(long timeout)}} method was deprecated back in 2.0.0. In 3.7, there are two implementations, each with different behavior: * The {{LegacyKafkaConsumer}} implementation will continue to work but will log a warning about its removal * The {{AsyncKafkaConsumer}} implementation will throw an error. In 4.0, the `poll` method that takes a single `long` timeout will be removed altogether. > Remove deprecated Consumer API poll(long timeout) > - > > Key: KAFKA-15908 > URL: https://issues.apache.org/jira/browse/KAFKA-15908 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > > Per > [KIP-266|https://cwiki.apache.org/confluence/display/KAFKA/KIP-266%3A+Fix+consumer+indefinite+blocking+behavior], > the {{Consumer.poll(long timeout)}} method was deprecated back in 2.0.0. > In 3.7, there are two implementations, each with different behavior: > * The {{LegacyKafkaConsumer}} implementation will continue to work but will > log a warning about its removal > * The {{AsyncKafkaConsumer}} implementation will throw an error. > In 4.0, the `poll` method that takes a single `long` timeout will be removed > altogether. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15909) Remove support for empty "group.id"
Kirk True created KAFKA-15909: - Summary: Remove support for empty "group.id" Key: KAFKA-15909 URL: https://issues.apache.org/jira/browse/KAFKA-15909 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Kirk True Assignee: Kirk True Per KIP-266, the {{Consumer.poll(long timeout)}} method was deprecated back in 2.0.0. In 3.7, there are two implementations, each with different behavior: * The {{LegacyKafkaConsumer}} implementation will continue to work but will log a warning about its removal * The {{AsyncKafkaConsumer}} implementation will throw an error. In 4.0, the `poll` method that takes a single `long` timeout will be removed altogether. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15908) Remove deprecated Consumer API poll(long timeout)
[ https://issues.apache.org/jira/browse/KAFKA-15908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15908: -- Component/s: clients consumer > Remove deprecated Consumer API poll(long timeout) > - > > Key: KAFKA-15908 > URL: https://issues.apache.org/jira/browse/KAFKA-15908 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > > Per KIP-266, the {{Consumer.poll(long timeout)}} method was deprecated back > in 2.0.0. > In 3.7, there are two implementations, each with different behavior: > * The {{LegacyKafkaConsumer}} implementation will continue to work but will > log a warning about its removal > * The {{AsyncKafkaConsumer}} implementation will throw an error. > In 4.0, the `poll` method that takes a single `long` timeout will be removed > altogether. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15908) Remove deprecated Consumer API poll(long timeout)
Kirk True created KAFKA-15908: - Summary: Remove deprecated Consumer API poll(long timeout) Key: KAFKA-15908 URL: https://issues.apache.org/jira/browse/KAFKA-15908 Project: Kafka Issue Type: Sub-task Reporter: Kirk True Assignee: Kirk True Per KIP-266, the {{Consumer.poll(long timeout)}} method was deprecated back in 2.0.0. In 3.7, there are two implementations, each with different behavior: * The {{LegacyKafkaConsumer}} implementation will continue to work but will log a warning about its removal * The {{AsyncKafkaConsumer}} implementation will throw an error. In 4.0, the `poll` method that takes a single `long` timeout will be removed altogether. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15886: Always specify directories for new partition registrations [kafka]
cmccabe commented on PR #14820: URL: https://github.com/apache/kafka/pull/14820#issuecomment-1828735267 @soarez : `BrokerLifecycleManagerTest.testAlwaysSendsAccumulatedOfflineDirs` seems to be failing -- can you update this with the latest from trunk? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15896) Flaky test: shouldQuerySpecificStalePartitionStores() – org.apache.kafka.streams.integration.StoreQueryIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-15896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15896: Labels: flaky-test (was: ) > Flaky test: shouldQuerySpecificStalePartitionStores() – > org.apache.kafka.streams.integration.StoreQueryIntegrationTest > -- > > Key: KAFKA-15896 > URL: https://issues.apache.org/jira/browse/KAFKA-15896 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Apoorv Mittal >Priority: Major > Labels: flaky-test > > Flaky test: > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14699/21/tests/ > > > {code:java} > Error > org.apache.kafka.streams.errors.InvalidStateStorePartitionException: The > specified partition 1 for store source-table does not > exist.Stacktraceorg.apache.kafka.streams.errors.InvalidStateStorePartitionException: > The specified partition 1 for store source-table does not exist. at > app//org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:63) > at > app//org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:53) > at > app//org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificStalePartitionStores(StoreQueryIntegrationTest.java:347) > at > java.base@21.0.1/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) > at java.base@21.0.1/java.lang.reflect.Method.invoke(Method.java:580)at > app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) > at > app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) >at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > app//org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) >at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) >at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) > {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15896) Flaky test: shouldQuerySpecificStalePartitionStores() – org.apache.kafka.streams.integration.StoreQueryIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-15896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15896: Component/s: streams unit tests > Flaky test: shouldQuerySpecificStalePartitionStores() – > org.apache.kafka.streams.integration.StoreQueryIntegrationTest > -- > > Key: KAFKA-15896 > URL: https://issues.apache.org/jira/browse/KAFKA-15896 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Apoorv Mittal >Priority: Major > > Flaky test: > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14699/21/tests/ > > > {code:java} > Error > org.apache.kafka.streams.errors.InvalidStateStorePartitionException: The > specified partition 1 for store source-table does not > exist.Stacktraceorg.apache.kafka.streams.errors.InvalidStateStorePartitionException: > The specified partition 1 for store source-table does not exist. at > app//org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:63) > at > app//org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:53) > at > app//org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificStalePartitionStores(StoreQueryIntegrationTest.java:347) > at > java.base@21.0.1/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) > at java.base@21.0.1/java.lang.reflect.Method.invoke(Method.java:580)at > app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) > at > app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) >at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > app//org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) >at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) >at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) > {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]
jsancio commented on code in PR #14428: URL: https://github.com/apache/kafka/pull/14428#discussion_r1406768273 ## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ## @@ -79,6 +86,39 @@ protected LeaderState( this.grantingVoters = Collections.unmodifiableSet(new HashSet<>(grantingVoters)); this.log = logContext.logger(LeaderState.class); this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); +// use the 1.5x fetch timeout to tolerate some network transition time or other IO time. +this.fetchTimeoutMs = (int) (fetchTimeoutMs * 1.5); +this.fetchTimer = time.timer(fetchTimeoutMs); Review Comment: Since this is not set to the fetch timeout maybe we can call this `checkQuorumTimeoutMs` and `checkQuorumTimer`. I am suggesting these names because @ahuang98 uses "check quorum" in the pre-vote KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-996%3A+Pre-Vote ## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ## @@ -79,6 +86,39 @@ protected LeaderState( this.grantingVoters = Collections.unmodifiableSet(new HashSet<>(grantingVoters)); this.log = logContext.logger(LeaderState.class); this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); +// use the 1.5x fetch timeout to tolerate some network transition time or other IO time. +this.fetchTimeoutMs = (int) (fetchTimeoutMs * 1.5); +this.fetchTimer = time.timer(fetchTimeoutMs); +} + +// Check if the fetchTimer is expired because we didn't receive a valid fetch/fetchSnapshot request from the majority of +// the voters within fetch timeout. Review Comment: Can we make this a Java doc comments. E.g.: ```java /** * Check if the fetchTimer is expired because we didn't receive a valid fetch/fetchSnapshot request from * the majority of the voters within fetch timeout. * * @param ... * @return ... */ ``` ## raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java: ## @@ -1340,6 +1341,9 @@ private FetchSnapshotResponseData handleFetchSnapshotRequest( UnalignedRecords records = snapshot.slice(partitionSnapshot.position(), Math.min(data.maxBytes(), maxSnapshotSize)); +Optional> state = quorum.maybeLeaderState(); +state.ifPresent(s -> s.maybeResetMajorityFollowerFetchTimer(data.replicaId(), currentTimeMs)); + Review Comment: Since the check above `leaderValidation.isPresent()` is false, it means that this replica is guarantee to be the leader at this point in time. I prefer if we use `leaderStateOrThrow` instead of `maybeLeaderState` to make this clear to future readers. ## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ## @@ -79,6 +86,39 @@ protected LeaderState( this.grantingVoters = Collections.unmodifiableSet(new HashSet<>(grantingVoters)); this.log = logContext.logger(LeaderState.class); this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); +// use the 1.5x fetch timeout to tolerate some network transition time or other IO time. +this.fetchTimeoutMs = (int) (fetchTimeoutMs * 1.5); +this.fetchTimer = time.timer(fetchTimeoutMs); +} + +// Check if the fetchTimer is expired because we didn't receive a valid fetch/fetchSnapshot request from the majority of +// the voters within fetch timeout. +public boolean hasMajorityFollowerFetchExpired(long currentTimeMs) { +fetchTimer.update(currentTimeMs); +boolean isExpired = fetchTimer.isExpired(); +if (isExpired) { +log.info("Did not receive fetch request from the majority of the voters within {}ms. Current fetched voters are {}.", +fetchTimeoutMs, fetchedVoters); +} +return isExpired; +} + +// Reset the fetch timer if we've received fetch/fetchSnapshot request from the majority of the voter +public void maybeResetMajorityFollowerFetchTimer(int id, long currentTimeMs) { +updateFetchedVoters(id); +// The majority number of the voters excluding the leader. Ex: 3 voters, the value will be 1 +int majority = voterStates.size() / 2; +if (fetchedVoters.size() >= majority) { +fetchedVoters.clear(); +fetchTimer.update(currentTimeMs); +fetchTimer.reset(fetchTimeoutMs); +} +} + +private void updateFetchedVoters(int id) { +if (isVoter(id)) { Review Comment: We should be defensive against this getting called with the local replica. Let's throw an `IllegalArgumentException` if `id` is equal to the `localId`. ## raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java: ## @@ -485,6 +485,49 @@ public void testHandleBeginQuorumEpo
[jira] [Updated] (KAFKA-15907) Remove previously deprecated Consumer features from 4.0
[ https://issues.apache.org/jira/browse/KAFKA-15907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15907: -- Component/s: clients consumer > Remove previously deprecated Consumer features from 4.0 > --- > > Key: KAFKA-15907 > URL: https://issues.apache.org/jira/browse/KAFKA-15907 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > > This Jira serves as the main collection of APIs, logic, etc. that were > previously marked as "deprecated" by other KIPs. With 4.0, we will be > updating the code to remove the deprecated features. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15907) Remove previously deprecated Consumer features from 4.0
Kirk True created KAFKA-15907: - Summary: Remove previously deprecated Consumer features from 4.0 Key: KAFKA-15907 URL: https://issues.apache.org/jira/browse/KAFKA-15907 Project: Kafka Issue Type: Task Reporter: Kirk True Assignee: Kirk True This Jira serves as the main collection of APIs, logic, etc. that were previously marked as "deprecated" by other KIPs. With 4.0, we will be updating the code to remove the deprecated features. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15831: KIP-1000 protocol and admin client [kafka]
AndrewJSchofield commented on PR #14811: URL: https://github.com/apache/kafka/pull/14811#issuecomment-1828718746 Build is almost green. A small number of test failures unrelated to this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]
junrao commented on code in PR #14836: URL: https://github.com/apache/kafka/pull/14836#discussion_r1406820475 ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -201,7 +201,7 @@ class BrokerLifecycleManagerTest { while (!future.isDone || context.mockClient.hasInFlightRequests) { context.poll() manager.eventQueue.wakeup() - context.time.sleep(100) + context.time.sleep(5) Review Comment: What's causing the following failure before? Does this change fix the issue? ``` rg.opentest4j.AssertionFailedError: Expected :Set(h3sC4Yk-Q9-fd0ntJTocCA, ej8Q9_d2Ri6FXNiTxKFiow, 1iF76HVNRPqC7Y4r6647eg) Actual :Set(h3sC4Yk-Q9-fd0ntJTocCA, ej8Q9_d2Ri6FXNiTxKFiow) ``` ## core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala: ## @@ -214,28 +214,20 @@ class BrokerLifecycleManagerTest { ctx.controllerNodeProvider.node.set(controllerNode) val registration = prepareResponse(ctx, new BrokerRegistrationResponse(new BrokerRegistrationResponseData().setBrokerEpoch(1000))) -val heartbeats = Seq.fill(6)(prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData( - manager.start(() => ctx.highestMetadataOffset.get(), ctx.mockChannelManager, ctx.clusterId, ctx.advertisedListeners, Collections.emptyMap(), OptionalLong.empty()) poll(ctx, manager, registration) manager.propagateDirectoryFailure(Uuid.fromString("h3sC4Yk-Q9-fd0ntJTocCA")) -poll(ctx, manager, heartbeats(0)).data() -val dirs1 = poll(ctx, manager, heartbeats(1)).data().offlineLogDirs() - manager.propagateDirectoryFailure(Uuid.fromString("ej8Q9_d2Ri6FXNiTxKFiow")) -poll(ctx, manager, heartbeats(2)).data() -val dirs2 = poll(ctx, manager, heartbeats(3)).data().offlineLogDirs() - manager.propagateDirectoryFailure(Uuid.fromString("1iF76HVNRPqC7Y4r6647eg")) -poll(ctx, manager, heartbeats(4)).data() -val dirs3 = poll(ctx, manager, heartbeats(5)).data().offlineLogDirs() - -assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA").map(Uuid.fromString), dirs1.asScala.toSet) -assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow").map(Uuid.fromString), dirs2.asScala.toSet) -assertEquals(Set("h3sC4Yk-Q9-fd0ntJTocCA", "ej8Q9_d2Ri6FXNiTxKFiow", "1iF76HVNRPqC7Y4r6647eg").map(Uuid.fromString), dirs3.asScala.toSet) +val latestHeartbeat = Seq.fill(10)( + prepareResponse[BrokerHeartbeatRequest](ctx, new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData())) +).map(poll(ctx, manager, _)).last Review Comment: Hmm, KIP-858 says "The UUIDs for the newly failed log directories are included in the BrokerHeartbeat request until the broker receives a successful response.". How do we guarantee that only the 10th HeartbeatRequest picks up the failed log dirs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix flaky BrokerLifecycleManagerTest [kafka]
cmccabe merged PR #14836: URL: https://github.com/apache/kafka/pull/14836 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee commented on PR #14842: URL: https://github.com/apache/kafka/pull/14842#issuecomment-1828686697 Hi @lucasbru - I just opened the PR for you to review. I'm not 100% happy with the way tests are setup therefore I made some changes around optionally disabling autocommit in the network thread. Also, I feel the tests here kind of become some sort of integration testing. I thought it kind of against the unit test philosophy. In summary, the changes are: 1. We will only auto commit if the configuration is enabled (by default) or if we've got anything to commit at all 2. We need to enforce finding a coordinator and send an autocommit regardless of the previous commit state because we need to make sure to record the progress before closing 3. Quite a bit of changes to the testing, because autocommit depends on the current progress, so I need to "seek" for some cases to ensure the test sends an autocommit LMK what do you think! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Zk to KRaft migration is now production ready [kafka]
cmccabe commented on PR #14546: URL: https://github.com/apache/kafka/pull/14546#issuecomment-1828684914 Thanks for the PR, @ocadaruma . There is a PR we've been working on for a while, https://github.com/apache/kafka/pull/14160/files , which I think includes your fix and much more. I will see if I can get that one in since it corrects a bunch of things which are now out-of-date. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee commented on code in PR #14842: URL: https://github.com/apache/kafka/pull/14842#discussion_r1406792003 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java: ## @@ -279,7 +287,7 @@ public ConsumerNetworkThreadTestBuilder(Optional groupInfo) { @Override public void close() { -closeQuietly(consumerNetworkThread, ConsumerNetworkThread.class.getSimpleName()); +consumerNetworkThread.close(); Review Comment: I don't think we should suppress the failures on closing during testing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee commented on code in PR #14842: URL: https://github.com/apache/kafka/pull/14842#discussion_r1406789593 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -206,21 +206,21 @@ public CompletableFuture maybeAutoCommit(final Map maybeAutoCommitAllConsumed() { +public CompletableFuture maybeAutoCommit() { return maybeAutoCommit(subscriptions.allConsumed()); } +boolean canAutoCommit() { +return autoCommitState.isPresent() && !subscriptions.allConsumed().isEmpty(); +} + /** - * The consumer needs to send an auto commit during the shutdown if autocommit is enabled. + * Return an OffsetCommitRequest of all assigned topicPartitions and their current positions. */ -Optional maybeCreateAutoCommitRequest() { -if (!autoCommitState.isPresent()) { -return Optional.empty(); -} - +NetworkClientDelegate.UnsentRequest commitAllConsumedPositions() { OffsetCommitRequestState request = pendingRequests.createOffsetCommitRequest(subscriptions.allConsumed(), jitter); request.future.whenComplete(autoCommitCallback(subscriptions.allConsumed())); -return Optional.of(request.toUnsentRequest()); +return request.toUnsentRequest(); Review Comment: We should always return a request because I moved that check out of it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15468: Prevent transaction coordinator reloads on already loaded leaders [kafka]
hachikuji commented on code in PR #14489: URL: https://github.com/apache/kafka/pull/14489#discussion_r1406789222 ## core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala: ## @@ -514,8 +520,14 @@ class TransactionStateManager(brokerId: Int, * When this broker becomes a leader for a transaction log partition, load this partition and populate the transaction * metadata cache with the transactional ids. This operation must be resilient to any partial state left off from * the previous loading / unloading operation. + * + * If the state is already loaded (leader epoch bumps, but we have the same leader), just update the epoch in the + * metadata cache and for all the pending markers. */ - def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: Int, sendTxnMarkers: SendTxnMarkersCallback): Unit = { + def maybeLoadTransactionsAndBumpEpochForTxnTopicPartition(partitionId: Int, Review Comment: Yeah, the original name seems fine to me. We are still loading the transactions. We just have an optimization when we already had state from a previous epoch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-15860) ControllerRegistration must be written out to the metadata image
[ https://issues.apache.org/jira/browse/KAFKA-15860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-15860. -- Fix Version/s: 3.7.0 Resolution: Fixed > ControllerRegistration must be written out to the metadata image > > > Key: KAFKA-15860 > URL: https://issues.apache.org/jira/browse/KAFKA-15860 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Major > Fix For: 3.7.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee commented on code in PR #14842: URL: https://github.com/apache/kafka/pull/14842#discussion_r1406788484 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -184,7 +184,7 @@ private static long findMinTime(final Collection request * completed future if no request is generated. */ public CompletableFuture maybeAutoCommit(final Map offsets) { -if (!autoCommitState.isPresent()) { +if (!canAutoCommit()) { Review Comment: pretty terrible naming because it kind of overlaps with the one below, not sure there's a better description for it. It needs to check 1. if autocommit is enabled and 2. if there's anything to commit. If neither, then we don't try to send a commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee commented on code in PR #14842: URL: https://github.com/apache/kafka/pull/14842#discussion_r1406787063 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -154,7 +154,7 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { if (!coordinatorRequestManager.coordinator().isPresent()) return EMPTY; -maybeAutoCommitAllConsumed(); +maybeAutoCommit(); Review Comment: we always commit allConsumed(), so there's no point to reinstate that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee commented on code in PR #14842: URL: https://github.com/apache/kafka/pull/14842#discussion_r1406788484 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -184,7 +184,7 @@ private static long findMinTime(final Collection request * completed future if no request is generated. */ public CompletableFuture maybeAutoCommit(final Map offsets) { -if (!autoCommitState.isPresent()) { +if (!canAutoCommit()) { Review Comment: pretty terrible words, not sure there's a better description for it. It needs to check 1. if autocommit is enabled and 2. if there's anything to commit. If neither, then we don't try to send a commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15311: Fix docs about reverting to ZooKeeper mode during KRaft migration [kafka]
cmccabe commented on code in PR #14160: URL: https://github.com/apache/kafka/pull/14160#discussion_r1406786870 ## docs/ops.html: ## @@ -3778,6 +3784,14 @@ Migrating brokers to KRaft Each broker is restarted with a KRaft configuration until the entire cluster is running in KRaft mode. + Reverting to ZooKeeper mode During the Migration +While the cluster is still in migration mode, it is possible to revert to ZK mode. In order to do this: + + One by one, take each KRaft broker down. Remove the __cluster_metadata directory on the broker. Then, restart the broker as ZooKeeper. Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15311: Fix docs about reverting to ZooKeeper mode during KRaft migration [kafka]
cmccabe commented on code in PR #14160: URL: https://github.com/apache/kafka/pull/14160#discussion_r1406786425 ## docs/ops.html: ## @@ -3603,39 +3603,45 @@ Supporting JBOD configurations with multiple storage directories Modifying certain dynamic configurations on the standalone KRaft controller -Delegation tokens ZooKeeper to KRaft Migration ZooKeeper to KRaft migration is considered an Early Access feature and is not recommended for production clusters. +Please report issues with ZooKeeper to KRaft migration using the +https://issues.apache.org/jira/projects/KAFKA"; target="_blank">project JIRA and the "kraft" component. - The following features are not yet supported for ZK to KRaft migrations: - + Terminology -Downgrading to ZooKeeper mode during or after the migration -Other features not yet supported in KRaft +Brokers that are in ZK mode store their metadata in Apache ZooKepeer. This is the old mode of handling metadata. +Brokers that are in KRaft mode store their metadata in a KRaft quorum. This is the new and improve mode of handling metadata. +Migration is the process of moving cluster metadata from ZooKeeper into a KRaft quorum. - -Please report issues with ZooKeeper to KRaft migration using the -https://issues.apache.org/jira/projects/KAFKA"; target="_blank">project JIRA and the "kraft" component. - + Migration Phases + In general, the migration process passes through several phases. - Terminology - -We use the term "migration" here to refer to the process of changing a Kafka cluster's metadata -system from ZooKeeper to KRaft and migrating existing metadata. An "upgrade" refers to installing a newer version of Kafka. It is not recommended to -upgrade the software at the same time as performing a metadata migration. - + +In the initial phase, all the brokers are in ZK mode, and there is a ZK-based controller. +During the initial metadata load, a KRaft quorum loads the metadata from ZooKeeper, +In hybrid phase, some brokers are in ZK mode, but there is a KRaft controller. +In dual-write phase, all brokers are KRaft, but the KRaft controller is continuing to write to ZK. +When the migration has been finalized, we no longer write metadata to ZooKeeper. + - -We also use the term "ZK mode" to refer to Kafka brokers which are using ZooKeeper as their metadata -system. "KRaft mode" refers Kafka brokers which are using a KRaft controller quorum as their metadata system. - + Limitations + +While a cluster is being migrated from ZK mode to KRaft mode, we do not support changing the metadata + version (also known as the inter.broker.protocol version.) Please do not attempt to do this during + a migration, or you may break the cluster. +After the migration has been finalized, it is not possible to revert back to ZooKeeper mode. +As noted above, some features are not fully implemented in KRaft mode. If you are + using one of those features, you will not be able to migrate to KRaft yet. + + Review Comment: removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15311: Fix docs about reverting to ZooKeeper mode during KRaft migration [kafka]
cmccabe commented on code in PR #14160: URL: https://github.com/apache/kafka/pull/14160#discussion_r1406786099 ## docs/ops.html: ## @@ -3603,39 +3603,45 @@ Supporting JBOD configurations with multiple storage directories Modifying certain dynamic configurations on the standalone KRaft controller -Delegation tokens ZooKeeper to KRaft Migration ZooKeeper to KRaft migration is considered an Early Access feature and is not recommended for production clusters. +Please report issues with ZooKeeper to KRaft migration using the +https://issues.apache.org/jira/projects/KAFKA"; target="_blank">project JIRA and the "kraft" component. - The following features are not yet supported for ZK to KRaft migrations: - + Terminology -Downgrading to ZooKeeper mode during or after the migration -Other features not yet supported in KRaft +Brokers that are in ZK mode store their metadata in Apache ZooKepeer. This is the old mode of handling metadata. +Brokers that are in KRaft mode store their metadata in a KRaft quorum. This is the new and improve mode of handling metadata. +Migration is the process of moving cluster metadata from ZooKeeper into a KRaft quorum. - -Please report issues with ZooKeeper to KRaft migration using the -https://issues.apache.org/jira/projects/KAFKA"; target="_blank">project JIRA and the "kraft" component. - + Migration Phases + In general, the migration process passes through several phases. - Terminology - -We use the term "migration" here to refer to the process of changing a Kafka cluster's metadata -system from ZooKeeper to KRaft and migrating existing metadata. An "upgrade" refers to installing a newer version of Kafka. It is not recommended to -upgrade the software at the same time as performing a metadata migration. - + +In the initial phase, all the brokers are in ZK mode, and there is a ZK-based controller. +During the initial metadata load, a KRaft quorum loads the metadata from ZooKeeper, +In hybrid phase, some brokers are in ZK mode, but there is a KRaft controller. +In dual-write phase, all brokers are KRaft, but the KRaft controller is continuing to write to ZK. +When the migration has been finalized, we no longer write metadata to ZooKeeper. + Review Comment: I think distinguishing "hybrid phase" from "dual-write phase" is useful, even though they both map to the same `MigrationState` enum. "migration phases" aren't "migration states" (and actually, we don't discuss migration states in this doc) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15819: Fix leaked KafkaRaftManager in ZK mode during migration [kafka]
cmccabe merged PR #14751: URL: https://github.com/apache/kafka/pull/14751 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Rename method sendBrokerHeartbeat [kafka]
cmccabe merged PR #14658: URL: https://github.com/apache/kafka/pull/14658 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15768: StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult [kafka]
hanyuzheng7 closed pull request #14821: KAFKA-15768: StateQueryResult#getOnlyPartitionResult should not throw for FailedQueryResult URL: https://github.com/apache/kafka/pull/14821 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
hanyuzheng7 commented on PR #14570: URL: https://github.com/apache/kafka/pull/14570#issuecomment-1828621306 @mjsax ready for final code review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
hanyuzheng7 commented on code in PR #14570: URL: https://github.com/apache/kafka/pull/14570#discussion_r1406752014 ## streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java: ## @@ -1584,7 +1646,43 @@ public void shouldHandleKeyQuery( ); final V result1 = queryResult.getResult(); -final Integer integer = valueExtactor.apply(result1); +final Integer integer = (Integer) result1; +assertThat(integer, is(expectedValue)); +assertThat(queryResult.getExecutionInfo(), is(empty())); +assertThat(queryResult.getPosition(), is(POSITION_0)); +} + +public void shouldHandleTimestampedKeyQuery( +final Integer key, +final Integer expectedValue) { Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15629: Proposal to introduce IQv2 Query Types: TimestampedKeyQuery and TimestampedRangeQuery [kafka]
hanyuzheng7 commented on code in PR #14570: URL: https://github.com/apache/kafka/pull/14570#discussion_r1406751704 ## streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java: ## @@ -1632,10 +1729,65 @@ public void shouldHandleRangeQuery( IllegalArgumentException.class, queryResult.get(partition)::getFailureMessage ); - try (final KeyValueIterator iterator = queryResult.get(partition).getResult()) { while (iterator.hasNext()) { - actualValues.add(valueExtactor.apply(iterator.next().value)); +actualValues.add((Integer) iterator.next().value); +} +} +assertThat(queryResult.get(partition).getExecutionInfo(), is(empty())); +} +assertThat("Result:" + result, actualValues, is(expectedValues)); +assertThat("Result:" + result, result.getPosition(), is(INPUT_POSITION)); +} +} + +public void shouldHandleTimestampedRangeQuery( +final Optional lower, +final Optional upper, +final boolean isKeyAscending, +final List expectedValues) { Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15456: Client support for OffsetFetch/OffsetCommit v9 [kafka]
kirktrue commented on PR #14557: URL: https://github.com/apache/kafka/pull/14557#issuecomment-1828563894 Thanks for the PR, @lianetm. This is tricky stuff, to be sure! My feedback is mostly minor. Hopefully I can do another pass in a day or so with more time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15456: Client support for OffsetFetch/OffsetCommit v9 [kafka]
kirktrue commented on code in PR #14557: URL: https://github.com/apache/kafka/pull/14557#discussion_r1406678397 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -161,16 +172,16 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { List requests = pendingRequests.drain(currentTimeMs); // min of the remainingBackoffMs of all the request that are still backing off final long timeUntilNextPoll = Math.min( -findMinTime(unsentOffsetCommitRequests(), currentTimeMs), -findMinTime(unsentOffsetFetchRequests(), currentTimeMs)); +findMinTime(unsentOffsetCommitRequests(), currentTimeMs), +findMinTime(unsentOffsetFetchRequests(), currentTimeMs)); Review Comment: Nit: we can leave the indentation as is. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java: ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.consumer.internals; + +/** + * Listener to register for getting notified when the member state changes, or new member ID or + * epoch are received. + */ +public interface MemberStateListener { + +/** + * Called when the member transitions to a new state. + * + * @param state New state. + */ +void onStateChange(MemberState state); + +/** + * Called when the member receives a new member ID. + * + * @param memberId New member ID. + * @param epochLatest member epoch received. + */ +void onMemberIdUpdated(String memberId, int epoch); + +/** + * Called when a member receives a new member epoch. + * + * @param epochNew member epoch. + * @param memberId Current member ID. + */ +void onMemberEpochUpdated(int epoch, String memberId); Review Comment: I'm wondering why this interface can't be a single `onUpdate(MemberState state)` and leave it up to the callbacks to determine what's changed? 🤔 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -467,66 +594,153 @@ private void handleFatalError(final Errors error) { future.completeExceptionally(new CommitFailedException()); break; case UNKNOWN_MEMBER_ID: -log.info("OffsetCommit failed due to unknown member id memberId {}: {}", null, error.message()); -future.completeExceptionally(error.exception()); +log.info("OffsetCommit failed due to unknown member id: {}", error.message()); +handleUnknownMemberIdError(this); +break; +case STALE_MEMBER_EPOCH: +log.info("OffsetCommit failed due to stale member epoch: {}", error.message()); +handleStaleMemberEpochError(this); break; default: -future.completeExceptionally(new KafkaException("Unexpected error in commit: " + error.message())); +future.completeExceptionally(new KafkaException("Unexpected error in commit:" + +" " + error.message())); break; } } + +@Override +void abortRetry(String cause) { +future.completeExceptionally(new KafkaException("Offset commit waiting for new member" + +" ID or epoch cannot be retried. " + cause)); +} + +/** + * Reset timers and add request to the list of pending requests, to make sure it is sent + * out on the next poll iteration, without applying any backoff. + */ +@Override +public void retryOnMemberIdOrEpochUpdate(Optional memberId, + Optional memberEpoch) { +this.memberId = memberId; +this.memberEpoch = memberEpoch; +reset(); +pendingRequests.addOffsetCommitRequest(this); +} + +@Override +public String requestName()
Re: [PR] KAFKA-15887: Ensure FindCoordinatorRequest is sent before closing [kafka]
philipnee commented on code in PR #14842: URL: https://github.com/apache/kafka/pull/14842#discussion_r1406699542 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java: ## @@ -269,22 +270,40 @@ void cleanup() { * completed in time. */ // Visible for testing -void maybeAutoCommitAndLeaveGroup(final Timer timer) { +void maybeAutocommitOnClose(final Timer timer) { if (!requestManagers.coordinatorRequestManager.isPresent()) return; +if (!requestManagers.commitRequestManager.isPresent()) { +log.error("Expecting a CommitRequestManager but the object was never initialized. Shutting down."); +return; +} + +if (!requestManagers.commitRequestManager.get().autoCommitEnabled()) { +return; +} + ensureCoordinatorReady(timer); -List tasks = closingRequests(); -networkClientDelegate.addAll(tasks); +Optional autocommit = requestManagers.commitRequestManager.get().maybeCreateAutoCommitRequest(); +if (!autocommit.isPresent()) { +return; +} + +List autocommitRequest = Collections.singletonList(autocommit.get()); +networkClientDelegate.addAll(autocommitRequest); do { long currentTimeMs = timer.currentTimeMs(); ensureCoordinatorReady(timer); networkClientDelegate.poll(timer.remainingMs(), currentTimeMs); -} while (timer.notExpired() && !tasks.stream().allMatch(v -> v.future().isDone())); +} while (timer.notExpired() && !autocommitRequest.get(0).future().isDone()); +} + +void maybeLeaveGroup(final Timer timer) { +// TODO: Leave group upon closing the consumer Review Comment: Can I follow up with a subsequent ticket to address this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15890: Consumer.poll with long timeout unaware of assigned partitions [kafka]
AndrewJSchofield commented on code in PR #14835: URL: https://github.com/apache/kafka/pull/14835#discussion_r1406699697 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -417,4 +394,127 @@ private void updateHeartbeatIntervalMs(final long heartbeatIntervalMs) { this.heartbeatTimer.updateAndReset(heartbeatIntervalMs); } } + +/** + * Builds the heartbeat requests correctly, ensuring that all information is sent according to + * the protocol, but subsequent requests do not send information which has not changed. This + * is important to ensure that reconciliation completes successfully. + */ +static class HeartbeatState { Review Comment: Also, spotbugs will object if an inner class is non-static and does not use its `this` reference to the outer class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]
jsancio commented on PR #14428: URL: https://github.com/apache/kafka/pull/14428#issuecomment-1828499581 Excuse the delays @showuon . I'll review this today and this week! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15373: fix exception thrown in Admin#describeTopics for unknown ID [kafka]
MikeEdgar commented on PR #14599: URL: https://github.com/apache/kafka/pull/14599#issuecomment-1828509146 Hi @jolshan , please take a look at this PR to modify the exception thrown when describing a topic by an unknown topic ID. The CI failures don't appear related to the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15372: Reconfigure dedicated MM2 connectors after leadership change [kafka]
gharris1727 commented on PR #14293: URL: https://github.com/apache/kafka/pull/14293#issuecomment-1828464698 Hi @C0urante @yashmayya I'd like to get this into the 3.7.0 release, and code freeze is currently planned for Dec 20th. PTAL, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15890: Consumer.poll with long timeout unaware of assigned partitions [kafka]
AndrewJSchofield commented on code in PR #14835: URL: https://github.com/apache/kafka/pull/14835#discussion_r1406686948 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -181,36 +191,19 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(request)); } -private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { -// TODO: extract this logic for building the ConsumerGroupHeartbeatRequestData to a -// stateful builder (HeartbeatState), that will keep the last data sent, and determine -// the fields that changed and need to be included in the next HB (ex. check -// subscriptionState changed from last sent to include assignment). It should also -// ensure that all fields are sent on failure. -ConsumerGroupHeartbeatRequestData data = new ConsumerGroupHeartbeatRequestData() -.setGroupId(membershipManager.groupId()) -.setMemberEpoch(membershipManager.memberEpoch()) -.setRebalanceTimeoutMs(rebalanceTimeoutMs); - -if (membershipManager.memberId() != null) { -data.setMemberId(membershipManager.memberId()); -} - -membershipManager.groupInstanceId().ifPresent(data::setInstanceId); - -if (this.subscriptions.hasPatternSubscription()) { -// TODO: Pass the string to the GC if server side regex is used. -} else { -data.setSubscribedTopicNames(new ArrayList<>(this.subscriptions.subscription())); -List topicPartitions = - buildTopicPartitionsList(membershipManager.currentAssignment()); -data.setTopicPartitions(topicPartitions); -} - - this.membershipManager.serverAssignor().ifPresent(data::setServerAssignor); +/** + * Returns the delay before the next network request for this request manager. Used to ensure that + * waiting in the application thread does not delay beyond the point that a result can be returned. + */ +@Override +public long timeUntilNextPoll(long currentTimeMs) { +boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight(); +return heartbeatNow ? 0L : heartbeatRequestState.nextHeartbeatMs(currentTimeMs); +} +private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( -new ConsumerGroupHeartbeatRequest.Builder(data), +new ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()), Review Comment: That style is used throughout this file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15468: Prevent transaction coordinator reloads on already loaded leaders [kafka]
jolshan commented on code in PR #14489: URL: https://github.com/apache/kafka/pull/14489#discussion_r1406686608 ## core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala: ## @@ -514,8 +520,14 @@ class TransactionStateManager(brokerId: Int, * When this broker becomes a leader for a transaction log partition, load this partition and populate the transaction * metadata cache with the transactional ids. This operation must be resilient to any partial state left off from * the previous loading / unloading operation. + * + * If the state is already loaded (leader epoch bumps, but we have the same leader), just update the epoch in the + * metadata cache and for all the pending markers. */ - def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: Int, sendTxnMarkers: SendTxnMarkersCallback): Unit = { + def maybeLoadTransactionsAndBumpEpochForTxnTopicPartition(partitionId: Int, Review Comment: Ok. 😅 I think I was trying to distinguish the difference between logical and physical loading. But maybe that is too specific. Do you think it should just keep the original name? ## core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala: ## @@ -514,8 +520,14 @@ class TransactionStateManager(brokerId: Int, * When this broker becomes a leader for a transaction log partition, load this partition and populate the transaction * metadata cache with the transactional ids. This operation must be resilient to any partial state left off from * the previous loading / unloading operation. + * + * If the state is already loaded (leader epoch bumps, but we have the same leader), just update the epoch in the + * metadata cache and for all the pending markers. */ - def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: Int, sendTxnMarkers: SendTxnMarkersCallback): Unit = { + def maybeLoadTransactionsAndBumpEpochForTxnTopicPartition(partitionId: Int, Review Comment: Ok. 😅 I think I was trying to distinguish the difference between logical and physical loading. But maybe that is too specific. Do you think it should just keep the original name? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15890: Consumer.poll with long timeout unaware of assigned partitions [kafka]
AndrewJSchofield commented on code in PR #14835: URL: https://github.com/apache/kafka/pull/14835#discussion_r1406683119 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -417,4 +394,127 @@ private void updateHeartbeatIntervalMs(final long heartbeatIntervalMs) { this.heartbeatTimer.updateAndReset(heartbeatIntervalMs); } } + +/** + * Builds the heartbeat requests correctly, ensuring that all information is sent according to + * the protocol, but subsequent requests do not send information which has not changed. This + * is important to ensure that reconciliation completes successfully. + */ +static class HeartbeatState { Review Comment: So that you can construct an independent instance of the class. It doesn't need a `this` and it's handy for testing to be able to make instances on demand. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15890: Consumer.poll with long timeout unaware of assigned partitions [kafka]
kirktrue commented on code in PR #14835: URL: https://github.com/apache/kafka/pull/14835#discussion_r1406656043 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -159,11 +159,19 @@ public NetworkClientDelegate.PollResult poll(final long currentTimeMs) { return EMPTY; List requests = pendingRequests.drain(currentTimeMs); +return new NetworkClientDelegate.PollResult(timeUntilNextPoll(currentTimeMs), requests); +} + +/** + * Returns the delay before the next network request for this request manager. Used to ensure that + * waiting in the application thread does not delay beyond the point that a result can be returned. + */ +@Override +public long timeUntilNextPoll(long currentTimeMs) { // min of the remainingBackoffMs of all the request that are still backing off -final long timeUntilNextPoll = Math.min( -findMinTime(unsentOffsetCommitRequests(), currentTimeMs), -findMinTime(unsentOffsetFetchRequests(), currentTimeMs)); -return new NetworkClientDelegate.PollResult(timeUntilNextPoll, requests); +return Math.min( +findMinTime(unsentOffsetCommitRequests(), currentTimeMs), +findMinTime(unsentOffsetFetchRequests(), currentTimeMs)); Review Comment: Nit: leaving the indentation as is conforms to the existing 'four-spaces per tab' style. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -181,36 +191,19 @@ public NetworkClientDelegate.PollResult poll(long currentTimeMs) { return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(request)); } -private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { -// TODO: extract this logic for building the ConsumerGroupHeartbeatRequestData to a -// stateful builder (HeartbeatState), that will keep the last data sent, and determine -// the fields that changed and need to be included in the next HB (ex. check -// subscriptionState changed from last sent to include assignment). It should also -// ensure that all fields are sent on failure. -ConsumerGroupHeartbeatRequestData data = new ConsumerGroupHeartbeatRequestData() -.setGroupId(membershipManager.groupId()) -.setMemberEpoch(membershipManager.memberEpoch()) -.setRebalanceTimeoutMs(rebalanceTimeoutMs); - -if (membershipManager.memberId() != null) { -data.setMemberId(membershipManager.memberId()); -} - -membershipManager.groupInstanceId().ifPresent(data::setInstanceId); - -if (this.subscriptions.hasPatternSubscription()) { -// TODO: Pass the string to the GC if server side regex is used. -} else { -data.setSubscribedTopicNames(new ArrayList<>(this.subscriptions.subscription())); -List topicPartitions = - buildTopicPartitionsList(membershipManager.currentAssignment()); -data.setTopicPartitions(topicPartitions); -} - - this.membershipManager.serverAssignor().ifPresent(data::setServerAssignor); +/** + * Returns the delay before the next network request for this request manager. Used to ensure that + * waiting in the application thread does not delay beyond the point that a result can be returned. + */ +@Override +public long timeUntilNextPoll(long currentTimeMs) { +boolean heartbeatNow = membershipManager.shouldHeartbeatNow() && !heartbeatRequestState.requestInFlight(); +return heartbeatNow ? 0L : heartbeatRequestState.nextHeartbeatMs(currentTimeMs); +} +private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest() { NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest( -new ConsumerGroupHeartbeatRequest.Builder(data), +new ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()), Review Comment: Nit: we got knocked in other reviews for unnecessarily qualifying with `this`. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -417,4 +394,127 @@ private void updateHeartbeatIntervalMs(final long heartbeatIntervalMs) { this.heartbeatTimer.updateAndReset(heartbeatIntervalMs); } } + +/** + * Builds the heartbeat requests correctly, ensuring that all information is sent according to + * the protocol, but subsequent requests do not send information which has not changed. This + * is important to ensure that reconciliation completes successfully. + */ +static class HeartbeatState { +private final SubscriptionState subscriptions; +private f
Re: [PR] KAFKA-15468: Prevent transaction coordinator reloads on already loaded leaders [kafka]
hachikuji commented on code in PR #14489: URL: https://github.com/apache/kafka/pull/14489#discussion_r1406616682 ## core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala: ## @@ -514,8 +520,14 @@ class TransactionStateManager(brokerId: Int, * When this broker becomes a leader for a transaction log partition, load this partition and populate the transaction * metadata cache with the transactional ids. This operation must be resilient to any partial state left off from * the previous loading / unloading operation. + * + * If the state is already loaded (leader epoch bumps, but we have the same leader), just update the epoch in the + * metadata cache and for all the pending markers. */ - def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: Int, sendTxnMarkers: SendTxnMarkersCallback): Unit = { + def maybeLoadTransactionsAndBumpEpochForTxnTopicPartition(partitionId: Int, Review Comment: nit: The rename seems borderline overkill. I would consider the epoch bump part of transaction loading. ## core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala: ## @@ -514,8 +520,14 @@ class TransactionStateManager(brokerId: Int, * When this broker becomes a leader for a transaction log partition, load this partition and populate the transaction * metadata cache with the transactional ids. This operation must be resilient to any partial state left off from * the previous loading / unloading operation. + * + * If the state is already loaded (leader epoch bumps, but we have the same leader), just update the epoch in the + * metadata cache and for all the pending markers. */ - def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: Int, sendTxnMarkers: SendTxnMarkersCallback): Unit = { + def maybeLoadTransactionsAndBumpEpochForTxnTopicPartition(partitionId: Int, +coordinatorEpoch: Int, +sendTxnMarkers: SendTxnMarkersCallback, + transactionStateLoaded: Boolean): Unit = { Review Comment: As mentioned above, I don't think we should pass this as an argument. On a higher level, I'm trying to figure out the safety of this loading process. Suppose we have two epoch bumps in quick succession. Do we get a strong ordering guarantee given that it is done asynchronously? I think I would expect that we would check for the existence of the partition in `loadingPartitions` when we first acquire the write lock below. If it exists, then we need to ensure the monotonicity of the epoch. If the entry has a higher epoch, then we ignore the call. ## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ## @@ -447,12 +447,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig, info(s"Elected as the txn coordinator for partition $txnTopicPartitionId at epoch $coordinatorEpoch") // The operations performed during immigration must be resilient to any previous errors we saw or partial state we // left off during the unloading phase. Ensure we remove all associated state for this partition before we continue -// loading it. +// loading it. In the case where the state partition is already loaded, we want to remove inflight markers with the +// old epoch. txnMarkerChannelManager.removeMarkersForTxnTopicPartition(txnTopicPartitionId) // Now load the partition. -txnManager.loadTransactionsForTxnTopicPartition(txnTopicPartitionId, coordinatorEpoch, - txnMarkerChannelManager.addTxnMarkersToSend) + txnManager.maybeLoadTransactionsAndBumpEpochForTxnTopicPartition(txnTopicPartitionId, coordinatorEpoch, + txnMarkerChannelManager.addTxnMarkersToSend, txnManager.txnStateLoaded(txnTopicPartitionId)) Review Comment: It's curious that we need to pass the result of `txnStateLoaded`. Couldn't `txnManager` figure it out on its own? ## core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala: ## @@ -524,23 +536,35 @@ class TransactionStateManager(brokerId: Int, } def loadTransactions(startTimeMs: java.lang.Long): Unit = { - val schedulerTimeMs = time.milliseconds() - startTimeMs - info(s"Loading transaction metadata from $topicPartition at epoch $coordinatorEpoch") - validateTransactionTopicPartitionCountIsStable() - - val loadedTransactions = loadTransactionMetadata(topicPartition, coordinatorEpoch) - val endTimeMs = time.milliseconds() - val totalLoadingTimeMs = endTimeMs - startTimeMs - partitionLoadSensor.record(totalLoadingTimeMs.toDouble, endTimeMs, false) - info(s"Finished loading ${loadedTransactions.size} transaction metadata from $topicPartition in " + -
[jira] [Updated] (KAFKA-15372) MM2 rolling restart can drop configuration changes silently
[ https://issues.apache.org/jira/browse/KAFKA-15372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-15372: Fix Version/s: 3.7.0 > MM2 rolling restart can drop configuration changes silently > --- > > Key: KAFKA-15372 > URL: https://issues.apache.org/jira/browse/KAFKA-15372 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Daniel Urban >Assignee: Greg Harris >Priority: Major > Fix For: 3.7.0 > > > When MM2 is restarted, it tries to update the Connector configuration in all > flows. This is a one-time trial, and fails if the Connect worker is not the > leader of the group. > In a distributed setup and with a rolling restart, it is possible that for a > specific flow, the Connect worker of the just restarted MM2 instance is not > the leader, meaning that Connector configurations can get dropped. > For example, assuming 2 MM2 instances, and one flow A->B: > # MM2 instance 1 is restarted, the worker inside MM2 instance 2 becomes the > leader of A->B Connect group. > # MM2 instance 1 tries to update the Connector configurations, but fails > (instance 2 has the leader, not instance 1) > # MM2 instance 2 is restarted, leadership moves to worker in MM2 instance 1 > # MM2 instance 2 tries to update the Connector configurations, but fails > At this point, the configuration changes before the restart are never > applied. Many times, this can also happen silently, without any indication. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15906) Emit offset syncs more often than offset.lag.max for low-throughput/finite partitions
[ https://issues.apache.org/jira/browse/KAFKA-15906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-15906: Description: Right now, the offset.lag.max configuration limits the number of offset syncs are emitted by the MirrorSourceTask, along with a fair rate-limiting semaphore. After 100 records have been emitted for a partition, _and_ the semaphore is available, an offset sync can be emitted. For low-volume topics, the `offset.lag.max` default of 100 is much more restrictive than the rate-limiting semaphore. For example, a topic which mirrors at the rate of 1 record/sec may take 100 seconds to receive an offset sync. If the topic is actually finite, the last offset sync will never arrive, and the translation will have a persistent lag. Instead, we can periodically flush the offset syncs for partitions that are under the offset.lag.max limit, but have not received an offset sync recently. This could be a new configuration, be a hard-coded time, or be based on the existing emit.checkpoints.interval.seconds and sync.group.offsets.interval.seconds configurations. Alternatively, we could decrease the default `offset.lag.max` value to 0, and rely on the fair semaphore to limit the number of syncs emitted for high-throughput partitions. The semaphore is not currently configurable, so users wanting lower throughput on the offset-syncs topic will still need an offset.lag.max > 0. was: Right now, the offset.lag.max configuration limits the number of offset syncs are emitted by the MirrorSourceTask, along with a fair rate-limiting semaphore. After 100 records have been emitted for a partition, _and_ the semaphore is available, an offset sync can be emitted. For low-volume topics, the `offset.lag.max` default of 100 is much more restrictive than the rate-limiting semaphore. For example, a topic which mirrors at the rate of 1 record/sec may take 100 seconds to receive an offset sync. If the topic is actually finite, the last offset sync will never arrive, and the translation will have a persistent lag. Instead, we can periodically flush the offset syncs for partitions that are under the offset.lag.max limit, but have not received an offset sync recently. This could be a new configuration, be a hard-coded time, or be based on the existing emit.checkpoints.interval.seconds and sync.group.offsets.interval.seconds configurations. > Emit offset syncs more often than offset.lag.max for low-throughput/finite > partitions > - > > Key: KAFKA-15906 > URL: https://issues.apache.org/jira/browse/KAFKA-15906 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Greg Harris >Priority: Minor > > Right now, the offset.lag.max configuration limits the number of offset syncs > are emitted by the MirrorSourceTask, along with a fair rate-limiting > semaphore. After 100 records have been emitted for a partition, _and_ the > semaphore is available, an offset sync can be emitted. > For low-volume topics, the `offset.lag.max` default of 100 is much more > restrictive than the rate-limiting semaphore. For example, a topic which > mirrors at the rate of 1 record/sec may take 100 seconds to receive an offset > sync. If the topic is actually finite, the last offset sync will never > arrive, and the translation will have a persistent lag. > Instead, we can periodically flush the offset syncs for partitions that are > under the offset.lag.max limit, but have not received an offset sync > recently. This could be a new configuration, be a hard-coded time, or be > based on the existing emit.checkpoints.interval.seconds and > sync.group.offsets.interval.seconds configurations. > > Alternatively, we could decrease the default `offset.lag.max` value to 0, and > rely on the fair semaphore to limit the number of syncs emitted for > high-throughput partitions. The semaphore is not currently configurable, so > users wanting lower throughput on the offset-syncs topic will still need an > offset.lag.max > 0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Fix the flaky TBRLMM `testInternalTopicExists` test [kafka]
junrao commented on code in PR #14840: URL: https://github.com/apache/kafka/pull/14840#discussion_r1406631952 ## storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java: ## @@ -70,18 +70,20 @@ public TopicBasedRemoteLogMetadataManager topicBasedRlmm() { } @Test -public void testInternalTopicExists() { +public void testDoesTopicExist() { Properties adminConfig = remoteLogMetadataManagerHarness.adminClientConfig(); ListenerName listenerName = remoteLogMetadataManagerHarness.listenerName(); try (Admin admin = remoteLogMetadataManagerHarness.createAdminClient(listenerName, adminConfig)) { -String topic = topicBasedRlmm().config().remoteLogMetadataTopicName(); +String topic = "test-topic-exist"; +remoteLogMetadataManagerHarness.createTopic(topic, 1, 1, new Properties(), Review Comment: `createTopic` only waits for the topic metadata to be committed in the metadata log, but not waiting for the metadata to be propagated to every broker. `topicBasedRlmm().doesTopicExist` calls `describeTopic` and only checks the metadata in a least loaded broker. So, it seems that there is no strong guarantee that `doesTopicExist` is always true? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org