Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]

2023-10-09 Thread via GitHub


iit2009060 commented on code in PR #14482:
URL: https://github.com/apache/kafka/pull/14482#discussion_r1351514017


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -540,6 +545,8 @@ class RemoteIndexCacheTest {
   "Failed to mark cache entry for cleanup after resizing cache.")
 TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted,
   "Failed to cleanup cache entry after resizing cache.")
+TestUtils.waitUntilTrue(() => cacheEntry.isCleanFinished,
+  "Failed to finish cleanup cache entry after resizing cache.")
 
 // verify no index files on remote cache dir
 TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,

Review Comment:
   @hudeqi  Can you resolve the above comment ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]

2023-10-09 Thread via GitHub


showuon commented on code in PR #14482:
URL: https://github.com/apache/kafka/pull/14482#discussion_r1351504132


##
core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala:
##
@@ -540,6 +545,8 @@ class RemoteIndexCacheTest {
   "Failed to mark cache entry for cleanup after resizing cache.")
 TestUtils.waitUntilTrue(() => cacheEntry.isCleanStarted,
   "Failed to cleanup cache entry after resizing cache.")
+TestUtils.waitUntilTrue(() => cacheEntry.isCleanFinished,
+  "Failed to finish cleanup cache entry after resizing cache.")
 
 // verify no index files on remote cache dir
 TestUtils.waitUntilTrue(() => 
!getIndexFileFromRemoteCacheDir(LogFileUtils.INDEX_FILE_SUFFIX).isPresent,

Review Comment:
   @iit2009060 @hudeqi , adding an `isCleanFinished` flag just for test is not 
a good solution. In this case, could we catch the exception in the 
`getIndexFileFromRemoteCacheDir`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15514) Controller-side replica management changes

2023-10-09 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez reassigned KAFKA-15514:
---

Assignee: Igor Soarez

> Controller-side replica management changes
> --
>
> Key: KAFKA-15514
> URL: https://issues.apache.org/jira/browse/KAFKA-15514
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Igor Soarez
>Assignee: Igor Soarez
>Priority: Major
>
> The new "Assignments" field replaces the "Replicas" field in PartitionRecord 
> and PartitionChangeRecord.
> Any changes to partitions need to consider both fields.
>  * ISR updates
>  * Partiton reassignments & reverts
>  * Partition creation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [TEST] Run builds for PR #14364 [kafka]

2023-10-09 Thread via GitHub


philipnee closed pull request #14509: [TEST] Run builds for PR #14364
URL: https://github.com/apache/kafka/pull/14509


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]

2023-10-09 Thread via GitHub


iit2009060 commented on PR #14482:
URL: https://github.com/apache/kafka/pull/14482#issuecomment-1754314250

   @showuon  Can we merge this changes ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Rename log dir UUIDs [kafka]

2023-10-09 Thread via GitHub


soarez opened a new pull request, #14517:
URL: https://github.com/apache/kafka/pull/14517

   After a late discussion in the voting thread for KIP-858 we decided to 
improve the names for the designated reserved log directory UUID values.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14879: Update system tests to use latest versions [kafka]

2023-10-09 Thread via GitHub


github-actions[bot] commented on PR #13528:
URL: https://github.com/apache/kafka/pull/13528#issuecomment-1754300677

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-10733: Clean up producer exceptions [kafka]

2023-10-09 Thread via GitHub


github-actions[bot] commented on PR #13876:
URL: https://github.com/apache/kafka/pull/13876#issuecomment-1754300502

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15514: Metadata records Replicas->Assignment [kafka]

2023-10-09 Thread via GitHub


soarez opened a new pull request, #14516:
URL: https://github.com/apache/kafka/pull/14516

   The new "Assignments" field replaces the "Replicas" field in PartitionRecord 
and PartitionChangeRecord.
   
   Depends on #14290 - 
