Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]
iit2009060 commented on code in PR #14482: URL: https://github.com/apache/kafka/pull/14482#discussion_r1351514017 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -540,6 +545,8 @@ class RemoteIndexCacheTest { "Failed to mark cache entry for cleanup after resizing cache.") TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted, "Failed to cleanup cache entry after resizing cache.") +TestUtils.waitUntilTrue(() => cacheEntry.isCleanFinished, + "Failed to finish cleanup cache entry after resizing cache.") // verify no index files on remote cache dir TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, Review Comment: @hudeqi Can you resolve the above comment ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]
showuon commented on code in PR #14482: URL: https://github.com/apache/kafka/pull/14482#discussion_r1351504132 ## core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala: ## @@ -540,6 +545,8 @@ class RemoteIndexCacheTest { "Failed to mark cache entry for cleanup after resizing cache.") TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted, "Failed to cleanup cache entry after resizing cache.") +TestUtils.waitUntilTrue(() => cacheEntry.isCleanFinished, + "Failed to finish cleanup cache entry after resizing cache.") // verify no index files on remote cache dir TestUtils.waitUntilTrue(() => !getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent, Review Comment: @iit2009060 @hudeqi , adding an `isCleanFinished` flag just for test is not a good solution. In this case, could we catch the exception in the `getIndexFileFromRemoteCacheDir`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15514) Controller-side replica management changes
[ https://issues.apache.org/jira/browse/KAFKA-15514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez reassigned KAFKA-15514: --- Assignee: Igor Soarez > Controller-side replica management changes > -- > > Key: KAFKA-15514 > URL: https://issues.apache.org/jira/browse/KAFKA-15514 > Project: Kafka > Issue Type: Sub-task >Reporter: Igor Soarez >Assignee: Igor Soarez >Priority: Major > > The new "Assignments" field replaces the "Replicas" field in PartitionRecord > and PartitionChangeRecord. > Any changes to partitions need to consider both fields. > * ISR updates > * Partiton reassignments & reverts > * Partition creation -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [TEST] Run builds for PR #14364 [kafka]
philipnee closed pull request #14509: [TEST] Run builds for PR #14364 URL: https://github.com/apache/kafka/pull/14509 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]
iit2009060 commented on PR #14482: URL: https://github.com/apache/kafka/pull/14482#issuecomment-1754314250 @showuon Can we merge this changes ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Rename log dir UUIDs [kafka]
soarez opened a new pull request, #14517: URL: https://github.com/apache/kafka/pull/14517 After a late discussion in the voting thread for KIP-858 we decided to improve the names for the designated reserved log directory UUID values. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14879: Update system tests to use latest versions [kafka]
github-actions[bot] commented on PR #13528: URL: https://github.com/apache/kafka/pull/13528#issuecomment-1754300677 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10733: Clean up producer exceptions [kafka]
github-actions[bot] commented on PR #13876: URL: https://github.com/apache/kafka/pull/13876#issuecomment-1754300502 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15514: Metadata records Replicas->Assignment [kafka]
soarez opened a new pull request, #14516: URL: https://github.com/apache/kafka/pull/14516 The new "Assignments" field replaces the "Replicas" field in PartitionRecord and PartitionChangeRecord. Depends on #14290 - [KAFKA-15355](https://issues.apache.org/jira/browse/KAFKA-15355) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1351064502 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java: ## @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.GroupRebalanceConfig; +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; +import org.apache.kafka.clients.consumer.internals.events.BackgroundEventProcessor; +import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; + +import java.io.Closeable; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics; +import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState; +import static org.apache.kafka.common.utils.Utils.closeQuietly; +import static org.mockito.Mockito.spy; + +public class ConsumerTestBuilder implements Closeable { + +static final long RETRY_BACKOFF_MS = 80; +static final long RETRY_BACKOFF_MAX_MS = 1000; +static final int REQUEST_TIMEOUT_MS = 500; + +final LogContext logContext = new LogContext(); +final Time time = new MockTime(0); +public final BlockingQueue applicationEventQueue; +public final BlockingQueue backgroundEventQueue; +final ConsumerConfig config; +final long retryBackoffMs; +final SubscriptionState subscriptions; +final ConsumerMetadata metadata; +final FetchConfig fetchConfig; +final Metrics metrics; +final FetchMetricsManager metricsManager; +final NetworkClientDelegate networkClientDelegate; +final OffsetsRequestManager offsetsRequestManager; +final CoordinatorRequestManager coordinatorRequestManager; +final CommitRequestManager commitRequestManager; +final TopicMetadataRequestManager topicMetadataRequestManager; +final FetchRequestManager fetchRequestManager; +final RequestManagers requestManagers; +public final ApplicationEventProcessor applicationEventProcessor; +public final BackgroundEventProcessor backgroundEventProcessor; +public final BackgroundEventHandler backgroundEventHandler; +final MockClient client; + +public ConsumerTestBuilder() { +this.applicationEventQueue = new LinkedBlockingQueue<>(); +this.backgroundEventQueue = new LinkedBlockingQueue<>(); +this.backgroundEventHandler = new BackgroundEventHandler(logContext, backgroundEventQueue); +GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig( +100, +100, +100, +"group_id", +
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1351063405 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.consumer.LogTruncationException; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; +import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.FetchEvent; +import org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; +import org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.TopicMetadataApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@SuppressWarnings("ClassDataAbstractionCoupling") +public class ConsumerNetworkThreadTest { + +private ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder testBuilder; +private Time time; +private ConsumerMetadata metadata; +private NetworkClientDelegate networkClient; +private BlockingQueue applicationEventsQueue; +private ApplicationEventProcessor applicationEventProcessor; +private CoordinatorRequestManager coordinatorManager; +private OffsetsRequestManager offsetsRequestManager; +private CommitRequestManager commitManager; +private ConsumerNetworkThread consumerNetworkThread; +private MockClient client; + +@BeforeEach +public void setup() { +testBuilder = new ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder(); +time = testBuilder.time; +metadata = testBuilder.metadata; +networkClient = testBuilder.networkClientDelegate; +client = testBuilder.client; +applicationEventsQueue = testBuilder.applicationEventQueue; +applicationEventProcessor = testBuilder.applicationEventProcessor; +coordinatorManager = testBuilder.coordinatorRequestManager; +commitManager = testBuilder.commitRequestManager; +offsetsRequestManager = testBuilder.offsetsRequestManager; +consumerNetworkThread =
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1351054656 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.consumer.LogTruncationException; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; +import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.FetchEvent; +import org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; +import org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.TopicMetadataApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@SuppressWarnings("ClassDataAbstractionCoupling") +public class ConsumerNetworkThreadTest { + +private ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder testBuilder; +private Time time; +private ConsumerMetadata metadata; +private NetworkClientDelegate networkClient; +private BlockingQueue applicationEventsQueue; +private ApplicationEventProcessor applicationEventProcessor; +private CoordinatorRequestManager coordinatorManager; +private OffsetsRequestManager offsetsRequestManager; +private CommitRequestManager commitManager; +private ConsumerNetworkThread consumerNetworkThread; +private MockClient client; + +@BeforeEach +public void setup() { +testBuilder = new ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder(); +time = testBuilder.time; +metadata = testBuilder.metadata; +networkClient = testBuilder.networkClientDelegate; +client = testBuilder.client; +applicationEventsQueue = testBuilder.applicationEventQueue; +applicationEventProcessor = testBuilder.applicationEventProcessor; +coordinatorManager = testBuilder.coordinatorRequestManager; +commitManager = testBuilder.commitRequestManager; +offsetsRequestManager = testBuilder.offsetsRequestManager; +consumerNetworkThread =
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1351042141 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java: ## @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.consumer.LogTruncationException; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; +import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.FetchEvent; +import org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent; +import org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.TopicMetadataApplicationEvent; +import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.message.FindCoordinatorRequestData; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.RequestTestUtils; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.test.TestUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@SuppressWarnings("ClassDataAbstractionCoupling") +public class ConsumerNetworkThreadTest { + +private ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder testBuilder; +private Time time; +private ConsumerMetadata metadata; +private NetworkClientDelegate networkClient; +private BlockingQueue applicationEventsQueue; +private ApplicationEventProcessor applicationEventProcessor; +private CoordinatorRequestManager coordinatorManager; +private OffsetsRequestManager offsetsRequestManager; +private CommitRequestManager commitManager; +private ConsumerNetworkThread consumerNetworkThread; +private MockClient client; + +@BeforeEach +public void setup() { +testBuilder = new ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder(); +time = testBuilder.time; +metadata = testBuilder.metadata; +networkClient = testBuilder.networkClientDelegate; +client = testBuilder.client; +applicationEventsQueue = testBuilder.applicationEventQueue; +applicationEventProcessor = testBuilder.applicationEventProcessor; +coordinatorManager = testBuilder.coordinatorRequestManager; +commitManager = testBuilder.commitRequestManager; +offsetsRequestManager = testBuilder.offsetsRequestManager; +consumerNetworkThread =
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
kirktrue commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1351011656 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java: ## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.function.BiConsumer; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import org.apache.kafka.clients.ClientRequest; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.FetchSessionHandler; +import org.apache.kafka.clients.NetworkClient; +import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult; +import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; + +/** + * {@code FetchRequestManager} is responsible for generating {@link FetchRequest} that represent the + * {@link SubscriptionState#fetchablePartitions(Predicate)} based on the user's topic subscription/partition + * assignment. + */ +public class FetchRequestManager extends AbstractFetch implements RequestManager { + +private final NetworkClientDelegate networkClientDelegate; +private final List>> futures; + +FetchRequestManager(final LogContext logContext, +final Time time, +final ConsumerMetadata metadata, +final SubscriptionState subscriptions, +final FetchConfig fetchConfig, +final FetchMetricsManager metricsManager, +final NetworkClientDelegate networkClientDelegate) { +super(logContext, metadata, subscriptions, fetchConfig, metricsManager, time); +this.networkClientDelegate = networkClientDelegate; +this.futures = new ArrayList<>(); +} + +@Override +protected boolean isUnavailable(Node node) { +return networkClientDelegate.isUnavailable(node); +} + +@Override +protected void maybeThrowAuthFailure(Node node) { +networkClientDelegate.maybeThrowAuthFailure(node); +} + +/** + * Adds a new {@link Future future} to the list of futures awaiting results. Per the comments on + * {@link #handleFetchResponse(Node, FetchSessionHandler.FetchRequestData, ClientResponse)}}, there is no + * guarantee that this particular future will be provided with a non-empty result, but it is guaranteed + * to be completed with a result, assuming that it does not time out. + * + * @param future Future that will be {@link CompletableFuture#complete(Object) completed} if not timed out + */ +public void requestFetch(CompletableFuture> future) { +futures.add(future); +} + +/** + * {@inheritDoc} + */ +@Override +public PollResult poll(long currentTimeMs) { +return pollInternal( +prepareFetchRequests(), +this::handleFetchResponse, +this::handleFetchResponse +); +} + +/** + * {@inheritDoc} + */ +@Override +public PollResult pollOnClose() { +return pollInternal( +prepareCloseFetchSessionRequests(), +this::handleCloseFetchSessionResponse, +this::handleCloseFetchSessionResponse +); +} + +/** + * Creates the {@link PollResult poll result} that contains a list of zero or more + * {@link FetchRequest.Builder fetch requests} fetch request}, + * {@link NetworkClient#send(ClientRequest, long) enqueues/sends it, and adds the {@link RequestFuture callback} Review Comment: I removed the latter phrases to reduce confusion. ##
Re: [PR] KAFKA-15221; Fix the race between fetch requests from a rebooted follower. [kafka]
hachikuji commented on code in PR #14053: URL: https://github.com/apache/kafka/pull/14053#discussion_r1351005643 ## core/src/main/scala/kafka/cluster/Replica.scala: ## @@ -98,14 +105,22 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log * fetch request is always smaller than the leader's LEO, which can happen if small produce requests are received at * high frequency. */ - def updateFetchState( + def updateFetchStateOrThrow( followerFetchOffsetMetadata: LogOffsetMetadata, followerStartOffset: Long, followerFetchTimeMs: Long, leaderEndOffset: Long, brokerEpoch: Long ): Unit = { replicaState.updateAndGet { currentReplicaState => + val cachedBrokerEpoch = if (metadataCache.isInstanceOf[KRaftMetadataCache]) + metadataCache.asInstanceOf[KRaftMetadataCache].getAliveBrokerEpoch(brokerId) else Option(-1L) + // Fence the update if it provides a stale broker epoch. + if (brokerEpoch != -1 && cachedBrokerEpoch.exists(_ > brokerEpoch)) { Review Comment: I was actually debating it. Do we create a race on the leader for a restarted broker? A restarted broker will typically not be in the ISR, so perhaps a delay for propagation of the registration state would not have any adverse effects. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15452: Access SslPrincipalMapper and kerberosShortNamer in Custom KafkaPrincipalBuilder(KIP-982) [kafka]
rbaddam commented on code in PR #14491: URL: https://github.com/apache/kafka/pull/14491#discussion_r1350996498 ## clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java: ## @@ -240,12 +242,19 @@ public static KafkaPrincipalBuilder createPrincipalBuilder(Map config KerberosShortNamer kerberosShortNamer, SslPrincipalMapper sslPrincipalMapper) { Class principalBuilderClass = (Class) configs.get(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG); -final KafkaPrincipalBuilder builder; +KafkaPrincipalBuilder builder; if (principalBuilderClass == null || principalBuilderClass == DefaultKafkaPrincipalBuilder.class) { builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer, sslPrincipalMapper); } else if (KafkaPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) { -builder = (KafkaPrincipalBuilder) Utils.newInstance(principalBuilderClass); +try { +Constructor constructor = principalBuilderClass.getConstructor(KerberosShortNamer.class, SslPrincipalMapper.class); +builder = (KafkaPrincipalBuilder) constructor.newInstance(kerberosShortNamer, sslPrincipalMapper); Review Comment: @plazma-prizma made the changes as per your inputs, please review when you get a chance, Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]
jolshan commented on code in PR #14417: URL: https://github.com/apache/kafka/pull/14417#discussion_r1350953160 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -1340,4 +1377,119 @@ public void testNonRetryableTimer() throws InterruptedException { assertEquals(1, cnt.get()); assertEquals(0, ctx.timer.size()); } + +@Test +public void testStateChanges() throws Exception { +MockTimer timer = new MockTimer(); +MockPartitionWriter writer = mock(MockPartitionWriter.class); +MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class); +MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class); +MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class); +MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class); +GroupCoordinatorRuntimeMetrics runtimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withTime(timer.time()) +.withTimer(timer) +.withLoader(loader) +.withEventProcessor(new DirectEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorShardBuilderSupplier(supplier) +.withCoordinatorRuntimeMetrics(runtimeMetrics) +.build(); + +when(builder.withSnapshotRegistry(any())).thenReturn(builder); +when(builder.withLogContext(any())).thenReturn(builder); +when(builder.withTime(any())).thenReturn(builder); +when(builder.withTimer(any())).thenReturn(builder); +when(builder.withTopicPartition(any())).thenReturn(builder); +when(builder.build()).thenReturn(coordinator); +when(supplier.get()).thenReturn(builder); +CompletableFuture future = new CompletableFuture<>(); +when(loader.load(TP, coordinator)).thenReturn(future); + +// Schedule the loading. +runtime.scheduleLoadOperation(TP, 0); + +// Getting the context succeeds and the coordinator should be in loading. +CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); +assertEquals(LOADING, ctx.state); +verify(runtimeMetrics, times(1)).recordPartitionStateChange(INITIAL, LOADING); + +// When the loading fails, the coordinator transitions to failed. +future.completeExceptionally(new Exception("failure")); +assertEquals(FAILED, ctx.state); +verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, FAILED); + +// Start loading a new topic partition. +TopicPartition tp = new TopicPartition("__consumer_offsets", 1); +future = new CompletableFuture<>(); +when(loader.load(tp, coordinator)).thenReturn(future); +// Schedule the loading. +runtime.scheduleLoadOperation(tp, 0); +// Getting the context succeeds and the coordinator should be in loading. +ctx = runtime.contextOrThrow(tp); +assertEquals(LOADING, ctx.state); +verify(runtimeMetrics, times(2)).recordPartitionStateChange(INITIAL, LOADING); + +// When the loading completes, the coordinator transitions to active. +future.complete(null); +assertEquals(ACTIVE, ctx.state); +verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, ACTIVE); + +runtime.close(); +verify(runtimeMetrics, times(1)).recordPartitionStateChange(FAILED, CLOSED); Review Comment: Oops. I see. Thanks for clarifying. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]
jolshan commented on code in PR #1: URL: https://github.com/apache/kafka/pull/1#discussion_r1350945079 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -338,7 +338,7 @@ public boolean isControllerRegistrationSupported() { public short fetchRequestVersion() { if (this.isAtLeast(IBP_3_5_IV1)) { -return 15; +return 16; Review Comment: Hmm. Is this correct? In the upgrade scenario we will send request version 16 to brokers that may not have that version yet. I know we just ignore tagged fields, but I'm not sure I recall if we can handle version bumps. If this is always just the latest version, should it be hardcoded? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]
jolshan commented on code in PR #1: URL: https://github.com/apache/kafka/pull/1#discussion_r1350936703 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -559,6 +560,30 @@ class KafkaApis(val requestChannel: RequestChannel, } } + case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node) + + private def getCurrentLeader(tp: TopicPartition): LeaderNode = { +val partitionInfoOrError = replicaManager.getPartitionOrError(tp) +var leaderId = -1 +var leaderEpoch = -1 +partitionInfoOrError match { + case Right(x) => + leaderId = x.leaderReplicaIdOpt.getOrElse(-1) + leaderEpoch = x.getLeaderEpoch + case Left(x) => +debug(s"Unable to retrieve local leaderId and Epoch with error $x, falling back to metadata cache") +val partitionInfo = metadataCache.getPartitionInfo(tp.topic, tp.partition) +partitionInfo.foreach { info => Review Comment: ``` partitionInfo = partitionInfoOrError match { case Right(partitionInfo) => partitionInfo case Left(error) => debug(s"Unable to retrieve local leaderId and Epoch with error $error, falling back to metadata cache") metadataCache.getPartitionInfo(tp.topic, tp.partition) match { case Some(partitionInfo) => partitionInfo case None => handle case where we don't have the partition } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]
jolshan commented on code in PR #1: URL: https://github.com/apache/kafka/pull/1#discussion_r1350934417 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -559,6 +560,30 @@ class KafkaApis(val requestChannel: RequestChannel, } } + case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node) + + private def getCurrentLeader(tp: TopicPartition): LeaderNode = { +val partitionInfoOrError = replicaManager.getPartitionOrError(tp) +var leaderId = -1 +var leaderEpoch = -1 +partitionInfoOrError match { + case Right(x) => + leaderId = x.leaderReplicaIdOpt.getOrElse(-1) + leaderEpoch = x.getLeaderEpoch + case Left(x) => +debug(s"Unable to retrieve local leaderId and Epoch with error $x, falling back to metadata cache") +val partitionInfo = metadataCache.getPartitionInfo(tp.topic, tp.partition) +partitionInfo.foreach { info => Review Comment: We also don't need to set vars here. We could have a statement where we return a tuple or even just the partitionInfo. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]
jolshan commented on code in PR #1: URL: https://github.com/apache/kafka/pull/1#discussion_r1350934417 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -559,6 +560,30 @@ class KafkaApis(val requestChannel: RequestChannel, } } + case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node) + + private def getCurrentLeader(tp: TopicPartition): LeaderNode = { +val partitionInfoOrError = replicaManager.getPartitionOrError(tp) +var leaderId = -1 +var leaderEpoch = -1 +partitionInfoOrError match { + case Right(x) => + leaderId = x.leaderReplicaIdOpt.getOrElse(-1) + leaderEpoch = x.getLeaderEpoch + case Left(x) => +debug(s"Unable to retrieve local leaderId and Epoch with error $x, falling back to metadata cache") +val partitionInfo = metadataCache.getPartitionInfo(tp.topic, tp.partition) +partitionInfo.foreach { info => Review Comment: We also don't need to set vars here. We could have a statement where we return a tuple. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]
jolshan commented on code in PR #1: URL: https://github.com/apache/kafka/pull/1#discussion_r1350933806 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -559,6 +560,30 @@ class KafkaApis(val requestChannel: RequestChannel, } } + case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node) + + private def getCurrentLeader(tp: TopicPartition): LeaderNode = { +val partitionInfoOrError = replicaManager.getPartitionOrError(tp) +var leaderId = -1 +var leaderEpoch = -1 +partitionInfoOrError match { + case Right(x) => + leaderId = x.leaderReplicaIdOpt.getOrElse(-1) + leaderEpoch = x.getLeaderEpoch + case Left(x) => +debug(s"Unable to retrieve local leaderId and Epoch with error $x, falling back to metadata cache") +val partitionInfo = metadataCache.getPartitionInfo(tp.topic, tp.partition) +partitionInfo.foreach { info => Review Comment: getPartitionInfo returns an option. If it exists, foreach will access it. If it doesn't foreach does nothing. This is a common pattern in scala. Are we considering the case when the partition is not present? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]
jolshan commented on code in PR #1: URL: https://github.com/apache/kafka/pull/1#discussion_r1350932166 ## clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java: ## @@ -67,20 +69,31 @@ public ProduceResponse(ProduceResponseData produceResponseData) { */ @Deprecated public ProduceResponse(Map responses) { -this(responses, DEFAULT_THROTTLE_TIME); +this(responses, DEFAULT_THROTTLE_TIME, Collections.emptyList()); } /** - * Constructor for the latest version + * Constructor for versions <= 9 * @param responses Produced data grouped by topic-partition * @param throttleTimeMs Time in milliseconds the response was throttled */ @Deprecated public ProduceResponse(Map responses, int throttleTimeMs) { -this(toData(responses, throttleTimeMs)); +this(toData(responses, throttleTimeMs, Collections.emptyList())); +} + +/** + * Constructor for the latest version + * @param responses Produced data grouped by topic-partition + * @param throttleTimeMs Time in milliseconds the response was throttled + * @param nodeEndpoints List of node endpoints + */ +@Deprecated Review Comment: KAFKA-10730 is a pretty dormant JIRA. I do agree that there is some level of conversion. I wonder if folks have a strong opinion about this conversion still. Looking into this further, I see the change would need to be made to appendRecords and the ProducePartitionStatus. It doesn't look too crazy, but also understandable this is not the scope for this PR. I wonder if KAFKA-9682 was premature in deprecating the constructor. I guess our options are leaving it deprecated and adding a deprecated method or removing the deprecation until KAFKA-10730 is completed. (I almost just want to fix it so this doesn't happen in the future ) ## clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java: ## @@ -67,20 +69,31 @@ public ProduceResponse(ProduceResponseData produceResponseData) { */ @Deprecated public ProduceResponse(Map responses) { -this(responses, DEFAULT_THROTTLE_TIME); +this(responses, DEFAULT_THROTTLE_TIME, Collections.emptyList()); } /** - * Constructor for the latest version + * Constructor for versions <= 9 * @param responses Produced data grouped by topic-partition * @param throttleTimeMs Time in milliseconds the response was throttled */ @Deprecated public ProduceResponse(Map responses, int throttleTimeMs) { -this(toData(responses, throttleTimeMs)); +this(toData(responses, throttleTimeMs, Collections.emptyList())); +} + +/** + * Constructor for the latest version + * @param responses Produced data grouped by topic-partition + * @param throttleTimeMs Time in milliseconds the response was throttled + * @param nodeEndpoints List of node endpoints + */ +@Deprecated Review Comment: KAFKA-10730 is a pretty dormant JIRA. I do agree that there is some level of conversion. I wonder if folks have a strong opinion about this conversion still. Looking into this further, I see the change would need to be made to appendRecords and the ProducePartitionStatus. It doesn't look too crazy, but also understandable this is not the scope for this PR. I wonder if KAFKA-9682 was premature in deprecating the constructor. I guess our options are leaving it deprecated and adding a deprecated method or removing the deprecation until KAFKA-10730 is completed. (I almost just want to fix it so this doesn't happen in the future ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15452: Access SslPrincipalMapper and kerberosShortNamer in Custom KafkaPrincipalBuilder(KIP-982) [kafka]
rbaddam commented on code in PR #14491: URL: https://github.com/apache/kafka/pull/14491#discussion_r1350920227 ## clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java: ## @@ -240,12 +242,19 @@ public static KafkaPrincipalBuilder createPrincipalBuilder(Map config KerberosShortNamer kerberosShortNamer, SslPrincipalMapper sslPrincipalMapper) { Class principalBuilderClass = (Class) configs.get(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG); -final KafkaPrincipalBuilder builder; +KafkaPrincipalBuilder builder; if (principalBuilderClass == null || principalBuilderClass == DefaultKafkaPrincipalBuilder.class) { builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer, sslPrincipalMapper); } else if (KafkaPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) { -builder = (KafkaPrincipalBuilder) Utils.newInstance(principalBuilderClass); +try { +Constructor constructor = principalBuilderClass.getConstructor(KerberosShortNamer.class, SslPrincipalMapper.class); +builder = (KafkaPrincipalBuilder) constructor.newInstance(kerberosShortNamer, sslPrincipalMapper); Review Comment: @plazma-prizma, I appreciate your suggestion. To address your concern about the specificity of the solution, I will create Interfaces/subclasses of KerberosPrincipalBuilder and SSLPrincipalBuilder to handle different types of principals and their configurations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] freeze requirements for system-tests running with python2 [kafka]
imcdo closed pull request #11392: freeze requirements for system-tests running with python2 URL: https://github.com/apache/kafka/pull/11392 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15468: Prevent transaction coordinator reloads on already loaded leaders [kafka]
jolshan commented on code in PR #14489: URL: https://github.com/apache/kafka/pull/14489#discussion_r1350911184 ## metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java: ## @@ -26,19 +26,22 @@ public final class LocalReplicaChanges { private final Set deletes; -private final Map leaders; +private final Map electedLeaders; +private final Map updatedLeaders; Review Comment: I pushed changes for the simpler fixes and will continue to think on this one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]
jeffkbkim commented on code in PR #14417: URL: https://github.com/apache/kafka/pull/14417#discussion_r1350909602 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -1340,4 +1377,119 @@ public void testNonRetryableTimer() throws InterruptedException { assertEquals(1, cnt.get()); assertEquals(0, ctx.timer.size()); } + +@Test +public void testStateChanges() throws Exception { +MockTimer timer = new MockTimer(); +MockPartitionWriter writer = mock(MockPartitionWriter.class); +MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class); +MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class); +MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class); +MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class); +GroupCoordinatorRuntimeMetrics runtimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withTime(timer.time()) +.withTimer(timer) +.withLoader(loader) +.withEventProcessor(new DirectEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorShardBuilderSupplier(supplier) +.withCoordinatorRuntimeMetrics(runtimeMetrics) +.build(); + +when(builder.withSnapshotRegistry(any())).thenReturn(builder); +when(builder.withLogContext(any())).thenReturn(builder); +when(builder.withTime(any())).thenReturn(builder); +when(builder.withTimer(any())).thenReturn(builder); +when(builder.withTopicPartition(any())).thenReturn(builder); +when(builder.build()).thenReturn(coordinator); +when(supplier.get()).thenReturn(builder); +CompletableFuture future = new CompletableFuture<>(); +when(loader.load(TP, coordinator)).thenReturn(future); + +// Schedule the loading. +runtime.scheduleLoadOperation(TP, 0); + +// Getting the context succeeds and the coordinator should be in loading. +CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); +assertEquals(LOADING, ctx.state); +verify(runtimeMetrics, times(1)).recordPartitionStateChange(INITIAL, LOADING); + +// When the loading fails, the coordinator transitions to failed. +future.completeExceptionally(new Exception("failure")); +assertEquals(FAILED, ctx.state); +verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, FAILED); + +// Start loading a new topic partition. +TopicPartition tp = new TopicPartition("__consumer_offsets", 1); +future = new CompletableFuture<>(); +when(loader.load(tp, coordinator)).thenReturn(future); +// Schedule the loading. +runtime.scheduleLoadOperation(tp, 0); +// Getting the context succeeds and the coordinator should be in loading. +ctx = runtime.contextOrThrow(tp); +assertEquals(LOADING, ctx.state); +verify(runtimeMetrics, times(2)).recordPartitionStateChange(INITIAL, LOADING); + +// When the loading completes, the coordinator transitions to active. +future.complete(null); +assertEquals(ACTIVE, ctx.state); +verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, ACTIVE); + +runtime.close(); +verify(runtimeMetrics, times(1)).recordPartitionStateChange(FAILED, CLOSED); Review Comment: We actually don't have the closed metric (and initial metric) anymore. This was removed in https://github.com/apache/kafka/pull/14417#discussion_r1334548382 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]
jeffkbkim commented on code in PR #14417: URL: https://github.com/apache/kafka/pull/14417#discussion_r1350893216 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java: ## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.metrics; + +import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState; + +import java.util.function.Supplier; + +/** + * Used by the group and transaction coordinator runtimes, the metrics suite holds partition state gauges and sensors. + */ +public interface CoordinatorRuntimeMetrics extends AutoCloseable { +/** + * Returns the metrics group. + */ +String metricsGroup(); + +/** + * Called when the partition state changes. + * @param oldState The old state. + * @param newState The new state to transition to. + */ +void onPartitionStateChange(CoordinatorState oldState, CoordinatorState newState); + +/** + * Record the partition load metric. + * @param startTimeMs The partition load start time. + * @param endTimeMs The partition load end time. + */ +void recordPartitionLoadSensor(long startTimeMs, long endTimeMs); + +/** + * Get the counter for partitions in Loading state. + * Only used for testing. + */ +long numPartitionsLoading(); + +/** + * Get the counter for partitions in Active state. + * Only used for testing. + */ +long numPartitionsActive(); + +/** + * Get the counter for partitions in Failed state. + * Only used for testing. + */ +long numPartitionsFailed(); + +/** + * Update the event queue time. + * + * @param durationMs The queue time. + */ +void updateEventQueueTime(long durationMs); + +/** + * Update the event queue processing time. + * + * @param durationMs The event processing time. + */ +void updateEventQueueProcessingTime(long durationMs); + +/** + * Record the failed event. + */ +void recordFailedEvent(); + +/** + * Record the successful event. + */ +void recordSuccessfulEvent(); + +/** + * Record the thread idle ratio. + * @param ratio The idle ratio. + */ +void recordThreadIdleRatio(double ratio); Review Comment: Right, we won't be able to differentiate the idle ratio for each individual thread. Is this something we should have? > So we have an average an min for idle ratio. Is the minimum value just the lowest recorded at a given time frame? Yes, that's right -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]
jeffkbkim commented on code in PR #14417: URL: https://github.com/apache/kafka/pull/14417#discussion_r1350893216 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java: ## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.metrics; + +import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState; + +import java.util.function.Supplier; + +/** + * Used by the group and transaction coordinator runtimes, the metrics suite holds partition state gauges and sensors. + */ +public interface CoordinatorRuntimeMetrics extends AutoCloseable { +/** + * Returns the metrics group. + */ +String metricsGroup(); + +/** + * Called when the partition state changes. + * @param oldState The old state. + * @param newState The new state to transition to. + */ +void onPartitionStateChange(CoordinatorState oldState, CoordinatorState newState); + +/** + * Record the partition load metric. + * @param startTimeMs The partition load start time. + * @param endTimeMs The partition load end time. + */ +void recordPartitionLoadSensor(long startTimeMs, long endTimeMs); + +/** + * Get the counter for partitions in Loading state. + * Only used for testing. + */ +long numPartitionsLoading(); + +/** + * Get the counter for partitions in Active state. + * Only used for testing. + */ +long numPartitionsActive(); + +/** + * Get the counter for partitions in Failed state. + * Only used for testing. + */ +long numPartitionsFailed(); + +/** + * Update the event queue time. + * + * @param durationMs The queue time. + */ +void updateEventQueueTime(long durationMs); + +/** + * Update the event queue processing time. + * + * @param durationMs The event processing time. + */ +void updateEventQueueProcessingTime(long durationMs); + +/** + * Record the failed event. + */ +void recordFailedEvent(); + +/** + * Record the successful event. + */ +void recordSuccessfulEvent(); + +/** + * Record the thread idle ratio. + * @param ratio The idle ratio. + */ +void recordThreadIdleRatio(double ratio); Review Comment: Right, we won't be able to differentiate the idle ratio for each individual thread. > So we have an average an min for idle ratio. Is the minimum value just the lowest recorded at a given time frame? Yes, that's right -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]
jeffkbkim commented on code in PR #14417: URL: https://github.com/apache/kafka/pull/14417#discussion_r1350888321 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java: ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.metrics; + +import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState; + +import java.util.function.Supplier; + +/** + * Used by the group and transaction coordinator runtimes, the metrics suite holds partition state gauges and sensors. + */ +public interface CoordinatorRuntimeMetrics extends AutoCloseable { + +/** + * Called when the partition state changes. + * @param oldState The old state. + * @param newState The new state to transition to. + */ +void recordPartitionStateChange(CoordinatorState oldState, CoordinatorState newState); + +/** + * Record the partition load metric. + * @param startTimeMs The partition load start time. + * @param endTimeMs The partition load end time. + */ +void recordPartitionLoadSensor(long startTimeMs, long endTimeMs); + +/** + * Update the event queue time. + * + * @param durationMs The queue time. + */ +void recordEventQueueTime(long durationMs); + +/** + * Update the event queue processing time. + * + * @param durationMs The event processing time. + */ +void recordEventQueueProcessingTime(long durationMs); + +/** + * Record the thread idle ratio. + * @param ratio The idle ratio. + */ +void recordThreadIdleRatio(double ratio); + +/** + * Register the event queue size gauge. + * + * @param sizeSupplier The size supplier. + */ +void registerEventQueueSizeGauge(Supplier sizeSupplier); Review Comment: Ah yes. Registering should only happen once -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15468: Prevent transaction coordinator reloads on already loaded leaders [kafka]
jolshan commented on code in PR #14489: URL: https://github.com/apache/kafka/pull/14489#discussion_r1350877639 ## core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala: ## @@ -515,7 +521,10 @@ class TransactionStateManager(brokerId: Int, * metadata cache with the transactional ids. This operation must be resilient to any partial state left off from * the previous loading / unloading operation. */ - def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: Int, sendTxnMarkers: SendTxnMarkersCallback): Unit = { + def loadTransactionsForTxnTopicPartition(partitionId: Int, + coordinatorEpoch: Int, + sendTxnMarkers: SendTxnMarkersCallback, + hadTransactionStateAlreadyLoaded: Boolean): Unit = { Review Comment: I opted to go for simpler and chose `transactionStateLoaded`. I updated the method to say the same. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
jeffkbkim commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1350863763 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets( .setTopics(topicResponses); } +/** + * Remove expired offsets for group. + * + * @param groupId The group id. + * @param records The list of records to populate with offset commit tombstone records. + * @param offsetsRetentionMs The offset retention in milliseconds. + * + * @return The group id if the group no longer has any offsets remaining, empty otherwise. + */ +public Optional cleanupExpiredOffsets(String groupId, List records, long offsetsRetentionMs) { +TimelineHashMap> offsetsByTopic = offsetsByGroup.get(groupId); +if (offsetsByTopic == null) { +return Optional.of(groupId); +} +try { +Group group = groupMetadataManager.group(groupId); +ExpirationCondition expirationCondition = group.expirationCondition(); +Set expiredPartitions = new HashSet<>(); +long currentTimestamp = time.milliseconds(); +AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true); +offsetsByTopic.forEach((topic, partitions) -> { +if (!expirationCondition.subscribedTopics.contains(topic)) { Review Comment: > In this case, isn't subscribedTopics going to be set to an optional containing an empty list? ah.. i misread and thought subscribedTopics becomes an empty optional, not an empty set. So we would return `false` which is the correct behavior. > Did you do it because it is correct or by mistake? which change, to add another argument? I added that so we can rely on using `isSubscribedToTopic` instead of passing in the subscribed topics set. i will update with your suggestion, it makes sense to me. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets( .setTopics(topicResponses); } +/** + * Remove expired offsets for group. + * + * @param groupId The group id. + * @param records The list of records to populate with offset commit tombstone records. + * @param offsetsRetentionMs The offset retention in milliseconds. + * + * @return The group id if the group no longer has any offsets remaining, empty otherwise. + */ +public Optional cleanupExpiredOffsets(String groupId, List records, long offsetsRetentionMs) { +TimelineHashMap> offsetsByTopic = offsetsByGroup.get(groupId); +if (offsetsByTopic == null) { +return Optional.of(groupId); +} +try { +Group group = groupMetadataManager.group(groupId); +ExpirationCondition expirationCondition = group.expirationCondition(); +Set expiredPartitions = new HashSet<>(); +long currentTimestamp = time.milliseconds(); +AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true); +offsetsByTopic.forEach((topic, partitions) -> { +if (!expirationCondition.subscribedTopics.contains(topic)) { Review Comment: > In this case, isn't subscribedTopics going to be set to an optional containing an empty list? ah.. i misread and thought subscribedTopics becomes an empty optional, not an empty set. So we would return `false` which is the correct behavior. > Did you do it because it is correct or by mistake? which change, to add another argument? I added that so we can rely on using `isSubscribedToTopic` instead of passing in the subscribed topics set. i will update with your suggestion, it makes sense to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]
ahuang98 commented on code in PR #14428: URL: https://github.com/apache/kafka/pull/14428#discussion_r1350814062 ## raft/src/main/java/org/apache/kafka/raft/LeaderState.java: ## @@ -79,6 +86,38 @@ protected LeaderState( this.grantingVoters = Collections.unmodifiableSet(new HashSet<>(grantingVoters)); this.log = logContext.logger(LeaderState.class); this.accumulator = Objects.requireNonNull(accumulator, "accumulator must be non-null"); +this.fetchTimeoutMs = fetchTimeoutMs; +this.fetchTimer = time.timer(fetchTimeoutMs); +} + +// Check if the fetchTimer is expired because we didn't receive fetch/fetchSnapshot request from the majority of Review Comment: nit: ```suggestion // Check if the fetchTimer is expired because we didn't receive a valid fetch/fetchSnapshot request from the majority of ``` same for the next method's description -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15221; Fix the race between fetch requests from a rebooted follower. [kafka]
CalvinConfluent commented on code in PR #14053: URL: https://github.com/apache/kafka/pull/14053#discussion_r1350811556 ## core/src/main/scala/kafka/cluster/Replica.scala: ## @@ -98,14 +105,22 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log * fetch request is always smaller than the leader's LEO, which can happen if small produce requests are received at * high frequency. */ - def updateFetchState( + def updateFetchStateOrThrow( followerFetchOffsetMetadata: LogOffsetMetadata, followerStartOffset: Long, followerFetchTimeMs: Long, leaderEndOffset: Long, brokerEpoch: Long ): Unit = { replicaState.updateAndGet { currentReplicaState => + val cachedBrokerEpoch = if (metadataCache.isInstanceOf[KRaftMetadataCache]) + metadataCache.asInstanceOf[KRaftMetadataCache].getAliveBrokerEpoch(brokerId) else Option(-1L) + // Fence the update if it provides a stale broker epoch. + if (brokerEpoch != -1 && cachedBrokerEpoch.exists(_ > brokerEpoch)) { Review Comment: Makes sense. For the fetch request with a higher epoch should also be fenced. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15221; Fix the race between fetch requests from a rebooted follower. [kafka]
CalvinConfluent commented on code in PR #14053: URL: https://github.com/apache/kafka/pull/14053#discussion_r1350811307 ## core/src/main/scala/kafka/cluster/Replica.scala: ## @@ -98,14 +105,22 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log * fetch request is always smaller than the leader's LEO, which can happen if small produce requests are received at * high frequency. */ - def updateFetchState( + def updateFetchStateOrThrow( followerFetchOffsetMetadata: LogOffsetMetadata, followerStartOffset: Long, followerFetchTimeMs: Long, leaderEndOffset: Long, brokerEpoch: Long ): Unit = { replicaState.updateAndGet { currentReplicaState => + val cachedBrokerEpoch = if (metadataCache.isInstanceOf[KRaftMetadataCache]) Review Comment: Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
jeffkbkim commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1350811243 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationCondition.java: ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +/** + * An offset is considered expired based on different factors, such as the state of the group + * and/or the GroupMetadata record version (for generic groups). This class is used to check + * how offsets for the group should be expired. + */ +public interface OffsetExpirationCondition { + +/** + * Given an offset metadata and offsets retention, return whether the offset is expired or not. + * + * @param offset The offset metadata. + * @param currentTimestamp The current timestamp. + * @param offsetsRetentionMs The offset retention. + * + * @return Whether the offset is considered expired or not. + */ +boolean isOffsetExpired(OffsetAndMetadata offset, long currentTimestamp, long offsetsRetentionMs); +} Review Comment: misunderstood. will fix -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]
jolshan commented on PR #14417: URL: https://github.com/apache/kafka/pull/14417#issuecomment-1753860433 Let's also checkstyle in the build. Something seems to be off in the group coordinator module. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]
junrao commented on code in PR #14406: URL: https://github.com/apache/kafka/pull/14406#discussion_r1350697908 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java: ## @@ -621,56 +825,163 @@ public void assign(Collection partitions) { throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic"); } -// TODO: implementation of refactored Fetcher will be included in forthcoming commits. -// fetcher.clearBufferedDataForUnassignedPartitions(partitions); +// Clear the buffered data which are not a part of newly assigned topics +final Set currentTopicPartitions = new HashSet<>(); + +for (TopicPartition tp : subscriptions.assignedPartitions()) { +if (partitions.contains(tp)) +currentTopicPartitions.add(tp); +} + +fetchBuffer.retainAll(currentTopicPartitions); // assignment change event will trigger autocommit if it is configured and the group id is specified. This is // to make sure offsets of topic partitions the consumer is unsubscribing from are committed since there will -// be no following rebalance -eventHandler.add(new AssignmentChangeApplicationEvent(this.subscriptions.allConsumed(), time.milliseconds())); +// be no following rebalance. +// +// See the ApplicationEventProcessor.process() method that handles this event for more detail. +applicationEventHandler.add(new AssignmentChangeApplicationEvent(subscriptions.allConsumed(), time.milliseconds())); log.info("Assigned to partition(s): {}", join(partitions, ", ")); -if (this.subscriptions.assignFromUser(new HashSet<>(partitions))) -eventHandler.add(new NewTopicsMetadataUpdateRequestEvent()); +if (subscriptions.assignFromUser(new HashSet<>(partitions))) +applicationEventHandler.add(new NewTopicsMetadataUpdateRequestEvent()); } @Override -public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) { -throw new KafkaException("method not implemented"); +public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { +maybeThrowInvalidGroupIdException(); +if (pattern == null || pattern.toString().isEmpty()) +throw new IllegalArgumentException("Topic pattern to subscribe to cannot be " + (pattern == null ? +"null" : "empty")); + +throwIfNoAssignorsConfigured(); +log.info("Subscribed to pattern: '{}'", pattern); +subscriptions.subscribe(pattern, listener); +updatePatternSubscription(metadata.fetch()); +metadata.requestUpdateForNewTopics(); +} + +/** + * TODO: remove this when we implement the KIP-848 protocol. + * + * + * The contents of this method are shamelessly stolen from + * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are used here because we won't have access + * to a {@link ConsumerCoordinator} in this code. Perhaps it could be moved to a ConsumerUtils class? + * + * @param cluster Cluster from which we get the topics + */ +private void updatePatternSubscription(Cluster cluster) { +final Set topicsToSubscribe = cluster.topics().stream() +.filter(subscriptions::matchesSubscribedPattern) +.collect(Collectors.toSet()); +if (subscriptions.subscribeFromPattern(topicsToSubscribe)) +metadata.requestUpdateForNewTopics(); } @Override public void subscribe(Pattern pattern) { -throw new KafkaException("method not implemented"); +subscribe(pattern, new NoOpConsumerRebalanceListener()); } @Override public void unsubscribe() { -throw new KafkaException("method not implemented"); +fetchBuffer.retainAll(Collections.emptySet()); +subscriptions.unsubscribe(); } @Override @Deprecated -public ConsumerRecords poll(long timeout) { -throw new KafkaException("method not implemented"); +public ConsumerRecords poll(final long timeoutMs) { +return poll(Duration.ofMillis(timeoutMs)); } // Visible for testing WakeupTrigger wakeupTrigger() { return wakeupTrigger; } -private static ClusterResourceListeners configureClusterResourceListeners( -final Deserializer keyDeserializer, -final Deserializer valueDeserializer, -final List... candidateLists) { -ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners(); -for (List candidateList: candidateLists) -clusterResourceListeners.maybeAddAll(candidateList); +/** + * Send the requests for fetch data to the {@link ConsumerNetworkThread network thread} and set up to + * collect
Re: [PR] KAFKA-14595 Move ReassignPartitionsCommand to java [kafka]
nizhikov commented on code in PR #13247: URL: https://github.com/apache/kafka/pull/13247#discussion_r1350743924 ## tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java: ## @@ -769,22 +762,12 @@ public void testPropagateInvalidJsonError() { } @SuppressWarnings("unchecked") -private static scala.collection.immutable.Set set(final T... set) { -return mutableSet(set).toSet(); -} - -@SuppressWarnings({"deprecation", "unchecked"}) -private static scala.collection.mutable.Set mutableSet(final T...set) { -return JavaConverters.asScalaSet(new HashSet<>(Arrays.asList(set))); +private static Set set(final T... set) { Review Comment: Fixed. Yes, we able to remove `set` method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14595 Move ReassignPartitionsCommand to java [kafka]
jolshan commented on code in PR #13247: URL: https://github.com/apache/kafka/pull/13247#discussion_r1350759368 ## tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsIntegrationTest.java: ## @@ -857,68 +864,18 @@ public void close() { } } -private ReassignPartitionsCommand.VerifyAssignmentResult asScala(VerifyAssignmentResult res) { -Map partStates = new HashMap<>(); -res.partStates.forEach((tp, state) -> partStates.put(tp, asScala(state))); - -Map moveStates = new HashMap<>(); -res.moveStates.forEach((tpr, state) -> moveStates.put(tpr, asScala(state))); - -return new ReassignPartitionsCommand.VerifyAssignmentResult(asScala(partStates), res.partsOngoing, asScala(moveStates), res.movesOngoing); -} - -@SuppressWarnings({"unchecked"}) -private ReassignPartitionsCommand.PartitionReassignmentState asScala(PartitionReassignmentState state) { -return new ReassignPartitionsCommand.PartitionReassignmentState( -seq((List) state.currentReplicas), -seq((List) state.targetReplicas), -state.done -); -} - -private ReassignPartitionsCommand.LogDirMoveState asScala(LogDirMoveState state) { -if (state instanceof ActiveMoveState) { -ActiveMoveState s = (ActiveMoveState) state; -return new ReassignPartitionsCommand.ActiveMoveState(s.currentLogDir, s.targetLogDir, s.futureLogDir); -} else if (state instanceof CancelledMoveState) { -CancelledMoveState s = (CancelledMoveState) state; -return new ReassignPartitionsCommand.CancelledMoveState(s.currentLogDir, s.targetLogDir); -} else if (state instanceof CompletedMoveState) { -CompletedMoveState s = (CompletedMoveState) state; -return new ReassignPartitionsCommand.CompletedMoveState(s.targetLogDir); -} else if (state instanceof MissingLogDirMoveState) { -MissingLogDirMoveState s = (MissingLogDirMoveState) state; -return new ReassignPartitionsCommand.MissingLogDirMoveState(s.targetLogDir); -} else if (state instanceof MissingReplicaMoveState) { -MissingReplicaMoveState s = (MissingReplicaMoveState) state; -return new ReassignPartitionsCommand.MissingReplicaMoveState(s.targetLogDir); -} - -throw new IllegalArgumentException("Unknown state " + state); -} - @SuppressWarnings("unchecked") -static scala.collection.immutable.Set set(final T... set) { -return mutableSet(set).toSet(); +static Set set(final T... set) { +return new HashSet<>(Arrays.asList(set)); Review Comment: Got it. Thanks for clarifying. (That's going to be a fun one to migrate too ) ## tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsIntegrationTest.java: ## @@ -857,68 +864,18 @@ public void close() { } } -private ReassignPartitionsCommand.VerifyAssignmentResult asScala(VerifyAssignmentResult res) { -Map partStates = new HashMap<>(); -res.partStates.forEach((tp, state) -> partStates.put(tp, asScala(state))); - -Map moveStates = new HashMap<>(); -res.moveStates.forEach((tpr, state) -> moveStates.put(tpr, asScala(state))); - -return new ReassignPartitionsCommand.VerifyAssignmentResult(asScala(partStates), res.partsOngoing, asScala(moveStates), res.movesOngoing); -} - -@SuppressWarnings({"unchecked"}) -private ReassignPartitionsCommand.PartitionReassignmentState asScala(PartitionReassignmentState state) { -return new ReassignPartitionsCommand.PartitionReassignmentState( -seq((List) state.currentReplicas), -seq((List) state.targetReplicas), -state.done -); -} - -private ReassignPartitionsCommand.LogDirMoveState asScala(LogDirMoveState state) { -if (state instanceof ActiveMoveState) { -ActiveMoveState s = (ActiveMoveState) state; -return new ReassignPartitionsCommand.ActiveMoveState(s.currentLogDir, s.targetLogDir, s.futureLogDir); -} else if (state instanceof CancelledMoveState) { -CancelledMoveState s = (CancelledMoveState) state; -return new ReassignPartitionsCommand.CancelledMoveState(s.currentLogDir, s.targetLogDir); -} else if (state instanceof CompletedMoveState) { -CompletedMoveState s = (CompletedMoveState) state; -return new ReassignPartitionsCommand.CompletedMoveState(s.targetLogDir); -} else if (state instanceof MissingLogDirMoveState) { -MissingLogDirMoveState s = (MissingLogDirMoveState) state; -return new ReassignPartitionsCommand.MissingLogDirMoveState(s.targetLogDir); -} else if (state instanceof MissingReplicaMoveState) { -MissingReplicaMoveState s = (MissingReplicaMoveState) state; -
[jira] [Updated] (KAFKA-15569) Update test and add test cases in IQv2StoreIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-15569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15569: Issue Type: Test (was: Improvement) > Update test and add test cases in IQv2StoreIntegrationTest > -- > > Key: KAFKA-15569 > URL: https://issues.apache.org/jira/browse/KAFKA-15569 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Hanyu Zheng >Assignee: Hanyu Zheng >Priority: Major > > Update test and add test cases in IQv2StoreIntegrationTest -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15569) Update test and add test cases in IQv2StoreIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-15569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-15569: Component/s: streams unit tests > Update test and add test cases in IQv2StoreIntegrationTest > -- > > Key: KAFKA-15569 > URL: https://issues.apache.org/jira/browse/KAFKA-15569 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Hanyu Zheng >Assignee: Hanyu Zheng >Priority: Major > > Update test and add test cases in IQv2StoreIntegrationTest -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14595 Move ReassignPartitionsCommand to java [kafka]
nizhikov commented on code in PR #13247: URL: https://github.com/apache/kafka/pull/13247#discussion_r1350744820 ## tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsIntegrationTest.java: ## @@ -857,68 +864,18 @@ public void close() { } } -private ReassignPartitionsCommand.VerifyAssignmentResult asScala(VerifyAssignmentResult res) { -Map partStates = new HashMap<>(); -res.partStates.forEach((tp, state) -> partStates.put(tp, asScala(state))); - -Map moveStates = new HashMap<>(); -res.moveStates.forEach((tpr, state) -> moveStates.put(tpr, asScala(state))); - -return new ReassignPartitionsCommand.VerifyAssignmentResult(asScala(partStates), res.partsOngoing, asScala(moveStates), res.movesOngoing); -} - -@SuppressWarnings({"unchecked"}) -private ReassignPartitionsCommand.PartitionReassignmentState asScala(PartitionReassignmentState state) { -return new ReassignPartitionsCommand.PartitionReassignmentState( -seq((List) state.currentReplicas), -seq((List) state.targetReplicas), -state.done -); -} - -private ReassignPartitionsCommand.LogDirMoveState asScala(LogDirMoveState state) { -if (state instanceof ActiveMoveState) { -ActiveMoveState s = (ActiveMoveState) state; -return new ReassignPartitionsCommand.ActiveMoveState(s.currentLogDir, s.targetLogDir, s.futureLogDir); -} else if (state instanceof CancelledMoveState) { -CancelledMoveState s = (CancelledMoveState) state; -return new ReassignPartitionsCommand.CancelledMoveState(s.currentLogDir, s.targetLogDir); -} else if (state instanceof CompletedMoveState) { -CompletedMoveState s = (CompletedMoveState) state; -return new ReassignPartitionsCommand.CompletedMoveState(s.targetLogDir); -} else if (state instanceof MissingLogDirMoveState) { -MissingLogDirMoveState s = (MissingLogDirMoveState) state; -return new ReassignPartitionsCommand.MissingLogDirMoveState(s.targetLogDir); -} else if (state instanceof MissingReplicaMoveState) { -MissingReplicaMoveState s = (MissingReplicaMoveState) state; -return new ReassignPartitionsCommand.MissingReplicaMoveState(s.targetLogDir); -} - -throw new IllegalArgumentException("Unknown state " + state); -} - @SuppressWarnings("unchecked") -static scala.collection.immutable.Set set(final T... set) { -return mutableSet(set).toSet(); +static Set set(final T... set) { +return new HashSet<>(Arrays.asList(set)); Review Comment: We able to remove `set` method. But, `mutableSet` and `seq` has to stay, because, we use `QuorumTestHarness` as parent and `TestUtils` which written in scala. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14595 Move ReassignPartitionsCommand to java [kafka]
nizhikov commented on code in PR #13247: URL: https://github.com/apache/kafka/pull/13247#discussion_r1350743924 ## tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java: ## @@ -769,22 +762,12 @@ public void testPropagateInvalidJsonError() { } @SuppressWarnings("unchecked") -private static scala.collection.immutable.Set set(final T... set) { -return mutableSet(set).toSet(); -} - -@SuppressWarnings({"deprecation", "unchecked"}) -private static scala.collection.mutable.Set mutableSet(final T...set) { -return JavaConverters.asScalaSet(new HashSet<>(Arrays.asList(set))); +private static Set set(final T... set) { Review Comment: Fixed. Yes, we able to remove these method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15221; Fix the race between fetch requests from a rebooted follower. [kafka]
hachikuji commented on code in PR #14053: URL: https://github.com/apache/kafka/pull/14053#discussion_r1350736079 ## core/src/main/scala/kafka/cluster/Replica.scala: ## @@ -98,14 +105,22 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log * fetch request is always smaller than the leader's LEO, which can happen if small produce requests are received at * high frequency. */ - def updateFetchState( + def updateFetchStateOrThrow( followerFetchOffsetMetadata: LogOffsetMetadata, followerStartOffset: Long, followerFetchTimeMs: Long, leaderEndOffset: Long, brokerEpoch: Long ): Unit = { replicaState.updateAndGet { currentReplicaState => + val cachedBrokerEpoch = if (metadataCache.isInstanceOf[KRaftMetadataCache]) Review Comment: nit: in scala, it's usually cleaner to use a match instead of `isInstanceOf`. ## core/src/main/scala/kafka/cluster/Replica.scala: ## @@ -98,14 +105,22 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log * fetch request is always smaller than the leader's LEO, which can happen if small produce requests are received at * high frequency. */ - def updateFetchState( + def updateFetchStateOrThrow( followerFetchOffsetMetadata: LogOffsetMetadata, followerStartOffset: Long, followerFetchTimeMs: Long, leaderEndOffset: Long, brokerEpoch: Long ): Unit = { replicaState.updateAndGet { currentReplicaState => + val cachedBrokerEpoch = if (metadataCache.isInstanceOf[KRaftMetadataCache]) + metadataCache.asInstanceOf[KRaftMetadataCache].getAliveBrokerEpoch(brokerId) else Option(-1L) + // Fence the update if it provides a stale broker epoch. + if (brokerEpoch != -1 && cachedBrokerEpoch.exists(_ > brokerEpoch)) { Review Comment: Should we check for equality? I guess the basic question is whether we allow fetches from a higher epoch than what is in the cache? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-15569) Update test and add test cases in IQv2StoreIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-15569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hanyu Zheng reassigned KAFKA-15569: --- Assignee: Hanyu Zheng > Update test and add test cases in IQv2StoreIntegrationTest > -- > > Key: KAFKA-15569 > URL: https://issues.apache.org/jira/browse/KAFKA-15569 > Project: Kafka > Issue Type: Improvement >Reporter: Hanyu Zheng >Assignee: Hanyu Zheng >Priority: Major > > Update test and add test cases in IQv2StoreIntegrationTest -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15565) KafkaAdminClient does not honor request timeout ms
[ https://issues.apache.org/jira/browse/KAFKA-15565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773463#comment-17773463 ] Sankalp Bhatia edited comment on KAFKA-15565 at 10/9/23 7:34 PM: - Thanks. The reason I say it is a bug is because the overriding you mentioned in [https://github.com/apache/kafka/blob/bf51a50a564ee43d3515c82fc706f17325c4602f/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L571] takes the min of 1. default api timeout (60s) 2. metadataTimeout: This I believe is derived from the default request timeout if the metadata request is pending.(which is hardcoded to 1hr) 3. default request timeout (hardcoded to 1hr). (But as per contract should be derived from client config) Now consider a case where the client is unable to create a socket connection. Ideally, such a case should be handled in line [584|https://github.com/apache/kafka/blob/bf51a50a564ee43d3515c82fc706f17325c4602f/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L584], but since the selector enters a long poll of 60s (this can happen when no other selection key is ready), it only gets to know about the timed out connection after 60s, and by that time the Client Call needs to be dropped without any retries. Had the adminClient honored the request timeout, the poll would have been shorter and the request could have been retried. was (Author: sankalpbhatia): Thanks. The reason I say it is a bug is because the overriding you mentioned in [https://github.com/apache/kafka/blob/bf51a50a564ee43d3515c82fc706f17325c4602f/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L571] takes the min of 1. default api timeout (60s) 2. metadataTimeout: This I believe is derived from the default request timeout if the metadata request is pending.(which is hardcoded to 1hr) 3. default request timeout (hardcoded to 1hr). (But as per contract should be derived from client config) Now consider a case where the client is unable to create a socket connection. Ideally, such a case should be handled in line [584|https://github.com/apache/kafka/blob/bf51a50a564ee43d3515c82fc706f17325c4602f/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L584], but since the selector enters a long poll of 60s, it only gets to know about the timed out connection after 60s, and by that time the Client Call needs to be dropped without any retries. Had the adminClient honored the request timeout, the poll would have been shorter and the request could have been retried. > KafkaAdminClient does not honor request timeout ms > --- > > Key: KAFKA-15565 > URL: https://issues.apache.org/jira/browse/KAFKA-15565 > Project: Kafka > Issue Type: Bug >Reporter: Sankalp Bhatia >Assignee: Sankalp Bhatia >Priority: Minor > > It seems to me there is a bug in this line in the KafkaAdminClient. For the > constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a > hardcoded value of 1 hour. Ideally, this should be derived from the client > config "request.timeout.ms" from the AdminClientConfig[2]. > References > [1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521] > [2][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15569) Update test and add test cases in IQv2StoreIntegrationTest
Hanyu Zheng created KAFKA-15569: --- Summary: Update test and add test cases in IQv2StoreIntegrationTest Key: KAFKA-15569 URL: https://issues.apache.org/jira/browse/KAFKA-15569 Project: Kafka Issue Type: Improvement Reporter: Hanyu Zheng Update test and add test cases in IQv2StoreIntegrationTest -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15565) KafkaAdminClient does not honor request timeout ms
[ https://issues.apache.org/jira/browse/KAFKA-15565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773463#comment-17773463 ] Sankalp Bhatia edited comment on KAFKA-15565 at 10/9/23 7:25 PM: - Thanks. The reason I say it is a bug is because the overriding you mentioned in [https://github.com/apache/kafka/blob/bf51a50a564ee43d3515c82fc706f17325c4602f/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L571] takes the min of 1. default api timeout (60s) 2. metadataTimeout: This I believe is derived from the default request timeout if the metadata request is pending.(which is hardcoded to 1hr) 3. default request timeout (hardcoded to 1hr). (But as per contract should be derived from client config) Now consider a case where the client is unable to create a socket connection. Ideally, such a case should be handled in line [584|https://github.com/apache/kafka/blob/bf51a50a564ee43d3515c82fc706f17325c4602f/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L584], but since the selector enters a long poll of 60s, it only gets to know about the timed out connection after 60s, and by that time the Client Call needs to be dropped without any retries. Had the adminClient honored the request timeout, the poll would have been shorter and the request could have been retried. was (Author: sankalpbhatia): Thanks. The reason I say it is a bug is because the overriding you mentioned in [https://github.com/apache/kafka/blob/bf51a50a564ee43d3515c82fc706f17325c4602f/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L571] takes the min of # default api timeout (60s) # metadataTimeout: This I believe is derived from the default request timeout if the metadata request is pending.(which is hardcoded to 1hr) # default request timeout (hardcoded to 1hr). (But as per contract should be derived from client config) Now consider a case where the client is unable to create a socket connection. Ideally, such a case should be handled in line [584|https://github.com/apache/kafka/blob/bf51a50a564ee43d3515c82fc706f17325c4602f/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L584], but since the selector enters a long poll of 60s, it only gets to know about the timed out connection after 60s, and by that time the Client Call needs to be dropped without any retries. Had the adminClient honored the request timeout, the poll would have been shorter and the request could have been retried. > KafkaAdminClient does not honor request timeout ms > --- > > Key: KAFKA-15565 > URL: https://issues.apache.org/jira/browse/KAFKA-15565 > Project: Kafka > Issue Type: Bug >Reporter: Sankalp Bhatia >Assignee: Sankalp Bhatia >Priority: Minor > > It seems to me there is a bug in this line in the KafkaAdminClient. For the > constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a > hardcoded value of 1 hour. Ideally, this should be derived from the client > config "request.timeout.ms" from the AdminClientConfig[2]. > References > [1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521] > [2][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15565) KafkaAdminClient does not honor request timeout ms
[ https://issues.apache.org/jira/browse/KAFKA-15565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773463#comment-17773463 ] Sankalp Bhatia commented on KAFKA-15565: Thanks. The reason I say it is a bug is because the overriding you mentioned in [https://github.com/apache/kafka/blob/bf51a50a564ee43d3515c82fc706f17325c4602f/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L571] takes the min of # default api timeout (60s) # metadataTimeout: This I believe is derived from the default request timeout if the metadata request is pending.(which is hardcoded to 1hr) # default request timeout (hardcoded to 1hr). (But as per contract should be derived from client config) Now consider a case where the client is unable to create a socket connection. Ideally, such a case should be handled in line [584|https://github.com/apache/kafka/blob/bf51a50a564ee43d3515c82fc706f17325c4602f/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L584], but since the selector enters a long poll of 60s, it only gets to know about the timed out connection after 60s, and by that time the Client Call needs to be dropped without any retries. Had the adminClient honored the request timeout, the poll would have been shorter and the request could have been retried. > KafkaAdminClient does not honor request timeout ms > --- > > Key: KAFKA-15565 > URL: https://issues.apache.org/jira/browse/KAFKA-15565 > Project: Kafka > Issue Type: Bug >Reporter: Sankalp Bhatia >Assignee: Sankalp Bhatia >Priority: Minor > > It seems to me there is a bug in this line in the KafkaAdminClient. For the > constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a > hardcoded value of 1 hour. Ideally, this should be derived from the client > config "request.timeout.ms" from the AdminClientConfig[2]. > References > [1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521] > [2][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Kafka 12317: Relax non-null key requirement in Kafka Streams [kafka]
wcarlson5 commented on code in PR #14174: URL: https://github.com/apache/kafka/pull/14174#discussion_r1347987570 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java: ## @@ -51,8 +51,8 @@ public void init(final ProcessorContext context) { public void process(final Record record) { // if the key is null, we do not need to put the record into window store // since it will never be considered for join operations +context().forward(record); if (record.key() != null) { -context().forward(record); // Every record basically starts a new window. We're using a window store mostly for the retention. window.put(record.key(), record.value(), record.timestamp()); Review Comment: why should null keys not enter the window? ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java: ## @@ -39,22 +39,26 @@ public static boolean skipRecord( // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record -- // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored if (record.key() == null || record.value() == null) { -if (context.recordMetadata().isPresent()) { -final RecordMetadata recordMetadata = context.recordMetadata().get(); -logger.warn( -"Skipping record due to null key or value. " -+ "topic=[{}] partition=[{}] offset=[{}]", -recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset() -); -} else { -logger.warn( -"Skipping record due to null key or value. Topic, partition, and offset not known." -); -} -droppedRecordsSensor.record(); +dropRecord(logger, droppedRecordsSensor, context); return true; } else { return false; } } + +public static void dropRecord(final Logger logger, final Sensor droppedRecordsSensor, final ProcessorContext context) { Review Comment: I'm not a huge fan of splitting this out to a separate public method. I think you can just reuse the logic in skip record. ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -124,17 +124,20 @@ public void init(final ProcessorContext context) { @SuppressWarnings("unchecked") @Override public void process(final Record record) { -if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) { -return; -} -boolean needOuterJoin = outer; - final long inputRecordTimestamp = record.timestamp(); final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); - sharedTimeTracker.advanceStreamTime(inputRecordTimestamp); +if (outer && record.key() == null && record.value() != null) { Review Comment: what about inner left joins? Those values go into the window? Why? ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java: ## @@ -351,8 +353,23 @@ private void optimizeTopology(final Properties props) { LOG.debug("Optimizing the Kafka Streams graph for self-joins"); rewriteSingleStoreSelfJoin(root, new IdentityHashMap<>()); } +LOG.debug("Optimizing the Kafka Streams graph for null-key records"); +rewriteRepartitionNodes(); } +private void rewriteRepartitionNodes() { Review Comment: This is to prevent null keys to go into reparation topics, right? Will that effect results if a manual reparation is added? ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java: ## @@ -124,17 +124,20 @@ public void init(final ProcessorContext context) { @SuppressWarnings("unchecked") @Override public void process(final Record record) { -if (StreamStreamJoinUtil.skipRecord(record, LOG, droppedRecordsSensor, context())) { -return; -} -boolean needOuterJoin = outer; - final long inputRecordTimestamp = record.timestamp(); final long timeFrom = Math.max(0L, inputRecordTimestamp - joinBeforeMs); final long timeTo = Math.max(0L, inputRecordTimestamp + joinAfterMs); - sharedTimeTracker.advanceStreamTime(inputRecordTimestamp); +if (outer && record.key() == null && record.value() != null) { +
Re: [PR] MINOR: Only commit running active and standby tasks when tasks corrupted [kafka]
mjsax commented on code in PR #14508: URL: https://github.com/apache/kafka/pull/14508#discussion_r1350625946 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -223,10 +223,7 @@ boolean handleCorruption(final Set corruptedTasks) { final Collection tasksToCommit = allTasks() .values() .stream() -// TODO: once we remove state restoration from the stream thread, we can also remove -// the RESTORING state here, since there will not be any restoring tasks managed -// by the stream thread anymore. -.filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) +.filter(t -> t.state() == Task.State.RUNNING) Review Comment: I was just reading the TODO without much thinking about it... -- I guess we might still want to flush restoring tasks and write the checkpoint file (what is part to a commit) -- so should we execute `preCommit()` and `postCommit()` for those -- I agree that we won't have input topic offsets to be committed (and the should not be any TX). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14595 Move ReassignPartitionsCommand to java [kafka]
jolshan commented on code in PR #13247: URL: https://github.com/apache/kafka/pull/13247#discussion_r1350623608 ## tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java: ## @@ -769,22 +762,12 @@ public void testPropagateInvalidJsonError() { } @SuppressWarnings("unchecked") -private static scala.collection.immutable.Set set(final T... set) { -return mutableSet(set).toSet(); -} - -@SuppressWarnings({"deprecation", "unchecked"}) -private static scala.collection.mutable.Set mutableSet(final T...set) { -return JavaConverters.asScalaSet(new HashSet<>(Arrays.asList(set))); +private static Set set(final T... set) { Review Comment: Were we also not unable to remove these warnings? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14595 Move ReassignPartitionsCommand to java [kafka]
jolshan commented on code in PR #13247: URL: https://github.com/apache/kafka/pull/13247#discussion_r1350622281 ## tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsIntegrationTest.java: ## @@ -857,68 +864,18 @@ public void close() { } } -private ReassignPartitionsCommand.VerifyAssignmentResult asScala(VerifyAssignmentResult res) { -Map partStates = new HashMap<>(); -res.partStates.forEach((tp, state) -> partStates.put(tp, asScala(state))); - -Map moveStates = new HashMap<>(); -res.moveStates.forEach((tpr, state) -> moveStates.put(tpr, asScala(state))); - -return new ReassignPartitionsCommand.VerifyAssignmentResult(asScala(partStates), res.partsOngoing, asScala(moveStates), res.movesOngoing); -} - -@SuppressWarnings({"unchecked"}) -private ReassignPartitionsCommand.PartitionReassignmentState asScala(PartitionReassignmentState state) { -return new ReassignPartitionsCommand.PartitionReassignmentState( -seq((List) state.currentReplicas), -seq((List) state.targetReplicas), -state.done -); -} - -private ReassignPartitionsCommand.LogDirMoveState asScala(LogDirMoveState state) { -if (state instanceof ActiveMoveState) { -ActiveMoveState s = (ActiveMoveState) state; -return new ReassignPartitionsCommand.ActiveMoveState(s.currentLogDir, s.targetLogDir, s.futureLogDir); -} else if (state instanceof CancelledMoveState) { -CancelledMoveState s = (CancelledMoveState) state; -return new ReassignPartitionsCommand.CancelledMoveState(s.currentLogDir, s.targetLogDir); -} else if (state instanceof CompletedMoveState) { -CompletedMoveState s = (CompletedMoveState) state; -return new ReassignPartitionsCommand.CompletedMoveState(s.targetLogDir); -} else if (state instanceof MissingLogDirMoveState) { -MissingLogDirMoveState s = (MissingLogDirMoveState) state; -return new ReassignPartitionsCommand.MissingLogDirMoveState(s.targetLogDir); -} else if (state instanceof MissingReplicaMoveState) { -MissingReplicaMoveState s = (MissingReplicaMoveState) state; -return new ReassignPartitionsCommand.MissingReplicaMoveState(s.targetLogDir); -} - -throw new IllegalArgumentException("Unknown state " + state); -} - @SuppressWarnings("unchecked") -static scala.collection.immutable.Set set(final T... set) { -return mutableSet(set).toSet(); +static Set set(final T... set) { +return new HashSet<>(Arrays.asList(set)); Review Comment: Were we not able to remove the deprecated method below? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]
jolshan commented on code in PR #14417: URL: https://github.com/apache/kafka/pull/14417#discussion_r1350610255 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java: ## @@ -1340,4 +1377,119 @@ public void testNonRetryableTimer() throws InterruptedException { assertEquals(1, cnt.get()); assertEquals(0, ctx.timer.size()); } + +@Test +public void testStateChanges() throws Exception { +MockTimer timer = new MockTimer(); +MockPartitionWriter writer = mock(MockPartitionWriter.class); +MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class); +MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class); +MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class); +MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class); +GroupCoordinatorRuntimeMetrics runtimeMetrics = mock(GroupCoordinatorRuntimeMetrics.class); + +CoordinatorRuntime runtime = +new CoordinatorRuntime.Builder() +.withTime(timer.time()) +.withTimer(timer) +.withLoader(loader) +.withEventProcessor(new DirectEventProcessor()) +.withPartitionWriter(writer) +.withCoordinatorShardBuilderSupplier(supplier) +.withCoordinatorRuntimeMetrics(runtimeMetrics) +.build(); + +when(builder.withSnapshotRegistry(any())).thenReturn(builder); +when(builder.withLogContext(any())).thenReturn(builder); +when(builder.withTime(any())).thenReturn(builder); +when(builder.withTimer(any())).thenReturn(builder); +when(builder.withTopicPartition(any())).thenReturn(builder); +when(builder.build()).thenReturn(coordinator); +when(supplier.get()).thenReturn(builder); +CompletableFuture future = new CompletableFuture<>(); +when(loader.load(TP, coordinator)).thenReturn(future); + +// Schedule the loading. +runtime.scheduleLoadOperation(TP, 0); + +// Getting the context succeeds and the coordinator should be in loading. +CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); +assertEquals(LOADING, ctx.state); +verify(runtimeMetrics, times(1)).recordPartitionStateChange(INITIAL, LOADING); + +// When the loading fails, the coordinator transitions to failed. +future.completeExceptionally(new Exception("failure")); +assertEquals(FAILED, ctx.state); +verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, FAILED); + +// Start loading a new topic partition. +TopicPartition tp = new TopicPartition("__consumer_offsets", 1); +future = new CompletableFuture<>(); +when(loader.load(tp, coordinator)).thenReturn(future); +// Schedule the loading. +runtime.scheduleLoadOperation(tp, 0); +// Getting the context succeeds and the coordinator should be in loading. +ctx = runtime.contextOrThrow(tp); +assertEquals(LOADING, ctx.state); +verify(runtimeMetrics, times(2)).recordPartitionStateChange(INITIAL, LOADING); + +// When the loading completes, the coordinator transitions to active. +future.complete(null); +assertEquals(ACTIVE, ctx.state); +verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, ACTIVE); + +runtime.close(); +verify(runtimeMetrics, times(1)).recordPartitionStateChange(FAILED, CLOSED); Review Comment: So we should see this closed metric increment when we resign leadership as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]
jolshan commented on code in PR #14417: URL: https://github.com/apache/kafka/pull/14417#discussion_r1350608637 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java: ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.metrics; + +import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState; + +import java.util.function.Supplier; + +/** + * Used by the group and transaction coordinator runtimes, the metrics suite holds partition state gauges and sensors. + */ +public interface CoordinatorRuntimeMetrics extends AutoCloseable { + +/** + * Called when the partition state changes. + * @param oldState The old state. + * @param newState The new state to transition to. + */ +void recordPartitionStateChange(CoordinatorState oldState, CoordinatorState newState); + +/** + * Record the partition load metric. + * @param startTimeMs The partition load start time. + * @param endTimeMs The partition load end time. + */ +void recordPartitionLoadSensor(long startTimeMs, long endTimeMs); + +/** + * Update the event queue time. + * + * @param durationMs The queue time. + */ +void recordEventQueueTime(long durationMs); + +/** + * Update the event queue processing time. + * + * @param durationMs The event processing time. + */ +void recordEventQueueProcessingTime(long durationMs); + +/** + * Record the thread idle ratio. + * @param ratio The idle ratio. + */ +void recordThreadIdleRatio(double ratio); + +/** + * Register the event queue size gauge. + * + * @param sizeSupplier The size supplier. + */ +void registerEventQueueSizeGauge(Supplier sizeSupplier); Review Comment: Hmm -- it should just be when we construct the metrics object right? (referring to the register method) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]
jolshan commented on code in PR #14417: URL: https://github.com/apache/kafka/pull/14417#discussion_r1350606817 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java: ## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.metrics; + +import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState; + +import java.util.function.Supplier; + +/** + * Used by the group and transaction coordinator runtimes, the metrics suite holds partition state gauges and sensors. + */ +public interface CoordinatorRuntimeMetrics extends AutoCloseable { +/** + * Returns the metrics group. + */ +String metricsGroup(); + +/** + * Called when the partition state changes. + * @param oldState The old state. + * @param newState The new state to transition to. + */ +void onPartitionStateChange(CoordinatorState oldState, CoordinatorState newState); + +/** + * Record the partition load metric. + * @param startTimeMs The partition load start time. + * @param endTimeMs The partition load end time. + */ +void recordPartitionLoadSensor(long startTimeMs, long endTimeMs); + +/** + * Get the counter for partitions in Loading state. + * Only used for testing. + */ +long numPartitionsLoading(); + +/** + * Get the counter for partitions in Active state. + * Only used for testing. + */ +long numPartitionsActive(); + +/** + * Get the counter for partitions in Failed state. + * Only used for testing. + */ +long numPartitionsFailed(); + +/** + * Update the event queue time. + * + * @param durationMs The queue time. + */ +void updateEventQueueTime(long durationMs); + +/** + * Update the event queue processing time. + * + * @param durationMs The event processing time. + */ +void updateEventQueueProcessingTime(long durationMs); + +/** + * Record the failed event. + */ +void recordFailedEvent(); + +/** + * Record the successful event. + */ +void recordSuccessfulEvent(); + +/** + * Record the thread idle ratio. + * @param ratio The idle ratio. + */ +void recordThreadIdleRatio(double ratio); Review Comment: So we have an average an min for idle ratio. Is the minimum value just the lowest recorded at a given time frame. (And we don't have a way to identify which thread is the one with the lowest value) ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java: ## @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.metrics; + +import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState; + +import java.util.function.Supplier; + +/** + * Used by the group and transaction coordinator runtimes, the metrics suite holds partition state gauges and sensors. + */ +public interface CoordinatorRuntimeMetrics extends AutoCloseable { +/** + * Returns the metrics group. + */ +String metricsGroup(); + +/** + * Called when the partition state changes. + * @param oldState The old state. + * @param newState The new state to transition to. + */ +void onPartitionStateChange(CoordinatorState
[jira] [Resolved] (KAFKA-15278) Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC
[ https://issues.apache.org/jira/browse/KAFKA-15278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee resolved KAFKA-15278. Resolution: Fixed > Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC > > > Key: KAFKA-15278 > URL: https://issues.apache.org/jira/browse/KAFKA-15278 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support, kip-848-e2e, kip-848-preview > > The necessary Java code that represents the {{ConsumerGroupHeartbeatRequest}} > and {{ConsumerGroupHeartbeatResponse}} are already present in the codebase. > It is assumed that the scaffolding for the other two will come along in time. > * Implement {{ConsumerGroupRequestManager}} > * Ensure that {{DefaultBackgroundThread}} correctly calculates I/O timeouts > so that the heartbeat occurs within the {{group.consumer.session.timeout.ms}} > interval regardless of other {{RequestManager}} instance activity > * Ensure error is handled correctly > * Ensure MembershipStateManager is updated on both successful and failures > cases, and the state machine is transioned to the correct state. > This task is part of the work to implement support for the new KIP-848 > consumer group protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15278) Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC
[ https://issues.apache.org/jira/browse/KAFKA-15278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773429#comment-17773429 ] Philip Nee commented on KAFKA-15278: The basic heartbeat is implemented. We are missing the assignment reconciliation and heartbeat on close logic. > Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC > > > Key: KAFKA-15278 > URL: https://issues.apache.org/jira/browse/KAFKA-15278 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support, kip-848-e2e, kip-848-preview > > The necessary Java code that represents the {{ConsumerGroupHeartbeatRequest}} > and {{ConsumerGroupHeartbeatResponse}} are already present in the codebase. > It is assumed that the scaffolding for the other two will come along in time. > * Implement {{ConsumerGroupRequestManager}} > * Ensure that {{DefaultBackgroundThread}} correctly calculates I/O timeouts > so that the heartbeat occurs within the {{group.consumer.session.timeout.ms}} > interval regardless of other {{RequestManager}} instance activity > * Ensure error is handled correctly > * Ensure MembershipStateManager is updated on both successful and failures > cases, and the state machine is transioned to the correct state. > This task is part of the work to implement support for the new KIP-848 > consumer group protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15278) Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC
[ https://issues.apache.org/jira/browse/KAFKA-15278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-15278: --- Description: The necessary Java code that represents the {{ConsumerGroupHeartbeatRequest}} and {{ConsumerGroupHeartbeatResponse}} are already present in the codebase. It is assumed that the scaffolding for the other two will come along in time. * Implement {{ConsumerGroupRequestManager}} * Ensure that {{DefaultBackgroundThread}} correctly calculates I/O timeouts so that the heartbeat occurs within the {{group.consumer.session.timeout.ms}} interval regardless of other {{RequestManager}} instance activity * Ensure error is handled correctly * Ensure MembershipStateManager is updated on both successful and failures cases, and the state machine is transioned to the correct state. This task is part of the work to implement support for the new KIP-848 consumer group protocol. was: The necessary Java code that represents the {{ConsumerGroupHeartbeatRequest}} and {{ConsumerGroupHeartbeatResponse}} are already present in the codebase. It is assumed that the scaffolding for the other two will come along in time. * Implement {{ConsumerGroupRequestManager}} * Ensure that {{DefaultBackgroundThread}} correctly calculates I/O timeouts so that the heartbeat occurs within the {{group.consumer.session.timeout.ms}} interval regardless of other {{RequestManager}} instance activity * Ensure error is handled correctly This task is part of the work to implement support for the new KIP-848 consumer group protocol. > Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC > > > Key: KAFKA-15278 > URL: https://issues.apache.org/jira/browse/KAFKA-15278 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support, kip-848-e2e, kip-848-preview > > The necessary Java code that represents the {{ConsumerGroupHeartbeatRequest}} > and {{ConsumerGroupHeartbeatResponse}} are already present in the codebase. > It is assumed that the scaffolding for the other two will come along in time. > * Implement {{ConsumerGroupRequestManager}} > * Ensure that {{DefaultBackgroundThread}} correctly calculates I/O timeouts > so that the heartbeat occurs within the {{group.consumer.session.timeout.ms}} > interval regardless of other {{RequestManager}} instance activity > * Ensure error is handled correctly > * Ensure MembershipStateManager is updated on both successful and failures > cases, and the state machine is transioned to the correct state. > This task is part of the work to implement support for the new KIP-848 > consumer group protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15278) Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC
[ https://issues.apache.org/jira/browse/KAFKA-15278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee reassigned KAFKA-15278: -- Assignee: Philip Nee > Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC > > > Key: KAFKA-15278 > URL: https://issues.apache.org/jira/browse/KAFKA-15278 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Philip Nee >Priority: Major > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support, kip-848-e2e, kip-848-preview > > The necessary Java code that represents the {{ConsumerGroupHeartbeatRequest}} > and {{ConsumerGroupHeartbeatResponse}} are already present in the codebase. > It is assumed that the scaffolding for the other two will come along in time. > * Implement {{ConsumerGroupRequestManager}} > * Ensure that {{DefaultBackgroundThread}} correctly calculates I/O timeouts > so that the heartbeat occurs within the {{group.consumer.session.timeout.ms}} > interval regardless of other {{RequestManager}} instance activity > * Ensure error is handled correctly > This task is part of the work to implement support for the new KIP-848 > consumer group protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15278) Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC
[ https://issues.apache.org/jira/browse/KAFKA-15278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-15278: --- Description: The necessary Java code that represents the {{ConsumerGroupHeartbeatRequest}} and {{ConsumerGroupHeartbeatResponse}} are already present in the codebase. It is assumed that the scaffolding for the other two will come along in time. * Implement {{ConsumerGroupRequestManager}} * Ensure that {{DefaultBackgroundThread}} correctly calculates I/O timeouts so that the heartbeat occurs within the {{group.consumer.session.timeout.ms}} interval regardless of other {{RequestManager}} instance activity * Ensure error is handled correctly This task is part of the work to implement support for the new KIP-848 consumer group protocol. was: The protocol introduces three new RPCs that the client uses to communicate with the broker: # [ConsumerGroupHeartbeat|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupHeartbeatAPI] The necessary Java code that represents the {{ConsumerGroupHeartbeatRequest}} and {{ConsumerGroupHeartbeatResponse}} are already present in the codebase. It is assumed that the scaffolding for the other two will come along in time. * Implement {{ConsumerGroupRequestManager}} * Ensure that {{DefaultBackgroundThread}} correctly calculates I/O timeouts so that the heartbeat occurs within the {{group.consumer.session.timeout.ms}} interval regardless of other {{RequestManager}} instance activity * Ensure error is handled correctly This task is part of the work to implement support for the new KIP-848 consumer group protocol. > Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC > > > Key: KAFKA-15278 > URL: https://issues.apache.org/jira/browse/KAFKA-15278 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support, kip-848-e2e, kip-848-preview > > The necessary Java code that represents the {{ConsumerGroupHeartbeatRequest}} > and {{ConsumerGroupHeartbeatResponse}} are already present in the codebase. > It is assumed that the scaffolding for the other two will come along in time. > * Implement {{ConsumerGroupRequestManager}} > * Ensure that {{DefaultBackgroundThread}} correctly calculates I/O timeouts > so that the heartbeat occurs within the {{group.consumer.session.timeout.ms}} > interval regardless of other {{RequestManager}} instance activity > * Ensure error is handled correctly > This task is part of the work to implement support for the new KIP-848 > consumer group protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15278) Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC
[ https://issues.apache.org/jira/browse/KAFKA-15278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-15278: --- Description: The protocol introduces three new RPCs that the client uses to communicate with the broker: # [ConsumerGroupHeartbeat|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupHeartbeatAPI] The necessary Java code that represents the {{ConsumerGroupHeartbeatRequest}} and {{ConsumerGroupHeartbeatResponse}} are already present in the codebase. It is assumed that the scaffolding for the other two will come along in time. * Implement {{ConsumerGroupRequestManager}} * Ensure that {{DefaultBackgroundThread}} correctly calculates I/O timeouts so that the heartbeat occurs within the {{group.consumer.session.timeout.ms}} interval regardless of other {{RequestManager}} instance activity * Ensure error is handled correctly This task is part of the work to implement support for the new KIP-848 consumer group protocol. was: The protocol introduces three new RPCs that the client uses to communicate with the broker: # [ConsumerGroupHeartbeat|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupHeartbeatAPI] # [ConsumerGroupPrepareAssignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupPrepareAssignmentAPI] # [ConsumerGroupInstallAssignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupInstallAssignmentAPI] The necessary Java code that represents the {{ConsumerGroupHeartbeatRequest}} and {{ConsumerGroupHeartbeatResponse}} are already present in the codebase. It is assumed that the scaffolding for the other two will come along in time. * Implement {{ConsumerGroupRequestManager}} * Ensure that {{DefaultBackgroundThread}} correctly calculates I/O timeouts so that the heartbeat occurs within the {{group.consumer.session.timeout.ms}} interval regardless of other {{RequestManager}} instance activity This task is part of the work to implement support for the new KIP-848 consumer group protocol. > Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC > > > Key: KAFKA-15278 > URL: https://issues.apache.org/jira/browse/KAFKA-15278 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support, kip-848-e2e, kip-848-preview > > The protocol introduces three new RPCs that the client uses to communicate > with the broker: > # > [ConsumerGroupHeartbeat|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupHeartbeatAPI] > The necessary Java code that represents the {{ConsumerGroupHeartbeatRequest}} > and {{ConsumerGroupHeartbeatResponse}} are already present in the codebase. > It is assumed that the scaffolding for the other two will come along in time. > * Implement {{ConsumerGroupRequestManager}} > * Ensure that {{DefaultBackgroundThread}} correctly calculates I/O timeouts > so that the heartbeat occurs within the {{group.consumer.session.timeout.ms}} > interval regardless of other {{RequestManager}} instance activity > * Ensure error is handled correctly > This task is part of the work to implement support for the new KIP-848 > consumer group protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15452: Access SslPrincipalMapper and kerberosShortNamer in Custom KafkaPrincipalBuilder(KIP-982) [kafka]
rbaddam commented on code in PR #14491: URL: https://github.com/apache/kafka/pull/14491#discussion_r1350588084 ## clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java: ## @@ -240,12 +242,19 @@ public static KafkaPrincipalBuilder createPrincipalBuilder(Map config KerberosShortNamer kerberosShortNamer, SslPrincipalMapper sslPrincipalMapper) { Class principalBuilderClass = (Class) configs.get(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG); -final KafkaPrincipalBuilder builder; +KafkaPrincipalBuilder builder; if (principalBuilderClass == null || principalBuilderClass == DefaultKafkaPrincipalBuilder.class) { builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer, sslPrincipalMapper); } else if (KafkaPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) { -builder = (KafkaPrincipalBuilder) Utils.newInstance(principalBuilderClass); +try { +Constructor constructor = principalBuilderClass.getConstructor(KerberosShortNamer.class, SslPrincipalMapper.class); +builder = (KafkaPrincipalBuilder) constructor.newInstance(kerberosShortNamer, sslPrincipalMapper); Review Comment: Thanks @plazma-prizma for the feedback, The above logic will be used whenever user want to config a custom KafkaPrincipalBuilder by extending the existing KafkaPrincipalBuilder. The goal is to pass already created kerberosShortNamer and sslPrincipalMapper so that User don't miss any functionalities like SSL_PRINCIPAL_MAPPING_RULES_CONFIG when they use custom(own) KafkaPrincipalBuilder. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15546) Transactions tool duration field confusing for completed transactions
[ https://issues.apache.org/jira/browse/KAFKA-15546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773425#comment-17773425 ] Justine Olshan commented on KAFKA-15546: Hey Giovanni. The API does not currently support sending two timestamps, so this change would require a KIP to update the API. I don't think we store a txn commit time in the metadata either. I can look into whether the last updated time maps to this, but if it does not, I'm not sure if it makes sense to add another field in the state for that. See current API: https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/DescribeTransactionsResponse.json > Transactions tool duration field confusing for completed transactions > - > > Key: KAFKA-15546 > URL: https://issues.apache.org/jira/browse/KAFKA-15546 > Project: Kafka > Issue Type: Task >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > When using the transactions tool to describe transactions, if the transaction > is completed, its duration will still increase based on when it started. This > value is not correct. Instead, we can leave the duration field blank (since > we don't have the data for the completed transaction in the describe > response). > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15468: Prevent transaction coordinator reloads on already loaded leaders [kafka]
jolshan commented on code in PR #14489: URL: https://github.com/apache/kafka/pull/14489#discussion_r1350579091 ## core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala: ## @@ -515,7 +521,10 @@ class TransactionStateManager(brokerId: Int, * metadata cache with the transactional ids. This operation must be resilient to any partial state left off from * the previous loading / unloading operation. */ - def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: Int, sendTxnMarkers: SendTxnMarkersCallback): Unit = { + def loadTransactionsForTxnTopicPartition(partitionId: Int, + coordinatorEpoch: Int, + sendTxnMarkers: SendTxnMarkersCallback, + hadTransactionStateAlreadyLoaded: Boolean): Unit = { Review Comment: yeah. Something like that. The key is that it is currently loaded when we make the call. Because it's ok to load in the past and unload. We want to make sure we clarify that is is loaded and running. Maybe even like transactionCoordinatorAlreadyRunning. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14596: Move TopicCommand to tools [kafka]
mimaison commented on PR #13201: URL: https://github.com/apache/kafka/pull/13201#issuecomment-1753360551 @OmniaGM Thanks for the update. I've not had time to take a look yet but noticed there's a compilation failure: ``` > Task :tools:compileTestJava /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-13201/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java:78: error: variable TEST_WITH_PARAMETERIZED_QUORUM_NAME is already defined in class ToolsTestUtils public static final String TEST_WITH_PARAMETERIZED_QUORUM_NAME = "{displayName}.quorum={0}"; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15468: Prevent transaction coordinator reloads on already loaded leaders [kafka]
clolov commented on code in PR #14489: URL: https://github.com/apache/kafka/pull/14489#discussion_r1350537983 ## core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala: ## @@ -515,7 +521,10 @@ class TransactionStateManager(brokerId: Int, * metadata cache with the transactional ids. This operation must be resilient to any partial state left off from * the previous loading / unloading operation. */ - def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: Int, sendTxnMarkers: SendTxnMarkersCallback): Unit = { + def loadTransactionsForTxnTopicPartition(partitionId: Int, + coordinatorEpoch: Int, + sendTxnMarkers: SendTxnMarkersCallback, + hadTransactionStateAlreadyLoaded: Boolean): Unit = { Review Comment: Ohhh, I see, maybe hasTransactionStateBeenLoadedBefore? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15521: Refactor build.gradle to align gradle swagger plugin with swagger dependencies [kafka]
ijuma commented on code in PR #14473: URL: https://github.com/apache/kafka/pull/14473#discussion_r1350536831 ## gradle.properties: ## @@ -25,6 +25,9 @@ group=org.apache.kafka # - streams/quickstart/java/pom.xml version=3.7.0-SNAPSHOT scalaVersion=2.13.12 +# Adding swaggerVersion in gradle.properties to have a single version in place for swagger +# New version of Swagger 2.2.14 requires minimum JDK 11. +swaggerVersion=2.2.8 Review Comment: Why didn't we keep this in `dependencies.gradle`? It's confusing to have versions split across multiple files (outside of the Scala version, which is special in a few ways). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15521: Refactor build.gradle to align gradle swagger plugin with swagger dependencies [kafka]
ijuma commented on code in PR #14473: URL: https://github.com/apache/kafka/pull/14473#discussion_r1350536831 ## gradle.properties: ## @@ -25,6 +25,9 @@ group=org.apache.kafka # - streams/quickstart/java/pom.xml version=3.7.0-SNAPSHOT scalaVersion=2.13.12 +# Adding swaggerVersion in gradle.properties to have a single version in place for swagger +# New version of Swagger 2.2.14 requires minimum JDK 11. +swaggerVersion=2.2.8 Review Comment: Why didn't we keep this in `dependencies.gradle`? It's confusing to have a separate file for versions (outside of the Scala version, which is special in a few ways). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]
jolshan commented on code in PR #1: URL: https://github.com/apache/kafka/pull/1#discussion_r1350511364 ## clients/src/main/resources/common/message/ProduceResponse.json: ## @@ -32,7 +32,9 @@ // records that cause the whole batch to be dropped. See KIP-467 for details. // // Version 9 enables flexible versions. - "validVersions": "0-9", + // + // Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields Review Comment: Are we in agreement then that we should always bump the tagged field then? Unless we explicitly plan to backport the changes (ie what Jeff did for the consumer group changes?) I had some folks ask me what the correct protocol is, and it would be nice to give consistent answers ## clients/src/main/resources/common/message/ProduceResponse.json: ## @@ -32,7 +32,9 @@ // records that cause the whole batch to be dropped. See KIP-467 for details. // // Version 9 enables flexible versions. - "validVersions": "0-9", + // + // Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields Review Comment: Are we in agreement then that we should always bump the tagged field then? Unless we explicitly plan to backport the changes (ie what Jeff did for the consumer group changes?) I had some folks ask me what the correct protocol is, and it would be nice to give consistent answers -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15325) Integrate topicId in OffsetFetch and OffsetCommit async consumer calls
[ https://issues.apache.org/jira/browse/KAFKA-15325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-15325: --- Labels: kip-848 kip-848-client-support (was: kip-848 kip-848-client-support kip-848-preview) > Integrate topicId in OffsetFetch and OffsetCommit async consumer calls > -- > > Key: KAFKA-15325 > URL: https://issues.apache.org/jira/browse/KAFKA-15325 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848, kip-848-client-support > > KIP-848 introduces support for topicIds in the OffsetFetch and OffsetCommit > APIs. The consumer calls to those APIs should be updated to include topicIds > when available. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15468: Prevent transaction coordinator reloads on already loaded leaders [kafka]
jolshan commented on code in PR #14489: URL: https://github.com/apache/kafka/pull/14489#discussion_r1350505733 ## metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java: ## @@ -26,19 +26,22 @@ public final class LocalReplicaChanges { private final Set deletes; -private final Map leaders; +private final Map electedLeaders; +private final Map updatedLeaders; Review Comment: I struggled with naming this quite a bit . I was also wondering if I should make it clearer abou leader epoch changes vs partition epoch changes. The thing that is tricky is that the map only contains the leaders that experienced the changes (not followers) so I also wanted to make that clear. I will also think on that some more. ## metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java: ## @@ -26,19 +26,22 @@ public final class LocalReplicaChanges { private final Set deletes; -private final Map leaders; +private final Map electedLeaders; +private final Map updatedLeaders; Review Comment: I struggled with naming this quite a bit . I was also wondering if I should make it clearer abou leader epoch changes vs partition epoch changes. The thing that is tricky is that the map only contains the leaders that experienced the changes (not followers) so I also wanted to make that clear. I will also think on that some more. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15468: Prevent transaction coordinator reloads on already loaded leaders [kafka]
jolshan commented on code in PR #14489: URL: https://github.com/apache/kafka/pull/14489#discussion_r1350503797 ## core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala: ## @@ -515,7 +521,10 @@ class TransactionStateManager(brokerId: Int, * metadata cache with the transactional ids. This operation must be resilient to any partial state left off from * the previous loading / unloading operation. */ - def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: Int, sendTxnMarkers: SendTxnMarkersCallback): Unit = { + def loadTransactionsForTxnTopicPartition(partitionId: Int, + coordinatorEpoch: Int, + sendTxnMarkers: SendTxnMarkersCallback, + hadTransactionStateAlreadyLoaded: Boolean): Unit = { Review Comment: I actually had it this way before, but we load in this method (and check afterwards) so I thought "had" conveyed that this happened before the method. I will think a bit more about this because there may be a better way to phrase. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add javadoc to all ConfigDef.Types values [kafka]
mimaison commented on code in PR #14515: URL: https://github.com/apache/kafka/pull/14515#discussion_r1350502430 ## clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java: ## @@ -802,11 +802,63 @@ else if (value instanceof Class) } /** - * The config types + * The type for a configuration value */ public enum Type { -BOOLEAN, STRING, INT, SHORT, LONG, DOUBLE, LIST, CLASS, PASSWORD; +/** + * Used for boolean values. Values can be provided as a Boolean object or as a String with values + * true or false (this is not case-sensitive), otherwise a {@link ConfigException} is + * thrown. + */ +BOOLEAN, +/** + * Used for string values. Values must be provided as a String object, otherwise a {@link ConfigException} is + * thrown. + */ +STRING, +/** + * Used for numerical values within the Java Integer range. Values must be provided as a Integer object or as + * a String being a valid Integer value, otherwise a {@link ConfigException} is thrown. + */ +INT, +/** + * Used for numerical values within the Java Short range. Values must be provided as a Short object or as + * a String being a valid Short value, otherwise a {@link ConfigException} is thrown. + */ +SHORT, +/** + * Used for numerical values within the Java Long range. Values must be provided as a Long object, as an Integer + * object or as a String being a valid Long value, otherwise a {@link ConfigException} is thrown. + */ +LONG, +/** + * Used for numerical values within the Java Double range. Values must be provided as a Number object, as a + * Double object or as a String being a valid Double value, otherwise a {@link ConfigException} is thrown. + */ +DOUBLE, +/** + * Used for list values. Values must be provided as a List object, as a String object, otherwise a + * {@link ConfigException} is thrown. When the value is provided as a String it must use commas to separate the + * different entries (for example: first-entry, second-entry) and an empty String maps to an empty List. + */ +LIST, +/** + * Used for values that implement a Kafka interface. Values must be provided as a Class object or as a + * String object, otherwise a {@link ConfigException} is thrown. When the value is provided as a String it must + * be the binary name of the Class. + */ +CLASS, +/** + * Used for string values containing sensitive data such as a password or key. The values of configurations with + * of this type are not included in logs and instead replaced with "[hidden]". Values must be provided as a + * String object, otherwise a {@link ConfigException} is thrown. Review Comment: Technically you can also provide a `org.apache.kafka.common.config.types.Password` object but `Password` is not part of the public API so I omit that option. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Add javadoc to all ConfigDef.Types values [kafka]
mimaison commented on code in PR #14515: URL: https://github.com/apache/kafka/pull/14515#discussion_r1350501171 ## clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java: ## @@ -57,7 +57,7 @@ * defs.define(config_with_default, Type.STRING, default string value, Importance.High, Configuration with default value.); * // check {@link #define(String, Type, Object, Validator, Importance, String)} for more details. * defs.define(config_with_validator, Type.INT, 42, Range.atLeast(0), Importance.High, Configuration with user provided validator.); - * // check {@link #define(String, Type, Importance, String, String, int, Width, String, List)} for more details. + * // check {@link #define(String, Type, Importance, String, String, int, Width, String, List) define(String, Type, Importance, String, String, int, Width, String, ListString)} for more details. Review Comment: This fixes warnings when generating javadoc on this file ``` ConfigDef.java:81: warning - Tag @link:illegal character: "60" in "#define(String, Type, Importance, String, String, int, Width, String, List)" ConfigDef.java:81: warning - Tag @link:illegal character: "62" in "#define(String, Type, Importance, String, String, int, Width, String, List)" ConfigDef.java:81: warning - Tag @link: can't find define(String, Type, Importance, String, String, int, Width, String, List) in org.apache.kafka.common.config.ConfigDef ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Add javadoc to all ConfigDef.Types values [kafka]
mimaison opened a new pull request, #14515: URL: https://github.com/apache/kafka/pull/14515 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15568) Use incrementalAlterConfigs to update the dynamic config of broker in ConfigCommand tool
[ https://issues.apache.org/jira/browse/KAFKA-15568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aman Singh updated KAFKA-15568: --- Description: As part of [this](https://cwiki.apache.org/confluence/display/KAFKA/KIP-339%3A+Create+a+new+IncrementalAlterConfigs+API) `incrementalAlterConfigs` api was introduced to change any config dynamically. `kakfa-config.sh (CommandConfig)` still uses `alterConfig` to update the config. It has below issue - > Use incrementalAlterConfigs to update the dynamic config of broker in > ConfigCommand tool > > > Key: KAFKA-15568 > URL: https://issues.apache.org/jira/browse/KAFKA-15568 > Project: Kafka > Issue Type: Improvement >Reporter: Aman Singh >Priority: Major > > As part of > [this](https://cwiki.apache.org/confluence/display/KAFKA/KIP-339%3A+Create+a+new+IncrementalAlterConfigs+API) > > `incrementalAlterConfigs` api was introduced to change any config > dynamically. > `kakfa-config.sh (CommandConfig)` still uses `alterConfig` to update the > config. It has below issue > - -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-13988: Fix MM2 not consuming from latest when "auto.offset.reset=latest" is set [kafka]
electrical commented on PR #13905: URL: https://github.com/apache/kafka/pull/13905#issuecomment-1753259175 @C0urante the other PR is **https://github.com/apache/kafka/pull/12358** So far it seems it was reviewed and approved but no movement from that point on. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests [kafka]
dajac merged PR #14364: URL: https://github.com/apache/kafka/pull/14364 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests [kafka]
dajac commented on PR #14364: URL: https://github.com/apache/kafka/pull/14364#issuecomment-1753238153 If we combine the last two builds, I am confident that the changes are good so I will merge it to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-15568) Use incrementalAlterConfigs to update the dynamic config of broker in ConfigCommand tool
Aman Singh created KAFKA-15568: -- Summary: Use incrementalAlterConfigs to update the dynamic config of broker in ConfigCommand tool Key: KAFKA-15568 URL: https://issues.apache.org/jira/browse/KAFKA-15568 Project: Kafka Issue Type: Improvement Reporter: Aman Singh -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14927) Dynamic configs not validated when using kafka-configs and --add-config-file
[ https://issues.apache.org/jira/browse/KAFKA-14927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773365#comment-17773365 ] Aman Singh commented on KAFKA-14927: Raised the PR to add validation https://github.com/apache/kafka/pull/14514 as suggested by [~cmccabe] > Dynamic configs not validated when using kafka-configs and --add-config-file > > > Key: KAFKA-14927 > URL: https://issues.apache.org/jira/browse/KAFKA-14927 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 3.3.2 >Reporter: Justin Daines >Assignee: José Armando García Sancio >Priority: Minor > Labels: 4.0-blocker > > Using {{kafka-configs}} should validate dynamic configurations before > applying. It is possible to send a file with invalid configurations. > For example a file containing the following: > {code:java} > { > "routes": { > "crn:///kafka=*": { > "management": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events-denied" > }, > "describe": { > "allowed": "", > "denied": "confluent-audit-log-events-denied" > }, > "authentication": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events-denied-authn" > }, > "authorize": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events-denied-authz" > }, > "interbroker": { > "allowed": "", > "denied": "" > } > }, > "crn:///kafka=*/group=*": { > "consume": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events" > } > }, > "crn:///kafka=*/topic=*": { > "produce": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events" > }, > "consume": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events" > } > } > }, > "destinations": { > "topics": { > "confluent-audit-log-events": { > "retention_ms": 777600 > }, > "confluent-audit-log-events-denied": { > "retention_ms": 777600 > }, > "confluent-audit-log-events-denied-authn": { > "retention_ms": 777600 > }, > "confluent-audit-log-events-denied-authz": { > "retention_ms": 777600 > }, > "confluent-audit-log-events_audit": { > "retention_ms": 777600 > } > } > }, > "default_topics": { > "allowed": "confluent-audit-log-events_audit", > "denied": "confluent-audit-log-events" > }, > "excluded_principals": [ > "User:schemaregistryUser", > "User:ANONYMOUS", > "User:appSA", > "User:admin", > "User:connectAdmin", > "User:connectorSubmitter", > "User:connectorSA", > "User:schemaregistryUser", > "User:ksqlDBAdmin", > "User:ksqlDBUser", > "User:controlCenterAndKsqlDBServer", > "User:controlcenterAdmin", > "User:restAdmin", > "User:appSA", > "User:clientListen", > "User:superUser" > ] > } {code} > {code:java} > kafka-configs --bootstrap-server $KAFKA_BOOTSTRAP --entity-type brokers > --entity-default --alter --add-config-file audit-log.json {code} > Yields the following dynamic configs: > {code:java} > Default configs for brokers in the cluster are: > "destinations"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"destinations"=null} > "confluent-audit-log-events-denied-authn"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events-denied-authn"=null} > "routes"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"routes"=null} > "User=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"User=null} > },=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:},=null} > "excluded_principals"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"excluded_principals"=null} > "confluent-audit-log-events_audit"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events_audit"=null} > "authorize"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"authorize"=null} > "default_topics"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"default_topics"=null} > "topics"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"topics"=null} > ]=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:]=null} > "interbroker"=null sensitive=true > synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"interbroker"=null} > "produce"=null sensitive=true >
[PR] KAFKA-14927: Add validation to be config keys in ConfigCommand tool [kafka]
singhnama opened a new pull request, #14514: URL: https://github.com/apache/kafka/pull/14514 Added validation in ConfigCommand tool, only allow characters '([a-z][A-Z][0-9][._-])*' for config keys. Jira: https://issues.apache.org/jira/browse/KAFKA-14927 (Contains more details about why it's required) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-15567 Fix benchmark code to work [kafka]
ocadaruma opened a new pull request, #14513: URL: https://github.com/apache/kafka/pull/14513 ## Summary - Currently, ReplicaFetcherThreadBenchmark doesn't run due to 2 reasons: * NPE that is introduced by https://github.com/apache/kafka/commit/755e04a41dfd00dd4587b0fe0980befd0ae5c433 - ZkMetadataCache instantiation causes NPE because we pass `null` for `kraftControllerNodes` but it should be `Seq.empty` * NotLeaderOrFollowerException that is introduced by https://github.com/apache/kafka/commit/3467036e017adc3ac0919bbc0c067b1bb1b621f3 - Even we address above NPE issue, we still get NotLeaderOrFollowerException - ``` org.apache.kafka.common.errors.NotLeaderOrFollowerException: Error while fetching partition state for topic-30 ``` - This is because replicaManager doesn't host partitions when RemoteLeaderEndPoint tries to build fetches - This PR addresses these problems - NPE issue on ZkMetadataCache instantiation also occurs in other several benchmarks. This PR also fix these benchmarks. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-13988: Fix MM2 not consuming from latest when "auto.offset.reset=latest" is set [kafka]
C0urante commented on PR #13905: URL: https://github.com/apache/kafka/pull/13905#issuecomment-1753124395 @electrical can you link to the second PR? I've reviewed this one and am waiting on the author. If a second author is more responsive, we may be able to fix this issue sooner. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-13988: Fix MM2 not consuming from latest when "auto.offset.reset=latest" is set [kafka]
electrical commented on PR #13905: URL: https://github.com/apache/kafka/pull/13905#issuecomment-1753118374 Hi, Seems there are now 2 PR's open to solve the same issue but both are not moving forward at all and is blocking a lot of people of moving forward to migrate to MM2. I really hope someone will push this over the finish line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-15514) Controller-side replica management changes
[ https://issues.apache.org/jira/browse/KAFKA-15514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez updated KAFKA-15514: Description: The new "Assignments" field replaces the "Replicas" field in PartitionRecord and PartitionChangeRecord. Any changes to partitions need to consider both fields. * ISR updates * Partiton reassignments & reverts * Partition creation was: The new "Assignments" field replaces the "Replicas" field in PartitionRecord and PartitionChangeRecord. On the controller side, any changes to partitions need to consider both fields. * ISR updates * Partiton reassignments & reverts * Partition creation > Controller-side replica management changes > -- > > Key: KAFKA-15514 > URL: https://issues.apache.org/jira/browse/KAFKA-15514 > Project: Kafka > Issue Type: Sub-task >Reporter: Igor Soarez >Priority: Major > > The new "Assignments" field replaces the "Replicas" field in PartitionRecord > and PartitionChangeRecord. > Any changes to partitions need to consider both fields. > * ISR updates > * Partiton reassignments & reverts > * Partition creation -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15514) Replica management changes
[ https://issues.apache.org/jira/browse/KAFKA-15514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez updated KAFKA-15514: Summary: Replica management changes (was: Controller-side replica management changes) > Replica management changes > -- > > Key: KAFKA-15514 > URL: https://issues.apache.org/jira/browse/KAFKA-15514 > Project: Kafka > Issue Type: Sub-task >Reporter: Igor Soarez >Priority: Major > > The new "Assignments" field replaces the "Replicas" field in PartitionRecord > and PartitionChangeRecord. > Any changes to partitions need to consider both fields. > * ISR updates > * Partiton reassignments & reverts > * Partition creation -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15514) Controller-side replica management changes
[ https://issues.apache.org/jira/browse/KAFKA-15514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez updated KAFKA-15514: Summary: Controller-side replica management changes (was: Replica management changes) > Controller-side replica management changes > -- > > Key: KAFKA-15514 > URL: https://issues.apache.org/jira/browse/KAFKA-15514 > Project: Kafka > Issue Type: Sub-task >Reporter: Igor Soarez >Priority: Major > > The new "Assignments" field replaces the "Replicas" field in PartitionRecord > and PartitionChangeRecord. > Any changes to partitions need to consider both fields. > * ISR updates > * Partiton reassignments & reverts > * Partition creation -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15567) ReplicaFetcherThreadBenchmark is not working
Haruki Okada created KAFKA-15567: Summary: ReplicaFetcherThreadBenchmark is not working Key: KAFKA-15567 URL: https://issues.apache.org/jira/browse/KAFKA-15567 Project: Kafka Issue Type: Improvement Reporter: Haruki Okada Assignee: Haruki Okada * ReplicaFetcherThreadBenchmark is not working as of current trunk (https://github.com/apache/kafka/tree/c223a9c3761f796468ccfdae9e177e764ab6a965) {code:java} % jmh-benchmarks/jmh.sh ReplicaFetcherThreadBenchmark (snip) java.lang.NullPointerException at kafka.server.metadata.ZkMetadataCache.(ZkMetadataCache.scala:89) at kafka.server.MetadataCache.zkMetadataCache(MetadataCache.scala:120) at org.apache.kafka.jmh.fetcher.ReplicaFetcherThreadBenchmark.setup(ReplicaFetcherThreadBenchmark.java:220) at org.apache.kafka.jmh.fetcher.jmh_generated.ReplicaFetcherThreadBenchmark_testFetcher_jmhTest._jmh_tryInit_f_replicafetcherthreadbenchmark0_G(ReplicaFetcherThreadBenchmark_testFetcher_jmhTest.java:448) at org.apache.kafka.jmh.fetcher.jmh_generated.ReplicaFetcherThreadBenchmark_testFetcher_jmhTest.testFetcher_AverageTime(ReplicaFetcherThreadBenchmark_testFetcher_jmhTest.java:164) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.openjdk.jmh.runner.BenchmarkHandler$BenchmarkTask.call(BenchmarkHandler.java:527) at org.openjdk.jmh.runner.BenchmarkHandler$BenchmarkTask.call(BenchmarkHandler.java:504) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15468: Prevent transaction coordinator reloads on already loaded leaders [kafka]
clolov commented on code in PR #14489: URL: https://github.com/apache/kafka/pull/14489#discussion_r1350305894 ## core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala: ## @@ -392,6 +392,12 @@ class TransactionStateManager(brokerId: Int, } } + private[transaction] def hasTxnStateLoaded(partitionId: Int): Boolean = { +inReadLock(stateLock) { + !transactionMetadataCache.get(partitionId).isEmpty Review Comment: ```suggestion transactionMetadataCache.contains(partitionId) ``` ## core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala: ## @@ -524,23 +533,35 @@ class TransactionStateManager(brokerId: Int, } def loadTransactions(startTimeMs: java.lang.Long): Unit = { - val schedulerTimeMs = time.milliseconds() - startTimeMs - info(s"Loading transaction metadata from $topicPartition at epoch $coordinatorEpoch") - validateTransactionTopicPartitionCountIsStable() - - val loadedTransactions = loadTransactionMetadata(topicPartition, coordinatorEpoch) - val endTimeMs = time.milliseconds() - val totalLoadingTimeMs = endTimeMs - startTimeMs - partitionLoadSensor.record(totalLoadingTimeMs.toDouble, endTimeMs, false) - info(s"Finished loading ${loadedTransactions.size} transaction metadata from $topicPartition in " + -s"$totalLoadingTimeMs milliseconds, of which $schedulerTimeMs milliseconds was spent in the scheduler.") + val maybeLoadedTransactions = +if (!hadTransactionStateAlreadyLoaded) { Review Comment: ```suggestion if (!hasTransactionStateAlreadyLoaded) { ``` ## core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala: ## @@ -875,18 +875,35 @@ class TransactionStateManagerTest { val startOffset = 0L val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, txnRecords.toArray: _*) +val transactionsWithPendingMarkers = new ConcurrentHashMap[String, PendingCompleteTxn] +def sendMarkers(coordinatorEpoch: Int, +txnResult: TransactionResult, +txnMetadata: TransactionMetadata, +newMetadata: TxnTransitMetadata): Unit = { + val transactionalId = txnMetadata.transactionalId + val pendingCompleteTxn = PendingCompleteTxn( +transactionalId, +coordinatorEpoch, +txnMetadata, +newMetadata) + + transactionsWithPendingMarkers.put(transactionalId, pendingCompleteTxn) +} + prepareTxnLog(topicPartition, 0, records) // immigrate partition at epoch 0 -transactionManager.loadTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch = 0, (_, _, _, _) => ()) +transactionManager.loadTransactionsForTxnTopicPartition(partitionId, coordinatorEpoch = 0, sendMarkers, false) Review Comment: Since this is a Scala test class can you use named arguments to improve readability similar to the argument `coordinatorEpoch = 0`? ## metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java: ## @@ -218,7 +218,11 @@ public void testBasicLocalChanges() { ); assertEquals( new HashSet<>(Arrays.asList(new TopicPartition("baz", 0))), -changes.leaders().keySet() +changes.electedLeaders().keySet() +); +assertEquals( +new HashSet<>(Arrays.asList(new TopicPartition("baz", 0))), Review Comment: ```suggestion new HashSet<>(Arrays.asList(new TopicPartition("baz", 0))), ``` ## metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java: ## @@ -269,14 +273,49 @@ public void testDeleteAfterChanges() { LocalReplicaChanges changes = delta.localChanges(localId); assertEquals(new HashSet<>(Arrays.asList(new TopicPartition("zoo", 0))), changes.deletes()); -assertEquals(Collections.emptyMap(), changes.leaders()); +assertEquals(Collections.emptyMap(), changes.electedLeaders()); +assertEquals(Collections.emptyMap(), changes.updatedLeaders()); assertEquals(Collections.emptyMap(), changes.followers()); TopicsImage finalImage = delta.apply(); List imageRecords = getImageRecords(image); imageRecords.addAll(topicRecords); testToImage(finalImage, Optional.of(imageRecords)); } +@Test Review Comment: ```suggestion @Test ``` ## metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java: ## @@ -26,19 +26,22 @@ public final class LocalReplicaChanges { private final Set deletes; -private final Map leaders; +private final Map electedLeaders; +private final Map updatedLeaders; Review Comment: Does it make sense to change these names to tpToPartitionEpochs and tpToLeaderEpochs? I can anticipate this naming being
Re: [PR] KAFKA-15507: Make AdminClient throw non-retriable exception for a new call while closing [kafka]
tinaselenge commented on PR #14455: URL: https://github.com/apache/kafka/pull/14455#issuecomment-1753035679 Thanks everyone for reviewing the PR. @showuon, I have updated the failing tests. 2 other tests failed in the last build, but they seem unrelated and passed when running locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
dajac commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1350279208 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationCondition.java: ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +/** + * An offset is considered expired based on different factors, such as the state of the group + * and/or the GroupMetadata record version (for generic groups). This class is used to check + * how offsets for the group should be expired. + */ +public interface OffsetExpirationCondition { + +/** + * Given an offset metadata and offsets retention, return whether the offset is expired or not. + * + * @param offset The offset metadata. + * @param currentTimestamp The current timestamp. + * @param offsetsRetentionMs The offset retention. + * + * @return Whether the offset is considered expired or not. + */ +boolean isOffsetExpired(OffsetAndMetadata offset, long currentTimestamp, long offsetsRetentionMs); +} Review Comment: This was closed but not addressed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Only commit running active and standby tasks when tasks corrupted [kafka]
cadonna commented on code in PR #14508: URL: https://github.com/apache/kafka/pull/14508#discussion_r1350271668 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -223,10 +223,7 @@ boolean handleCorruption(final Set corruptedTasks) { final Collection tasksToCommit = allTasks() .values() .stream() -// TODO: once we remove state restoration from the stream thread, we can also remove -// the RESTORING state here, since there will not be any restoring tasks managed -// by the stream thread anymore. -.filter(t -> t.state() == Task.State.RUNNING || t.state() == Task.State.RESTORING) +.filter(t -> t.state() == Task.State.RUNNING) Review Comment: I do not think that this is needed. Don't you agree that restoring active tasks do not need to be committed -- with or without state updater. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
dajac commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1350261943 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets( .setTopics(topicResponses); } +/** + * Remove expired offsets for group. + * + * @param groupId The group id. + * @param records The list of records to populate with offset commit tombstone records. + * @param offsetsRetentionMs The offset retention in milliseconds. + * + * @return The group id if the group no longer has any offsets remaining, empty otherwise. + */ +public Optional cleanupExpiredOffsets(String groupId, List records, long offsetsRetentionMs) { +TimelineHashMap> offsetsByTopic = offsetsByGroup.get(groupId); +if (offsetsByTopic == null) { +return Optional.of(groupId); +} +try { +Group group = groupMetadataManager.group(groupId); +ExpirationCondition expirationCondition = group.expirationCondition(); +Set expiredPartitions = new HashSet<>(); +long currentTimestamp = time.milliseconds(); +AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true); +offsetsByTopic.forEach((topic, partitions) -> { +if (!expirationCondition.subscribedTopics.contains(topic)) { Review Comment: I am also confused by the fact that you did the change. Did you do it because it is correct or by mistake? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]
dajac commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1350261109 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ## @@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets( .setTopics(topicResponses); } +/** + * Remove expired offsets for group. + * + * @param groupId The group id. + * @param records The list of records to populate with offset commit tombstone records. + * @param offsetsRetentionMs The offset retention in milliseconds. + * + * @return The group id if the group no longer has any offsets remaining, empty otherwise. + */ +public Optional cleanupExpiredOffsets(String groupId, List records, long offsetsRetentionMs) { +TimelineHashMap> offsetsByTopic = offsetsByGroup.get(groupId); +if (offsetsByTopic == null) { +return Optional.of(groupId); +} +try { +Group group = groupMetadataManager.group(groupId); +ExpirationCondition expirationCondition = group.expirationCondition(); +Set expiredPartitions = new HashSet<>(); +long currentTimestamp = time.milliseconds(); +AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true); +offsetsByTopic.forEach((topic, partitions) -> { +if (!expirationCondition.subscribedTopics.contains(topic)) { Review Comment: Hum.. I am not sure to follow. In this case, isn't `subscribedTopics` going to be set to an optional containing an empty list? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15452: Access SslPrincipalMapper and kerberosShortNamer in Custom KafkaPrincipalBuilder(KIP-982) [kafka]
plazma-prizma commented on code in PR #14491: URL: https://github.com/apache/kafka/pull/14491#discussion_r1350253419 ## clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java: ## @@ -240,12 +242,19 @@ public static KafkaPrincipalBuilder createPrincipalBuilder(Map config KerberosShortNamer kerberosShortNamer, SslPrincipalMapper sslPrincipalMapper) { Class principalBuilderClass = (Class) configs.get(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG); -final KafkaPrincipalBuilder builder; +KafkaPrincipalBuilder builder; if (principalBuilderClass == null || principalBuilderClass == DefaultKafkaPrincipalBuilder.class) { builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer, sslPrincipalMapper); } else if (KafkaPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) { -builder = (KafkaPrincipalBuilder) Utils.newInstance(principalBuilderClass); +try { +Constructor constructor = principalBuilderClass.getConstructor(KerberosShortNamer.class, SslPrincipalMapper.class); +builder = (KafkaPrincipalBuilder) constructor.newInstance(kerberosShortNamer, sslPrincipalMapper); Review Comment: This looks as if it is solving a problem for a certain Principal type. Not all of the principals have to use KerberosShortNamer, it is related if the credentials are provided by the Kerberos authentication. Why don't you define a subclass of KerberosPrincipalBuilder instead and provide these details only to that one? Moreover, you could even define SSLPrincipalBuilder as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] wip: investigate Flaky tests [kafka]
dengziming opened a new pull request, #14512: URL: https://github.com/apache/kafka/pull/14512 *More detailed description of your change* *Summary of testing strategy (including rationale)* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]
iit2009060 commented on PR #14482: URL: https://github.com/apache/kafka/pull/14482#issuecomment-1752910354 Unrelated test failures https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14482/14/tests @showuon @divijvaidya can we merge this changes ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org