[KAFKA-15355](https://issues.apache.org/jira/browse/KAFKA-15355)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-09 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1351064502


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java:
##
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.GroupRebalanceConfig;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
+import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventProcessor;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+
+import java.io.Closeable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState;
+import static org.apache.kafka.common.utils.Utils.closeQuietly;
+import static org.mockito.Mockito.spy;
+
+public class ConsumerTestBuilder implements Closeable {
+
+static final long RETRY_BACKOFF_MS = 80;
+static final long RETRY_BACKOFF_MAX_MS = 1000;
+static final int REQUEST_TIMEOUT_MS = 500;
+
+final LogContext logContext = new LogContext();
+final Time time = new MockTime(0);
+public final BlockingQueue applicationEventQueue;
+public final BlockingQueue backgroundEventQueue;
+final ConsumerConfig config;
+final long retryBackoffMs;
+final SubscriptionState subscriptions;
+final ConsumerMetadata metadata;
+final FetchConfig fetchConfig;
+final Metrics metrics;
+final FetchMetricsManager metricsManager;
+final NetworkClientDelegate networkClientDelegate;
+final OffsetsRequestManager offsetsRequestManager;
+final CoordinatorRequestManager coordinatorRequestManager;
+final CommitRequestManager commitRequestManager;
+final TopicMetadataRequestManager topicMetadataRequestManager;
+final FetchRequestManager fetchRequestManager;
+final RequestManagers requestManagers;
+public final ApplicationEventProcessor applicationEventProcessor;
+public final BackgroundEventProcessor backgroundEventProcessor;
+public final BackgroundEventHandler backgroundEventHandler;
+final MockClient client;
+
+public ConsumerTestBuilder() {
+this.applicationEventQueue = new LinkedBlockingQueue<>();
+this.backgroundEventQueue = new LinkedBlockingQueue<>();
+this.backgroundEventHandler = new BackgroundEventHandler(logContext, 
backgroundEventQueue);
+GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
+100,
+100,
+100,
+"group_id",
+   

Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-09 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1351063405


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.LogTruncationException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.TopicMetadataApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@SuppressWarnings("ClassDataAbstractionCoupling")
+public class ConsumerNetworkThreadTest {
+
+private ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder testBuilder;
+private Time time;
+private ConsumerMetadata metadata;
+private NetworkClientDelegate networkClient;
+private BlockingQueue applicationEventsQueue;
+private ApplicationEventProcessor applicationEventProcessor;
+private CoordinatorRequestManager coordinatorManager;
+private OffsetsRequestManager offsetsRequestManager;
+private CommitRequestManager commitManager;
+private ConsumerNetworkThread consumerNetworkThread;
+private MockClient client;
+
+@BeforeEach
+public void setup() {
+testBuilder = new 
ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder();
+time = testBuilder.time;
+metadata = testBuilder.metadata;
+networkClient = testBuilder.networkClientDelegate;
+client = testBuilder.client;
+applicationEventsQueue = testBuilder.applicationEventQueue;
+applicationEventProcessor = testBuilder.applicationEventProcessor;
+coordinatorManager = testBuilder.coordinatorRequestManager;
+commitManager = testBuilder.commitRequestManager;
+offsetsRequestManager = testBuilder.offsetsRequestManager;
+consumerNetworkThread = 

Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-09 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1351054656


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.LogTruncationException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.TopicMetadataApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@SuppressWarnings("ClassDataAbstractionCoupling")
+public class ConsumerNetworkThreadTest {
+
+private ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder testBuilder;
+private Time time;
+private ConsumerMetadata metadata;
+private NetworkClientDelegate networkClient;
+private BlockingQueue applicationEventsQueue;
+private ApplicationEventProcessor applicationEventProcessor;
+private CoordinatorRequestManager coordinatorManager;
+private OffsetsRequestManager offsetsRequestManager;
+private CommitRequestManager commitManager;
+private ConsumerNetworkThread consumerNetworkThread;
+private MockClient client;
+
+@BeforeEach
+public void setup() {
+testBuilder = new 
ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder();
+time = testBuilder.time;
+metadata = testBuilder.metadata;
+networkClient = testBuilder.networkClientDelegate;
+client = testBuilder.client;
+applicationEventsQueue = testBuilder.applicationEventQueue;
+applicationEventProcessor = testBuilder.applicationEventProcessor;
+coordinatorManager = testBuilder.coordinatorRequestManager;
+commitManager = testBuilder.commitRequestManager;
+offsetsRequestManager = testBuilder.offsetsRequestManager;
+consumerNetworkThread = 

Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-09 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1351042141


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.LogTruncationException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import 
org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
+import org.apache.kafka.clients.consumer.internals.events.FetchEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ListOffsetsApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ResetPositionsApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.TopicMetadataApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ValidatePositionsApplicationEvent;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@SuppressWarnings("ClassDataAbstractionCoupling")
+public class ConsumerNetworkThreadTest {
+
+private ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder testBuilder;
+private Time time;
+private ConsumerMetadata metadata;
+private NetworkClientDelegate networkClient;
+private BlockingQueue applicationEventsQueue;
+private ApplicationEventProcessor applicationEventProcessor;
+private CoordinatorRequestManager coordinatorManager;
+private OffsetsRequestManager offsetsRequestManager;
+private CommitRequestManager commitManager;
+private ConsumerNetworkThread consumerNetworkThread;
+private MockClient client;
+
+@BeforeEach
+public void setup() {
+testBuilder = new 
ConsumerTestBuilder.ConsumerNetworkThreadTestBuilder();
+time = testBuilder.time;
+metadata = testBuilder.metadata;
+networkClient = testBuilder.networkClientDelegate;
+client = testBuilder.client;
+applicationEventsQueue = testBuilder.applicationEventQueue;
+applicationEventProcessor = testBuilder.applicationEventProcessor;
+coordinatorManager = testBuilder.coordinatorRequestManager;
+commitManager = testBuilder.commitRequestManager;
+offsetsRequestManager = testBuilder.offsetsRequestManager;
+consumerNetworkThread = 

Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-09 Thread via GitHub


kirktrue commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1351011656


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java:
##
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.function.BiConsumer;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.FetchSessionHandler;
+import org.apache.kafka.clients.NetworkClient;
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
+import 
org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.UnsentRequest;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+
+/**
+ * {@code FetchRequestManager} is responsible for generating {@link 
FetchRequest} that represent the
+ * {@link SubscriptionState#fetchablePartitions(Predicate)} based on the 
user's topic subscription/partition
+ * assignment.
+ */
+public class FetchRequestManager extends AbstractFetch implements 
RequestManager {
+
+private final NetworkClientDelegate networkClientDelegate;
+private final List>> futures;
+
+FetchRequestManager(final LogContext logContext,
+final Time time,
+final ConsumerMetadata metadata,
+final SubscriptionState subscriptions,
+final FetchConfig fetchConfig,
+final FetchMetricsManager metricsManager,
+final NetworkClientDelegate networkClientDelegate) {
+super(logContext, metadata, subscriptions, fetchConfig, 
metricsManager, time);
+this.networkClientDelegate = networkClientDelegate;
+this.futures = new ArrayList<>();
+}
+
+@Override
+protected boolean isUnavailable(Node node) {
+return networkClientDelegate.isUnavailable(node);
+}
+
+@Override
+protected void maybeThrowAuthFailure(Node node) {
+networkClientDelegate.maybeThrowAuthFailure(node);
+}
+
+/**
+ * Adds a new {@link Future future} to the list of futures awaiting 
results. Per the comments on
+ * {@link #handleFetchResponse(Node, FetchSessionHandler.FetchRequestData, 
ClientResponse)}}, there is no
+ * guarantee that this particular future will be provided with a non-empty 
result, but it is guaranteed
+ * to be completed with a result, assuming that it does not time out.
+ *
+ * @param future Future that will be {@link 
CompletableFuture#complete(Object) completed} if not timed out
+ */
+public void requestFetch(CompletableFuture> future) {
+futures.add(future);
+}
+
+/**
+ * {@inheritDoc}
+ */
+@Override
+public PollResult poll(long currentTimeMs) {
+return pollInternal(
+prepareFetchRequests(),
+this::handleFetchResponse,
+this::handleFetchResponse
+);
+}
+
+/**
+ * {@inheritDoc}
+ */
+@Override
+public PollResult pollOnClose() {
+return pollInternal(
+prepareCloseFetchSessionRequests(),
+this::handleCloseFetchSessionResponse,
+this::handleCloseFetchSessionResponse
+);
+}
+
+/**
+ * Creates the {@link PollResult poll result} that contains a list of zero 
or more
+ * {@link FetchRequest.Builder fetch requests}  fetch request},
+ * {@link NetworkClient#send(ClientRequest, long) enqueues/sends it, and 
adds the {@link RequestFuture callback}

Review Comment:
   I removed the latter phrases to reduce confusion.



##

Re: [PR] KAFKA-15221; Fix the race between fetch requests from a rebooted follower. [kafka]

2023-10-09 Thread via GitHub


hachikuji commented on code in PR #14053:
URL: https://github.com/apache/kafka/pull/14053#discussion_r1351005643


##
core/src/main/scala/kafka/cluster/Replica.scala:
##
@@ -98,14 +105,22 @@ class Replica(val brokerId: Int, val topicPartition: 
TopicPartition) extends Log
* fetch request is always smaller than the leader's LEO, which can happen 
if small produce requests are received at
* high frequency.
*/
-  def updateFetchState(
+  def updateFetchStateOrThrow(
 followerFetchOffsetMetadata: LogOffsetMetadata,
 followerStartOffset: Long,
 followerFetchTimeMs: Long,
 leaderEndOffset: Long,
 brokerEpoch: Long
   ): Unit = {
 replicaState.updateAndGet { currentReplicaState =>
+  val cachedBrokerEpoch = if 
(metadataCache.isInstanceOf[KRaftMetadataCache])
+
metadataCache.asInstanceOf[KRaftMetadataCache].getAliveBrokerEpoch(brokerId) 
else Option(-1L)
+  // Fence the update if it provides a stale broker epoch.
+  if (brokerEpoch != -1 && cachedBrokerEpoch.exists(_ > brokerEpoch)) {

Review Comment:
   I was actually debating it. Do we create a race on the leader for a 
restarted broker? A restarted broker will typically not be in the ISR, so 
perhaps a delay for propagation of the registration state would not have any 
adverse effects. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15452: Access SslPrincipalMapper and kerberosShortNamer in Custom KafkaPrincipalBuilder(KIP-982) [kafka]

2023-10-09 Thread via GitHub


rbaddam commented on code in PR #14491:
URL: https://github.com/apache/kafka/pull/14491#discussion_r1350996498


##
clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java:
##
@@ -240,12 +242,19 @@ public static KafkaPrincipalBuilder 
createPrincipalBuilder(Map config

KerberosShortNamer kerberosShortNamer,

SslPrincipalMapper sslPrincipalMapper) {
 Class principalBuilderClass = (Class) 
configs.get(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG);
-final KafkaPrincipalBuilder builder;
+KafkaPrincipalBuilder builder;
 
 if (principalBuilderClass == null || principalBuilderClass == 
DefaultKafkaPrincipalBuilder.class) {
 builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer, 
sslPrincipalMapper);
 } else if 
(KafkaPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
-builder = (KafkaPrincipalBuilder) 
Utils.newInstance(principalBuilderClass);
+try {
+Constructor constructor = 
principalBuilderClass.getConstructor(KerberosShortNamer.class, 
SslPrincipalMapper.class);
+builder = (KafkaPrincipalBuilder) 
constructor.newInstance(kerberosShortNamer, sslPrincipalMapper);

Review Comment:
   @plazma-prizma made the changes as per your inputs, please review when you 
get a chance, Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-09 Thread via GitHub


jolshan commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1350953160


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -1340,4 +1377,119 @@ public void testNonRetryableTimer() throws 
InterruptedException {
 assertEquals(1, cnt.get());
 assertEquals(0, ctx.timer.size());
 }
+
+@Test
+public void testStateChanges() throws Exception {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = mock(MockPartitionWriter.class);
+MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+MockCoordinatorShardBuilderSupplier supplier = 
mock(MockCoordinatorShardBuilderSupplier.class);
+MockCoordinatorShardBuilder builder = 
mock(MockCoordinatorShardBuilder.class);
+MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+GroupCoordinatorRuntimeMetrics runtimeMetrics = 
mock(GroupCoordinatorRuntimeMetrics.class);
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withLoader(loader)
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(supplier)
+.withCoordinatorRuntimeMetrics(runtimeMetrics)
+.build();
+
+when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+when(builder.withLogContext(any())).thenReturn(builder);
+when(builder.withTime(any())).thenReturn(builder);
+when(builder.withTimer(any())).thenReturn(builder);
+when(builder.withTopicPartition(any())).thenReturn(builder);
+when(builder.build()).thenReturn(coordinator);
+when(supplier.get()).thenReturn(builder);
+CompletableFuture future = new 
CompletableFuture<>();
+when(loader.load(TP, coordinator)).thenReturn(future);
+
+// Schedule the loading.
+runtime.scheduleLoadOperation(TP, 0);
+
+// Getting the context succeeds and the coordinator should be in 
loading.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+assertEquals(LOADING, ctx.state);
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(INITIAL, 
LOADING);
+
+// When the loading fails, the coordinator transitions to failed.
+future.completeExceptionally(new Exception("failure"));
+assertEquals(FAILED, ctx.state);
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, 
FAILED);
+
+// Start loading a new topic partition.
+TopicPartition tp = new TopicPartition("__consumer_offsets", 1);
+future = new CompletableFuture<>();
+when(loader.load(tp, coordinator)).thenReturn(future);
+// Schedule the loading.
+runtime.scheduleLoadOperation(tp, 0);
+// Getting the context succeeds and the coordinator should be in 
loading.
+ctx = runtime.contextOrThrow(tp);
+assertEquals(LOADING, ctx.state);
+verify(runtimeMetrics, times(2)).recordPartitionStateChange(INITIAL, 
LOADING);
+
+// When the loading completes, the coordinator transitions to active.
+future.complete(null);
+assertEquals(ACTIVE, ctx.state);
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, 
ACTIVE);
+
+runtime.close();
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(FAILED, 
CLOSED);

Review Comment:
   Oops. I see. Thanks for clarifying. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-09 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1350945079


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -338,7 +338,7 @@ public boolean isControllerRegistrationSupported() {
 
 public short fetchRequestVersion() {
 if (this.isAtLeast(IBP_3_5_IV1)) {
-return 15;
+return 16;

Review Comment:
   Hmm. Is this correct? In the upgrade scenario we will send request version 
16 to brokers that may not have that version yet. I know we just ignore tagged 
fields, but I'm not sure I recall if we can handle version bumps.
   
   If this is always just the latest version, should it be hardcoded?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-09 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1350936703


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,30 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+var leaderId = -1
+var leaderEpoch = -1
+partitionInfoOrError match {
+  case Right(x) =>
+  leaderId = x.leaderReplicaIdOpt.getOrElse(-1)
+  leaderEpoch = x.getLeaderEpoch
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+val partitionInfo = metadataCache.getPartitionInfo(tp.topic, 
tp.partition)
+partitionInfo.foreach { info =>

Review Comment:
   
   ```
   partitionInfo =  partitionInfoOrError match {
 case Right(partitionInfo) =>
partitionInfo
 case Left(error) =>
   debug(s"Unable to retrieve local leaderId and Epoch with error 
$error, falling back to metadata cache")
   metadataCache.getPartitionInfo(tp.topic, tp.partition) match {
 case Some(partitionInfo) => partitionInfo
 case None => handle case where we don't have the partition
   }
}
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-09 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1350934417


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,30 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+var leaderId = -1
+var leaderEpoch = -1
+partitionInfoOrError match {
+  case Right(x) =>
+  leaderId = x.leaderReplicaIdOpt.getOrElse(-1)
+  leaderEpoch = x.getLeaderEpoch
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+val partitionInfo = metadataCache.getPartitionInfo(tp.topic, 
tp.partition)
+partitionInfo.foreach { info =>

Review Comment:
   We also don't need to set vars here. We could have a statement where we 
return a tuple or even just the partitionInfo.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-09 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1350934417


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,30 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+var leaderId = -1
+var leaderEpoch = -1
+partitionInfoOrError match {
+  case Right(x) =>
+  leaderId = x.leaderReplicaIdOpt.getOrElse(-1)
+  leaderEpoch = x.getLeaderEpoch
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+val partitionInfo = metadataCache.getPartitionInfo(tp.topic, 
tp.partition)
+partitionInfo.foreach { info =>

Review Comment:
   We also don't need to set vars here. We could have a statement where we 
return a tuple.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-09 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1350933806


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,30 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+var leaderId = -1
+var leaderEpoch = -1
+partitionInfoOrError match {
+  case Right(x) =>
+  leaderId = x.leaderReplicaIdOpt.getOrElse(-1)
+  leaderEpoch = x.getLeaderEpoch
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+val partitionInfo = metadataCache.getPartitionInfo(tp.topic, 
tp.partition)
+partitionInfo.foreach { info =>

Review Comment:
   getPartitionInfo returns an option. If it exists, foreach will access it. If 
it doesn't foreach does nothing. This is a common pattern in scala. Are we 
considering the case when the partition is not present?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-09 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1350932166


##
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java:
##
@@ -67,20 +69,31 @@ public ProduceResponse(ProduceResponseData 
produceResponseData) {
  */
 @Deprecated
 public ProduceResponse(Map responses) {
-this(responses, DEFAULT_THROTTLE_TIME);
+this(responses, DEFAULT_THROTTLE_TIME, Collections.emptyList());
 }
 
 /**
- * Constructor for the latest version
+ * Constructor for versions <= 9
  * @param responses Produced data grouped by topic-partition
  * @param throttleTimeMs Time in milliseconds the response was throttled
  */
 @Deprecated
 public ProduceResponse(Map responses, 
int throttleTimeMs) {
-this(toData(responses, throttleTimeMs));
+this(toData(responses, throttleTimeMs, Collections.emptyList()));
+}
+
+/**
+ * Constructor for the latest version
+ * @param responses Produced data grouped by topic-partition
+ * @param throttleTimeMs Time in milliseconds the response was throttled
+ * @param nodeEndpoints List of node endpoints
+ */
+@Deprecated

Review Comment:
   KAFKA-10730 is a pretty dormant JIRA. I do agree that there is some level of 
conversion. I wonder if folks have a strong opinion about this conversion 
still. 
   
   Looking into this further, I see the change would need to be made to 
appendRecords and the ProducePartitionStatus. It doesn't look too crazy, but 
also understandable this is not the scope for this PR.
   
   I wonder if KAFKA-9682 was premature in deprecating the constructor. I guess 
our options are leaving it deprecated and adding a deprecated method or 
removing the deprecation until KAFKA-10730 is completed. (I almost just want to 
fix it so this doesn't happen in the future  )



##
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java:
##
@@ -67,20 +69,31 @@ public ProduceResponse(ProduceResponseData 
produceResponseData) {
  */
 @Deprecated
 public ProduceResponse(Map responses) {
-this(responses, DEFAULT_THROTTLE_TIME);
+this(responses, DEFAULT_THROTTLE_TIME, Collections.emptyList());
 }
 
 /**
- * Constructor for the latest version
+ * Constructor for versions <= 9
  * @param responses Produced data grouped by topic-partition
  * @param throttleTimeMs Time in milliseconds the response was throttled
  */
 @Deprecated
 public ProduceResponse(Map responses, 
int throttleTimeMs) {
-this(toData(responses, throttleTimeMs));
+this(toData(responses, throttleTimeMs, Collections.emptyList()));
+}
+
+/**
+ * Constructor for the latest version
+ * @param responses Produced data grouped by topic-partition
+ * @param throttleTimeMs Time in milliseconds the response was throttled
+ * @param nodeEndpoints List of node endpoints
+ */
+@Deprecated

Review Comment:
   KAFKA-10730 is a pretty dormant JIRA. I do agree that there is some level of 
conversion. I wonder if folks have a strong opinion about this conversion 
still. 
   
   Looking into this further, I see the change would need to be made to 
appendRecords and the ProducePartitionStatus. It doesn't look too crazy, but 
also understandable this is not the scope for this PR.
   
   I wonder if KAFKA-9682 was premature in deprecating the constructor. I guess 
our options are leaving it deprecated and adding a deprecated method or 
removing the deprecation until KAFKA-10730 is completed. (I almost just want to 
fix it so this doesn't happen in the future  )



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15452: Access SslPrincipalMapper and kerberosShortNamer in Custom KafkaPrincipalBuilder(KIP-982) [kafka]

2023-10-09 Thread via GitHub


rbaddam commented on code in PR #14491:
URL: https://github.com/apache/kafka/pull/14491#discussion_r1350920227


##
clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java:
##
@@ -240,12 +242,19 @@ public static KafkaPrincipalBuilder 
createPrincipalBuilder(Map config

KerberosShortNamer kerberosShortNamer,

SslPrincipalMapper sslPrincipalMapper) {
 Class principalBuilderClass = (Class) 
configs.get(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG);
-final KafkaPrincipalBuilder builder;
+KafkaPrincipalBuilder builder;
 
 if (principalBuilderClass == null || principalBuilderClass == 
DefaultKafkaPrincipalBuilder.class) {
 builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer, 
sslPrincipalMapper);
 } else if 
(KafkaPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
-builder = (KafkaPrincipalBuilder) 
Utils.newInstance(principalBuilderClass);
+try {
+Constructor constructor = 
principalBuilderClass.getConstructor(KerberosShortNamer.class, 
SslPrincipalMapper.class);
+builder = (KafkaPrincipalBuilder) 
constructor.newInstance(kerberosShortNamer, sslPrincipalMapper);

Review Comment:
   @plazma-prizma, I appreciate your suggestion. To address your concern about 
the specificity of the solution, I will create Interfaces/subclasses of 
KerberosPrincipalBuilder and SSLPrincipalBuilder to handle different types of 
principals and their configurations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] freeze requirements for system-tests running with python2 [kafka]

2023-10-09 Thread via GitHub


imcdo closed pull request #11392: freeze requirements for system-tests running 
with python2
URL: https://github.com/apache/kafka/pull/11392


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15468: Prevent transaction coordinator reloads on already loaded leaders [kafka]

2023-10-09 Thread via GitHub


jolshan commented on code in PR #14489:
URL: https://github.com/apache/kafka/pull/14489#discussion_r1350911184


##
metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java:
##
@@ -26,19 +26,22 @@
 
 public final class LocalReplicaChanges {
 private final Set deletes;
-private final Map leaders;
+private final Map electedLeaders;
+private final Map updatedLeaders;

Review Comment:
   I pushed changes for the simpler fixes and will continue to think on this 
one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-09 Thread via GitHub


jeffkbkim commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1350909602


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -1340,4 +1377,119 @@ public void testNonRetryableTimer() throws 
InterruptedException {
 assertEquals(1, cnt.get());
 assertEquals(0, ctx.timer.size());
 }
+
+@Test
+public void testStateChanges() throws Exception {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = mock(MockPartitionWriter.class);
+MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+MockCoordinatorShardBuilderSupplier supplier = 
mock(MockCoordinatorShardBuilderSupplier.class);
+MockCoordinatorShardBuilder builder = 
mock(MockCoordinatorShardBuilder.class);
+MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+GroupCoordinatorRuntimeMetrics runtimeMetrics = 
mock(GroupCoordinatorRuntimeMetrics.class);
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withLoader(loader)
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(supplier)
+.withCoordinatorRuntimeMetrics(runtimeMetrics)
+.build();
+
+when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+when(builder.withLogContext(any())).thenReturn(builder);
+when(builder.withTime(any())).thenReturn(builder);
+when(builder.withTimer(any())).thenReturn(builder);
+when(builder.withTopicPartition(any())).thenReturn(builder);
+when(builder.build()).thenReturn(coordinator);
+when(supplier.get()).thenReturn(builder);
+CompletableFuture future = new 
CompletableFuture<>();
+when(loader.load(TP, coordinator)).thenReturn(future);
+
+// Schedule the loading.
+runtime.scheduleLoadOperation(TP, 0);
+
+// Getting the context succeeds and the coordinator should be in 
loading.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+assertEquals(LOADING, ctx.state);
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(INITIAL, 
LOADING);
+
+// When the loading fails, the coordinator transitions to failed.
+future.completeExceptionally(new Exception("failure"));
+assertEquals(FAILED, ctx.state);
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, 
FAILED);
+
+// Start loading a new topic partition.
+TopicPartition tp = new TopicPartition("__consumer_offsets", 1);
+future = new CompletableFuture<>();
+when(loader.load(tp, coordinator)).thenReturn(future);
+// Schedule the loading.
+runtime.scheduleLoadOperation(tp, 0);
+// Getting the context succeeds and the coordinator should be in 
loading.
+ctx = runtime.contextOrThrow(tp);
+assertEquals(LOADING, ctx.state);
+verify(runtimeMetrics, times(2)).recordPartitionStateChange(INITIAL, 
LOADING);
+
+// When the loading completes, the coordinator transitions to active.
+future.complete(null);
+assertEquals(ACTIVE, ctx.state);
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, 
ACTIVE);
+
+runtime.close();
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(FAILED, 
CLOSED);

Review Comment:
   We actually don't have the closed metric (and initial metric) anymore. This 
was removed in https://github.com/apache/kafka/pull/14417#discussion_r1334548382



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-09 Thread via GitHub


jeffkbkim commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1350893216


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+
+import java.util.function.Supplier;
+
+/**
+ * Used by the group and transaction coordinator runtimes, the metrics suite 
holds partition state gauges and sensors.
+ */
+public interface CoordinatorRuntimeMetrics extends AutoCloseable {
+/**
+ * Returns the metrics group.
+ */
+String metricsGroup();
+
+/**
+ * Called when the partition state changes.
+ * @param oldState The old state.
+ * @param newState The new state to transition to.
+ */
+void onPartitionStateChange(CoordinatorState oldState, CoordinatorState 
newState);
+
+/**
+ * Record the partition load metric.
+ * @param startTimeMs The partition load start time.
+ * @param endTimeMs   The partition load end time.
+ */
+void recordPartitionLoadSensor(long startTimeMs, long endTimeMs);
+
+/**
+ * Get the counter for partitions in Loading state.
+ * Only used for testing.
+ */
+long numPartitionsLoading();
+
+/**
+ * Get the counter for partitions in Active state.
+ * Only used for testing.
+ */
+long numPartitionsActive();
+
+/**
+ * Get the counter for partitions in Failed state.
+ * Only used for testing.
+ */
+long numPartitionsFailed();
+
+/**
+ * Update the event queue time.
+ *
+ * @param durationMs The queue time.
+ */
+void updateEventQueueTime(long durationMs);
+
+/**
+ * Update the event queue processing time.
+ *
+ * @param durationMs The event processing time.
+ */
+void updateEventQueueProcessingTime(long durationMs);
+
+/**
+ * Record the failed event.
+ */
+void recordFailedEvent();
+
+/**
+ * Record the successful event.
+ */
+void recordSuccessfulEvent();
+
+/**
+ * Record the thread idle ratio.
+ * @param ratio The idle ratio.
+ */
+void recordThreadIdleRatio(double ratio);

Review Comment:
   Right, we won't be able to differentiate the idle ratio for each individual 
thread. Is this something we should have?
   
   > So we have an average an min for idle ratio. Is the minimum value just the 
lowest recorded at a given time frame?
   
   Yes, that's right



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-09 Thread via GitHub


jeffkbkim commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1350893216


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+
+import java.util.function.Supplier;
+
+/**
+ * Used by the group and transaction coordinator runtimes, the metrics suite 
holds partition state gauges and sensors.
+ */
+public interface CoordinatorRuntimeMetrics extends AutoCloseable {
+/**
+ * Returns the metrics group.
+ */
+String metricsGroup();
+
+/**
+ * Called when the partition state changes.
+ * @param oldState The old state.
+ * @param newState The new state to transition to.
+ */
+void onPartitionStateChange(CoordinatorState oldState, CoordinatorState 
newState);
+
+/**
+ * Record the partition load metric.
+ * @param startTimeMs The partition load start time.
+ * @param endTimeMs   The partition load end time.
+ */
+void recordPartitionLoadSensor(long startTimeMs, long endTimeMs);
+
+/**
+ * Get the counter for partitions in Loading state.
+ * Only used for testing.
+ */
+long numPartitionsLoading();
+
+/**
+ * Get the counter for partitions in Active state.
+ * Only used for testing.
+ */
+long numPartitionsActive();
+
+/**
+ * Get the counter for partitions in Failed state.
+ * Only used for testing.
+ */
+long numPartitionsFailed();
+
+/**
+ * Update the event queue time.
+ *
+ * @param durationMs The queue time.
+ */
+void updateEventQueueTime(long durationMs);
+
+/**
+ * Update the event queue processing time.
+ *
+ * @param durationMs The event processing time.
+ */
+void updateEventQueueProcessingTime(long durationMs);
+
+/**
+ * Record the failed event.
+ */
+void recordFailedEvent();
+
+/**
+ * Record the successful event.
+ */
+void recordSuccessfulEvent();
+
+/**
+ * Record the thread idle ratio.
+ * @param ratio The idle ratio.
+ */
+void recordThreadIdleRatio(double ratio);

Review Comment:
   Right, we won't be able to differentiate the idle ratio for each individual 
thread. 
   
   > So we have an average an min for idle ratio. Is the minimum value just the 
lowest recorded at a given time frame?
   
   Yes, that's right



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-09 Thread via GitHub


jeffkbkim commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1350888321


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+
+import java.util.function.Supplier;
+
+/**
+ * Used by the group and transaction coordinator runtimes, the metrics suite 
holds partition state gauges and sensors.
+ */
+public interface CoordinatorRuntimeMetrics extends AutoCloseable {
+
+/**
+ * Called when the partition state changes.
+ * @param oldState The old state.
+ * @param newState The new state to transition to.
+ */
+void recordPartitionStateChange(CoordinatorState oldState, 
CoordinatorState newState);
+
+/**
+ * Record the partition load metric.
+ * @param startTimeMs The partition load start time.
+ * @param endTimeMs   The partition load end time.
+ */
+void recordPartitionLoadSensor(long startTimeMs, long endTimeMs);
+
+/**
+ * Update the event queue time.
+ *
+ * @param durationMs The queue time.
+ */
+void recordEventQueueTime(long durationMs);
+
+/**
+ * Update the event queue processing time.
+ *
+ * @param durationMs The event processing time.
+ */
+void recordEventQueueProcessingTime(long durationMs);
+
+/**
+ * Record the thread idle ratio.
+ * @param ratio The idle ratio.
+ */
+void recordThreadIdleRatio(double ratio);
+
+/**
+ * Register the event queue size gauge.
+ *
+ * @param sizeSupplier The size supplier.
+ */
+void registerEventQueueSizeGauge(Supplier sizeSupplier);

Review Comment:
   Ah yes. Registering should only happen once



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15468: Prevent transaction coordinator reloads on already loaded leaders [kafka]

2023-10-09 Thread via GitHub


jolshan commented on code in PR #14489:
URL: https://github.com/apache/kafka/pull/14489#discussion_r1350877639


##
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala:
##
@@ -515,7 +521,10 @@ class TransactionStateManager(brokerId: Int,
* metadata cache with the transactional ids. This operation must be 
resilient to any partial state left off from
* the previous loading / unloading operation.
*/
-  def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: 
Int, sendTxnMarkers: SendTxnMarkersCallback): Unit = {
+  def loadTransactionsForTxnTopicPartition(partitionId: Int,
+   coordinatorEpoch: Int,
+   sendTxnMarkers: 
SendTxnMarkersCallback,
+   hadTransactionStateAlreadyLoaded: 
Boolean): Unit = {

Review Comment:
   I opted to go for simpler and chose `transactionStateLoaded`. I updated the 
method to say the same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-09 Thread via GitHub


jeffkbkim commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1350863763


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ * @param offsetsRetentionMs The offset retention in milliseconds.
+ *
+ * @return The group id if the group no longer has any offsets remaining, 
empty otherwise.
+ */
+public Optional cleanupExpiredOffsets(String groupId, List 
records, long offsetsRetentionMs) {
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return Optional.of(groupId);
+}
+try {
+Group group = groupMetadataManager.group(groupId);
+ExpirationCondition expirationCondition = 
group.expirationCondition();
+Set expiredPartitions = new HashSet<>();
+long currentTimestamp = time.milliseconds();
+AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+offsetsByTopic.forEach((topic, partitions) -> {
+if (!expirationCondition.subscribedTopics.contains(topic)) {

Review Comment:
   > In this case, isn't subscribedTopics going to be set to an optional 
containing an empty list?
   ah.. i misread and thought subscribedTopics becomes an empty optional, not 
an empty set. So we would return `false` which is the correct behavior.
   
   > Did you do it because it is correct or by mistake?
   which change, to add another argument? I added that so we can rely on using 
`isSubscribedToTopic` instead of passing in the subscribed topics set.
   
   i will update with your suggestion, it makes sense to me.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ * @param offsetsRetentionMs The offset retention in milliseconds.
+ *
+ * @return The group id if the group no longer has any offsets remaining, 
empty otherwise.
+ */
+public Optional cleanupExpiredOffsets(String groupId, List 
records, long offsetsRetentionMs) {
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return Optional.of(groupId);
+}
+try {
+Group group = groupMetadataManager.group(groupId);
+ExpirationCondition expirationCondition = 
group.expirationCondition();
+Set expiredPartitions = new HashSet<>();
+long currentTimestamp = time.milliseconds();
+AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+offsetsByTopic.forEach((topic, partitions) -> {
+if (!expirationCondition.subscribedTopics.contains(topic)) {

Review Comment:
   > In this case, isn't subscribedTopics going to be set to an optional 
containing an empty list?
   
   ah.. i misread and thought subscribedTopics becomes an empty optional, not 
an empty set. So we would return `false` which is the correct behavior.
   
   > Did you do it because it is correct or by mistake?
   
   which change, to add another argument? I added that so we can rely on using 
`isSubscribedToTopic` instead of passing in the subscribed topics set.
   
   i will update with your suggestion, it makes sense to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]

2023-10-09 Thread via GitHub


ahuang98 commented on code in PR #14428:
URL: https://github.com/apache/kafka/pull/14428#discussion_r1350814062


##
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##
@@ -79,6 +86,38 @@ protected LeaderState(
 this.grantingVoters = Collections.unmodifiableSet(new 
HashSet<>(grantingVoters));
 this.log = logContext.logger(LeaderState.class);
 this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
+this.fetchTimeoutMs = fetchTimeoutMs;
+this.fetchTimer = time.timer(fetchTimeoutMs);
+}
+
+// Check if the fetchTimer is expired because we didn't receive 
fetch/fetchSnapshot request from the majority of

Review Comment:
   nit: 
   ```suggestion
   // Check if the fetchTimer is expired because we didn't receive a valid 
fetch/fetchSnapshot request from the majority of
   ```
   same for the next method's description



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15221; Fix the race between fetch requests from a rebooted follower. [kafka]

2023-10-09 Thread via GitHub


CalvinConfluent commented on code in PR #14053:
URL: https://github.com/apache/kafka/pull/14053#discussion_r1350811556


##
core/src/main/scala/kafka/cluster/Replica.scala:
##
@@ -98,14 +105,22 @@ class Replica(val brokerId: Int, val topicPartition: 
TopicPartition) extends Log
* fetch request is always smaller than the leader's LEO, which can happen 
if small produce requests are received at
* high frequency.
*/
-  def updateFetchState(
+  def updateFetchStateOrThrow(
 followerFetchOffsetMetadata: LogOffsetMetadata,
 followerStartOffset: Long,
 followerFetchTimeMs: Long,
 leaderEndOffset: Long,
 brokerEpoch: Long
   ): Unit = {
 replicaState.updateAndGet { currentReplicaState =>
+  val cachedBrokerEpoch = if 
(metadataCache.isInstanceOf[KRaftMetadataCache])
+
metadataCache.asInstanceOf[KRaftMetadataCache].getAliveBrokerEpoch(brokerId) 
else Option(-1L)
+  // Fence the update if it provides a stale broker epoch.
+  if (brokerEpoch != -1 && cachedBrokerEpoch.exists(_ > brokerEpoch)) {

Review Comment:
   Makes sense. For the fetch request with a higher epoch should also be fenced.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15221; Fix the race between fetch requests from a rebooted follower. [kafka]

2023-10-09 Thread via GitHub


CalvinConfluent commented on code in PR #14053:
URL: https://github.com/apache/kafka/pull/14053#discussion_r1350811307


##
core/src/main/scala/kafka/cluster/Replica.scala:
##
@@ -98,14 +105,22 @@ class Replica(val brokerId: Int, val topicPartition: 
TopicPartition) extends Log
* fetch request is always smaller than the leader's LEO, which can happen 
if small produce requests are received at
* high frequency.
*/
-  def updateFetchState(
+  def updateFetchStateOrThrow(
 followerFetchOffsetMetadata: LogOffsetMetadata,
 followerStartOffset: Long,
 followerFetchTimeMs: Long,
 leaderEndOffset: Long,
 brokerEpoch: Long
   ): Unit = {
 replicaState.updateAndGet { currentReplicaState =>
+  val cachedBrokerEpoch = if 
(metadataCache.isInstanceOf[KRaftMetadataCache])

Review Comment:
   Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-09 Thread via GitHub


jeffkbkim commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1350811243


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationCondition.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+/**
+ * An offset is considered expired based on different factors, such as the 
state of the group
+ * and/or the GroupMetadata record version (for generic groups). This class is 
used to check
+ * how offsets for the group should be expired.
+ */
+public interface OffsetExpirationCondition {
+
+/**
+ * Given an offset metadata and offsets retention, return whether the 
offset is expired or not.
+ *
+ * @param offset   The offset metadata.
+ * @param currentTimestamp The current timestamp.
+ * @param offsetsRetentionMs   The offset retention.
+ *
+ * @return Whether the offset is considered expired or not.
+ */
+boolean isOffsetExpired(OffsetAndMetadata offset, long currentTimestamp, 
long offsetsRetentionMs);
+}

Review Comment:
   misunderstood. will fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-09 Thread via GitHub


jolshan commented on PR #14417:
URL: https://github.com/apache/kafka/pull/14417#issuecomment-1753860433

   Let's also checkstyle in the build. Something seems to be off in the group 
coordinator module.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-09 Thread via GitHub


junrao commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1350697908


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java:
##
@@ -621,56 +825,163 @@ public void assign(Collection 
partitions) {
 throw new IllegalArgumentException("Topic partitions to assign 
to cannot have null or empty topic");
 }
 
-// TODO: implementation of refactored Fetcher will be included in 
forthcoming commits.
-// fetcher.clearBufferedDataForUnassignedPartitions(partitions);
+// Clear the buffered data which are not a part of newly assigned 
topics
+final Set currentTopicPartitions = new HashSet<>();
+
+for (TopicPartition tp : subscriptions.assignedPartitions()) {
+if (partitions.contains(tp))
+currentTopicPartitions.add(tp);
+}
+
+fetchBuffer.retainAll(currentTopicPartitions);
 
 // assignment change event will trigger autocommit if it is configured 
and the group id is specified. This is
 // to make sure offsets of topic partitions the consumer is 
unsubscribing from are committed since there will
-// be no following rebalance
-eventHandler.add(new 
AssignmentChangeApplicationEvent(this.subscriptions.allConsumed(), 
time.milliseconds()));
+// be no following rebalance.
+//
+// See the ApplicationEventProcessor.process() method that handles 
this event for more detail.
+applicationEventHandler.add(new 
AssignmentChangeApplicationEvent(subscriptions.allConsumed(), 
time.milliseconds()));
 
 log.info("Assigned to partition(s): {}", join(partitions, ", "));
-if (this.subscriptions.assignFromUser(new HashSet<>(partitions)))
-eventHandler.add(new NewTopicsMetadataUpdateRequestEvent());
+if (subscriptions.assignFromUser(new HashSet<>(partitions)))
+applicationEventHandler.add(new 
NewTopicsMetadataUpdateRequestEvent());
 }
 
 @Override
-public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) 
{
-throw new KafkaException("method not implemented");
+public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
{
+maybeThrowInvalidGroupIdException();
+if (pattern == null || pattern.toString().isEmpty())
+throw new IllegalArgumentException("Topic pattern to subscribe to 
cannot be " + (pattern == null ?
+"null" : "empty"));
+
+throwIfNoAssignorsConfigured();
+log.info("Subscribed to pattern: '{}'", pattern);
+subscriptions.subscribe(pattern, listener);
+updatePatternSubscription(metadata.fetch());
+metadata.requestUpdateForNewTopics();
+}
+
+/**
+ * TODO: remove this when we implement the KIP-848 protocol.
+ *
+ * 
+ * The contents of this method are shamelessly stolen from
+ * {@link ConsumerCoordinator#updatePatternSubscription(Cluster)} and are 
used here because we won't have access
+ * to a {@link ConsumerCoordinator} in this code. Perhaps it could be 
moved to a ConsumerUtils class?
+ *
+ * @param cluster Cluster from which we get the topics
+ */
+private void updatePatternSubscription(Cluster cluster) {
+final Set topicsToSubscribe = cluster.topics().stream()
+.filter(subscriptions::matchesSubscribedPattern)
+.collect(Collectors.toSet());
+if (subscriptions.subscribeFromPattern(topicsToSubscribe))
+metadata.requestUpdateForNewTopics();
 }
 
 @Override
 public void subscribe(Pattern pattern) {
-throw new KafkaException("method not implemented");
+subscribe(pattern, new NoOpConsumerRebalanceListener());
 }
 
 @Override
 public void unsubscribe() {
-throw new KafkaException("method not implemented");
+fetchBuffer.retainAll(Collections.emptySet());
+subscriptions.unsubscribe();
 }
 
 @Override
 @Deprecated
-public ConsumerRecords poll(long timeout) {
-throw new KafkaException("method not implemented");
+public ConsumerRecords poll(final long timeoutMs) {
+return poll(Duration.ofMillis(timeoutMs));
 }
 
 // Visible for testing
 WakeupTrigger wakeupTrigger() {
 return wakeupTrigger;
 }
 
-private static  ClusterResourceListeners 
configureClusterResourceListeners(
-final Deserializer keyDeserializer,
-final Deserializer valueDeserializer,
-final List... candidateLists) {
-ClusterResourceListeners clusterResourceListeners = new 
ClusterResourceListeners();
-for (List candidateList: candidateLists)
-clusterResourceListeners.maybeAddAll(candidateList);
+/**
+ * Send the requests for fetch data to the {@link ConsumerNetworkThread 
network thread} and set up to
+ * collect 

Re: [PR] KAFKA-14595 Move ReassignPartitionsCommand to java [kafka]

2023-10-09 Thread via GitHub


nizhikov commented on code in PR #13247:
URL: https://github.com/apache/kafka/pull/13247#discussion_r1350743924


##
tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java:
##
@@ -769,22 +762,12 @@ public void testPropagateInvalidJsonError() {
 }
 
 @SuppressWarnings("unchecked")
-private static  scala.collection.immutable.Set set(final T... set) {
-return mutableSet(set).toSet();
-}
-
-@SuppressWarnings({"deprecation", "unchecked"})
-private static  scala.collection.mutable.Set mutableSet(final 
T...set) {
-return JavaConverters.asScalaSet(new HashSet<>(Arrays.asList(set)));
+private static  Set set(final T... set) {

Review Comment:
   Fixed. Yes, we able to remove `set` method. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14595 Move ReassignPartitionsCommand to java [kafka]

2023-10-09 Thread via GitHub


jolshan commented on code in PR #13247:
URL: https://github.com/apache/kafka/pull/13247#discussion_r1350759368


##
tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsIntegrationTest.java:
##
@@ -857,68 +864,18 @@ public void close() {
 }
 }
 
-private ReassignPartitionsCommand.VerifyAssignmentResult 
asScala(VerifyAssignmentResult res) {
-Map partStates = new 
HashMap<>();
-res.partStates.forEach((tp, state) -> partStates.put(tp, 
asScala(state)));
-
-Map 
moveStates = new HashMap<>();
-res.moveStates.forEach((tpr, state) -> moveStates.put(tpr, 
asScala(state)));
-
-return new 
ReassignPartitionsCommand.VerifyAssignmentResult(asScala(partStates), 
res.partsOngoing, asScala(moveStates), res.movesOngoing);
-}
-
-@SuppressWarnings({"unchecked"})
-private ReassignPartitionsCommand.PartitionReassignmentState 
asScala(PartitionReassignmentState state) {
-return new ReassignPartitionsCommand.PartitionReassignmentState(
-seq((List) state.currentReplicas),
-seq((List) state.targetReplicas),
-state.done
-);
-}
-
-private ReassignPartitionsCommand.LogDirMoveState asScala(LogDirMoveState 
state) {
-if (state instanceof ActiveMoveState) {
-ActiveMoveState s = (ActiveMoveState) state;
-return new 
ReassignPartitionsCommand.ActiveMoveState(s.currentLogDir, s.targetLogDir, 
s.futureLogDir);
-} else if (state instanceof CancelledMoveState) {
-CancelledMoveState s = (CancelledMoveState) state;
-return new 
ReassignPartitionsCommand.CancelledMoveState(s.currentLogDir, s.targetLogDir);
-} else if (state instanceof CompletedMoveState) {
-CompletedMoveState s = (CompletedMoveState) state;
-return new 
ReassignPartitionsCommand.CompletedMoveState(s.targetLogDir);
-} else if (state instanceof MissingLogDirMoveState) {
-MissingLogDirMoveState s = (MissingLogDirMoveState) state;
-return new 
ReassignPartitionsCommand.MissingLogDirMoveState(s.targetLogDir);
-} else if (state instanceof MissingReplicaMoveState) {
-MissingReplicaMoveState s = (MissingReplicaMoveState) state;
-return new 
ReassignPartitionsCommand.MissingReplicaMoveState(s.targetLogDir);
-}
-
-throw new IllegalArgumentException("Unknown state " + state);
-}
-
 @SuppressWarnings("unchecked")
-static  scala.collection.immutable.Set set(final T... set) {
-return mutableSet(set).toSet();
+static  Set set(final T... set) {
+return new HashSet<>(Arrays.asList(set));

Review Comment:
   Got it. Thanks for clarifying. (That's going to be a fun one to migrate too 
 )



##
tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsIntegrationTest.java:
##
@@ -857,68 +864,18 @@ public void close() {
 }
 }
 
-private ReassignPartitionsCommand.VerifyAssignmentResult 
asScala(VerifyAssignmentResult res) {
-Map partStates = new 
HashMap<>();
-res.partStates.forEach((tp, state) -> partStates.put(tp, 
asScala(state)));
-
-Map 
moveStates = new HashMap<>();
-res.moveStates.forEach((tpr, state) -> moveStates.put(tpr, 
asScala(state)));
-
-return new 
ReassignPartitionsCommand.VerifyAssignmentResult(asScala(partStates), 
res.partsOngoing, asScala(moveStates), res.movesOngoing);
-}
-
-@SuppressWarnings({"unchecked"})
-private ReassignPartitionsCommand.PartitionReassignmentState 
asScala(PartitionReassignmentState state) {
-return new ReassignPartitionsCommand.PartitionReassignmentState(
-seq((List) state.currentReplicas),
-seq((List) state.targetReplicas),
-state.done
-);
-}
-
-private ReassignPartitionsCommand.LogDirMoveState asScala(LogDirMoveState 
state) {
-if (state instanceof ActiveMoveState) {
-ActiveMoveState s = (ActiveMoveState) state;
-return new 
ReassignPartitionsCommand.ActiveMoveState(s.currentLogDir, s.targetLogDir, 
s.futureLogDir);
-} else if (state instanceof CancelledMoveState) {
-CancelledMoveState s = (CancelledMoveState) state;
-return new 
ReassignPartitionsCommand.CancelledMoveState(s.currentLogDir, s.targetLogDir);
-} else if (state instanceof CompletedMoveState) {
-CompletedMoveState s = (CompletedMoveState) state;
-return new 
ReassignPartitionsCommand.CompletedMoveState(s.targetLogDir);
-} else if (state instanceof MissingLogDirMoveState) {
-MissingLogDirMoveState s = (MissingLogDirMoveState) state;
-return new 
ReassignPartitionsCommand.MissingLogDirMoveState(s.targetLogDir);
-} else if (state instanceof MissingReplicaMoveState) {
-MissingReplicaMoveState s = (MissingReplicaMoveState) state;
- 

[jira] [Updated] (KAFKA-15569) Update test and add test cases in IQv2StoreIntegrationTest

2023-10-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15569:

Issue Type: Test  (was: Improvement)

> Update test and add test cases in IQv2StoreIntegrationTest
> --
>
> Key: KAFKA-15569
> URL: https://issues.apache.org/jira/browse/KAFKA-15569
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>
> Update test and add test cases in IQv2StoreIntegrationTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15569) Update test and add test cases in IQv2StoreIntegrationTest

2023-10-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15569:

Component/s: streams
 unit tests

> Update test and add test cases in IQv2StoreIntegrationTest
> --
>
> Key: KAFKA-15569
> URL: https://issues.apache.org/jira/browse/KAFKA-15569
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>
> Update test and add test cases in IQv2StoreIntegrationTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14595 Move ReassignPartitionsCommand to java [kafka]

2023-10-09 Thread via GitHub


nizhikov commented on code in PR #13247:
URL: https://github.com/apache/kafka/pull/13247#discussion_r1350744820


##
tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsIntegrationTest.java:
##
@@ -857,68 +864,18 @@ public void close() {
 }
 }
 
-private ReassignPartitionsCommand.VerifyAssignmentResult 
asScala(VerifyAssignmentResult res) {
-Map partStates = new 
HashMap<>();
-res.partStates.forEach((tp, state) -> partStates.put(tp, 
asScala(state)));
-
-Map 
moveStates = new HashMap<>();
-res.moveStates.forEach((tpr, state) -> moveStates.put(tpr, 
asScala(state)));
-
-return new 
ReassignPartitionsCommand.VerifyAssignmentResult(asScala(partStates), 
res.partsOngoing, asScala(moveStates), res.movesOngoing);
-}
-
-@SuppressWarnings({"unchecked"})
-private ReassignPartitionsCommand.PartitionReassignmentState 
asScala(PartitionReassignmentState state) {
-return new ReassignPartitionsCommand.PartitionReassignmentState(
-seq((List) state.currentReplicas),
-seq((List) state.targetReplicas),
-state.done
-);
-}
-
-private ReassignPartitionsCommand.LogDirMoveState asScala(LogDirMoveState 
state) {
-if (state instanceof ActiveMoveState) {
-ActiveMoveState s = (ActiveMoveState) state;
-return new 
ReassignPartitionsCommand.ActiveMoveState(s.currentLogDir, s.targetLogDir, 
s.futureLogDir);
-} else if (state instanceof CancelledMoveState) {
-CancelledMoveState s = (CancelledMoveState) state;
-return new 
ReassignPartitionsCommand.CancelledMoveState(s.currentLogDir, s.targetLogDir);
-} else if (state instanceof CompletedMoveState) {
-CompletedMoveState s = (CompletedMoveState) state;
-return new 
ReassignPartitionsCommand.CompletedMoveState(s.targetLogDir);
-} else if (state instanceof MissingLogDirMoveState) {
-MissingLogDirMoveState s = (MissingLogDirMoveState) state;
-return new 
ReassignPartitionsCommand.MissingLogDirMoveState(s.targetLogDir);
-} else if (state instanceof MissingReplicaMoveState) {
-MissingReplicaMoveState s = (MissingReplicaMoveState) state;
-return new 
ReassignPartitionsCommand.MissingReplicaMoveState(s.targetLogDir);
-}
-
-throw new IllegalArgumentException("Unknown state " + state);
-}
-
 @SuppressWarnings("unchecked")
-static  scala.collection.immutable.Set set(final T... set) {
-return mutableSet(set).toSet();
+static  Set set(final T... set) {
+return new HashSet<>(Arrays.asList(set));

Review Comment:
   We able to remove `set` method.
   But, `mutableSet` and `seq` has to stay, because, we use `QuorumTestHarness` 
as parent and `TestUtils` which written in scala.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14595 Move ReassignPartitionsCommand to java [kafka]

2023-10-09 Thread via GitHub


nizhikov commented on code in PR #13247:
URL: https://github.com/apache/kafka/pull/13247#discussion_r1350743924


##
tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java:
##
@@ -769,22 +762,12 @@ public void testPropagateInvalidJsonError() {
 }
 
 @SuppressWarnings("unchecked")
-private static  scala.collection.immutable.Set set(final T... set) {
-return mutableSet(set).toSet();
-}
-
-@SuppressWarnings({"deprecation", "unchecked"})
-private static  scala.collection.mutable.Set mutableSet(final 
T...set) {
-return JavaConverters.asScalaSet(new HashSet<>(Arrays.asList(set)));
+private static  Set set(final T... set) {

Review Comment:
   Fixed. Yes, we able to remove these method. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15221; Fix the race between fetch requests from a rebooted follower. [kafka]

2023-10-09 Thread via GitHub


hachikuji commented on code in PR #14053:
URL: https://github.com/apache/kafka/pull/14053#discussion_r1350736079


##
core/src/main/scala/kafka/cluster/Replica.scala:
##
@@ -98,14 +105,22 @@ class Replica(val brokerId: Int, val topicPartition: 
TopicPartition) extends Log
* fetch request is always smaller than the leader's LEO, which can happen 
if small produce requests are received at
* high frequency.
*/
-  def updateFetchState(
+  def updateFetchStateOrThrow(
 followerFetchOffsetMetadata: LogOffsetMetadata,
 followerStartOffset: Long,
 followerFetchTimeMs: Long,
 leaderEndOffset: Long,
 brokerEpoch: Long
   ): Unit = {
 replicaState.updateAndGet { currentReplicaState =>
+  val cachedBrokerEpoch = if 
(metadataCache.isInstanceOf[KRaftMetadataCache])

Review Comment:
   nit: in scala, it's usually cleaner to use a match instead of `isInstanceOf`.



##
core/src/main/scala/kafka/cluster/Replica.scala:
##
@@ -98,14 +105,22 @@ class Replica(val brokerId: Int, val topicPartition: 
TopicPartition) extends Log
* fetch request is always smaller than the leader's LEO, which can happen 
if small produce requests are received at
* high frequency.
*/
-  def updateFetchState(
+  def updateFetchStateOrThrow(
 followerFetchOffsetMetadata: LogOffsetMetadata,
 followerStartOffset: Long,
 followerFetchTimeMs: Long,
 leaderEndOffset: Long,
 brokerEpoch: Long
   ): Unit = {
 replicaState.updateAndGet { currentReplicaState =>
+  val cachedBrokerEpoch = if 
(metadataCache.isInstanceOf[KRaftMetadataCache])
+
metadataCache.asInstanceOf[KRaftMetadataCache].getAliveBrokerEpoch(brokerId) 
else Option(-1L)
+  // Fence the update if it provides a stale broker epoch.
+  if (brokerEpoch != -1 && cachedBrokerEpoch.exists(_ > brokerEpoch)) {

Review Comment:
   Should we check for equality? I guess the basic question is whether we allow 
fetches from a higher epoch than what is in the cache?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15569) Update test and add test cases in IQv2StoreIntegrationTest

2023-10-09 Thread Hanyu Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15569?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hanyu Zheng reassigned KAFKA-15569:
---

Assignee: Hanyu Zheng

> Update test and add test cases in IQv2StoreIntegrationTest
> --
>
> Key: KAFKA-15569
> URL: https://issues.apache.org/jira/browse/KAFKA-15569
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>
> Update test and add test cases in IQv2StoreIntegrationTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15565) KafkaAdminClient does not honor request timeout ms

2023-10-09 Thread Sankalp Bhatia (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773463#comment-17773463
 ] 

Sankalp Bhatia edited comment on KAFKA-15565 at 10/9/23 7:34 PM:
-

Thanks. The reason I say it is a bug is because the overriding you mentioned in 
[https://github.com/apache/kafka/blob/bf51a50a564ee43d3515c82fc706f17325c4602f/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L571]
 takes the min of

1. default api timeout (60s)

2. metadataTimeout: This I believe is derived from the default request timeout 
if the metadata request is pending.(which is hardcoded to 1hr)

3. default request timeout (hardcoded to 1hr). (But as per contract should be 
derived from client config)

Now consider a case where the client is unable to create a socket connection. 
Ideally, such a case should be handled in line 
[584|https://github.com/apache/kafka/blob/bf51a50a564ee43d3515c82fc706f17325c4602f/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L584],
 but since the selector enters a long poll of 60s (this can happen when no 
other selection key is ready), it only gets to know about the timed out 
connection after 60s, and by that time the Client Call needs to be dropped 
without any retries. Had the adminClient honored the request timeout, the poll 
would have been shorter and the request could have been retried.


was (Author: sankalpbhatia):
Thanks. The reason I say it is a bug is because the overriding you mentioned in 
[https://github.com/apache/kafka/blob/bf51a50a564ee43d3515c82fc706f17325c4602f/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L571]
 takes the min of

1. default api timeout (60s)

2. metadataTimeout: This I believe is derived from the default request timeout 
if the metadata request is pending.(which is hardcoded to 1hr)

3. default request timeout (hardcoded to 1hr). (But as per contract should be 
derived from client config)

Now consider a case where the client is unable to create a socket connection. 
Ideally, such a case should be handled in line 
[584|https://github.com/apache/kafka/blob/bf51a50a564ee43d3515c82fc706f17325c4602f/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L584],
 but since the selector enters a long poll of 60s, it only gets to know about 
the timed out connection after 60s, and by that time the Client Call needs to 
be dropped without any retries. Had the adminClient honored the request 
timeout, the poll would have been shorter and the request could have been 
retried.

> KafkaAdminClient does not honor request timeout ms 
> ---
>
> Key: KAFKA-15565
> URL: https://issues.apache.org/jira/browse/KAFKA-15565
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sankalp Bhatia
>Assignee: Sankalp Bhatia
>Priority: Minor
>
> It seems to me there is a bug in this line in the KafkaAdminClient. For the 
> constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a 
> hardcoded value of 1 hour.  Ideally, this should be derived from the client 
> config     "request.timeout.ms"  from the AdminClientConfig[2]. 
> References
> [1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521]
> [2][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15569) Update test and add test cases in IQv2StoreIntegrationTest

2023-10-09 Thread Hanyu Zheng (Jira)
Hanyu Zheng created KAFKA-15569:
---

 Summary: Update test and add test cases in IQv2StoreIntegrationTest
 Key: KAFKA-15569
 URL: https://issues.apache.org/jira/browse/KAFKA-15569
 Project: Kafka
  Issue Type: Improvement
Reporter: Hanyu Zheng


Update test and add test cases in IQv2StoreIntegrationTest



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15565) KafkaAdminClient does not honor request timeout ms

2023-10-09 Thread Sankalp Bhatia (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773463#comment-17773463
 ] 

Sankalp Bhatia edited comment on KAFKA-15565 at 10/9/23 7:25 PM:
-

Thanks. The reason I say it is a bug is because the overriding you mentioned in 
[https://github.com/apache/kafka/blob/bf51a50a564ee43d3515c82fc706f17325c4602f/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L571]
 takes the min of

1. default api timeout (60s)

2. metadataTimeout: This I believe is derived from the default request timeout 
if the metadata request is pending.(which is hardcoded to 1hr)

3. default request timeout (hardcoded to 1hr). (But as per contract should be 
derived from client config)

Now consider a case where the client is unable to create a socket connection. 
Ideally, such a case should be handled in line 
[584|https://github.com/apache/kafka/blob/bf51a50a564ee43d3515c82fc706f17325c4602f/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L584],
 but since the selector enters a long poll of 60s, it only gets to know about 
the timed out connection after 60s, and by that time the Client Call needs to 
be dropped without any retries. Had the adminClient honored the request 
timeout, the poll would have been shorter and the request could have been 
retried.


was (Author: sankalpbhatia):
Thanks. The reason I say it is a bug is because the overriding you mentioned in 
[https://github.com/apache/kafka/blob/bf51a50a564ee43d3515c82fc706f17325c4602f/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L571]
 takes the min of # default api timeout (60s)
 # metadataTimeout: This I believe is derived from the default request timeout 
if the metadata request is pending.(which is hardcoded to 1hr)
 # default request timeout (hardcoded to 1hr). (But as per contract should be 
derived from client config)

Now consider a case where the client is unable to create a socket connection. 
Ideally, such a case should be handled in line 
[584|https://github.com/apache/kafka/blob/bf51a50a564ee43d3515c82fc706f17325c4602f/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L584],
 but since the selector enters a long poll of 60s, it only gets to know about 
the timed out connection after 60s, and by that time the Client Call needs to 
be dropped without any retries. Had the adminClient honored the request 
timeout, the poll would have been shorter and the request could have been 
retried.

> KafkaAdminClient does not honor request timeout ms 
> ---
>
> Key: KAFKA-15565
> URL: https://issues.apache.org/jira/browse/KAFKA-15565
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sankalp Bhatia
>Assignee: Sankalp Bhatia
>Priority: Minor
>
> It seems to me there is a bug in this line in the KafkaAdminClient. For the 
> constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a 
> hardcoded value of 1 hour.  Ideally, this should be derived from the client 
> config     "request.timeout.ms"  from the AdminClientConfig[2]. 
> References
> [1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521]
> [2][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15565) KafkaAdminClient does not honor request timeout ms

2023-10-09 Thread Sankalp Bhatia (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15565?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773463#comment-17773463
 ] 

Sankalp Bhatia commented on KAFKA-15565:


Thanks. The reason I say it is a bug is because the overriding you mentioned in 
[https://github.com/apache/kafka/blob/bf51a50a564ee43d3515c82fc706f17325c4602f/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L571]
 takes the min of # default api timeout (60s)
 # metadataTimeout: This I believe is derived from the default request timeout 
if the metadata request is pending.(which is hardcoded to 1hr)
 # default request timeout (hardcoded to 1hr). (But as per contract should be 
derived from client config)

Now consider a case where the client is unable to create a socket connection. 
Ideally, such a case should be handled in line 
[584|https://github.com/apache/kafka/blob/bf51a50a564ee43d3515c82fc706f17325c4602f/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L584],
 but since the selector enters a long poll of 60s, it only gets to know about 
the timed out connection after 60s, and by that time the Client Call needs to 
be dropped without any retries. Had the adminClient honored the request 
timeout, the poll would have been shorter and the request could have been 
retried.

> KafkaAdminClient does not honor request timeout ms 
> ---
>
> Key: KAFKA-15565
> URL: https://issues.apache.org/jira/browse/KAFKA-15565
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sankalp Bhatia
>Assignee: Sankalp Bhatia
>Priority: Minor
>
> It seems to me there is a bug in this line in the KafkaAdminClient. For the 
> constructor arg defaultRequestTimeoutMs of NetworkClient [1], it uses a 
> hardcoded value of 1 hour.  Ideally, this should be derived from the client 
> config     "request.timeout.ms"  from the AdminClientConfig[2]. 
> References
> [1][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java#L521]
> [2][https://github.com/apache/kafka/blob/1c3eb4395a15cf4f45b6dc0d39effb3dc087f5a4/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java#L98]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Kafka 12317: Relax non-null key requirement in Kafka Streams [kafka]

2023-10-09 Thread via GitHub


wcarlson5 commented on code in PR #14174:
URL: https://github.com/apache/kafka/pull/14174#discussion_r1347987570


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoinWindow.java:
##
@@ -51,8 +51,8 @@ public void init(final ProcessorContext context) {
 public void process(final Record record) {
 // if the key is null, we do not need to put the record into 
window store
 // since it will never be considered for join operations
+context().forward(record);
 if (record.key() != null) {
-context().forward(record);
 // Every record basically starts a new window. We're using a 
window store mostly for the retention.
 window.put(record.key(), record.value(), record.timestamp());

Review Comment:
   why should null keys not enter the window?



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/StreamStreamJoinUtil.java:
##
@@ -39,22 +39,26 @@ public static  boolean skipRecord(
 // furthermore, on left/outer joins 'null' in ValueJoiner#apply() 
indicates a missing record --
 // thus, to be consistent and to avoid ambiguous null semantics, null 
values are ignored
 if (record.key() == null || record.value() == null) {
-if (context.recordMetadata().isPresent()) {
-final RecordMetadata recordMetadata = 
context.recordMetadata().get();
-logger.warn(
-"Skipping record due to null key or value. "
-+ "topic=[{}] partition=[{}] offset=[{}]",
-recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset()
-);
-} else {
-logger.warn(
-"Skipping record due to null key or value. Topic, 
partition, and offset not known."
-);
-}
-droppedRecordsSensor.record();
+dropRecord(logger, droppedRecordsSensor, context);
 return true;
 } else {
 return false;
 }
 }
+
+public static  void dropRecord(final Logger logger, final 
Sensor droppedRecordsSensor, final ProcessorContext context) {

Review Comment:
   I'm not a huge fan of splitting this out to a separate public method. I 
think you can just reuse the logic in skip record.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -124,17 +124,20 @@ public void init(final ProcessorContext context) 
{
 @SuppressWarnings("unchecked")
 @Override
 public void process(final Record record) {
-if (StreamStreamJoinUtil.skipRecord(record, LOG, 
droppedRecordsSensor, context())) {
-return;
-}
-boolean needOuterJoin = outer;
-
 final long inputRecordTimestamp = record.timestamp();
 final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
 final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
-
 sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
 
+if (outer && record.key() == null && record.value() != null) {

Review Comment:
   what about inner left joins? Those values go into the window? Why?



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##
@@ -351,8 +353,23 @@ private void optimizeTopology(final Properties props) {
 LOG.debug("Optimizing the Kafka Streams graph for self-joins");
 rewriteSingleStoreSelfJoin(root, new IdentityHashMap<>());
 }
+LOG.debug("Optimizing the Kafka Streams graph for null-key records");
+rewriteRepartitionNodes();
 }
 
+private void rewriteRepartitionNodes() {

Review Comment:
   This is to prevent null keys to go into reparation topics, right? Will that 
effect results if a manual reparation is added?



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##
@@ -124,17 +124,20 @@ public void init(final ProcessorContext context) 
{
 @SuppressWarnings("unchecked")
 @Override
 public void process(final Record record) {
-if (StreamStreamJoinUtil.skipRecord(record, LOG, 
droppedRecordsSensor, context())) {
-return;
-}
-boolean needOuterJoin = outer;
-
 final long inputRecordTimestamp = record.timestamp();
 final long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinBeforeMs);
 final long timeTo = Math.max(0L, inputRecordTimestamp + 
joinAfterMs);
-
 sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
 
+if (outer && record.key() == null && record.value() != null) {
+

Re: [PR] MINOR: Only commit running active and standby tasks when tasks corrupted [kafka]

2023-10-09 Thread via GitHub


mjsax commented on code in PR #14508:
URL: https://github.com/apache/kafka/pull/14508#discussion_r1350625946


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -223,10 +223,7 @@ boolean handleCorruption(final Set corruptedTasks) 
{
 final Collection tasksToCommit = allTasks()
 .values()
 .stream()
-// TODO: once we remove state restoration from the stream 
thread, we can also remove
-//  the RESTORING state here, since there will not be any 
restoring tasks managed
-//  by the stream thread anymore.
-.filter(t -> t.state() == Task.State.RUNNING || t.state() == 
Task.State.RESTORING)
+.filter(t -> t.state() == Task.State.RUNNING)

Review Comment:
   I was just reading the TODO without much thinking about it... -- I guess we 
might still want to flush restoring tasks and write the checkpoint file (what 
is part to a commit) -- so should we execute `preCommit()` and `postCommit()` 
for those -- I agree that we won't have input topic offsets to be committed 
(and the should not be any TX).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14595 Move ReassignPartitionsCommand to java [kafka]

2023-10-09 Thread via GitHub


jolshan commented on code in PR #13247:
URL: https://github.com/apache/kafka/pull/13247#discussion_r1350623608


##
tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java:
##
@@ -769,22 +762,12 @@ public void testPropagateInvalidJsonError() {
 }
 
 @SuppressWarnings("unchecked")
-private static  scala.collection.immutable.Set set(final T... set) {
-return mutableSet(set).toSet();
-}
-
-@SuppressWarnings({"deprecation", "unchecked"})
-private static  scala.collection.mutable.Set mutableSet(final 
T...set) {
-return JavaConverters.asScalaSet(new HashSet<>(Arrays.asList(set)));
+private static  Set set(final T... set) {

Review Comment:
   Were we also not unable to remove these warnings?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14595 Move ReassignPartitionsCommand to java [kafka]

2023-10-09 Thread via GitHub


jolshan commented on code in PR #13247:
URL: https://github.com/apache/kafka/pull/13247#discussion_r1350622281


##
tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsIntegrationTest.java:
##
@@ -857,68 +864,18 @@ public void close() {
 }
 }
 
-private ReassignPartitionsCommand.VerifyAssignmentResult 
asScala(VerifyAssignmentResult res) {
-Map partStates = new 
HashMap<>();
-res.partStates.forEach((tp, state) -> partStates.put(tp, 
asScala(state)));
-
-Map 
moveStates = new HashMap<>();
-res.moveStates.forEach((tpr, state) -> moveStates.put(tpr, 
asScala(state)));
-
-return new 
ReassignPartitionsCommand.VerifyAssignmentResult(asScala(partStates), 
res.partsOngoing, asScala(moveStates), res.movesOngoing);
-}
-
-@SuppressWarnings({"unchecked"})
-private ReassignPartitionsCommand.PartitionReassignmentState 
asScala(PartitionReassignmentState state) {
-return new ReassignPartitionsCommand.PartitionReassignmentState(
-seq((List) state.currentReplicas),
-seq((List) state.targetReplicas),
-state.done
-);
-}
-
-private ReassignPartitionsCommand.LogDirMoveState asScala(LogDirMoveState 
state) {
-if (state instanceof ActiveMoveState) {
-ActiveMoveState s = (ActiveMoveState) state;
-return new 
ReassignPartitionsCommand.ActiveMoveState(s.currentLogDir, s.targetLogDir, 
s.futureLogDir);
-} else if (state instanceof CancelledMoveState) {
-CancelledMoveState s = (CancelledMoveState) state;
-return new 
ReassignPartitionsCommand.CancelledMoveState(s.currentLogDir, s.targetLogDir);
-} else if (state instanceof CompletedMoveState) {
-CompletedMoveState s = (CompletedMoveState) state;
-return new 
ReassignPartitionsCommand.CompletedMoveState(s.targetLogDir);
-} else if (state instanceof MissingLogDirMoveState) {
-MissingLogDirMoveState s = (MissingLogDirMoveState) state;
-return new 
ReassignPartitionsCommand.MissingLogDirMoveState(s.targetLogDir);
-} else if (state instanceof MissingReplicaMoveState) {
-MissingReplicaMoveState s = (MissingReplicaMoveState) state;
-return new 
ReassignPartitionsCommand.MissingReplicaMoveState(s.targetLogDir);
-}
-
-throw new IllegalArgumentException("Unknown state " + state);
-}
-
 @SuppressWarnings("unchecked")
-static  scala.collection.immutable.Set set(final T... set) {
-return mutableSet(set).toSet();
+static  Set set(final T... set) {
+return new HashSet<>(Arrays.asList(set));

Review Comment:
   Were we not able to remove the deprecated method below?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-09 Thread via GitHub


jolshan commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1350610255


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##
@@ -1340,4 +1377,119 @@ public void testNonRetryableTimer() throws 
InterruptedException {
 assertEquals(1, cnt.get());
 assertEquals(0, ctx.timer.size());
 }
+
+@Test
+public void testStateChanges() throws Exception {
+MockTimer timer = new MockTimer();
+MockPartitionWriter writer = mock(MockPartitionWriter.class);
+MockCoordinatorLoader loader = mock(MockCoordinatorLoader.class);
+MockCoordinatorShardBuilderSupplier supplier = 
mock(MockCoordinatorShardBuilderSupplier.class);
+MockCoordinatorShardBuilder builder = 
mock(MockCoordinatorShardBuilder.class);
+MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+GroupCoordinatorRuntimeMetrics runtimeMetrics = 
mock(GroupCoordinatorRuntimeMetrics.class);
+
+CoordinatorRuntime runtime =
+new CoordinatorRuntime.Builder()
+.withTime(timer.time())
+.withTimer(timer)
+.withLoader(loader)
+.withEventProcessor(new DirectEventProcessor())
+.withPartitionWriter(writer)
+.withCoordinatorShardBuilderSupplier(supplier)
+.withCoordinatorRuntimeMetrics(runtimeMetrics)
+.build();
+
+when(builder.withSnapshotRegistry(any())).thenReturn(builder);
+when(builder.withLogContext(any())).thenReturn(builder);
+when(builder.withTime(any())).thenReturn(builder);
+when(builder.withTimer(any())).thenReturn(builder);
+when(builder.withTopicPartition(any())).thenReturn(builder);
+when(builder.build()).thenReturn(coordinator);
+when(supplier.get()).thenReturn(builder);
+CompletableFuture future = new 
CompletableFuture<>();
+when(loader.load(TP, coordinator)).thenReturn(future);
+
+// Schedule the loading.
+runtime.scheduleLoadOperation(TP, 0);
+
+// Getting the context succeeds and the coordinator should be in 
loading.
+CoordinatorRuntime.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+assertEquals(LOADING, ctx.state);
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(INITIAL, 
LOADING);
+
+// When the loading fails, the coordinator transitions to failed.
+future.completeExceptionally(new Exception("failure"));
+assertEquals(FAILED, ctx.state);
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, 
FAILED);
+
+// Start loading a new topic partition.
+TopicPartition tp = new TopicPartition("__consumer_offsets", 1);
+future = new CompletableFuture<>();
+when(loader.load(tp, coordinator)).thenReturn(future);
+// Schedule the loading.
+runtime.scheduleLoadOperation(tp, 0);
+// Getting the context succeeds and the coordinator should be in 
loading.
+ctx = runtime.contextOrThrow(tp);
+assertEquals(LOADING, ctx.state);
+verify(runtimeMetrics, times(2)).recordPartitionStateChange(INITIAL, 
LOADING);
+
+// When the loading completes, the coordinator transitions to active.
+future.complete(null);
+assertEquals(ACTIVE, ctx.state);
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(LOADING, 
ACTIVE);
+
+runtime.close();
+verify(runtimeMetrics, times(1)).recordPartitionStateChange(FAILED, 
CLOSED);

Review Comment:
   So we should see this closed metric increment when we resign leadership as 
well? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-09 Thread via GitHub


jolshan commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1350608637


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+
+import java.util.function.Supplier;
+
+/**
+ * Used by the group and transaction coordinator runtimes, the metrics suite 
holds partition state gauges and sensors.
+ */
+public interface CoordinatorRuntimeMetrics extends AutoCloseable {
+
+/**
+ * Called when the partition state changes.
+ * @param oldState The old state.
+ * @param newState The new state to transition to.
+ */
+void recordPartitionStateChange(CoordinatorState oldState, 
CoordinatorState newState);
+
+/**
+ * Record the partition load metric.
+ * @param startTimeMs The partition load start time.
+ * @param endTimeMs   The partition load end time.
+ */
+void recordPartitionLoadSensor(long startTimeMs, long endTimeMs);
+
+/**
+ * Update the event queue time.
+ *
+ * @param durationMs The queue time.
+ */
+void recordEventQueueTime(long durationMs);
+
+/**
+ * Update the event queue processing time.
+ *
+ * @param durationMs The event processing time.
+ */
+void recordEventQueueProcessingTime(long durationMs);
+
+/**
+ * Record the thread idle ratio.
+ * @param ratio The idle ratio.
+ */
+void recordThreadIdleRatio(double ratio);
+
+/**
+ * Register the event queue size gauge.
+ *
+ * @param sizeSupplier The size supplier.
+ */
+void registerEventQueueSizeGauge(Supplier sizeSupplier);

Review Comment:
   Hmm -- it should just be when we construct the metrics object right? 
(referring to the register method)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14519; [1/N] Implement coordinator runtime metrics [kafka]

2023-10-09 Thread via GitHub


jolshan commented on code in PR #14417:
URL: https://github.com/apache/kafka/pull/14417#discussion_r1350606817


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+
+import java.util.function.Supplier;
+
+/**
+ * Used by the group and transaction coordinator runtimes, the metrics suite 
holds partition state gauges and sensors.
+ */
+public interface CoordinatorRuntimeMetrics extends AutoCloseable {
+/**
+ * Returns the metrics group.
+ */
+String metricsGroup();
+
+/**
+ * Called when the partition state changes.
+ * @param oldState The old state.
+ * @param newState The new state to transition to.
+ */
+void onPartitionStateChange(CoordinatorState oldState, CoordinatorState 
newState);
+
+/**
+ * Record the partition load metric.
+ * @param startTimeMs The partition load start time.
+ * @param endTimeMs   The partition load end time.
+ */
+void recordPartitionLoadSensor(long startTimeMs, long endTimeMs);
+
+/**
+ * Get the counter for partitions in Loading state.
+ * Only used for testing.
+ */
+long numPartitionsLoading();
+
+/**
+ * Get the counter for partitions in Active state.
+ * Only used for testing.
+ */
+long numPartitionsActive();
+
+/**
+ * Get the counter for partitions in Failed state.
+ * Only used for testing.
+ */
+long numPartitionsFailed();
+
+/**
+ * Update the event queue time.
+ *
+ * @param durationMs The queue time.
+ */
+void updateEventQueueTime(long durationMs);
+
+/**
+ * Update the event queue processing time.
+ *
+ * @param durationMs The event processing time.
+ */
+void updateEventQueueProcessingTime(long durationMs);
+
+/**
+ * Record the failed event.
+ */
+void recordFailedEvent();
+
+/**
+ * Record the successful event.
+ */
+void recordSuccessfulEvent();
+
+/**
+ * Record the thread idle ratio.
+ * @param ratio The idle ratio.
+ */
+void recordThreadIdleRatio(double ratio);

Review Comment:
   So we have an average an min for idle ratio. Is the minimum value just the 
lowest recorded at a given time frame. (And we don't have a way to identify 
which thread is the one with the lowest value)



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/CoordinatorRuntimeMetrics.java:
##
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.metrics;
+
+import 
org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime.CoordinatorState;
+
+import java.util.function.Supplier;
+
+/**
+ * Used by the group and transaction coordinator runtimes, the metrics suite 
holds partition state gauges and sensors.
+ */
+public interface CoordinatorRuntimeMetrics extends AutoCloseable {
+/**
+ * Returns the metrics group.
+ */
+String metricsGroup();
+
+/**
+ * Called when the partition state changes.
+ * @param oldState The old state.
+ * @param newState The new state to transition to.
+ */
+void onPartitionStateChange(CoordinatorState 

[jira] [Resolved] (KAFKA-15278) Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC

2023-10-09 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-15278.

Resolution: Fixed

> Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC
> 
>
> Key: KAFKA-15278
> URL: https://issues.apache.org/jira/browse/KAFKA-15278
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support, kip-848-e2e, kip-848-preview
>
> The necessary Java code that represents the {{ConsumerGroupHeartbeatRequest}} 
> and {{ConsumerGroupHeartbeatResponse}} are already present in the codebase. 
> It is assumed that the scaffolding for the other two will come along in time.
>  * Implement {{ConsumerGroupRequestManager}}
>  * Ensure that {{DefaultBackgroundThread}} correctly calculates I/O timeouts 
> so that the heartbeat occurs within the {{group.consumer.session.timeout.ms}} 
> interval regardless of other {{RequestManager}} instance activity
>  * Ensure error is handled correctly
>  * Ensure MembershipStateManager is updated on both successful and failures 
> cases, and the state machine is transioned to the correct state.
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15278) Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC

2023-10-09 Thread Philip Nee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773429#comment-17773429
 ] 

Philip Nee commented on KAFKA-15278:


The basic heartbeat is implemented.  We are missing the assignment 
reconciliation and heartbeat on close logic.

> Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC
> 
>
> Key: KAFKA-15278
> URL: https://issues.apache.org/jira/browse/KAFKA-15278
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support, kip-848-e2e, kip-848-preview
>
> The necessary Java code that represents the {{ConsumerGroupHeartbeatRequest}} 
> and {{ConsumerGroupHeartbeatResponse}} are already present in the codebase. 
> It is assumed that the scaffolding for the other two will come along in time.
>  * Implement {{ConsumerGroupRequestManager}}
>  * Ensure that {{DefaultBackgroundThread}} correctly calculates I/O timeouts 
> so that the heartbeat occurs within the {{group.consumer.session.timeout.ms}} 
> interval regardless of other {{RequestManager}} instance activity
>  * Ensure error is handled correctly
>  * Ensure MembershipStateManager is updated on both successful and failures 
> cases, and the state machine is transioned to the correct state.
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15278) Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC

2023-10-09 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-15278:
---
Description: 
The necessary Java code that represents the {{ConsumerGroupHeartbeatRequest}} 
and {{ConsumerGroupHeartbeatResponse}} are already present in the codebase. It 
is assumed that the scaffolding for the other two will come along in time.
 * Implement {{ConsumerGroupRequestManager}}
 * Ensure that {{DefaultBackgroundThread}} correctly calculates I/O timeouts so 
that the heartbeat occurs within the {{group.consumer.session.timeout.ms}} 
interval regardless of other {{RequestManager}} instance activity
 * Ensure error is handled correctly
 * Ensure MembershipStateManager is updated on both successful and failures 
cases, and the state machine is transioned to the correct state.

This task is part of the work to implement support for the new KIP-848 consumer 
group protocol.

  was:
The necessary Java code that represents the {{ConsumerGroupHeartbeatRequest}} 
and {{ConsumerGroupHeartbeatResponse}} are already present in the codebase. It 
is assumed that the scaffolding for the other two will come along in time.
 * Implement {{ConsumerGroupRequestManager}}
 * Ensure that {{DefaultBackgroundThread}} correctly calculates I/O timeouts so 
that the heartbeat occurs within the {{group.consumer.session.timeout.ms}} 
interval regardless of other {{RequestManager}} instance activity
 * Ensure error is handled correctly

This task is part of the work to implement support for the new KIP-848 consumer 
group protocol.


> Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC
> 
>
> Key: KAFKA-15278
> URL: https://issues.apache.org/jira/browse/KAFKA-15278
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support, kip-848-e2e, kip-848-preview
>
> The necessary Java code that represents the {{ConsumerGroupHeartbeatRequest}} 
> and {{ConsumerGroupHeartbeatResponse}} are already present in the codebase. 
> It is assumed that the scaffolding for the other two will come along in time.
>  * Implement {{ConsumerGroupRequestManager}}
>  * Ensure that {{DefaultBackgroundThread}} correctly calculates I/O timeouts 
> so that the heartbeat occurs within the {{group.consumer.session.timeout.ms}} 
> interval regardless of other {{RequestManager}} instance activity
>  * Ensure error is handled correctly
>  * Ensure MembershipStateManager is updated on both successful and failures 
> cases, and the state machine is transioned to the correct state.
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15278) Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC

2023-10-09 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee reassigned KAFKA-15278:
--

Assignee: Philip Nee

> Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC
> 
>
> Key: KAFKA-15278
> URL: https://issues.apache.org/jira/browse/KAFKA-15278
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Philip Nee
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support, kip-848-e2e, kip-848-preview
>
> The necessary Java code that represents the {{ConsumerGroupHeartbeatRequest}} 
> and {{ConsumerGroupHeartbeatResponse}} are already present in the codebase. 
> It is assumed that the scaffolding for the other two will come along in time.
>  * Implement {{ConsumerGroupRequestManager}}
>  * Ensure that {{DefaultBackgroundThread}} correctly calculates I/O timeouts 
> so that the heartbeat occurs within the {{group.consumer.session.timeout.ms}} 
> interval regardless of other {{RequestManager}} instance activity
>  * Ensure error is handled correctly
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15278) Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC

2023-10-09 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-15278:
---
Description: 
The necessary Java code that represents the {{ConsumerGroupHeartbeatRequest}} 
and {{ConsumerGroupHeartbeatResponse}} are already present in the codebase. It 
is assumed that the scaffolding for the other two will come along in time.
 * Implement {{ConsumerGroupRequestManager}}
 * Ensure that {{DefaultBackgroundThread}} correctly calculates I/O timeouts so 
that the heartbeat occurs within the {{group.consumer.session.timeout.ms}} 
interval regardless of other {{RequestManager}} instance activity
 * Ensure error is handled correctly

This task is part of the work to implement support for the new KIP-848 consumer 
group protocol.

  was:
The protocol introduces three new RPCs that the client uses to communicate with 
the broker:
 # 
[ConsumerGroupHeartbeat|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupHeartbeatAPI]

The necessary Java code that represents the {{ConsumerGroupHeartbeatRequest}} 
and {{ConsumerGroupHeartbeatResponse}} are already present in the codebase. It 
is assumed that the scaffolding for the other two will come along in time.
 * Implement {{ConsumerGroupRequestManager}}
 * Ensure that {{DefaultBackgroundThread}} correctly calculates I/O timeouts so 
that the heartbeat occurs within the {{group.consumer.session.timeout.ms}} 
interval regardless of other {{RequestManager}} instance activity
 * Ensure error is handled correctly

This task is part of the work to implement support for the new KIP-848 consumer 
group protocol.


> Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC
> 
>
> Key: KAFKA-15278
> URL: https://issues.apache.org/jira/browse/KAFKA-15278
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support, kip-848-e2e, kip-848-preview
>
> The necessary Java code that represents the {{ConsumerGroupHeartbeatRequest}} 
> and {{ConsumerGroupHeartbeatResponse}} are already present in the codebase. 
> It is assumed that the scaffolding for the other two will come along in time.
>  * Implement {{ConsumerGroupRequestManager}}
>  * Ensure that {{DefaultBackgroundThread}} correctly calculates I/O timeouts 
> so that the heartbeat occurs within the {{group.consumer.session.timeout.ms}} 
> interval regardless of other {{RequestManager}} instance activity
>  * Ensure error is handled correctly
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15278) Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC

2023-10-09 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-15278:
---
Description: 
The protocol introduces three new RPCs that the client uses to communicate with 
the broker:
 # 
[ConsumerGroupHeartbeat|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupHeartbeatAPI]

The necessary Java code that represents the {{ConsumerGroupHeartbeatRequest}} 
and {{ConsumerGroupHeartbeatResponse}} are already present in the codebase. It 
is assumed that the scaffolding for the other two will come along in time.
 * Implement {{ConsumerGroupRequestManager}}
 * Ensure that {{DefaultBackgroundThread}} correctly calculates I/O timeouts so 
that the heartbeat occurs within the {{group.consumer.session.timeout.ms}} 
interval regardless of other {{RequestManager}} instance activity
 * Ensure error is handled correctly

This task is part of the work to implement support for the new KIP-848 consumer 
group protocol.

  was:
The protocol introduces three new RPCs that the client uses to communicate with 
the broker:
 # 
[ConsumerGroupHeartbeat|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupHeartbeatAPI]

 # 
[ConsumerGroupPrepareAssignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupPrepareAssignmentAPI]

 # 
[ConsumerGroupInstallAssignment|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupInstallAssignmentAPI]

The necessary Java code that represents the {{ConsumerGroupHeartbeatRequest}} 
and {{ConsumerGroupHeartbeatResponse}} are already present in the codebase. It 
is assumed that the scaffolding for the other two will come along in time.
 * Implement {{ConsumerGroupRequestManager}}
 * Ensure that {{DefaultBackgroundThread}} correctly calculates I/O timeouts so 
that the heartbeat occurs within the {{group.consumer.session.timeout.ms}} 
interval regardless of other {{RequestManager}} instance activity

This task is part of the work to implement support for the new KIP-848 consumer 
group protocol.


> Implement client support for KIP-848 ConsumerGroupHeartbeat protocol RPC
> 
>
> Key: KAFKA-15278
> URL: https://issues.apache.org/jira/browse/KAFKA-15278
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support, kip-848-e2e, kip-848-preview
>
> The protocol introduces three new RPCs that the client uses to communicate 
> with the broker:
>  # 
> [ConsumerGroupHeartbeat|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-ConsumerGroupHeartbeatAPI]
> The necessary Java code that represents the {{ConsumerGroupHeartbeatRequest}} 
> and {{ConsumerGroupHeartbeatResponse}} are already present in the codebase. 
> It is assumed that the scaffolding for the other two will come along in time.
>  * Implement {{ConsumerGroupRequestManager}}
>  * Ensure that {{DefaultBackgroundThread}} correctly calculates I/O timeouts 
> so that the heartbeat occurs within the {{group.consumer.session.timeout.ms}} 
> interval regardless of other {{RequestManager}} instance activity
>  * Ensure error is handled correctly
> This task is part of the work to implement support for the new KIP-848 
> consumer group protocol.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15452: Access SslPrincipalMapper and kerberosShortNamer in Custom KafkaPrincipalBuilder(KIP-982) [kafka]

2023-10-09 Thread via GitHub


rbaddam commented on code in PR #14491:
URL: https://github.com/apache/kafka/pull/14491#discussion_r1350588084


##
clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java:
##
@@ -240,12 +242,19 @@ public static KafkaPrincipalBuilder 
createPrincipalBuilder(Map config

KerberosShortNamer kerberosShortNamer,

SslPrincipalMapper sslPrincipalMapper) {
 Class principalBuilderClass = (Class) 
configs.get(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG);
-final KafkaPrincipalBuilder builder;
+KafkaPrincipalBuilder builder;
 
 if (principalBuilderClass == null || principalBuilderClass == 
DefaultKafkaPrincipalBuilder.class) {
 builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer, 
sslPrincipalMapper);
 } else if 
(KafkaPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
-builder = (KafkaPrincipalBuilder) 
Utils.newInstance(principalBuilderClass);
+try {
+Constructor constructor = 
principalBuilderClass.getConstructor(KerberosShortNamer.class, 
SslPrincipalMapper.class);
+builder = (KafkaPrincipalBuilder) 
constructor.newInstance(kerberosShortNamer, sslPrincipalMapper);

Review Comment:
   Thanks @plazma-prizma for the feedback,
   
   The above logic will be used whenever user want to config a custom 
KafkaPrincipalBuilder by extending the existing KafkaPrincipalBuilder.
   
   The goal is to pass already created kerberosShortNamer and 
sslPrincipalMapper so that User don't miss any functionalities like 
SSL_PRINCIPAL_MAPPING_RULES_CONFIG when they use custom(own) 
KafkaPrincipalBuilder.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15546) Transactions tool duration field confusing for completed transactions

2023-10-09 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773425#comment-17773425
 ] 

Justine Olshan commented on KAFKA-15546:


Hey Giovanni. The API does not currently support sending two timestamps, so 
this change would require a KIP to update the API. I don't think we store a txn 
commit time in the metadata either. I can look into whether the last updated 
time maps to this, but if it does not, I'm not sure if it makes sense to add 
another field in the state for that.

See current API: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/DescribeTransactionsResponse.json

> Transactions tool duration field confusing for completed transactions
> -
>
> Key: KAFKA-15546
> URL: https://issues.apache.org/jira/browse/KAFKA-15546
> Project: Kafka
>  Issue Type: Task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> When using the transactions tool to describe transactions, if the transaction 
> is completed, its duration will still increase based on when it started. This 
> value is not correct. Instead, we can leave the duration field blank (since 
> we don't have the data for the completed transaction in the describe 
> response).
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15468: Prevent transaction coordinator reloads on already loaded leaders [kafka]

2023-10-09 Thread via GitHub


jolshan commented on code in PR #14489:
URL: https://github.com/apache/kafka/pull/14489#discussion_r1350579091


##
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala:
##
@@ -515,7 +521,10 @@ class TransactionStateManager(brokerId: Int,
* metadata cache with the transactional ids. This operation must be 
resilient to any partial state left off from
* the previous loading / unloading operation.
*/
-  def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: 
Int, sendTxnMarkers: SendTxnMarkersCallback): Unit = {
+  def loadTransactionsForTxnTopicPartition(partitionId: Int,
+   coordinatorEpoch: Int,
+   sendTxnMarkers: 
SendTxnMarkersCallback,
+   hadTransactionStateAlreadyLoaded: 
Boolean): Unit = {

Review Comment:
   yeah. Something like that. The key is that it is currently loaded when we 
make the call. Because it's ok to load in the past and unload. We want to make 
sure we clarify that is is loaded and running. Maybe even like 
transactionCoordinatorAlreadyRunning.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14596: Move TopicCommand to tools [kafka]

2023-10-09 Thread via GitHub


mimaison commented on PR #13201:
URL: https://github.com/apache/kafka/pull/13201#issuecomment-1753360551

   @OmniaGM Thanks for the update. I've not had time to take a look yet but 
noticed there's a compilation failure:
   ```
   > Task :tools:compileTestJava
   
   
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-13201/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java:78:
 error: variable TEST_WITH_PARAMETERIZED_QUORUM_NAME is already defined in 
class ToolsTestUtils
   
   public static final String TEST_WITH_PARAMETERIZED_QUORUM_NAME = 
"{displayName}.quorum={0}";
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15468: Prevent transaction coordinator reloads on already loaded leaders [kafka]

2023-10-09 Thread via GitHub


clolov commented on code in PR #14489:
URL: https://github.com/apache/kafka/pull/14489#discussion_r1350537983


##
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala:
##
@@ -515,7 +521,10 @@ class TransactionStateManager(brokerId: Int,
* metadata cache with the transactional ids. This operation must be 
resilient to any partial state left off from
* the previous loading / unloading operation.
*/
-  def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: 
Int, sendTxnMarkers: SendTxnMarkersCallback): Unit = {
+  def loadTransactionsForTxnTopicPartition(partitionId: Int,
+   coordinatorEpoch: Int,
+   sendTxnMarkers: 
SendTxnMarkersCallback,
+   hadTransactionStateAlreadyLoaded: 
Boolean): Unit = {

Review Comment:
   Ohhh, I see, maybe hasTransactionStateBeenLoadedBefore?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15521: Refactor build.gradle to align gradle swagger plugin with swagger dependencies [kafka]

2023-10-09 Thread via GitHub


ijuma commented on code in PR #14473:
URL: https://github.com/apache/kafka/pull/14473#discussion_r1350536831


##
gradle.properties:
##
@@ -25,6 +25,9 @@ group=org.apache.kafka
 #  - streams/quickstart/java/pom.xml
 version=3.7.0-SNAPSHOT
 scalaVersion=2.13.12
+# Adding swaggerVersion in gradle.properties to have a single version in place 
for swagger
+# New version of Swagger 2.2.14 requires minimum JDK 11.
+swaggerVersion=2.2.8

Review Comment:
   Why didn't we keep this in `dependencies.gradle`? It's confusing to have 
versions split across multiple files (outside of the Scala version, which is 
special in a few ways).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15521: Refactor build.gradle to align gradle swagger plugin with swagger dependencies [kafka]

2023-10-09 Thread via GitHub


ijuma commented on code in PR #14473:
URL: https://github.com/apache/kafka/pull/14473#discussion_r1350536831


##
gradle.properties:
##
@@ -25,6 +25,9 @@ group=org.apache.kafka
 #  - streams/quickstart/java/pom.xml
 version=3.7.0-SNAPSHOT
 scalaVersion=2.13.12
+# Adding swaggerVersion in gradle.properties to have a single version in place 
for swagger
+# New version of Swagger 2.2.14 requires minimum JDK 11.
+swaggerVersion=2.2.8

Review Comment:
   Why didn't we keep this in `dependencies.gradle`? It's confusing to have a 
separate file for versions (outside of the Scala version, which is special in a 
few ways).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-09 Thread via GitHub


jolshan commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1350511364


##
clients/src/main/resources/common/message/ProduceResponse.json:
##
@@ -32,7 +32,9 @@
   // records that cause the whole batch to be dropped.  See KIP-467 for 
details.
   //
   // Version 9 enables flexible versions.
-  "validVersions": "0-9",
+  //
+  // Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields

Review Comment:
   Are we in agreement then that we should always bump the tagged field then? 
Unless we explicitly plan to backport the changes (ie what Jeff did for the 
consumer group changes?)
   
   I had some folks ask me what the correct protocol is, and it would be nice 
to give consistent answers  



##
clients/src/main/resources/common/message/ProduceResponse.json:
##
@@ -32,7 +32,9 @@
   // records that cause the whole batch to be dropped.  See KIP-467 for 
details.
   //
   // Version 9 enables flexible versions.
-  "validVersions": "0-9",
+  //
+  // Version 10 adds 'CurrentLeader' and 'NodeEndpoints' as tagged fields

Review Comment:
   Are we in agreement then that we should always bump the tagged field then? 
Unless we explicitly plan to backport the changes (ie what Jeff did for the 
consumer group changes?)
   
   I had some folks ask me what the correct protocol is, and it would be nice 
to give consistent answers  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15325) Integrate topicId in OffsetFetch and OffsetCommit async consumer calls

2023-10-09 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-15325:
---
Labels: kip-848 kip-848-client-support  (was: kip-848 
kip-848-client-support kip-848-preview)

> Integrate topicId in OffsetFetch and OffsetCommit async consumer calls
> --
>
> Key: KAFKA-15325
> URL: https://issues.apache.org/jira/browse/KAFKA-15325
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lianet Magrans
>Priority: Major
>  Labels: kip-848, kip-848-client-support
>
> KIP-848 introduces support for topicIds in the OffsetFetch and OffsetCommit 
> APIs. The consumer calls to those APIs should be updated to include topicIds 
> when available.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15468: Prevent transaction coordinator reloads on already loaded leaders [kafka]

2023-10-09 Thread via GitHub


jolshan commented on code in PR #14489:
URL: https://github.com/apache/kafka/pull/14489#discussion_r1350505733


##
metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java:
##
@@ -26,19 +26,22 @@
 
 public final class LocalReplicaChanges {
 private final Set deletes;
-private final Map leaders;
+private final Map electedLeaders;
+private final Map updatedLeaders;

Review Comment:
   I struggled with naming this quite a bit  . I was also wondering if I 
should make it clearer abou leader epoch changes vs partition epoch changes. 
The thing that is tricky is that the map only contains the leaders that 
experienced the changes (not followers) so I also wanted to make that clear. I 
will also think on that some more.



##
metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java:
##
@@ -26,19 +26,22 @@
 
 public final class LocalReplicaChanges {
 private final Set deletes;
-private final Map leaders;
+private final Map electedLeaders;
+private final Map updatedLeaders;

Review Comment:
   I struggled with naming this quite a bit  . I was also wondering if I 
should make it clearer abou leader epoch changes vs partition epoch changes. 
The thing that is tricky is that the map only contains the leaders that 
experienced the changes (not followers) so I also wanted to make that clear. I 
will also think on that some more.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15468: Prevent transaction coordinator reloads on already loaded leaders [kafka]

2023-10-09 Thread via GitHub


jolshan commented on code in PR #14489:
URL: https://github.com/apache/kafka/pull/14489#discussion_r1350503797


##
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala:
##
@@ -515,7 +521,10 @@ class TransactionStateManager(brokerId: Int,
* metadata cache with the transactional ids. This operation must be 
resilient to any partial state left off from
* the previous loading / unloading operation.
*/
-  def loadTransactionsForTxnTopicPartition(partitionId: Int, coordinatorEpoch: 
Int, sendTxnMarkers: SendTxnMarkersCallback): Unit = {
+  def loadTransactionsForTxnTopicPartition(partitionId: Int,
+   coordinatorEpoch: Int,
+   sendTxnMarkers: 
SendTxnMarkersCallback,
+   hadTransactionStateAlreadyLoaded: 
Boolean): Unit = {

Review Comment:
   I actually had it this way before, but we load in this method (and check 
afterwards) so I thought "had" conveyed that this happened before the method. I 
will think a bit more about this because there may be a better way to phrase.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add javadoc to all ConfigDef.Types values [kafka]

2023-10-09 Thread via GitHub


mimaison commented on code in PR #14515:
URL: https://github.com/apache/kafka/pull/14515#discussion_r1350502430


##
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java:
##
@@ -802,11 +802,63 @@ else if (value instanceof Class)
 }
 
 /**
- * The config types
+ * The type for a configuration value
  */
 public enum Type {
-BOOLEAN, STRING, INT, SHORT, LONG, DOUBLE, LIST, CLASS, PASSWORD;
+/**
+ * Used for boolean values. Values can be provided as a Boolean object 
or as a String with values
+ * true or false (this is not 
case-sensitive), otherwise a {@link ConfigException} is
+ * thrown.
+ */
+BOOLEAN,
+/**
+ * Used for string values. Values must be provided as a String object, 
otherwise a {@link ConfigException} is
+ * thrown.
+ */
+STRING,
+/**
+ * Used for numerical values within the Java Integer range. Values 
must be provided as a Integer object or as
+ * a String being a valid Integer value, otherwise a {@link 
ConfigException} is thrown.
+ */
+INT,
+/**
+ * Used for numerical values within the Java Short range. Values must 
be provided as a Short object or as
+ * a String being a valid Short value, otherwise a {@link 
ConfigException} is thrown.
+ */
+SHORT,
+/**
+ * Used for numerical values within the Java Long range. Values must 
be provided as a Long object, as an Integer
+ * object or as a String being a valid Long value, otherwise a {@link 
ConfigException} is thrown.
+ */
+LONG,
+/**
+ * Used for numerical values within the Java Double range. Values must 
be provided as a Number object, as a
+ * Double object or as a String being a valid Double value, otherwise 
a {@link ConfigException} is thrown.
+ */
+DOUBLE,
+/**
+ * Used for list values. Values must be provided as a List object, as 
a String object, otherwise a
+ * {@link ConfigException} is thrown. When the value is provided as a 
String it must use commas to separate the
+ * different entries (for example: first-entry, 
second-entry) and an empty String maps to an empty List.
+ */
+LIST,
+/**
+ * Used for values that implement a Kafka interface. Values must be 
provided as a Class object or as a
+ * String object, otherwise a {@link ConfigException} is thrown. When 
the value is provided as a String it must
+ * be the binary name of the Class.
+ */
+CLASS,
+/**
+ * Used for string values containing sensitive data such as a password 
or key. The values of configurations with
+ * of this type are not included in logs and instead replaced with 
"[hidden]". Values must be provided as a
+ * String object, otherwise a {@link ConfigException} is thrown.

Review Comment:
   Technically you can also provide a 
`org.apache.kafka.common.config.types.Password` object but `Password` is not 
part of the public API so I omit that option.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Add javadoc to all ConfigDef.Types values [kafka]

2023-10-09 Thread via GitHub


mimaison commented on code in PR #14515:
URL: https://github.com/apache/kafka/pull/14515#discussion_r1350501171


##
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java:
##
@@ -57,7 +57,7 @@
  * defs.define(config_with_default, Type.STRING, default 
string value, Importance.High, Configuration with default 
value.);
  * // check {@link #define(String, Type, Object, Validator, Importance, 
String)} for more details.
  * defs.define(config_with_validator, Type.INT, 42, 
Range.atLeast(0), Importance.High, Configuration with user provided 
validator.);
- * // check {@link #define(String, Type, Importance, String, String, int, 
Width, String, List)} for more details.
+ * // check {@link #define(String, Type, Importance, String, String, int, 
Width, String, List) define(String, Type, Importance, String, String, int, 
Width, String, ListString)} for more details.

Review Comment:
   This fixes warnings when generating javadoc on this file
   ```
   ConfigDef.java:81: warning - Tag @link:illegal character: "60" in 
"#define(String, Type, Importance, String, String, int, Width, String, 
List)"
   ConfigDef.java:81: warning - Tag @link:illegal character: "62" in 
"#define(String, Type, Importance, String, String, int, Width, String, 
List)"
   ConfigDef.java:81: warning - Tag @link: can't find define(String, Type, 
Importance, String, String, int, Width, String, List) in 
org.apache.kafka.common.config.ConfigDef
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Add javadoc to all ConfigDef.Types values [kafka]

2023-10-09 Thread via GitHub


mimaison opened a new pull request, #14515:
URL: https://github.com/apache/kafka/pull/14515

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15568) Use incrementalAlterConfigs to update the dynamic config of broker in ConfigCommand tool

2023-10-09 Thread Aman Singh (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15568?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aman Singh updated KAFKA-15568:
---
Description: 
As part of 
[this](https://cwiki.apache.org/confluence/display/KAFKA/KIP-339%3A+Create+a+new+IncrementalAlterConfigs+API)
 
 `incrementalAlterConfigs` api was introduced to change any config dynamically.

`kakfa-config.sh (CommandConfig)` still uses `alterConfig`  to update the 
config. It has below issue
- 


> Use incrementalAlterConfigs to update the dynamic config of broker in 
> ConfigCommand tool
> 
>
> Key: KAFKA-15568
> URL: https://issues.apache.org/jira/browse/KAFKA-15568
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Aman Singh
>Priority: Major
>
> As part of 
> [this](https://cwiki.apache.org/confluence/display/KAFKA/KIP-339%3A+Create+a+new+IncrementalAlterConfigs+API)
>  
>  `incrementalAlterConfigs` api was introduced to change any config 
> dynamically.
> `kakfa-config.sh (CommandConfig)` still uses `alterConfig`  to update the 
> config. It has below issue
> - 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-13988: Fix MM2 not consuming from latest when "auto.offset.reset=latest" is set [kafka]

2023-10-09 Thread via GitHub


electrical commented on PR #13905:
URL: https://github.com/apache/kafka/pull/13905#issuecomment-1753259175

   @C0urante the other PR is **https://github.com/apache/kafka/pull/12358**
   So far it seems it was reviewed and approved but no movement from that point 
on.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests [kafka]

2023-10-09 Thread via GitHub


dajac merged PR #14364:
URL: https://github.com/apache/kafka/pull/14364


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests [kafka]

2023-10-09 Thread via GitHub


dajac commented on PR #14364:
URL: https://github.com/apache/kafka/pull/14364#issuecomment-1753238153

   If we combine the last two builds, I am confident that the changes are good 
so I will merge it to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15568) Use incrementalAlterConfigs to update the dynamic config of broker in ConfigCommand tool

2023-10-09 Thread Aman Singh (Jira)
Aman Singh created KAFKA-15568:
--

 Summary: Use incrementalAlterConfigs to update the dynamic config 
of broker in ConfigCommand tool
 Key: KAFKA-15568
 URL: https://issues.apache.org/jira/browse/KAFKA-15568
 Project: Kafka
  Issue Type: Improvement
Reporter: Aman Singh






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14927) Dynamic configs not validated when using kafka-configs and --add-config-file

2023-10-09 Thread Aman Singh (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17773365#comment-17773365
 ] 

Aman Singh commented on KAFKA-14927:


Raised the PR to add validation https://github.com/apache/kafka/pull/14514 as 
suggested by [~cmccabe]

> Dynamic configs not validated when using kafka-configs and --add-config-file
> 
>
> Key: KAFKA-14927
> URL: https://issues.apache.org/jira/browse/KAFKA-14927
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 3.3.2
>Reporter: Justin Daines
>Assignee: José Armando García Sancio
>Priority: Minor
>  Labels: 4.0-blocker
>
> Using {{kafka-configs}} should validate dynamic configurations before 
> applying. It is possible to send a file with invalid configurations. 
> For example a file containing the following:
> {code:java}
> {
>   "routes": {
>     "crn:///kafka=*": {
>       "management": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events-denied"
>       },
>       "describe": {
>         "allowed": "",
>         "denied": "confluent-audit-log-events-denied"
>       },
>       "authentication": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events-denied-authn"
>       },
>       "authorize": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events-denied-authz"
>       },
>       "interbroker": {
>         "allowed": "",
>         "denied": ""
>       }
>     },
>     "crn:///kafka=*/group=*": {
>       "consume": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events"
>       }
>     },
>     "crn:///kafka=*/topic=*": {
>       "produce": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events"
>       },
>       "consume": {
>         "allowed": "confluent-audit-log-events_audit",
>         "denied": "confluent-audit-log-events"
>       }
>     }
>   },
>   "destinations": {
>     "topics": {
>       "confluent-audit-log-events": {
>         "retention_ms": 777600
>       },
>       "confluent-audit-log-events-denied": {
>         "retention_ms": 777600
>       },
>       "confluent-audit-log-events-denied-authn": {
>         "retention_ms": 777600
>       },
>       "confluent-audit-log-events-denied-authz": {
>         "retention_ms": 777600
>       },
>       "confluent-audit-log-events_audit": {
>         "retention_ms": 777600
>       }
>     }
>   },
>   "default_topics": {
>     "allowed": "confluent-audit-log-events_audit",
>     "denied": "confluent-audit-log-events"
>   },
>   "excluded_principals": [
>     "User:schemaregistryUser",
>     "User:ANONYMOUS",
>     "User:appSA",
>     "User:admin",
>     "User:connectAdmin",
>     "User:connectorSubmitter",
>     "User:connectorSA",
>     "User:schemaregistryUser",
>     "User:ksqlDBAdmin",
>     "User:ksqlDBUser",
>     "User:controlCenterAndKsqlDBServer",
>     "User:controlcenterAdmin",
>     "User:restAdmin",
>     "User:appSA",
>     "User:clientListen",
>     "User:superUser"
>   ]
> } {code}
> {code:java}
> kafka-configs --bootstrap-server $KAFKA_BOOTSTRAP --entity-type brokers 
> --entity-default --alter --add-config-file audit-log.json {code}
> Yields the following dynamic configs:
> {code:java}
> Default configs for brokers in the cluster are:
>   "destinations"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"destinations"=null}
>   "confluent-audit-log-events-denied-authn"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events-denied-authn"=null}
>   "routes"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"routes"=null}
>   "User=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"User=null}
>   },=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:},=null}
>   "excluded_principals"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"excluded_principals"=null}
>   "confluent-audit-log-events_audit"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"confluent-audit-log-events_audit"=null}
>   "authorize"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"authorize"=null}
>   "default_topics"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"default_topics"=null}
>   "topics"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"topics"=null}
>   ]=null sensitive=true synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:]=null}
>   "interbroker"=null sensitive=true 
> synonyms={DYNAMIC_DEFAULT_BROKER_CONFIG:"interbroker"=null}
>   "produce"=null sensitive=true 
> 

[PR] KAFKA-14927: Add validation to be config keys in ConfigCommand tool [kafka]

2023-10-09 Thread via GitHub


singhnama opened a new pull request, #14514:
URL: https://github.com/apache/kafka/pull/14514

   Added validation in ConfigCommand tool, only allow characters 
'([a-z][A-Z][0-9][._-])*' for config keys.
   Jira: https://issues.apache.org/jira/browse/KAFKA-14927 (Contains more 
details about why it's required)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15567 Fix benchmark code to work [kafka]

2023-10-09 Thread via GitHub


ocadaruma opened a new pull request, #14513:
URL: https://github.com/apache/kafka/pull/14513

   ## Summary
   
   - Currently, ReplicaFetcherThreadBenchmark doesn't run due to 2 reasons:
 * NPE that is introduced by 
https://github.com/apache/kafka/commit/755e04a41dfd00dd4587b0fe0980befd0ae5c433
 - ZkMetadataCache instantiation causes NPE because we pass `null` for 
`kraftControllerNodes` but it should be `Seq.empty`
 * NotLeaderOrFollowerException that is introduced by 
https://github.com/apache/kafka/commit/3467036e017adc3ac0919bbc0c067b1bb1b621f3
 - Even we address above NPE issue, we still get 
NotLeaderOrFollowerException
 - ```
   org.apache.kafka.common.errors.NotLeaderOrFollowerException: Error 
while fetching partition state for topic-30
   ```
 - This is because replicaManager doesn't host partitions when 
RemoteLeaderEndPoint tries to build fetches
   - This PR addresses these problems
   - NPE issue on ZkMetadataCache instantiation also occurs in other several 
benchmarks. This PR also fix these benchmarks.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13988: Fix MM2 not consuming from latest when "auto.offset.reset=latest" is set [kafka]

2023-10-09 Thread via GitHub


C0urante commented on PR #13905:
URL: https://github.com/apache/kafka/pull/13905#issuecomment-1753124395

   @electrical can you link to the second PR? I've reviewed this one and am 
waiting on the author. If a second author is more responsive, we may be able to 
fix this issue sooner.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-13988: Fix MM2 not consuming from latest when "auto.offset.reset=latest" is set [kafka]

2023-10-09 Thread via GitHub


electrical commented on PR #13905:
URL: https://github.com/apache/kafka/pull/13905#issuecomment-1753118374

   Hi,
   
   Seems there are now 2 PR's open to solve the same issue but both are not 
moving forward at all and is blocking a lot of people of moving forward to 
migrate to MM2. 
   I really hope someone will push this over the finish line.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15514) Controller-side replica management changes

2023-10-09 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez updated KAFKA-15514:

Description: 
The new "Assignments" field replaces the "Replicas" field in PartitionRecord 
and PartitionChangeRecord.

Any changes to partitions need to consider both fields.
 * ISR updates
 * Partiton reassignments & reverts
 * Partition creation

  was:
The new "Assignments" field replaces the "Replicas" field in PartitionRecord 
and PartitionChangeRecord.

On the controller side, any changes to partitions need to consider both fields.
 * ISR updates
 * Partiton reassignments & reverts
 * Partition creation


> Controller-side replica management changes
> --
>
> Key: KAFKA-15514
> URL: https://issues.apache.org/jira/browse/KAFKA-15514
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Igor Soarez
>Priority: Major
>
> The new "Assignments" field replaces the "Replicas" field in PartitionRecord 
> and PartitionChangeRecord.
> Any changes to partitions need to consider both fields.
>  * ISR updates
>  * Partiton reassignments & reverts
>  * Partition creation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15514) Replica management changes

2023-10-09 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez updated KAFKA-15514:

Summary: Replica management changes  (was: Controller-side replica 
management changes)

> Replica management changes
> --
>
> Key: KAFKA-15514
> URL: https://issues.apache.org/jira/browse/KAFKA-15514
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Igor Soarez
>Priority: Major
>
> The new "Assignments" field replaces the "Replicas" field in PartitionRecord 
> and PartitionChangeRecord.
> Any changes to partitions need to consider both fields.
>  * ISR updates
>  * Partiton reassignments & reverts
>  * Partition creation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15514) Controller-side replica management changes

2023-10-09 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez updated KAFKA-15514:

Summary: Controller-side replica management changes  (was: Replica 
management changes)

> Controller-side replica management changes
> --
>
> Key: KAFKA-15514
> URL: https://issues.apache.org/jira/browse/KAFKA-15514
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Igor Soarez
>Priority: Major
>
> The new "Assignments" field replaces the "Replicas" field in PartitionRecord 
> and PartitionChangeRecord.
> Any changes to partitions need to consider both fields.
>  * ISR updates
>  * Partiton reassignments & reverts
>  * Partition creation



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15567) ReplicaFetcherThreadBenchmark is not working

2023-10-09 Thread Haruki Okada (Jira)
Haruki Okada created KAFKA-15567:


 Summary: ReplicaFetcherThreadBenchmark is not working
 Key: KAFKA-15567
 URL: https://issues.apache.org/jira/browse/KAFKA-15567
 Project: Kafka
  Issue Type: Improvement
Reporter: Haruki Okada
Assignee: Haruki Okada


* ReplicaFetcherThreadBenchmark is not working as of current trunk 
(https://github.com/apache/kafka/tree/c223a9c3761f796468ccfdae9e177e764ab6a965)

 
{code:java}
% jmh-benchmarks/jmh.sh ReplicaFetcherThreadBenchmark
(snip)
java.lang.NullPointerException
    at kafka.server.metadata.ZkMetadataCache.(ZkMetadataCache.scala:89)
    at kafka.server.MetadataCache.zkMetadataCache(MetadataCache.scala:120)
    at 
org.apache.kafka.jmh.fetcher.ReplicaFetcherThreadBenchmark.setup(ReplicaFetcherThreadBenchmark.java:220)
    at 
org.apache.kafka.jmh.fetcher.jmh_generated.ReplicaFetcherThreadBenchmark_testFetcher_jmhTest._jmh_tryInit_f_replicafetcherthreadbenchmark0_G(ReplicaFetcherThreadBenchmark_testFetcher_jmhTest.java:448)
    at 
org.apache.kafka.jmh.fetcher.jmh_generated.ReplicaFetcherThreadBenchmark_testFetcher_jmhTest.testFetcher_AverageTime(ReplicaFetcherThreadBenchmark_testFetcher_jmhTest.java:164)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
    at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at 
org.openjdk.jmh.runner.BenchmarkHandler$BenchmarkTask.call(BenchmarkHandler.java:527)
    at 
org.openjdk.jmh.runner.BenchmarkHandler$BenchmarkTask.call(BenchmarkHandler.java:504)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829) {code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15468: Prevent transaction coordinator reloads on already loaded leaders [kafka]

2023-10-09 Thread via GitHub


clolov commented on code in PR #14489:
URL: https://github.com/apache/kafka/pull/14489#discussion_r1350305894


##
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala:
##
@@ -392,6 +392,12 @@ class TransactionStateManager(brokerId: Int,
 }
   }
 
+  private[transaction] def hasTxnStateLoaded(partitionId: Int): Boolean = {
+inReadLock(stateLock) {
+  !transactionMetadataCache.get(partitionId).isEmpty

Review Comment:
   ```suggestion
 transactionMetadataCache.contains(partitionId)
   ```



##
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala:
##
@@ -524,23 +533,35 @@ class TransactionStateManager(brokerId: Int,
 }
 
 def loadTransactions(startTimeMs: java.lang.Long): Unit = {
-  val schedulerTimeMs = time.milliseconds() - startTimeMs
-  info(s"Loading transaction metadata from $topicPartition at epoch 
$coordinatorEpoch")
-  validateTransactionTopicPartitionCountIsStable()
-
-  val loadedTransactions = loadTransactionMetadata(topicPartition, 
coordinatorEpoch)
-  val endTimeMs = time.milliseconds()
-  val totalLoadingTimeMs = endTimeMs - startTimeMs
-  partitionLoadSensor.record(totalLoadingTimeMs.toDouble, endTimeMs, false)
-  info(s"Finished loading ${loadedTransactions.size} transaction metadata 
from $topicPartition in " +
-s"$totalLoadingTimeMs milliseconds, of which $schedulerTimeMs 
milliseconds was spent in the scheduler.")
+  val maybeLoadedTransactions =
+if (!hadTransactionStateAlreadyLoaded) {

Review Comment:
   ```suggestion
   if (!hasTransactionStateAlreadyLoaded) {
   ```



##
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala:
##
@@ -875,18 +875,35 @@ class TransactionStateManagerTest {
 val startOffset = 0L
 val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, 
txnRecords.toArray: _*)
 
+val transactionsWithPendingMarkers = new ConcurrentHashMap[String, 
PendingCompleteTxn]
+def sendMarkers(coordinatorEpoch: Int,
+txnResult: TransactionResult,
+txnMetadata: TransactionMetadata,
+newMetadata: TxnTransitMetadata): Unit = {
+  val transactionalId = txnMetadata.transactionalId
+  val pendingCompleteTxn = PendingCompleteTxn(
+transactionalId,
+coordinatorEpoch,
+txnMetadata,
+newMetadata)
+
+  transactionsWithPendingMarkers.put(transactionalId, pendingCompleteTxn)
+}
+
 prepareTxnLog(topicPartition, 0, records)
 
 // immigrate partition at epoch 0
-transactionManager.loadTransactionsForTxnTopicPartition(partitionId, 
coordinatorEpoch = 0, (_, _, _, _) => ())
+transactionManager.loadTransactionsForTxnTopicPartition(partitionId, 
coordinatorEpoch = 0, sendMarkers, false)

Review Comment:
   Since this is a Scala test class can you use named arguments to improve 
readability similar to the argument `coordinatorEpoch = 0`?



##
metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java:
##
@@ -218,7 +218,11 @@ public void testBasicLocalChanges() {
 );
 assertEquals(
 new HashSet<>(Arrays.asList(new TopicPartition("baz", 0))),
-changes.leaders().keySet()
+changes.electedLeaders().keySet()
+);
+assertEquals(
+new HashSet<>(Arrays.asList(new TopicPartition("baz", 0))),

Review Comment:
   ```suggestion
   new HashSet<>(Arrays.asList(new TopicPartition("baz", 0))),
   ```



##
metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java:
##
@@ -269,14 +273,49 @@ public void testDeleteAfterChanges() {
 
 LocalReplicaChanges changes = delta.localChanges(localId);
 assertEquals(new HashSet<>(Arrays.asList(new TopicPartition("zoo", 
0))), changes.deletes());
-assertEquals(Collections.emptyMap(), changes.leaders());
+assertEquals(Collections.emptyMap(), changes.electedLeaders());
+assertEquals(Collections.emptyMap(), changes.updatedLeaders());
 assertEquals(Collections.emptyMap(), changes.followers());
 
 TopicsImage finalImage = delta.apply();
 List imageRecords = getImageRecords(image);
 imageRecords.addAll(topicRecords);
 testToImage(finalImage, Optional.of(imageRecords));
 }
+@Test

Review Comment:
   ```suggestion
   
   @Test
   ```



##
metadata/src/main/java/org/apache/kafka/image/LocalReplicaChanges.java:
##
@@ -26,19 +26,22 @@
 
 public final class LocalReplicaChanges {
 private final Set deletes;
-private final Map leaders;
+private final Map electedLeaders;
+private final Map updatedLeaders;

Review Comment:
   Does it make sense to change these names to tpToPartitionEpochs and 
tpToLeaderEpochs? I can anticipate this naming being 

Re: [PR] KAFKA-15507: Make AdminClient throw non-retriable exception for a new call while closing [kafka]

2023-10-09 Thread via GitHub


tinaselenge commented on PR #14455:
URL: https://github.com/apache/kafka/pull/14455#issuecomment-1753035679

   Thanks everyone for reviewing the PR. 
   @showuon, I have updated the failing tests. 2 other tests failed in the last 
build, but they seem unrelated and passed when running locally. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-09 Thread via GitHub


dajac commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1350279208


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationCondition.java:
##
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group;
+
+/**
+ * An offset is considered expired based on different factors, such as the 
state of the group
+ * and/or the GroupMetadata record version (for generic groups). This class is 
used to check
+ * how offsets for the group should be expired.
+ */
+public interface OffsetExpirationCondition {
+
+/**
+ * Given an offset metadata and offsets retention, return whether the 
offset is expired or not.
+ *
+ * @param offset   The offset metadata.
+ * @param currentTimestamp The current timestamp.
+ * @param offsetsRetentionMs   The offset retention.
+ *
+ * @return Whether the offset is considered expired or not.
+ */
+boolean isOffsetExpired(OffsetAndMetadata offset, long currentTimestamp, 
long offsetsRetentionMs);
+}

Review Comment:
   This was closed but not addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Only commit running active and standby tasks when tasks corrupted [kafka]

2023-10-09 Thread via GitHub


cadonna commented on code in PR #14508:
URL: https://github.com/apache/kafka/pull/14508#discussion_r1350271668


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -223,10 +223,7 @@ boolean handleCorruption(final Set corruptedTasks) 
{
 final Collection tasksToCommit = allTasks()
 .values()
 .stream()
-// TODO: once we remove state restoration from the stream 
thread, we can also remove
-//  the RESTORING state here, since there will not be any 
restoring tasks managed
-//  by the stream thread anymore.
-.filter(t -> t.state() == Task.State.RUNNING || t.state() == 
Task.State.RESTORING)
+.filter(t -> t.state() == Task.State.RUNNING)

Review Comment:
   I do not think that this is needed. Don't you agree that restoring active 
tasks do not need to be committed -- with or without state updater.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-09 Thread via GitHub


dajac commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1350261943


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ * @param offsetsRetentionMs The offset retention in milliseconds.
+ *
+ * @return The group id if the group no longer has any offsets remaining, 
empty otherwise.
+ */
+public Optional cleanupExpiredOffsets(String groupId, List 
records, long offsetsRetentionMs) {
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return Optional.of(groupId);
+}
+try {
+Group group = groupMetadataManager.group(groupId);
+ExpirationCondition expirationCondition = 
group.expirationCondition();
+Set expiredPartitions = new HashSet<>();
+long currentTimestamp = time.milliseconds();
+AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+offsetsByTopic.forEach((topic, partitions) -> {
+if (!expirationCondition.subscribedTopics.contains(topic)) {

Review Comment:
   I am also confused by the fact that you did the change. Did you do it 
because it is correct or by mistake?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14987; Implement Group/Offset expiration in the new coordinator [kafka]

2023-10-09 Thread via GitHub


dajac commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1350261109


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
 .setTopics(topicResponses);
 }
 
+/**
+ * Remove expired offsets for group.
+ *
+ * @param groupId The group id.
+ * @param records The list of records to populate with offset commit 
tombstone records.
+ * @param offsetsRetentionMs The offset retention in milliseconds.
+ *
+ * @return The group id if the group no longer has any offsets remaining, 
empty otherwise.
+ */
+public Optional cleanupExpiredOffsets(String groupId, List 
records, long offsetsRetentionMs) {
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+if (offsetsByTopic == null) {
+return Optional.of(groupId);
+}
+try {
+Group group = groupMetadataManager.group(groupId);
+ExpirationCondition expirationCondition = 
group.expirationCondition();
+Set expiredPartitions = new HashSet<>();
+long currentTimestamp = time.milliseconds();
+AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true);
+offsetsByTopic.forEach((topic, partitions) -> {
+if (!expirationCondition.subscribedTopics.contains(topic)) {

Review Comment:
   Hum.. I am not sure to follow. In this case, isn't `subscribedTopics` going 
to be set to an optional containing an empty list?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15452: Access SslPrincipalMapper and kerberosShortNamer in Custom KafkaPrincipalBuilder(KIP-982) [kafka]

2023-10-09 Thread via GitHub


plazma-prizma commented on code in PR #14491:
URL: https://github.com/apache/kafka/pull/14491#discussion_r1350253419


##
clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java:
##
@@ -240,12 +242,19 @@ public static KafkaPrincipalBuilder 
createPrincipalBuilder(Map config

KerberosShortNamer kerberosShortNamer,

SslPrincipalMapper sslPrincipalMapper) {
 Class principalBuilderClass = (Class) 
configs.get(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG);
-final KafkaPrincipalBuilder builder;
+KafkaPrincipalBuilder builder;
 
 if (principalBuilderClass == null || principalBuilderClass == 
DefaultKafkaPrincipalBuilder.class) {
 builder = new DefaultKafkaPrincipalBuilder(kerberosShortNamer, 
sslPrincipalMapper);
 } else if 
(KafkaPrincipalBuilder.class.isAssignableFrom(principalBuilderClass)) {
-builder = (KafkaPrincipalBuilder) 
Utils.newInstance(principalBuilderClass);
+try {
+Constructor constructor = 
principalBuilderClass.getConstructor(KerberosShortNamer.class, 
SslPrincipalMapper.class);
+builder = (KafkaPrincipalBuilder) 
constructor.newInstance(kerberosShortNamer, sslPrincipalMapper);

Review Comment:
   This looks as if it is solving a problem for a certain Principal type. Not 
all of the principals have to use KerberosShortNamer, it is related if the 
credentials are provided by the Kerberos authentication.
   
   Why don't you define a subclass of KerberosPrincipalBuilder instead and 
provide these details only to that one?
   
   Moreover, you could even define SSLPrincipalBuilder as well.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] wip: investigate Flaky tests [kafka]

2023-10-09 Thread via GitHub


dengziming opened a new pull request, #14512:
URL: https://github.com/apache/kafka/pull/14512

   *More detailed description of your change*
   
   *Summary of testing strategy (including rationale)*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15169: Added TestCase in RemoteIndexCache [kafka]

2023-10-09 Thread via GitHub


iit2009060 commented on PR #14482:
URL: https://github.com/apache/kafka/pull/14482#issuecomment-1752910354

   Unrelated test failures 
   
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14482/14/tests
   @showuon  @divijvaidya  can we merge this changes ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >