[jira] [Commented] (KAFKA-10140) Incremental config api excludes plugin config changes

2024-01-23 Thread Deng Ziming (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17810243#comment-17810243
 ] 

Deng Ziming commented on KAFKA-10140:
-

This problem only affect append/subtract operations and will not affect 
add/delete operations, the following command is OK:
kafka-configs.sh --bootstrap-server localhost:9092 --alter --broker-defaults 
--add-config metrics.jmx.blicklist=kafka.controller
 

> Incremental config api excludes plugin config changes
> -
>
> Key: KAFKA-10140
> URL: https://issues.apache.org/jira/browse/KAFKA-10140
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Critical
>
> I was trying to alter the jmx metric filters using the incremental alter 
> config api and hit this error:
> {code:java}
> java.util.NoSuchElementException: key not found: metrics.jmx.blacklist
> at scala.collection.MapLike.default(MapLike.scala:235)
> at scala.collection.MapLike.default$(MapLike.scala:234)
> at scala.collection.AbstractMap.default(Map.scala:65)
> at scala.collection.MapLike.apply(MapLike.scala:144)
> at scala.collection.MapLike.apply$(MapLike.scala:143)
> at scala.collection.AbstractMap.apply(Map.scala:65)
> at kafka.server.AdminManager.listType$1(AdminManager.scala:681)
> at 
> kafka.server.AdminManager.$anonfun$prepareIncrementalConfigs$1(AdminManager.scala:693)
> at kafka.server.AdminManager.prepareIncrementalConfigs(AdminManager.scala:687)
> at 
> kafka.server.AdminManager.$anonfun$incrementalAlterConfigs$1(AdminManager.scala:618)
> at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273)
> at scala.collection.immutable.Map$Map1.foreach(Map.scala:154)
> at scala.collection.TraversableLike.map(TraversableLike.scala:273)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:266)
> at scala.collection.AbstractTraversable.map(Traversable.scala:108)
> at kafka.server.AdminManager.incrementalAlterConfigs(AdminManager.scala:589)
> at 
> kafka.server.KafkaApis.handleIncrementalAlterConfigsRequest(KafkaApis.scala:2698)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:188)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:78)
> at java.base/java.lang.Thread.run(Thread.java:834) {code}
> It looks like we are only allowing changes to the keys defined in 
> `KafkaConfig` through this API. This excludes config changes to any plugin 
> components such as `JmxReporter`.
> Note that I was able to use the regular `alterConfig` API to change this 
> config.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Delete unused classes [kafka]

2024-01-23 Thread via GitHub


ijuma merged PR #14797:
URL: https://github.com/apache/kafka/pull/14797


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15717: KRaft support in LeaderEpochIntegrationTest [kafka]

2024-01-23 Thread via GitHub


appchemist commented on PR #14815:
URL: https://github.com/apache/kafka/pull/14815#issuecomment-1907375242

   this is 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15717: KRaft support in LeaderEpochIntegrationTest [kafka]

2024-01-23 Thread via GitHub


appchemist closed pull request #14815: KAFKA-15717: KRaft support in 
LeaderEpochIntegrationTest
URL: https://github.com/apache/kafka/pull/14815


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR Remove unreachable if-else block in ReplicaManager.scala [kafka]

2024-01-23 Thread via GitHub


drawxy commented on PR #15220:
URL: https://github.com/apache/kafka/pull/15220#issuecomment-1907259848

   > @drawxy can you please fix the failing compilation. Otherwise looks good.
   
   Already fixed by rebasing the trunk branch. @divijvaidya, can you please 
help to merge this PR, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [8/8] Update offset delete paths [kafka]

2024-01-23 Thread via GitHub


rreddy-22 commented on code in PR #15221:
URL: https://github.com/apache/kafka/pull/15221#discussion_r1464202116


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -604,22 +602,23 @@ public CoordinatorResult deleteOffsets(
 )
 );
 } else {
-final TimelineHashMap 
offsetsByPartition = offsetsByTopic == null ?
-null : offsetsByTopic.get(topic.name());
-if (offsetsByPartition != null) {
-topic.partitions().forEach(partition -> {
-if 
(offsetsByPartition.containsKey(partition.partitionIndex())) {
-responsePartitionCollection.add(new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition()
-.setPartitionIndex(partition.partitionIndex())
-);
-
records.add(RecordHelpers.newOffsetCommitTombstoneRecord(
-request.groupId(),
-topic.name(),
-partition.partitionIndex()
-));
-}
-});
-}
+topic.partitions().forEach(partition -> {
+// We always add the partition to the response.
+responsePartitionCollection.add(new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+.setPartitionIndex(partition.partitionIndex())
+);
+
+// A tombstone is written if an offset in present is the 
main storage or

Review Comment:
   nit: offset is* present in* the main storage



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [8/8] Update offset delete paths [kafka]

2024-01-23 Thread via GitHub


jolshan commented on PR #15221:
URL: https://github.com/apache/kafka/pull/15221#issuecomment-1907178903

   Took a first pass. I will look again tomorrow. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [8/8] Update offset delete paths [kafka]

2024-01-23 Thread via GitHub


jolshan commented on code in PR #15221:
URL: https://github.com/apache/kafka/pull/15221#discussion_r1464178480


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -501,13 +521,11 @@ public void testOffsetDeleteWith(
 
 final 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
expectedResponsePartitionCollection =
 new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
-if (hasOffset(groupId, topic, partition)) {

Review Comment:
   Which one is the test where we verified we return a response even if the 
partition had no offset?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14505; [8/8] Update offset delete paths [kafka]

2024-01-23 Thread via GitHub


jolshan commented on code in PR #15221:
URL: https://github.com/apache/kafka/pull/15221#discussion_r1464175850


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -2342,26 +2375,24 @@ public void testConsumerGroupOffsetDeleteWithErrors() {
 context.testOffsetDeleteWith("foo", "bar", 0, 
Errors.GROUP_SUBSCRIBED_TO_TOPIC);
 }
 
+@Test
+public void testConsumerGroupOffsetDeleteWithPendingTransactionalOffsets() 
{
+OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(
+"foo",
+true
+);
+context.commitOffset(10L, "foo", "bar", 0, 100L, 0, 
context.time.milliseconds());
+assertFalse(group.isSubscribedToTopic("bar"));
+context.testOffsetDeleteWith("foo", "bar", 0, Errors.NONE);

Review Comment:
   do we want to test if the offset is deleted here like for the generic group?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig.Defaults to server module [kafka]

2024-01-23 Thread via GitHub


OmniaGM commented on code in PR #15158:
URL: https://github.com/apache/kafka/pull/15158#discussion_r1464137161


##
server/src/main/java/org/apache/kafka/server/config/Defaults.java:
##
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.coordinator.group.Group;
+import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslClientAuth;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder;
+import org.apache.kafka.coordinator.group.OffsetConfig;
+import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
+import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.server.common.MetadataVersion;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class Defaults {

Review Comment:
   I think the pattern of default and properties in the class object companion 
has been like this in some scala as well. KafkaConfig has been kinda of anti 
pattern for a while as the defaults are defined in another object companian. I 
will raise another pr soon to move this pr to Java pattern. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [WIP - DON'T MERGE] KAFKA-15974 Enforce that CompletableApplicationEvent has a timeout that is respected [kafka]

2024-01-23 Thread via GitHub


kirktrue opened a new pull request, #15250:
URL: https://github.com/apache/kafka/pull/15250

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15460: Add group type filter to List Groups API [kafka]

2024-01-23 Thread via GitHub


rreddy-22 commented on code in PR #15152:
URL: https://github.com/apache/kafka/pull/15152#discussion_r1464109057


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -452,21 +453,39 @@ public Group group(String groupId, long committedOffset) 
throws GroupIdNotFoundE
 /**
  * Get the Group List.
  *
- * @param statesFilter The states of the groups we want to list.
- * If empty all groups are returned with their state.
- * @param committedOffset A specified committed offset corresponding to 
this shard
+ * @param statesFilter  The states of the groups we want to list.
+ *  If empty, all groups are returned with their 
state.
+ * @param typesFilter   The types of the groups we want to list.
+ *  If empty, all groups are returned with their 
type.
+ * @param committedOffset   A specified committed offset corresponding to 
this shard.
  *
  * @return A list containing the ListGroupsResponseData.ListedGroup
  */
+public List listGroups(
+Set statesFilter,
+Set typesFilter,
+long committedOffset
+) {
+// Convert typesFilter to lowercase to make the filter 
case-insensitive.
+Set lowerCaseTypesFilter = typesFilter.stream()
+.map(String::toLowerCase)
+.collect(Collectors.toCollection(HashSet::new));
+
+Predicate combinedFilter = group -> {
+boolean stateCheck = statesFilter.isEmpty() || 
statesFilter.contains(group.stateAsString(committedOffset));
+boolean typeCheck = lowerCaseTypesFilter.isEmpty() ||
+
lowerCaseTypesFilter.contains(group.type().toString().toLowerCase());

Review Comment:
   I think I understand what you're saying. I'll add a way to parse any string 
and convert it to the necessary case to compare and determine which type it is 
in the group type enum. However, since some of these changes are on the client 
side can we do it in the second PR? That way I can test it as well. I could 
remove the lower case conversion here and then directly handle it there or we 
could change it there directly as an optimization just to be safe. What do you 
think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15813) Improve implementation of client instance cache

2024-01-23 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-15813.
-
Fix Version/s: 3.8.0
   Resolution: Fixed

Merged to the PR to trunk.

> Improve implementation of client instance cache
> ---
>
> Key: KAFKA-15813
> URL: https://issues.apache.org/jira/browse/KAFKA-15813
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
> Fix For: 3.8.0
>
>
> In the current implementation the ClientMetricsManager uses LRU cache but we 
> should alos support expiring stale clients i.e. client which haven't reported 
> metrics for a while.
>  
> The KIP mentions: This client instance specific state is maintained in broker 
> memory up to MAX(60*1000, PushIntervalMs * 3) milliseconds and is used to 
> enforce the push interval rate-limiting. There is no persistence of client 
> instance metrics state across broker restarts or between brokers 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15813: Evict client instances from cache (KIP-714) [kafka]

2024-01-23 Thread via GitHub


junrao merged PR #15234:
URL: https://github.com/apache/kafka/pull/15234


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15468 [1/2]: Prevent transaction coordinator reloads on already loaded leaders [kafka]

2024-01-23 Thread via GitHub


jolshan merged PR #15139:
URL: https://github.com/apache/kafka/pull/15139


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15343) Fix MirrorConnectIntegrationTests causing ci build failures.

2024-01-23 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris reassigned KAFKA-15343:
---

Assignee: Greg Harris

> Fix MirrorConnectIntegrationTests causing ci build failures.
> 
>
> Key: KAFKA-15343
> URL: https://issues.apache.org/jira/browse/KAFKA-15343
> Project: Kafka
>  Issue Type: Bug
>  Components: build
>Affects Versions: 3.6.0
>Reporter: Prasanth Kumar
>Assignee: Greg Harris
>Priority: Major
>
> There are several instances of tests interacting badly with gradle daemon(s) 
> running on ports that the kafka broker previously used. After going through 
> the debug logs we observed a few retrying kafka clients trying to connect to 
> broker which got shutdown and the gradle worker chose the same port on which 
> broker was running. Later in the build, the gradle daemon attempted to 
> connect to the worker and could not, triggering a failure. Ideally gradle 
> would not exit when connected to from an invalid client - in testing with 
> netcat, it would often handle these without dying. However there appear to be 
> some cases where the daemon dies completely. Both the broker code and the 
> gradle workers bind to port 0, resulting in the OS assigning it an unused 
> port. This does avoid conflicts, but does not ensure that long lived clients 
> do not attempt to connect to these ports afterwards. It's possible that 
> closing the client in between may be enough to work around this issue. Till 
> then we will disable the test to avoid the ci blocker from testing the code 
> changes.
> *MirrorConnectorsIntegrationBaseTest and extending Tests*
> {code:java}
> [2023-07-04T11:48:16.128Z] 2023-07-04T11:47:46.804+ [DEBUG] 
> [TestEventLogger] 
> MirrorConnectorsWithCustomForwardingAdminIntegrationTest > 
> testReplicateSourceDefault() STANDARD_OUT
> [2023-07-04T11:48:16.128Z] 2023-07-04T11:47:46.804+ [DEBUG] 
> [TestEventLogger] [2023-07-04 11:47:46,799]
>  INFO primary REST service: http://localhost:43809/connectors 
> (org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest:224)
> [2023-07-04T11:48:16.128Z] 2023-07-04T11:47:46.804+ [DEBUG] 
> [TestEventLogger] [2023-07-04 11:47:46,799] 
> INFO backup REST service: http://localhost:43323/connectors 
> (org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest:225)
> [2023-07-04T11:48:16.128Z] 2023-07-04T11:47:46.804+ [DEBUG] 
> [TestEventLogger] [2023-07-04 11:47:46,799] 
> INFO primary brokers: localhost:37557 
> (org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest:226)
> [2023-07-04T11:59:12.968Z] 2023-07-04T11:59:12.900+ [DEBUG] 
> [org.gradle.internal.remote.internal.inet.TcpIncomingConnector] 
> Accepted connection from /127.0.0.1:47660 to /127.0.0.1:37557.
> [2023-07-04T11:59:13.233Z] 
> org.gradle.internal.remote.internal.MessageIOException: Could not read 
> message from '/127.0.0.1:47660'.
> [2023-07-04T11:59:12.970Z] 2023-07-04T11:59:12.579+ [DEBUG] 
> [org.gradle.internal.remote.internal.inet.TcpIncomingConnector] Listening on 
> [d6bf30cb-bca2-46d9-8aeb-b9fd0497f54d port:37557, 
> addresses:[localhost/127.0.0.1]].
> [2023-07-04T11:59:46.519Z] 2023-07-04T11:59:13.014+ [ERROR] 
> [system.err] org.gradle.internal.remote.internal.ConnectException: Could not 
> connect to server [d6bf30cb-bca2-46d9-8aeb-b9fd0497f54d port:37557, 
> addresses:[/127.0.0.1]]. Tried addresses: [/127.0.0.1]. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15813: Evict client instances from cache (KIP-714) [kafka]

2024-01-23 Thread via GitHub


apoorvmittal10 commented on PR #15234:
URL: https://github.com/apache/kafka/pull/15234#issuecomment-1907047744

   > @apoorvmittal10 : Before I merge this, could you update the description of 
the PR. You listed 3 options to implement the eviction. The one you picked is 
based on timing wheels, right?
   
   Thanks for pointing out, added.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15343) Fix MirrorConnectIntegrationTests causing ci build failures.

2024-01-23 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17810124#comment-17810124
 ] 

Greg Harris commented on KAFKA-15343:
-

I opened an issue in Gradle for this bug, since it's not unique to Kafka and 
can appear in any project which uses sockets: 
[https://github.com/gradle/gradle/issues/27801]

I have also been working to reduce the number of socket/client leaks through 
https://issues.apache.org/jira/browse/KAFKA-15845 and have made a lot of 
progress in stopping the leaks we have currently, and should be able to prevent 
new ones in the future.

> Fix MirrorConnectIntegrationTests causing ci build failures.
> 
>
> Key: KAFKA-15343
> URL: https://issues.apache.org/jira/browse/KAFKA-15343
> Project: Kafka
>  Issue Type: Bug
>  Components: build
>Affects Versions: 3.6.0
>Reporter: Prasanth Kumar
>Priority: Major
>
> There are several instances of tests interacting badly with gradle daemon(s) 
> running on ports that the kafka broker previously used. After going through 
> the debug logs we observed a few retrying kafka clients trying to connect to 
> broker which got shutdown and the gradle worker chose the same port on which 
> broker was running. Later in the build, the gradle daemon attempted to 
> connect to the worker and could not, triggering a failure. Ideally gradle 
> would not exit when connected to from an invalid client - in testing with 
> netcat, it would often handle these without dying. However there appear to be 
> some cases where the daemon dies completely. Both the broker code and the 
> gradle workers bind to port 0, resulting in the OS assigning it an unused 
> port. This does avoid conflicts, but does not ensure that long lived clients 
> do not attempt to connect to these ports afterwards. It's possible that 
> closing the client in between may be enough to work around this issue. Till 
> then we will disable the test to avoid the ci blocker from testing the code 
> changes.
> *MirrorConnectorsIntegrationBaseTest and extending Tests*
> {code:java}
> [2023-07-04T11:48:16.128Z] 2023-07-04T11:47:46.804+ [DEBUG] 
> [TestEventLogger] 
> MirrorConnectorsWithCustomForwardingAdminIntegrationTest > 
> testReplicateSourceDefault() STANDARD_OUT
> [2023-07-04T11:48:16.128Z] 2023-07-04T11:47:46.804+ [DEBUG] 
> [TestEventLogger] [2023-07-04 11:47:46,799]
>  INFO primary REST service: http://localhost:43809/connectors 
> (org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest:224)
> [2023-07-04T11:48:16.128Z] 2023-07-04T11:47:46.804+ [DEBUG] 
> [TestEventLogger] [2023-07-04 11:47:46,799] 
> INFO backup REST service: http://localhost:43323/connectors 
> (org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest:225)
> [2023-07-04T11:48:16.128Z] 2023-07-04T11:47:46.804+ [DEBUG] 
> [TestEventLogger] [2023-07-04 11:47:46,799] 
> INFO primary brokers: localhost:37557 
> (org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest:226)
> [2023-07-04T11:59:12.968Z] 2023-07-04T11:59:12.900+ [DEBUG] 
> [org.gradle.internal.remote.internal.inet.TcpIncomingConnector] 
> Accepted connection from /127.0.0.1:47660 to /127.0.0.1:37557.
> [2023-07-04T11:59:13.233Z] 
> org.gradle.internal.remote.internal.MessageIOException: Could not read 
> message from '/127.0.0.1:47660'.
> [2023-07-04T11:59:12.970Z] 2023-07-04T11:59:12.579+ [DEBUG] 
> [org.gradle.internal.remote.internal.inet.TcpIncomingConnector] Listening on 
> [d6bf30cb-bca2-46d9-8aeb-b9fd0497f54d port:37557, 
> addresses:[localhost/127.0.0.1]].
> [2023-07-04T11:59:46.519Z] 2023-07-04T11:59:13.014+ [ERROR] 
> [system.err] org.gradle.internal.remote.internal.ConnectException: Could not 
> connect to server [d6bf30cb-bca2-46d9-8aeb-b9fd0497f54d port:37557, 
> addresses:[/127.0.0.1]]. Tried addresses: [/127.0.0.1]. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16042: Add byte-rate metrics for topic and partition [kafka]

2024-01-23 Thread via GitHub


afshing commented on PR #15085:
URL: https://github.com/apache/kafka/pull/15085#issuecomment-1907026805

   > FYI: We are making a similar effort here: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-977%3A+Partition-Level+Throughput+Metrics
   
   @ex172000 thanks for your comment.
   
   Your work is definitely more dedicated customization over exposing 
topic-partition metrics, which is great. 
   In our fork, we exposed the byte-rate and throttle-time metrics per 
topic-partition to use that for quota management based on topic-partition (see 
[KIP-1010](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1010%3A+Topic+Partition+Quota),
 without giving any option to disable/enable it or customize its level 
verbosity. 
   
   With this proposal, it makes more sense for us to port our quota management 
change over this format. 
   
   I have two questions:
   1. What is the status of this KIP. Is it something that you have used in 
your fork and trying to contribute to the upstream, or it is a new 
implementation (try to get a sense of how much work is remained)
   2. We don't have any discussion initiated on our KIP-1010. But, I see your 
KIP-977 is approved, but I can't find any discussion in [the 
thread](https://lists.apache.org/list?d...@kafka.apache.org:lte=1M:KIP-977). I 
am wondering what is the KIP life-cycle.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15813: Evict client instances from cache (KIP-714) [kafka]

2024-01-23 Thread via GitHub


junrao commented on PR #15234:
URL: https://github.com/apache/kafka/pull/15234#issuecomment-1907023180

   @apoorvmittal10 : Before I merge this, could you update the description of 
the PR. You listed 3 options to implement the eviction. The one you picked is 
based on timing wheels, right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16107: Stop fetching while onPartitionsAssign completes [kafka]

2024-01-23 Thread via GitHub


lianetm commented on PR #15215:
URL: https://github.com/apache/kafka/pull/15215#issuecomment-1906966026

   Thanks @dajac, all comments addressed. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16107: Stop fetching while onPartitionsAssign completes [kafka]

2024-01-23 Thread via GitHub


lianetm commented on code in PR #15215:
URL: https://github.com/apache/kafka/pull/15215#discussion_r1464026216


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -1134,9 +1134,38 @@ private CompletableFuture assignPartitions(
 // Make assignment effective on the client by updating the 
subscription state.
 updateSubscription(assignedPartitions, false);
 
+// Mark assigned partitions as pendingOnAssignedCallback to 
temporarily stop fetching or
+// initializing positions for them. Passing the full set of assigned 
partitions
+// (previously owned and newly added), given that they are all 
provided to the user in the
+// callback, so we could expect offsets updates for any of them.
+Set assignedTopicPartition = 
assignedPartitions.stream().map(tIdp -> 
tIdp.topicPartition()).collect(Collectors.toSet());
+subscriptions.markPendingOnAssignedCallback(assignedTopicPartition, 
true);

Review Comment:
   Totally, that was definitely the case. I made the changes to make sure that 
all `assignedPartitions` are updated in the subscription state and 
`addedPartitions` are marked as awaiting callback in a thread safe single 
operation in the subscription state. 
   This btw in line with your comment about not touching the previously owned 
partitions, and just blocking the added ones while the callback completes. 
Totally agree, fixed. Both, the legacy and this new logic always included only 
the added in the `onPartitionsAssigned`, so it was me getting mixed up before, 
sorry about the confusion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16164: Pre-Vote, modifying vote RPCs [part 1] [kafka]

2024-01-23 Thread via GitHub


jsancio commented on code in PR #15231:
URL: https://github.com/apache/kafka/pull/15231#discussion_r1463743747


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -562,33 +568,37 @@ private VoteResponseData handleVoteRequest(
 VoteRequestData.PartitionData partitionRequest =
 request.topics().get(0).partitions().get(0);
 
-int candidateId = partitionRequest.candidateId();
-int candidateEpoch = partitionRequest.candidateEpoch();
+if (partitionRequest.preVote()) {
+throw new IllegalArgumentException("PreVote is not supported yet");

Review Comment:
   I prefer if we implement this in this PR. That should alleviate @hachikuji 
's comment about adding a version for the Vote RPC without the replica fully 
supporting that version.
   
   We don't need to implement the prospective state and sending Vote request 
with pre-vote set to true in this PR. We just need to implement the handling of 
version 1 of Vote RPC in this PR. That includes adding a 
KafkaRaftClientPreVoteTest suite that verifies the correct implementation. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-15683) Delete subscription from metadata when all configs are deleted

2024-01-23 Thread Apoorv Mittal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15683?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apoorv Mittal resolved KAFKA-15683.
---
Resolution: Not A Problem

Closing ticket as this is not required, it works as per the default behaviour 
of kafka-configs.sh. Moreover ClientMetricsManager deletes the in-memory 
subscription/client resources once all properties for respective resource are 
removed. This is not needed.

> Delete subscription from metadata when all configs are deleted
> --
>
> Key: KAFKA-15683
> URL: https://issues.apache.org/jira/browse/KAFKA-15683
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
>
> As of now the kafka-configs.sh do not differentiate on non-existent and blank 
> metrics subscription. Add support to differentiate in 2 scenarios and also 
> delete the subscription if all configs are delete for respective 
> subscription. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16187) Flaky test: testTopicPatternArg - org.apache.kafka.tools.GetOffsetShellTest

2024-01-23 Thread Apoorv Mittal (Jira)
Apoorv Mittal created KAFKA-16187:
-

 Summary: Flaky test: testTopicPatternArg - 
org.apache.kafka.tools.GetOffsetShellTest
 Key: KAFKA-16187
 URL: https://issues.apache.org/jira/browse/KAFKA-16187
 Project: Kafka
  Issue Type: Bug
Reporter: Apoorv Mittal


[https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15234/8/tests/]

 
{code:java}
Errororg.opentest4j.AssertionFailedError: expected: 
<[org.apache.kafka.tools.GetOffsetShellTest$Row@c6f09cc2, 
org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a084, 
org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a0a3, 
org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a446, 
org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a465, 
org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a484, 
org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a808, 
org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a827, 
org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a846, 
org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a865]> but was: 
<[org.apache.kafka.tools.GetOffsetShellTest$Row@c6f09cc2, 
org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a084, 
org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a446, 
org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a808]>Stacktraceorg.opentest4j.AssertionFailedError:
 expected: <[org.apache.kafka.tools.GetOffsetShellTest$Row@c6f09cc2, 
org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a084, 
org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a0a3, 
org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a446, 
org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a465, 
org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a484, 
org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a808, 
org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a827, 
org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a846, 
org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a865]> but was: 
<[org.apache.kafka.tools.GetOffsetShellTest$Row@c6f09cc2, 
org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a084, 
org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a446, 
org.apache.kafka.tools.GetOffsetShellTest$Row@c6f0a808]>   at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
   at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
   at 
app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)  at 
app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)  at 
app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:177)  at 
app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1141) at 
app//org.apache.kafka.tools.GetOffsetShellTest.testTopicPatternArg(GetOffsetShellTest.java:154)
  at 
java.base@21.0.1/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
 at java.base@21.0.1/java.lang.reflect.Method.invoke(Method.java:580)at 
app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
  at 
app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
   at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
 at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
  at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:94)
   at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
 at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
   at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
  at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
  at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218)
   at 

Re: [PR] KAFKA-15813: Evict client instances from cache (KIP-714) [kafka]

2024-01-23 Thread via GitHub


apoorvmittal10 commented on PR #15234:
URL: https://github.com/apache/kafka/pull/15234#issuecomment-1906867428

   I have added one missing jira for flaky tests, others pre-existed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15813: Evict client instances from cache (KIP-714) [kafka]

2024-01-23 Thread via GitHub


apoorvmittal10 commented on PR #15234:
URL: https://github.com/apache/kafka/pull/15234#issuecomment-1906858917

   > @apoorvmittal10 : Are the 30 test failures related? Thanks.
   
   @junrao Thanks for the review, none of them is related. I find most of them 
already reported, ll check once again for jira.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16029: Fix "Unable to find FetchSessionHandler for node X" bug [kafka]

2024-01-23 Thread via GitHub


kirktrue commented on PR #15186:
URL: https://github.com/apache/kafka/pull/15186#issuecomment-1906805914

   @dajac—the PR description was updated.
   
   Let me know if there's anything left before we're ready to merge.
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15813: Evict client instances from cache (KIP-714) [kafka]

2024-01-23 Thread via GitHub


junrao commented on PR #15234:
URL: https://github.com/apache/kafka/pull/15234#issuecomment-1906780891

   @apoorvmittal10 : Are the 30 test failures related? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16164: Pre-Vote, modifying vote RPCs [part 1] [kafka]

2024-01-23 Thread via GitHub


hachikuji commented on code in PR #15231:
URL: https://github.com/apache/kafka/pull/15231#discussion_r1463806130


##
clients/src/main/resources/common/message/VoteRequest.json:
##
@@ -18,7 +18,8 @@
   "type": "request",
   "listeners": ["controller"],
   "name": "VoteRequest",
-  "validVersions": "0",
+  // Version 1 adds the PreVote field and renames CandidateEpoch and 
CandidateId to ReplicaEpoch and ReplicaId

Review Comment:
   Agree it would be better to implement all of it here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: populate TopicName in ConsumerGroupDescribe [kafka]

2024-01-23 Thread via GitHub


dongnuo123 commented on code in PR #15205:
URL: https://github.com/apache/kafka/pull/15205#discussion_r1463772765


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java:
##
@@ -17,10 +17,11 @@
 package org.apache.kafka.coordinator.group.consumer;

Review Comment:
   Yeah, we have that in `testAsConsumerGroupDescribeWithTopicNameNotFound`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16164: Pre-Vote, modifying vote RPCs [part 1] [kafka]

2024-01-23 Thread via GitHub


jsancio commented on code in PR #15231:
URL: https://github.com/apache/kafka/pull/15231#discussion_r1463743747


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -562,33 +568,37 @@ private VoteResponseData handleVoteRequest(
 VoteRequestData.PartitionData partitionRequest =
 request.topics().get(0).partitions().get(0);
 
-int candidateId = partitionRequest.candidateId();
-int candidateEpoch = partitionRequest.candidateEpoch();
+if (partitionRequest.preVote()) {
+throw new IllegalArgumentException("PreVote is not supported yet");

Review Comment:
   I prefer if we implement this in this PR. That should alleviate @hachikuji 
's comment about adding a version for the Vote RPC without the replica fully 
supporting that version.
   
   We don't need to implement the prospective state and sending Vote request 
with pre-vote set to true in this PR. We just need to implement the handling of 
version 1 of Vote RPC in this PR. That include adding a 
KafkaRaftClientPreVoteTest suite that verifies the correct implementation. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16164: Pre-Vote, modifying vote RPCs [part 1] [kafka]

2024-01-23 Thread via GitHub


jsancio commented on code in PR #15231:
URL: https://github.com/apache/kafka/pull/15231#discussion_r1463746152


##
clients/src/main/resources/common/message/VoteRequest.json:
##
@@ -18,7 +18,8 @@
   "type": "request",
   "listeners": ["controller"],
   "name": "VoteRequest",
-  "validVersions": "0",
+  // Version 1 adds the PreVote field and renames CandidateEpoch and 
CandidateId to ReplicaEpoch and ReplicaId

Review Comment:
   @hachikuji I think we can avoid this if we implement the handling of version 
1 requests in this PR. See 
https://github.com/apache/kafka/pull/15231#discussion_r1463743747



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16164: Pre-Vote, modifying vote RPCs [part 1] [kafka]

2024-01-23 Thread via GitHub


jsancio commented on code in PR #15231:
URL: https://github.com/apache/kafka/pull/15231#discussion_r1463743747


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -562,33 +568,37 @@ private VoteResponseData handleVoteRequest(
 VoteRequestData.PartitionData partitionRequest =
 request.topics().get(0).partitions().get(0);
 
-int candidateId = partitionRequest.candidateId();
-int candidateEpoch = partitionRequest.candidateEpoch();
+if (partitionRequest.preVote()) {
+throw new IllegalArgumentException("PreVote is not supported yet");

Review Comment:
   I prefer if we implement this in this PR. That should alleviate @hachikuji 
's comment about adding a version for the Vote RPC without the replica fully 
supporting that version.
   
   We don't need to implement the prospective state and sending Vote request 
with pre-vote set to true in this PR. We just need to implement the handling of 
version 2 of Vote RPC in this PR. That include adding a 
KafkaRaftClientPreVoteTest suite that verifies the correct implementation. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14576) Move ConsoleConsumer to tools

2024-01-23 Thread Christo Lolov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17810066#comment-17810066
 ] 

Christo Lolov commented on KAFKA-14576:
---

Sure [~mimaison] , apologies for not unassigned myself sooner!

> Move ConsoleConsumer to tools
> -
>
> Key: KAFKA-14576
> URL: https://issues.apache.org/jira/browse/KAFKA-14576
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-15675: Improve worker liveness check during Connect integration tests [kafka]

2024-01-23 Thread via GitHub


C0urante opened a new pull request, #15249:
URL: https://github.com/apache/kafka/pull/15249

   [Jira](https://issues.apache.org/jira/browse/KAFKA-15675)
   
   Currently a draft; will promote to ready once several consecutive CI runs 
have completed without the flaky test failing.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15675) Fix flaky ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector() test

2024-01-23 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17810057#comment-17810057
 ] 

Chris Egerton commented on KAFKA-15675:
---

I've done some analysis on this one and believe I've found the root cause. It's 
a confluence of a few different issues, but the TL;DR is: *the request to 
{{POST /connectors//restart?onlyFailed=false=false}} 
fails with a 409 error, this does not cause the test to (immediately) fail, but 
the connector is never restarted, which causes the test to time out while 
[waiting for the connector to be 
stopped|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java#L272-L275].*

 

This kind of scenario probably raises several questions. Here's my best attempt 
to anticipate and address them:

 

*Why does the 409 response not cause the test to immediately fail?*

It's unclear on the original rationale for this, but the code structure 
[here|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java#L374-L383]
 is fairly clear: issue the request, and if the status code is less than 400, 
attempt to deserialize the body. Then, unconditionally, return either null or 
the deserialized response body.

 

*Why is the 409 response occurring?*

The cluster (or, to be more specific, either the worker that received the 
initial REST request or, if the request was forwarded, the leader) detected 
that a rebalance due to an added/removed connector or new task configs was 
about to take place, and rejected the request. See the {{DistributedHerder}} 
class's 
[restartConnectorAndTasks|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1467]
 and 
[checkRebalanceNeeded|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2302-L2307]
 methods for the logic to check for pending rebalances, and its logic for 
detecting pending rebalances 
[here|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2385],
 
[here|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2400],
 and 
[here|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2419].

 

*Why is a rebalance pending by the time we try to restart the connector? 
Shouldn't the cluster and the set of connectors and tasks on it be stable by 
this point?*

Yes, the cluster and set of connectors and tasks on it should be stable by the 
time we issue our restart request. We check to make sure that [every worker in 
the cluster is up and 
running|https://github.com/apache/kafka/blob/0ef89a7cc059b0ba9b2536e018303c9004ac9142/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java#L116-L117]
 before proceeding with the rest of the test, and that the [connector and 
expected number of tasks are 
running|https://github.com/apache/kafka/blob/0ef89a7cc059b0ba9b2536e018303c9004ac9142/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java#L252-L253]
 before issuing the restart request. Unfortunately, the former check–for worker 
liveness across the cluster–does not guarantee that every worker has joined the 
cluster. This check is [performed by issuing a request to the root 
resource|https://github.com/apache/kafka/blob/0ef89a7cc059b0ba9b2536e018303c9004ac9142/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java#L956-L975]
 ({{{}GET /{}}}) for each worker: if the response is valid (i.e., its body 
matches the expected format), then the worker is considered up and running. 
However, this does not guarantee that the worker has actually completed 
startup: it may not have finished reading to the end of internal topics, or had 
a chance to contact the group coordinator and join the cluster yet.

 

After examining the logs of one test case, it appeared that the following 
sequence of events took place:
 # A single worker completes startup (creates and reads to the end of internal 
topics, then joins the cluster)
 # The connector is created (by chance, the REST request to create the 
connector happens to be sent to the only worker that has completed startup so 
far)
 # The 

[jira] [Commented] (KAFKA-14576) Move ConsoleConsumer to tools

2024-01-23 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17810056#comment-17810056
 ] 

Mickael Maison commented on KAFKA-14576:


Hi [~christo_lolov], I'll have some spare cycles so I've reassigned this ticket 
to me.

> Move ConsoleConsumer to tools
> -
>
> Key: KAFKA-14576
> URL: https://issues.apache.org/jira/browse/KAFKA-14576
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14576) Move ConsoleConsumer to tools

2024-01-23 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison reassigned KAFKA-14576:
--

Assignee: Mickael Maison  (was: Christo Lolov)

> Move ConsoleConsumer to tools
> -
>
> Key: KAFKA-14576
> URL: https://issues.apache.org/jira/browse/KAFKA-14576
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: populate TopicName in ConsumerGroupDescribe [kafka]

2024-01-23 Thread via GitHub


dajac commented on code in PR #15205:
URL: https://github.com/apache/kafka/pull/15205#discussion_r1463681361


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##
@@ -578,26 +578,38 @@ public ConsumerGroupDescribeResponseData.Member 
asConsumerGroupDescribeMember(
 
 private static List 
topicPartitionsFromMap(
 Map> partitions,
-Map subscriptionMetadata
+TopicsImage topicsImage
 ) {
-return partitions.entrySet().stream().map(
-item -> new ConsumerGroupDescribeResponseData.TopicPartitions()
-.setTopicId(item.getKey())
-.setTopicName(lookupTopicNameById(item.getKey(), 
subscriptionMetadata))
-.setPartitions(new ArrayList<>(item.getValue()))
-).collect(Collectors.toList());
+List 
topicPartitions = new ArrayList<>();
+for (Map.Entry> entry : partitions.entrySet()) {
+Uuid topicId = entry.getKey();
+Set partitionSet = partitions.get(topicId);
+//partitions.forEach((topicId, partitionSet) -> {
+String topicName = lookupTopicNameById(topicId, topicsImage);
+if (topicName != null) {
+topicPartitions.add(new 
ConsumerGroupDescribeResponseData.TopicPartitions()
+.setTopicId(topicId)
+.setTopicName(topicName)
+.setPartitions(new ArrayList<>(partitionSet)));
+} else {
+// When the topic has been deleted and the group/member hasn't 
updated,
+// directly remove the topic from the assignment.
+partitions.remove(topicId, partitionSet);

Review Comment:
   We should not do this. We should only not add it to the reponse.



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##
@@ -578,26 +578,38 @@ public ConsumerGroupDescribeResponseData.Member 
asConsumerGroupDescribeMember(
 
 private static List 
topicPartitionsFromMap(
 Map> partitions,
-Map subscriptionMetadata
+TopicsImage topicsImage
 ) {
-return partitions.entrySet().stream().map(
-item -> new ConsumerGroupDescribeResponseData.TopicPartitions()
-.setTopicId(item.getKey())
-.setTopicName(lookupTopicNameById(item.getKey(), 
subscriptionMetadata))
-.setPartitions(new ArrayList<>(item.getValue()))
-).collect(Collectors.toList());
+List 
topicPartitions = new ArrayList<>();
+for (Map.Entry> entry : partitions.entrySet()) {

Review Comment:
   nit: How about using `partitions.foreach`?



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java:
##
@@ -17,10 +17,11 @@
 package org.apache.kafka.coordinator.group.consumer;

Review Comment:
   Should we add a unit test here to verify that non-existent partitions are 
not provided in the response?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##
@@ -578,26 +578,38 @@ public ConsumerGroupDescribeResponseData.Member 
asConsumerGroupDescribeMember(
 
 private static List 
topicPartitionsFromMap(
 Map> partitions,
-Map subscriptionMetadata
+TopicsImage topicsImage
 ) {
-return partitions.entrySet().stream().map(
-item -> new ConsumerGroupDescribeResponseData.TopicPartitions()
-.setTopicId(item.getKey())
-.setTopicName(lookupTopicNameById(item.getKey(), 
subscriptionMetadata))
-.setPartitions(new ArrayList<>(item.getValue()))
-).collect(Collectors.toList());
+List 
topicPartitions = new ArrayList<>();
+for (Map.Entry> entry : partitions.entrySet()) {
+Uuid topicId = entry.getKey();
+Set partitionSet = partitions.get(topicId);
+//partitions.forEach((topicId, partitionSet) -> {

Review Comment:
   nit: Could we remove it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16164: Pre-Vote, modifying vote RPCs [part 1] [kafka]

2024-01-23 Thread via GitHub


hachikuji commented on code in PR #15231:
URL: https://github.com/apache/kafka/pull/15231#discussion_r1463682475


##
clients/src/main/resources/common/message/VoteRequest.json:
##
@@ -18,7 +18,8 @@
   "type": "request",
   "listeners": ["controller"],
   "name": "VoteRequest",
-  "validVersions": "0",
+  // Version 1 adds the PreVote field and renames CandidateEpoch and 
CandidateId to ReplicaEpoch and ReplicaId

Review Comment:
   Should we set `latestVersionUnstable` until the feature is fully implemented?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Update KIP-890 note [kafka]

2024-01-23 Thread via GitHub


jolshan commented on PR #15244:
URL: https://github.com/apache/kafka/pull/15244#issuecomment-1906562169

   Picked to 3.6 and 3.7


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16107: Stop fetching while onPartitionsAssign completes [kafka]

2024-01-23 Thread via GitHub


lianetm commented on code in PR #15215:
URL: https://github.com/apache/kafka/pull/15215#discussion_r1463584138


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -1134,9 +1134,38 @@ private CompletableFuture assignPartitions(
 // Make assignment effective on the client by updating the 
subscription state.
 updateSubscription(assignedPartitions, false);
 
+// Mark assigned partitions as pendingOnAssignedCallback to 
temporarily stop fetching or
+// initializing positions for them. Passing the full set of assigned 
partitions
+// (previously owned and newly added), given that they are all 
provided to the user in the
+// callback, so we could expect offsets updates for any of them.
+Set assignedTopicPartition = 
assignedPartitions.stream().map(tIdp -> 
tIdp.topicPartition()).collect(Collectors.toSet());
+subscriptions.markPendingOnAssignedCallback(assignedTopicPartition, 
true);
+
 // Invoke user call back.
 CompletableFuture result = 
invokeOnPartitionsAssignedCallback(addedPartitions);
 
+// Resume partitions only if the callback succeeded.
+result.whenComplete((error, callbackResult) -> {
+if (error == null) {
+// Remove pendingOnAssignedCallback flag from the assigned 
partitions, so we can
+// start fetching, and updating positions for them if needed.
+
subscriptions.markPendingOnAssignedCallback(assignedTopicPartition, false);
+} else {
+// Remove pendingOnAssignedCallback flag from the previously 
owned only so that

Review Comment:
   Yes, you got it right. When callback fails, the assignment is not acked to 
the broker, and it remains as `assignmentReadyToReconcile` on the client. So 
when we get the poll based reconciliation that assignment will be retried, 
until it succeeds or the broker removes it from the assignment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Update KIP-890 note [kafka]

2024-01-23 Thread via GitHub


jolshan merged PR #15244:
URL: https://github.com/apache/kafka/pull/15244


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-15675) Fix flaky ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector() test

2024-01-23 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-15675:
-

Assignee: Chris Egerton

> Fix flaky 
> ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector() test
> ---
>
> Key: KAFKA-15675
> URL: https://issues.apache.org/jira/browse/KAFKA-15675
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kirk True
>Assignee: Chris Egerton
>Priority: Major
>  Labels: flaky-test
> Attachments: error.stacktrace.txt, error.stdout.txt
>
>
> This integration test is flaky around 9% of test runs. Source: [Gradle 
> Enterprise test 
> trends|https://ge.apache.org/scans/tests?search.relativeStartTime=P28D=KAFKA=org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest=testMultiWorkerRestartOnlyConnector].
> One failure had this message:
> {code:java}
> java.lang.AssertionError: Failed to stop connector and tasks within 12ms 
> {code}
> Please see the attachments for the stack trace and stdout log.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16107: Stop fetching while onPartitionsAssign completes [kafka]

2024-01-23 Thread via GitHub


lianetm commented on code in PR #15215:
URL: https://github.com/apache/kafka/pull/15215#discussion_r1463584138


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -1134,9 +1134,38 @@ private CompletableFuture assignPartitions(
 // Make assignment effective on the client by updating the 
subscription state.
 updateSubscription(assignedPartitions, false);
 
+// Mark assigned partitions as pendingOnAssignedCallback to 
temporarily stop fetching or
+// initializing positions for them. Passing the full set of assigned 
partitions
+// (previously owned and newly added), given that they are all 
provided to the user in the
+// callback, so we could expect offsets updates for any of them.
+Set assignedTopicPartition = 
assignedPartitions.stream().map(tIdp -> 
tIdp.topicPartition()).collect(Collectors.toSet());
+subscriptions.markPendingOnAssignedCallback(assignedTopicPartition, 
true);
+
 // Invoke user call back.
 CompletableFuture result = 
invokeOnPartitionsAssignedCallback(addedPartitions);
 
+// Resume partitions only if the callback succeeded.
+result.whenComplete((error, callbackResult) -> {
+if (error == null) {
+// Remove pendingOnAssignedCallback flag from the assigned 
partitions, so we can
+// start fetching, and updating positions for them if needed.
+
subscriptions.markPendingOnAssignedCallback(assignedTopicPartition, false);
+} else {
+// Remove pendingOnAssignedCallback flag from the previously 
owned only so that

Review Comment:
   Yes, you're right. When callback fails, the assignment is not acked to the 
broker, and it remains as `assignmentReadyToReconcile` on the client. So when 
we get the poll based reconciliation that assignment will be retried, until it 
succeeds or the broker removes it from the assignment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16029: Fix "Unable to find FetchSessionHandler for node X" bug [kafka]

2024-01-23 Thread via GitHub


kirktrue commented on PR #15186:
URL: https://github.com/apache/kafka/pull/15186#issuecomment-1906448498

   > @kirktrue I think that the description is not quite right. First, I think 
that the session map was actually cleared previously. Second, the root cause is 
more along the line of what I said 
[here](https://github.com/apache/kafka/pull/15186#discussion_r1461537105). 
Could you please update the description of the PR?
   
   I will update the description today. Thanks @dajac!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16029) Investigate cause of "Unable to find FetchSessionHandler for node X" in logs

2024-01-23 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16029:
--
Reviewer: David Jacot  (was: Ismael Juma)

> Investigate cause of "Unable to find FetchSessionHandler for node X" in logs
> 
>
> Key: KAFKA-16029
> URL: https://issues.apache.org/jira/browse/KAFKA-16029
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor
> Fix For: 3.7.0, 3.8.0
>
>
> From [~mjsax]:
> {quote}Looking into AK unit/integration test logs for KS, I regularly see an 
> ERROR log line that is triggered here:
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java#L150-L151]
> Given that it seems not to impact the test (it's not failing because of 
> this), I am wondering why we log this at ERROR level or if it might be better 
> to reduce to WARN? It seems to happen fairly frequently, but it also seems 
> that it's nothing one would need to be concerned about, and thus using ERROR 
> might be more alerting to end users than it needs to be? Thoughts?
> {quote}
> According to Matthias, the running the {{EosIntegrationTest}} locally 
> reproduces the log line.
> This is also reproducible by running the Apache Kafka quickstart.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14483 Move/Rewrite of LocalLog to storage module. [kafka]

2024-01-23 Thread via GitHub


satishd commented on PR #14034:
URL: https://github.com/apache/kafka/pull/14034#issuecomment-1906403295

   Thanks @ijuma for the review comments. Updated the PR with the inline 
comments and/or with the latest commits.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16032: Fixes for commit/fetch error handling [kafka]

2024-01-23 Thread via GitHub


lianetm commented on PR #15202:
URL: https://github.com/apache/kafka/pull/15202#issuecomment-1906359448

   Hey @dajac , thanks for the comments, all addressed. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16032: Fixes for commit/fetch error handling [kafka]

2024-01-23 Thread via GitHub


lianetm commented on code in PR #15202:
URL: https://github.com/apache/kafka/pull/15202#discussion_r1463501761


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -272,7 +270,65 @@ public void 
testSyncAutocommitRetriedAfterRetriableException(Errors error) {
 
 // We expect that request should have been retried on this sync commit.
 assertExceptionHandling(commitRequestManger, error, true);
-assertCoordinatorDisconnect(error);
+}
+
+@Test
+public void testCommitSyncThrowsCommitFailedExceptionOnFencedInstanceId() {
+
testCommitSyncFailsWithCommitFailedExceptionOnError(Errors.FENCED_INSTANCE_ID);
+}
+
+@Test
+public void testCommitSyncThrowsCommitFailedExceptionOnUnknownMemberId() {
+
testCommitSyncFailsWithCommitFailedExceptionOnError(Errors.UNKNOWN_MEMBER_ID);
+}
+
+private void testCommitSyncFailsWithCommitFailedExceptionOnError(Errors 
commitError) {
+CommitRequestManager commitRequestManger = create(false, 100);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+
+Map offsets = 
Collections.singletonMap(
+new TopicPartition("topic", 1),
+new OffsetAndMetadata(0));
+
+// Send sync offset commit request that fails with an error that is 
expected to propagate
+// a CommitFailedException
+Long expirationTimeMs = time.milliseconds() + retryBackoffMs;
+CompletableFuture commitResult = 
commitRequestManger.addOffsetCommitRequest(offsets, 
Optional.of(expirationTimeMs), false);
+completeOffsetCommitRequestWithError(commitRequestManger, commitError);
+assertFutureThrows(commitResult, CommitFailedException.class);
+}
+
+@Test
+public void testCommitSyncThrowsOffsetMetadataTooLargeException() {
+// Error with metadata provided by the user should propagate the 
exception, so they can handle it.
+
testCommitSyncFailsWithErrorException(Errors.OFFSET_METADATA_TOO_LARGE);
+}
+
+@Test
+public void testCommitSyncThrowsInvalidCommitOffsetSizeException() {
+// Error with data provided by the user should propagate the 
exception, so they can handle it.
+
testCommitSyncFailsWithErrorException(Errors.INVALID_COMMIT_OFFSET_SIZE);
+}
+
+@Test
+public void testCommitSyncThrowsGroupAuthorizationException() {
+
testCommitSyncFailsWithErrorException(Errors.GROUP_AUTHORIZATION_FAILED);
+}
+
+private void testCommitSyncFailsWithErrorException(Errors commitError) {

Review Comment:
   Agree, missed that. All done and it did simplified a lot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16032: Fixes for commit/fetch error handling [kafka]

2024-01-23 Thread via GitHub


lianetm commented on code in PR #15202:
URL: https://github.com/apache/kafka/pull/15202#discussion_r1463498713


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -440,28 +582,59 @@ public void 
testOffsetCommitRequestErroredRequestsNotRetriedForAsyncCommit(final
 
 // We expect that the request should not have been retried on this 
async commit.
 assertExceptionHandling(commitRequestManger, error, false);
-assertCoordinatorDisconnect(error);
 }
 
+@Test
+public void 
testCommitAsyncThrowsKafkaExceptionForUnexpectedRetriableError() {
+testCommitAsyncThrowsKafkaException(Errors.CORRUPT_MESSAGE);
+}
 
 @Test
-public void 
testAsyncOffsetCommitThrowsRetriableCommitExceptionForUnhandledRetriable() {
+public void 
testCommitAsyncThrowsKafkaExceptionForUnexpectedNonRetriableError() {
+testCommitAsyncThrowsKafkaException(Errors.UNKNOWN_SERVER_ERROR);
+}
+
+private void testCommitAsyncThrowsKafkaException(Errors error) {
 CommitRequestManager commitRequestManger = create(true, 100);
 
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
 
 Map offsets = 
Collections.singletonMap(new TopicPartition("topic", 1),
 new OffsetAndMetadata(0));
 
-// Send commit request without expiration (async commit) that fails 
with retriable
-// network exception that has no specific handling. Should fail with
-// RetriableCommitException.
+// Send async commit that fails with unexpected error. Should fail 
with KafkaException.
 CompletableFuture commitResult = 
commitRequestManger.addOffsetCommitRequest(offsets, Optional.empty(), false);
-completeOffsetCommitRequestWithError(commitRequestManger, 
Errors.NETWORK_EXCEPTION);
+completeOffsetCommitRequestWithError(commitRequestManger, error);
 NetworkClientDelegate.PollResult res = 
commitRequestManger.poll(time.milliseconds());
 assertEquals(0, res.unsentRequests.size());
 assertTrue(commitResult.isDone());
 assertTrue(commitResult.isCompletedExceptionally());
-assertFutureThrows(commitResult, RetriableCommitFailedException.class);
+assertFutureThrows(commitResult, KafkaException.class);
+}
+
+@Test
+public void 
testCommitSyncThrowsKafkaExceptionForUnexpectedRetriableError() {
+testCommitSyncThrowsKafkaException(Errors.CORRUPT_MESSAGE);
+}
+
+@Test
+public void 
testCommitSyncThrowsKafkaExceptionForUnexpectedNonRetriableError() {
+testCommitSyncThrowsKafkaException(Errors.UNKNOWN_SERVER_ERROR);
+}
+
+private void testCommitSyncThrowsKafkaException(Errors error) {

Review Comment:
   Agree, it does not exist anymore after the refactoring for the tests common 
logic and params



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16032: Fixes for commit/fetch error handling [kafka]

2024-01-23 Thread via GitHub


lianetm commented on code in PR #15202:
URL: https://github.com/apache/kafka/pull/15202#discussion_r1463496440


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -414,8 +495,69 @@ public void testOffsetFetchRequestErroredRequests(final 
Errors error, final bool
 testNonRetriable(futures);
 assertEmptyPendingRequests(commitRequestManger);
 }
+}
 
-assertCoordinatorDisconnect(error);
+@Test

Review Comment:
   Sure, done for this and other similar cases that could be parametrized (That 
was the approach, not even sure why I did not use them in these)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupServiceTest rewritten in java [kafka]

2024-01-23 Thread via GitHub


nizhikov commented on PR #15248:
URL: https://github.com/apache/kafka/pull/15248#issuecomment-1906283629

   @tledkov can you, please, take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupServiceTest rewritten in java [kafka]

2024-01-23 Thread via GitHub


nizhikov commented on PR #15248:
URL: https://github.com/apache/kafka/pull/15248#issuecomment-1906283217

   Hello @mimaison , @jolshan 
   
   I prepared second PR for KAFKA-14589.
   It contains `ConsumerGroupServiceTest` rewritten in java.
   Please, take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16186) Implement broker metrics for client telemetry usage

2024-01-23 Thread Apoorv Mittal (Jira)
Apoorv Mittal created KAFKA-16186:
-

 Summary: Implement broker metrics for client telemetry usage
 Key: KAFKA-16186
 URL: https://issues.apache.org/jira/browse/KAFKA-16186
 Project: Kafka
  Issue Type: Sub-task
Reporter: Apoorv Mittal
Assignee: Apoorv Mittal


The KIP-714 lists new metrics for broker which records the usage of client 
telemetry instances and plugin. Implement broker metrics as defined in the 
KIP-714.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] KAFKA-14589 [WIP] [kafka]

2024-01-23 Thread via GitHub


nizhikov opened a new pull request, #15248:
URL: https://github.com/apache/kafka/pull/15248

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15813: Evict client instances from cache (KIP-714) [kafka]

2024-01-23 Thread via GitHub


apoorvmittal10 commented on code in PR #15234:
URL: https://github.com/apache/kafka/pull/15234#discussion_r1463408579


##
server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java:
##
@@ -919,4 +922,102 @@ public void 
testPushTelemetryConcurrentRequestAfterSubscriptionUpdate() throws U
 // 1 request should fail with throttling error.
 assertEquals(1, throttlingErrorCount);
 }
+
+@Test
+public void testCacheEviction() throws UnknownHostException, 
InterruptedException {
+Properties properties = new Properties();
+properties.put("metrics", 
ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG);
+properties.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "100");
+clientMetricsManager.updateSubscription("sub-1", properties);
+
+GetTelemetrySubscriptionsRequest request = new 
GetTelemetrySubscriptionsRequest.Builder(
+new GetTelemetrySubscriptionsRequestData(), true).build();
+
+GetTelemetrySubscriptionsResponse response = 
clientMetricsManager.processGetTelemetrySubscriptionRequest(
+request, ClientMetricsTestUtils.requestContext());
+assertEquals(Errors.NONE, response.error());
+
+
assertNotNull(clientMetricsManager.clientInstance(response.data().clientInstanceId()));
+assertEquals(1, clientMetricsManager.expirationTimer().size());
+// Cache expiry should occur after 100 * 3 = 300 ms, wait for at most 
600 ms for the eviction
+// to happen as eviction timer is scheduled in different thread.
+assertTimeoutPreemptively(Duration.ofMillis(600), () -> {
+// Validate that cache eviction happens and client instance is 
removed from cache.
+while (clientMetricsManager.expirationTimer().size() != 0 ||

Review Comment:
   There is a flakiness if clock is not forced to advance, hence I have added 
that back and lowered the max wait time in assertion. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15813: Evict client instances from cache (KIP-714) [kafka]

2024-01-23 Thread via GitHub


apoorvmittal10 commented on code in PR #15234:
URL: https://github.com/apache/kafka/pull/15234#discussion_r1462570838


##
server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java:
##
@@ -919,4 +922,102 @@ public void 
testPushTelemetryConcurrentRequestAfterSubscriptionUpdate() throws U
 // 1 request should fail with throttling error.
 assertEquals(1, throttlingErrorCount);
 }
+
+@Test
+public void testCacheEviction() throws UnknownHostException, 
InterruptedException {
+Properties properties = new Properties();
+properties.put("metrics", 
ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG);
+properties.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "100");
+clientMetricsManager.updateSubscription("sub-1", properties);
+
+GetTelemetrySubscriptionsRequest request = new 
GetTelemetrySubscriptionsRequest.Builder(
+new GetTelemetrySubscriptionsRequestData(), true).build();
+
+GetTelemetrySubscriptionsResponse response = 
clientMetricsManager.processGetTelemetrySubscriptionRequest(
+request, ClientMetricsTestUtils.requestContext());
+assertEquals(Errors.NONE, response.error());
+
+
assertNotNull(clientMetricsManager.clientInstance(response.data().clientInstanceId()));
+assertEquals(1, clientMetricsManager.expirationTimer().size());
+// Cache expiry should occur after 100 * 3 = 300 ms, wait for at most 
600 ms for the eviction
+// to happen as eviction timer is scheduled in different thread.
+assertTimeoutPreemptively(Duration.ofMillis(600), () -> {
+// Validate that cache eviction happens and client instance is 
removed from cache.
+while (clientMetricsManager.expirationTimer().size() != 0 ||

Review Comment:
   I checked the code for `waitUntilTrue` which actually takes a pause with 
thread sleep. Though I agree the current code has a busy wait but refactoring 
the method from TestUtils.scala to server-common with changes in all test files 
seems to be huge. Rather I could create a TestUtils java class in server-common 
with `waitUntilTrue` method as like scala and could use that.
   But I have also done equivalent now in the current code where thread sleeps 
for 50 ms in the busy wait. Also the executable in `assertTimeoutPreemptively` 
is executed in different thread. Please let me know if it works.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Fix "No suitable checks publisher found" message during CI build [kafka]

2024-01-23 Thread via GitHub


C0urante opened a new pull request, #15247:
URL: https://github.com/apache/kafka/pull/15247

   This message keeps popping up in our CI builds during the "Archive 
JUnit-formatted test results" step, and can be misleading since it appears to 
indicate that something is wrong.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16032: Fixes for commit/fetch error handling [kafka]

2024-01-23 Thread via GitHub


lianetm commented on code in PR #15202:
URL: https://github.com/apache/kafka/pull/15202#discussion_r1463379100


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -536,12 +543,23 @@ public void onResponse(final ClientResponse response) {
 continue;
 }
 
-if (error == Errors.COORDINATOR_NOT_AVAILABLE ||
+if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId));
+return;
+} else if (error == Errors.COORDINATOR_NOT_AVAILABLE ||
 error == Errors.NOT_COORDINATOR ||
 error == Errors.REQUEST_TIMED_OUT) {
 
coordinatorRequestManager.markCoordinatorUnknown(error.message(), 
currentTimeMs);
 maybeRetry(currentTimeMs, error.exception());
 return;
+} else if (error == Errors.FENCED_INSTANCE_ID) {
+log.info("OffsetCommit failed due to group instance id 
{} fenced: {}", groupInstanceId, error.message());
+future.completeExceptionally(new 
CommitFailedException());

Review Comment:
   got it. Message added to the exception too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16032: Fixes for commit/fetch error handling [kafka]

2024-01-23 Thread via GitHub


lianetm commented on code in PR #15202:
URL: https://github.com/apache/kafka/pull/15202#discussion_r1463372247


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -536,12 +543,23 @@ public void onResponse(final ClientResponse response) {
 continue;
 }
 
-if (error == Errors.COORDINATOR_NOT_AVAILABLE ||
+if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId));
+return;
+} else if (error == Errors.COORDINATOR_NOT_AVAILABLE ||
 error == Errors.NOT_COORDINATOR ||
 error == Errors.REQUEST_TIMED_OUT) {
 
coordinatorRequestManager.markCoordinatorUnknown(error.message(), 
currentTimeMs);
 maybeRetry(currentTimeMs, error.exception());
 return;
+} else if (error == Errors.FENCED_INSTANCE_ID) {
+log.info("OffsetCommit failed due to group instance id 
{} fenced: {}", groupInstanceId, error.message());
+future.completeExceptionally(new 
CommitFailedException());

Review Comment:
   Oh maybe just a confusion...I notice now that on the main PR page it still 
shows the `log.info`, but we do have the `log.error` I introduced in 
[this](https://github.com/apache/kafka/pull/15202/commits/f7b76fd75de6615ac089c6c8ff24166fc5683a2d)
 previous commit , so maybe just github tricking us?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16032: Fixes for commit/fetch error handling [kafka]

2024-01-23 Thread via GitHub


dajac commented on code in PR #15202:
URL: https://github.com/apache/kafka/pull/15202#discussion_r1463371625


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -536,12 +543,23 @@ public void onResponse(final ClientResponse response) {
 continue;
 }
 
-if (error == Errors.COORDINATOR_NOT_AVAILABLE ||
+if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId));
+return;
+} else if (error == Errors.COORDINATOR_NOT_AVAILABLE ||
 error == Errors.NOT_COORDINATOR ||
 error == Errors.REQUEST_TIMED_OUT) {
 
coordinatorRequestManager.markCoordinatorUnknown(error.message(), 
currentTimeMs);
 maybeRetry(currentTimeMs, error.exception());
 return;
+} else if (error == Errors.FENCED_INSTANCE_ID) {
+log.info("OffsetCommit failed due to group instance id 
{} fenced: {}", groupInstanceId, error.message());
+future.completeExceptionally(new 
CommitFailedException());

Review Comment:
   Ah. I was referring to adding a message to `new CommitFailedException()`. 
Sorry for the confusion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16032: Fixes for commit/fetch error handling [kafka]

2024-01-23 Thread via GitHub


lianetm commented on code in PR #15202:
URL: https://github.com/apache/kafka/pull/15202#discussion_r1463366293


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -536,12 +543,23 @@ public void onResponse(final ClientResponse response) {
 continue;
 }
 
-if (error == Errors.COORDINATOR_NOT_AVAILABLE ||
+if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId));
+return;
+} else if (error == Errors.COORDINATOR_NOT_AVAILABLE ||
 error == Errors.NOT_COORDINATOR ||
 error == Errors.REQUEST_TIMED_OUT) {
 
coordinatorRequestManager.markCoordinatorUnknown(error.message(), 
currentTimeMs);
 maybeRetry(currentTimeMs, error.exception());
 return;
+} else if (error == Errors.FENCED_INSTANCE_ID) {
+log.info("OffsetCommit failed due to group instance id 
{} fenced: {}", groupInstanceId, error.message());
+future.completeExceptionally(new 
CommitFailedException());

Review Comment:
   uhm I had addressed it by adding `log.error` instead of `log.info`, but 
maybe I misunderstood your comment. Were you suggesting something different?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16029: Fix "Unable to find FetchSessionHandler for node X" bug [kafka]

2024-01-23 Thread via GitHub


dajac commented on PR #15186:
URL: https://github.com/apache/kafka/pull/15186#issuecomment-1906093587

   @kirktrue I think that the description is not quite right. First, I think 
that the session map was actually cleared previously. Second, the root cause is 
more along the line of what I said 
[here](https://github.com/apache/kafka/pull/15186#discussion_r1461537105). 
Could you please update the description of the PR? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16032: Fixes for commit/fetch error handling [kafka]

2024-01-23 Thread via GitHub


dajac commented on code in PR #15202:
URL: https://github.com/apache/kafka/pull/15202#discussion_r1463282309


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -414,8 +495,69 @@ public void testOffsetFetchRequestErroredRequests(final 
Errors error, final bool
 testNonRetriable(futures);
 assertEmptyPendingRequests(commitRequestManger);
 }
+}
 
-assertCoordinatorDisconnect(error);
+@Test

Review Comment:
   nit: Should we use parameterized tests instead of specifying all cases like 
this?



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -272,7 +270,65 @@ public void 
testSyncAutocommitRetriedAfterRetriableException(Errors error) {
 
 // We expect that request should have been retried on this sync commit.
 assertExceptionHandling(commitRequestManger, error, true);
-assertCoordinatorDisconnect(error);
+}
+
+@Test
+public void testCommitSyncThrowsCommitFailedExceptionOnFencedInstanceId() {
+
testCommitSyncFailsWithCommitFailedExceptionOnError(Errors.FENCED_INSTANCE_ID);
+}
+
+@Test
+public void testCommitSyncThrowsCommitFailedExceptionOnUnknownMemberId() {
+
testCommitSyncFailsWithCommitFailedExceptionOnError(Errors.UNKNOWN_MEMBER_ID);
+}
+
+private void testCommitSyncFailsWithCommitFailedExceptionOnError(Errors 
commitError) {
+CommitRequestManager commitRequestManger = create(false, 100);
+
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
+
+Map offsets = 
Collections.singletonMap(
+new TopicPartition("topic", 1),
+new OffsetAndMetadata(0));
+
+// Send sync offset commit request that fails with an error that is 
expected to propagate
+// a CommitFailedException
+Long expirationTimeMs = time.milliseconds() + retryBackoffMs;
+CompletableFuture commitResult = 
commitRequestManger.addOffsetCommitRequest(offsets, 
Optional.of(expirationTimeMs), false);
+completeOffsetCommitRequestWithError(commitRequestManger, commitError);
+assertFutureThrows(commitResult, CommitFailedException.class);
+}
+
+@Test
+public void testCommitSyncThrowsOffsetMetadataTooLargeException() {
+// Error with metadata provided by the user should propagate the 
exception, so they can handle it.
+
testCommitSyncFailsWithErrorException(Errors.OFFSET_METADATA_TOO_LARGE);
+}
+
+@Test
+public void testCommitSyncThrowsInvalidCommitOffsetSizeException() {
+// Error with data provided by the user should propagate the 
exception, so they can handle it.
+
testCommitSyncFailsWithErrorException(Errors.INVALID_COMMIT_OFFSET_SIZE);
+}
+
+@Test
+public void testCommitSyncThrowsGroupAuthorizationException() {
+
testCommitSyncFailsWithErrorException(Errors.GROUP_AUTHORIZATION_FAILED);
+}
+
+private void testCommitSyncFailsWithErrorException(Errors commitError) {

Review Comment:
   This method is very similar to 
testCommitSyncFailsWithCommitFailedExceptionOnError. Could we share part of the 
implementation for both cases? For instance, one idea would be to pass the 
Errors and the exception Exception as params.



##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java:
##
@@ -440,28 +582,59 @@ public void 
testOffsetCommitRequestErroredRequestsNotRetriedForAsyncCommit(final
 
 // We expect that the request should not have been retried on this 
async commit.
 assertExceptionHandling(commitRequestManger, error, false);
-assertCoordinatorDisconnect(error);
 }
 
+@Test
+public void 
testCommitAsyncThrowsKafkaExceptionForUnexpectedRetriableError() {
+testCommitAsyncThrowsKafkaException(Errors.CORRUPT_MESSAGE);
+}
 
 @Test
-public void 
testAsyncOffsetCommitThrowsRetriableCommitExceptionForUnhandledRetriable() {
+public void 
testCommitAsyncThrowsKafkaExceptionForUnexpectedNonRetriableError() {
+testCommitAsyncThrowsKafkaException(Errors.UNKNOWN_SERVER_ERROR);
+}
+
+private void testCommitAsyncThrowsKafkaException(Errors error) {
 CommitRequestManager commitRequestManger = create(true, 100);
 
when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mockedNode));
 
 Map offsets = 
Collections.singletonMap(new TopicPartition("topic", 1),
 new OffsetAndMetadata(0));
 
-// Send commit request without expiration (async commit) that fails 
with retriable
-// network exception that has no specific handling. Should fail with
-// RetriableCommitException.
+// Send async commit that fails with unexpected error. Should fail 
with KafkaException.
 CompletableFuture commitResult 

Re: [PR] KAFKA-15853: Move PasswordEncoder to server module [kafka]

2024-01-23 Thread via GitHub


mimaison commented on PR #15164:
URL: https://github.com/apache/kafka/pull/15164#issuecomment-1906055970

   @OmniaGM I opened an alternative PR for this: 
https://github.com/apache/kafka/pull/15246 
   
   `ConfigCommand` uses `PasswordEncoder` so it needs to be accessible from the 
tools module.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15853: Move PasswordEncoder to server-common [kafka]

2024-01-23 Thread via GitHub


mimaison opened a new pull request, #15246:
URL: https://github.com/apache/kafka/pull/15246

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14920) Address timeouts and out of order sequences

2024-01-23 Thread Lan Ding (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lan Ding updated KAFKA-14920:
-
Description: 
KAFKA-14844 showed the destructive nature of a timeout on the first produce 
request for a topic partition (ie one that has no state in psm)

由于我们目前尚未验证第一个序列(我们将在 kip-890 的第 2 部分中),因此第一个产品上的任何瞬态错误都可能导致永远无法恢复的无序序列。

最初,KAFKA-14561 依赖于生产者的重试机制来解决这些暂时性问题,但在修复之前,我们可能需要从 AddPartitionsManager 
中重试。我们解决了并发事务,但还有其他错误,例如协调器加载,我们可能会遇到这些错误,并看到更多的乱序问题。

  was:
KAFKA-14844 showed the destructive nature of a timeout on the first produce 
request for a topic partition (ie one that has no state in psm)

Since we currently don't validate the first sequence (we will in part 2 of 
kip-890), any transient error on the first produce can lead to out of order 
sequences that never recover.

Originally, KAFKA-14561 relied on the producer's retry mechanism for these 
transient issues, but until that is fixed, we may need to retry from in the 
AddPartitionsManager instead. We addressed the concurrent transactions, but 
there are other errors like coordinator loading that we could run into and see 
increased out of order issues.


> Address timeouts and out of order sequences
> ---
>
> Key: KAFKA-14920
> URL: https://issues.apache.org/jira/browse/KAFKA-14920
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 3.6.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
> Fix For: 3.6.0
>
>
> KAFKA-14844 showed the destructive nature of a timeout on the first produce 
> request for a topic partition (ie one that has no state in psm)
> 由于我们目前尚未验证第一个序列(我们将在 kip-890 的第 2 部分中),因此第一个产品上的任何瞬态错误都可能导致永远无法恢复的无序序列。
> 最初,KAFKA-14561 依赖于生产者的重试机制来解决这些暂时性问题,但在修复之前,我们可能需要从 AddPartitionsManager 
> 中重试。我们解决了并发事务,但还有其他错误,例如协调器加载,我们可能会遇到这些错误,并看到更多的乱序问题。



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16032: Fixes for commit/fetch error handling [kafka]

2024-01-23 Thread via GitHub


dajac commented on code in PR #15202:
URL: https://github.com/apache/kafka/pull/15202#discussion_r1463253788


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -536,12 +543,23 @@ public void onResponse(final ClientResponse response) {
 continue;
 }
 
-if (error == Errors.COORDINATOR_NOT_AVAILABLE ||
+if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
+
future.completeExceptionally(GroupAuthorizationException.forGroupId(groupId));
+return;
+} else if (error == Errors.COORDINATOR_NOT_AVAILABLE ||
 error == Errors.NOT_COORDINATOR ||
 error == Errors.REQUEST_TIMED_OUT) {
 
coordinatorRequestManager.markCoordinatorUnknown(error.message(), 
currentTimeMs);
 maybeRetry(currentTimeMs, error.exception());
 return;
+} else if (error == Errors.FENCED_INSTANCE_ID) {
+log.info("OffsetCommit failed due to group instance id 
{} fenced: {}", groupInstanceId, error.message());
+future.completeExceptionally(new 
CommitFailedException());
+return;
+} else if (error == Errors.OFFSET_METADATA_TOO_LARGE ||
+error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
+future.completeExceptionally(error.exception());
+return;
 } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS ||
 error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
 // just retry

Review Comment:
   Sure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR Remove unreachable if-else block in ReplicaManager.scala [kafka]

2024-01-23 Thread via GitHub


drawxy commented on PR #15220:
URL: https://github.com/apache/kafka/pull/15220#issuecomment-1905997013

   > I can see there are thousands of lines changes. Is that expected?
   
   Hi @showuon , sorry for involving tens of commits by rebasing the wrong 
branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR Remove unreachable if-else block in ReplicaManager.scala [kafka]

2024-01-23 Thread via GitHub


showuon commented on PR #15220:
URL: https://github.com/apache/kafka/pull/15220#issuecomment-1905980778

   I can see there are thousands of lines changes. Is that expected?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-01-23 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1463150319


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,54 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+// If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+// written to the secondary store in a synchronous manner and then to 
the primary store.
+// This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+// but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+// source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+// if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+// offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+CompletableFuture offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+offsetWriteFuture.thenAccept(v -> {

Review Comment:
   ~~I have actually removed the chaining using `CompletableFuture` and 
simplified the logic. I just wait on thesecondary store write directly (with or 
without a timeout) and if the execution fails or the wait itself fails, I 
update the callback and return the same exception (because it's already 
completed).With this,  there are no changes (apart from using `regularOffsets` 
when writing to secondary store) from the primary store. Let me know what you 
think about this.~~
   
   I re-read the comments and looks like with this we are going against the 
approach you wanted in this 
https://github.com/apache/kafka/pull/13801/#discussion_r1268520271 i.e
   
   > That said, I don't love how we've made this method synchronously await the 
write to the secondary store. We should return a Future to the caller that 
corresponds to all of the offset flushes that we'd need to block on for an 
offset commit (i.e., the existing flush that we're performing, possibly 
preceded by a preemptive flush of tombstones to the secondary store).
   
   
   Let me take a look at this again. Sorry about this since I had forgotten 
about this comment.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-01-23 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1463150319


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,54 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+// If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+// written to the secondary store in a synchronous manner and then to 
the primary store.
+// This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+// but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+// source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+// if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+// offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+CompletableFuture offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+offsetWriteFuture.thenAccept(v -> {

Review Comment:
   ~~I have actually removed the chaining using `CompletableFuture` and 
simplified the logic. I just wait on thesecondary store write directly (with or 
without a timeout) and if the execution fails or the wait itself fails, I 
update the callback and return the same exception (because it's already 
completed).With this,  there are no changes (apart from using `regularOffsets` 
when writing to secondary store) from the primary store. Let me know what you 
think about this.~~
   
   I re-read the comments and looks like with this we are going against the 
approach you wanted in this 
https://github.com/apache/kafka/pull/13801/#discussion_r1268520271 i.e
   
   That said, I don't love how we've made this method synchronously await the 
write to the secondary store. We should return a Future to the caller that 
corresponds to all of the offset flushes that we'd need to block on for an 
offset commit (i.e., the existing flush that we're performing, possibly 
preceded by a preemptive flush of tombstones to the secondary store).
   
   Let me take a look at this again. Sorry about this since I had forgotten 
about this comment.
   



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,54 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+// If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+// written to the secondary store in a synchronous manner and then to 
the primary store.
+// This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+// but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+// source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+// if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+// offsets present in the global 

Re: [PR] KAFKA-16085: Add metric value consolidated for topics on a broker for tiered storage. [kafka]

2024-01-23 Thread via GitHub


showuon commented on code in PR #15133:
URL: https://github.com/apache/kafka/pull/15133#discussion_r1463221572


##
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##
@@ -1396,11 +1396,13 @@ object TestUtils extends Logging {
   // Note: Call this method in the test itself, rather than the @AfterEach 
method.
   // Because of the assert, if assertNoNonDaemonThreads fails, nothing after 
would be executed.
   def assertNoNonDaemonThreads(threadNamePrefix: String): Unit = {
-val nonDaemonThreads = Thread.getAllStackTraces.keySet.asScala.filter { t 
=>
-  !t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix)
-}
-val threadCount = nonDaemonThreads.size
-assertEquals(0, threadCount, s"Found unexpected $threadCount NonDaemon 
threads=${nonDaemonThreads.map(t => t.getName).mkString(", ")}")
+var nonDemonThreads: mutable.Set[Thread] = mutable.Set.empty[Thread]
+waitUntilTrue(() => {
+  nonDemonThreads = Thread.getAllStackTraces.keySet.asScala.filter { t =>
+!t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix)
+  }
+  0 == nonDemonThreads.size
+}, s"Found unexpected ${nonDemonThreads.size} NonDaemon 
threads=${nonDemonThreads.map(t => t.getName).mkString(", ")}", 1000)

Review Comment:
   > BTW, which threads do you think are closed async during Kafka server 
shutdown? AFAIK, we have checks in place to ensure that we wait for proper 
close during shutdown
   
   Because in the failed test cases, They are `ReplicaFetcherThread` leaked. 
But had a quick look, yes, we did do `awaitShutdown()` for fetcher threads. 
Hmm... Let me dig into further tomorrow. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-01-23 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1463158103


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,54 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+// If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+// written to the secondary store in a synchronous manner and then to 
the primary store.
+// This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+// but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+// source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+// if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+// offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+CompletableFuture offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+offsetWriteFuture.thenAccept(v -> {
+Future secondaryWriteFuture = 
secondaryStore.set(tombstoneOffsets, new FutureCallback<>());
+try {
+if (exactlyOnce) {
+secondaryWriteFuture.get();
+} else {
+secondaryWriteFuture.get(offsetFlushTimeoutMs, 
TimeUnit.MILLISECONDS);
+}
+} catch (ExecutionException e) {
+log.error("{} Flush of tombstone(s) offsets to secondary 
store threw an unexpected exception: ", this, e.getCause());
+callback.onCompletion(e.getCause(), null);
+} catch (Exception e) {
+log.error("{} Got Exception when trying to flush 
tombstone(s) offsets to secondary store", this, e);
+callback.onCompletion(e, null);
+}
+});
+}
+offsetWriteFuture.thenAccept(v -> primaryStore.set(values, new 
FutureCallback<>((primaryWriteError, ignored) -> {

Review Comment:
   I have not added this test at this point and I can add it if you think it's 
still needed with the new approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14589 ConsumerGroupCommand options and case classes rewritten [kafka]

2024-01-23 Thread via GitHub


nizhikov commented on PR #14856:
URL: https://github.com/apache/kafka/pull/14856#issuecomment-1905956206

   Thanks for review and merge!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16085: Add metric value consolidated for topics on a broker for tiered storage. [kafka]

2024-01-23 Thread via GitHub


divijvaidya commented on code in PR #15133:
URL: https://github.com/apache/kafka/pull/15133#discussion_r1463200794


##
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##
@@ -1396,11 +1396,13 @@ object TestUtils extends Logging {
   // Note: Call this method in the test itself, rather than the @AfterEach 
method.
   // Because of the assert, if assertNoNonDaemonThreads fails, nothing after 
would be executed.
   def assertNoNonDaemonThreads(threadNamePrefix: String): Unit = {
-val nonDaemonThreads = Thread.getAllStackTraces.keySet.asScala.filter { t 
=>
-  !t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix)
-}
-val threadCount = nonDaemonThreads.size
-assertEquals(0, threadCount, s"Found unexpected $threadCount NonDaemon 
threads=${nonDaemonThreads.map(t => t.getName).mkString(", ")}")
+var nonDemonThreads: mutable.Set[Thread] = mutable.Set.empty[Thread]
+waitUntilTrue(() => {
+  nonDemonThreads = Thread.getAllStackTraces.keySet.asScala.filter { t =>
+!t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix)
+  }
+  0 == nonDemonThreads.size
+}, s"Found unexpected ${nonDemonThreads.size} NonDaemon 
threads=${nonDemonThreads.map(t => t.getName).mkString(", ")}", 1000)

Review Comment:
   BTW, which threads do you think are closed async during Kafka server 
shutdown? AFAIK, we have checks in place to ensure that we wait for proper 
close during shutdown



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16085: Add metric value consolidated for topics on a broker for tiered storage. [kafka]

2024-01-23 Thread via GitHub


divijvaidya commented on code in PR #15133:
URL: https://github.com/apache/kafka/pull/15133#discussion_r1463198772


##
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##
@@ -1396,11 +1396,13 @@ object TestUtils extends Logging {
   // Note: Call this method in the test itself, rather than the @AfterEach 
method.
   // Because of the assert, if assertNoNonDaemonThreads fails, nothing after 
would be executed.
   def assertNoNonDaemonThreads(threadNamePrefix: String): Unit = {
-val nonDaemonThreads = Thread.getAllStackTraces.keySet.asScala.filter { t 
=>
-  !t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix)
-}
-val threadCount = nonDaemonThreads.size
-assertEquals(0, threadCount, s"Found unexpected $threadCount NonDaemon 
threads=${nonDaemonThreads.map(t => t.getName).mkString(", ")}")
+var nonDemonThreads: mutable.Set[Thread] = mutable.Set.empty[Thread]
+waitUntilTrue(() => {
+  nonDemonThreads = Thread.getAllStackTraces.keySet.asScala.filter { t =>
+!t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix)
+  }
+  0 == nonDemonThreads.size
+}, s"Found unexpected ${nonDemonThreads.size} NonDaemon 
threads=${nonDemonThreads.map(t => t.getName).mkString(", ")}", 1000)

Review Comment:
   This will increase the run-time for test suite when it is leaking threads. 
Which should be ok, as the happy case (when we are not leaking remains the 
same).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove controlPlaneRequestProcessor in BrokerServer [kafka]

2024-01-23 Thread via GitHub


appchemist commented on PR #15245:
URL: https://github.com/apache/kafka/pull/15245#issuecomment-1905904446

   Git fetch failed in CI due to the error "couldn't find remote ref 
refs/pull/15245/head".


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16085: Add metric value consolidated for topics on a broker for tiered storage. [kafka]

2024-01-23 Thread via GitHub


showuon commented on code in PR #15133:
URL: https://github.com/apache/kafka/pull/15133#discussion_r1463172726


##
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##
@@ -1396,11 +1396,13 @@ object TestUtils extends Logging {
   // Note: Call this method in the test itself, rather than the @AfterEach 
method.
   // Because of the assert, if assertNoNonDaemonThreads fails, nothing after 
would be executed.
   def assertNoNonDaemonThreads(threadNamePrefix: String): Unit = {
-val nonDaemonThreads = Thread.getAllStackTraces.keySet.asScala.filter { t 
=>
-  !t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix)
-}
-val threadCount = nonDaemonThreads.size
-assertEquals(0, threadCount, s"Found unexpected $threadCount NonDaemon 
threads=${nonDaemonThreads.map(t => t.getName).mkString(", ")}")
+var nonDemonThreads: mutable.Set[Thread] = mutable.Set.empty[Thread]
+waitUntilTrue(() => {
+  nonDemonThreads = Thread.getAllStackTraces.keySet.asScala.filter { t =>
+!t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix)
+  }
+  0 == nonDemonThreads.size
+}, s"Found unexpected ${nonDemonThreads.size} NonDaemon 
threads=${nonDemonThreads.map(t => t.getName).mkString(", ")}", 1000)

Review Comment:
   cc @divijvaidya , I found sometimes the 
[CI](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15133/9/testReport/junit/kafka.server/ReplicaManagerTest/Build___JDK_11_and_Scala_2_13___testSuccessfulBuildRemoteLogAuxStateMetrics__/)
 is too sensitive to the non demean threads check. There are some shutdown are 
in async way. So you can check the failed results 
[here](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15133/9/): 
Basically, if there are some resource not closed, all the following tests 
should also fail (I verified in my local env). But in the CI results, it only 
fail 2 of replicaManagertest, and only in jdk11. So I'm going to verify it 
using `waitUntilTrue` to give it some chance to wait for the threads shutdown.
   
   I also set the wait time as 1 second because if there are really resources 
leaked, the total wait time will be the product of `waitTime` and the number of 
all the following failed tests. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16085: Add metric value consolidated for topics on a broker for tiered storage. [kafka]

2024-01-23 Thread via GitHub


showuon commented on code in PR #15133:
URL: https://github.com/apache/kafka/pull/15133#discussion_r1463172726


##
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##
@@ -1396,11 +1396,13 @@ object TestUtils extends Logging {
   // Note: Call this method in the test itself, rather than the @AfterEach 
method.
   // Because of the assert, if assertNoNonDaemonThreads fails, nothing after 
would be executed.
   def assertNoNonDaemonThreads(threadNamePrefix: String): Unit = {
-val nonDaemonThreads = Thread.getAllStackTraces.keySet.asScala.filter { t 
=>
-  !t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix)
-}
-val threadCount = nonDaemonThreads.size
-assertEquals(0, threadCount, s"Found unexpected $threadCount NonDaemon 
threads=${nonDaemonThreads.map(t => t.getName).mkString(", ")}")
+var nonDemonThreads: mutable.Set[Thread] = mutable.Set.empty[Thread]
+waitUntilTrue(() => {
+  nonDemonThreads = Thread.getAllStackTraces.keySet.asScala.filter { t =>
+!t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix)
+  }
+  0 == nonDemonThreads.size
+}, s"Found unexpected ${nonDemonThreads.size} NonDaemon 
threads=${nonDemonThreads.map(t => t.getName).mkString(", ")}", 1000)

Review Comment:
   cc @divijvaidya , I found sometimes the 
[CI](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15133/9/testReport/junit/kafka.server/ReplicaManagerTest/Build___JDK_11_and_Scala_2_13___testSuccessfulBuildRemoteLogAuxStateMetrics__/)
 is too sensitive to the non demean threads check. There are some shutdown are 
in async way. So you can check the failed results 
[here](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15133/9/): 
Basically, if there are some resource not closed, all the following tests 
should also fail (I verified in my local env). But in the CI results, it only 
fail 2 of replicaManagertest, and only in jdk11. So I'm going to verify it 
using `waitUntilTrue` to give it some chance to wait for the threads shutdown.
   
   I also set the wait time as 1 second because if there are really resources 
leaked, the wait time will be the product of `waitTime` and the number of all 
the following failed tests. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16085: Add metric value consolidated for topics on a broker for tiered storage. [kafka]

2024-01-23 Thread via GitHub


showuon commented on code in PR #15133:
URL: https://github.com/apache/kafka/pull/15133#discussion_r1463172726


##
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##
@@ -1396,11 +1396,13 @@ object TestUtils extends Logging {
   // Note: Call this method in the test itself, rather than the @AfterEach 
method.
   // Because of the assert, if assertNoNonDaemonThreads fails, nothing after 
would be executed.
   def assertNoNonDaemonThreads(threadNamePrefix: String): Unit = {
-val nonDaemonThreads = Thread.getAllStackTraces.keySet.asScala.filter { t 
=>
-  !t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix)
-}
-val threadCount = nonDaemonThreads.size
-assertEquals(0, threadCount, s"Found unexpected $threadCount NonDaemon 
threads=${nonDaemonThreads.map(t => t.getName).mkString(", ")}")
+var nonDemonThreads: mutable.Set[Thread] = mutable.Set.empty[Thread]
+waitUntilTrue(() => {
+  nonDemonThreads = Thread.getAllStackTraces.keySet.asScala.filter { t =>
+!t.isDaemon && t.isAlive && t.getName.startsWith(threadNamePrefix)
+  }
+  0 == nonDemonThreads.size
+}, s"Found unexpected ${nonDemonThreads.size} NonDaemon 
threads=${nonDemonThreads.map(t => t.getName).mkString(", ")}", 1000)

Review Comment:
   cc @divijvaidya , I found sometimes the 
[CI](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15133/9/testReport/junit/kafka.server/ReplicaManagerTest/Build___JDK_11_and_Scala_2_13___testSuccessfulBuildRemoteLogAuxStateMetrics__/)
 is too sensitive to the non demean threads check. There are some shutdown are 
in async way. So you can check the failed results 
[here](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-15133/9/): 
Basically, if there are some resource not closed, all the following tests 
should also fail (I verified in my local env). But in the CI results, it only 
fail 2 of replicaManagertest, and only in jdk11. So I'm going to verify it 
using `waitUntilTrue` to give it some chance to wait for the threads shutdown.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-01-23 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1463165311


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,54 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+// If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+// written to the secondary store in a synchronous manner and then to 
the primary store.
+// This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+// but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+// source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+// if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+// offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+CompletableFuture offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+offsetWriteFuture.thenAccept(v -> {

Review Comment:
   I re-read the comments and looks like with this we are going against the 
approach you wanted in this 
[comment](https://github.com/apache/kafka/pull/13801/#discussion_r1268520271) 
i.e 
   
   > That said, I don't love how we've made this method synchronously await the 
write to the secondary store. We should return a Future to the caller that 
corresponds to all of the offset flushes that we'd need to block on for an 
offset commit (i.e., the existing flush that we're performing, possibly 
preceded by a preemptive flush of tombstones to the secondary store).
   
   Let me take a look at this again. Sorry about this since I had forgotten 
about this comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-01-23 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1463165311


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,54 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+// If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+// written to the secondary store in a synchronous manner and then to 
the primary store.
+// This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+// but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+// source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+// if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+// offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+CompletableFuture offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+offsetWriteFuture.thenAccept(v -> {

Review Comment:
   I re-read the comments again and looks like with this we are going against 
the approach you wanted in this 
[comment](https://github.com/apache/kafka/pull/13801/#discussion_r1268520271) 
i.e 
   
   > That said, I don't love how we've made this method synchronously await the 
write to the secondary store. We should return a Future to the caller that 
corresponds to all of the offset flushes that we'd need to block on for an 
offset commit (i.e., the existing flush that we're performing, possibly 
preceded by a preemptive flush of tombstones to the secondary store).
   
   Let me take a look at this again. Sorry about this since I had forgotten 
about this comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-01-23 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1463165311


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,54 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+// If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+// written to the secondary store in a synchronous manner and then to 
the primary store.
+// This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+// but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+// source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+// if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+// offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+CompletableFuture offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+offsetWriteFuture.thenAccept(v -> {

Review Comment:
   I re-read the comments again and looks like with this we are going against 
the approach you wanted in this 
[comment](https://github.com/apache/kafka/pull/13801/#discussion_r1268520271) 
i.e 
   
   > That said, I don't love how we've made this method synchronously await the 
write to the secondary store. We should return a Future to the caller that 
corresponds to all of the offset flushes that we'd need to block on for an 
offset commit (i.e., the existing flush that we're performing, possibly 
preceded by a preemptive flush of tombstones to the secondary store).
   
   Let me take a look at this again. Sorry about this but I had forgotten about 
this comment in particular.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-01-23 Thread via GitHub


clolov commented on PR #15213:
URL: https://github.com/apache/kafka/pull/15213#issuecomment-1905886714

   Thanks a lot for the review @kamalcph! I have hopefully addressed everything 
 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP [kafka]

2024-01-23 Thread via GitHub


clolov commented on code in PR #15213:
URL: https://github.com/apache/kafka/pull/15213#discussion_r1463164583


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1300,18 +1303,27 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
 val curLocalLogStartOffset = localLogStartOffset()
 
-val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache 
=> {
+val epochOpt: Optional[Integer] = 
leaderEpochCache.asJava.flatMap(cache => {
   val epoch = cache.epochForOffset(curLocalLogStartOffset)
-  if (epoch.isPresent) cache.epochEntry(epoch.getAsInt) else 
Optional.empty[EpochEntry]()
+  if (epoch.isPresent) Optional.of[Integer](epoch.getAsInt) else 
Optional.empty[Integer]()
 })
 
-val epochOpt = if (earliestLocalLogEpochEntry.isPresent && 
earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset)
-  Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch)
-else Optional.empty[Integer]()
-
 Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, 
curLocalLogStartOffset, epochOpt))
   } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIMESTAMP) {
 Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, 
latestEpochAsOptional(leaderEpochCache)))
+  } else if (targetTimestamp == 
ListOffsetsRequest.LATEST_TIERED_TIMESTAMP) {
+if (remoteLogEnabled()) {
+  val curHighestRemoteOffset = highestOffsetInRemoteStorage()
+
+  val optEpoch: Optional[Integer] = 
leaderEpochCache.asJava.flatMap(cache => {
+val epoch = cache.epochForOffset(curHighestRemoteOffset)
+if (epoch.isPresent) Optional.of[Integer](epoch.getAsInt) else 
Optional.empty[Integer]()
+  })
+
+  Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, 
highestOffsetInRemoteStorage(), optEpoch))
+} else {
+  Option.empty

Review Comment:
   Good catch! In the KIP I have specified that Kafka should return no offset 
in such situations. I shall aim to add an integration test from the point of 
view of the client in an upcoming pull request



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1280,7 +1282,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   if (config.messageFormatVersion.isLessThan(IBP_0_10_0_IV0) &&
 targetTimestamp != ListOffsetsRequest.EARLIEST_TIMESTAMP &&
 targetTimestamp != ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP &&
-targetTimestamp != ListOffsetsRequest.LATEST_TIMESTAMP)
+targetTimestamp != ListOffsetsRequest.LATEST_TIMESTAMP &&
+targetTimestamp != ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)

Review Comment:
   I have removed them both, but I don't think it would have caused problems 
either way.



##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1300,18 +1303,27 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   } else if (targetTimestamp == 
ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
 val curLocalLogStartOffset = localLogStartOffset()
 
-val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache 
=> {
+val epochOpt: Optional[Integer] = 
leaderEpochCache.asJava.flatMap(cache => {
   val epoch = cache.epochForOffset(curLocalLogStartOffset)
-  if (epoch.isPresent) cache.epochEntry(epoch.getAsInt) else 
Optional.empty[EpochEntry]()
+  if (epoch.isPresent) Optional.of[Integer](epoch.getAsInt) else 
Optional.empty[Integer]()
 })
 
-val epochOpt = if (earliestLocalLogEpochEntry.isPresent && 
earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset)
-  Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch)
-else Optional.empty[Integer]()
-

Review Comment:
   Hopefully I have achieved both in the subsequent commit (reverted one of the 
changes and made both easier to read). Let me know if this isn't the case!



##
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##
@@ -2126,6 +2126,94 @@ class UnifiedLogTest {
   log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP, 
Some(remoteLogManager)))
   }
 
+  @Test
+  def testFetchLatestTieredTimestampNoRemoteStorage(): Unit = {
+val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, 
indexIntervalBytes = 1)
+val log = createLog(logDir, logConfig)
+
+assertEquals(None, 
log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIERED_TIMESTAMP))
+
+val firstTimestamp = mockTime.milliseconds
+val leaderEpoch = 0
+log.appendAsLeader(TestUtils.singletonRecords(
+  value = TestUtils.randomBytes(10),
+  timestamp = firstTimestamp),
+  leaderEpoch = leaderEpoch)
+
+val secondTimestamp = firstTimestamp + 1
+log.appendAsLeader(TestUtils.singletonRecords(
+  

Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-01-23 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1463158347


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -299,12 +349,11 @@ public Future set(Map 
values, Callback callb
 } catch (Exception e) {
 log.warn("Failed to write offsets to secondary backing 
store", e);
 }
+callback.onCompletion(null, null);
 }
 }
-try (LoggingContext context = loggingContext()) {
-callback.onCompletion(primaryWriteError, ignored);
-}
-});

Review Comment:
   Added it back. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-01-23 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1463158103


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,54 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+// If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+// written to the secondary store in a synchronous manner and then to 
the primary store.
+// This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+// but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+// source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+// if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+// offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+CompletableFuture offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+offsetWriteFuture.thenAccept(v -> {
+Future secondaryWriteFuture = 
secondaryStore.set(tombstoneOffsets, new FutureCallback<>());
+try {
+if (exactlyOnce) {
+secondaryWriteFuture.get();
+} else {
+secondaryWriteFuture.get(offsetFlushTimeoutMs, 
TimeUnit.MILLISECONDS);
+}
+} catch (ExecutionException e) {
+log.error("{} Flush of tombstone(s) offsets to secondary 
store threw an unexpected exception: ", this, e.getCause());
+callback.onCompletion(e.getCause(), null);
+} catch (Exception e) {
+log.error("{} Got Exception when trying to flush 
tombstone(s) offsets to secondary store", this, e);
+callback.onCompletion(e, null);
+}
+});
+}
+offsetWriteFuture.thenAccept(v -> primaryStore.set(values, new 
FutureCallback<>((primaryWriteError, ignored) -> {

Review Comment:
   I have not added this test at this point and I can add it if you think it's 
still needed with the new approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-01-23 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1463150319


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,54 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+// If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+// written to the secondary store in a synchronous manner and then to 
the primary store.
+// This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+// but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+// source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+// if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+// offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+CompletableFuture offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+offsetWriteFuture.thenAccept(v -> {

Review Comment:
   I have actually removed the chaining using `CompletableFuture` and 
simplified the logic. I just wait on the secondary store write directly (with 
or without a timeout) and if the execution fails or the wait itself fails, I 
update the callback and return the same exception (because it's already 
completed).
   
   ```
   if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
   Future secondaryWriteFuture = 
secondaryStore.set(tombstoneOffsets, (t, r) -> { });
   try {
   if (exactlyOnce) {
   secondaryWriteFuture.get();
   } else {
   secondaryWriteFuture.get(offsetFlushTimeoutMs, 
TimeUnit.MILLISECONDS);
   }
   log.debug("Successfully flushed tombstone offsets to 
secondary store");
   } catch (ExecutionException e) {
   log.error("{} Failed to flush tombstone offsets to secondary 
store", this, e.getCause());
   callback.onCompletion(e.getCause(), null);
   return secondaryWriteFuture;
   } catch (Throwable e) {
   log.error("{} Failed to flush tombstone offsets to secondary 
store", this, e);
   callback.onCompletion(e, null);
   return secondaryWriteFuture;
   }
   }
   ```
   
   With this,  there are no changes (apart from using `regularOffsets` when 
writing to secondary store) from the primary store. Let me know what you think 
about this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Remove controlPlaneRequestProcessor in BrokerServer [kafka]

2024-01-23 Thread via GitHub


appchemist opened a new pull request, #15245:
URL: https://github.com/apache/kafka/pull/15245

   In BrokerServer, controlPlaneRequestProcessor is always null and is not used.
   In addition, `validateControlPlaneListenerEmptyForKRaft` in `KafkaConfig` 
checks that `controlPlaneListenerName` is empty in KRaft mode.
   
   So, controlPlaneRequestProcessor is needed in BrokerServer
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-01-23 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1463144968


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,54 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+
+// If the supplied offsets contain tombstone values, then tombstone 
offsets are extracted first,
+// written to the secondary store in a synchronous manner and then to 
the primary store.
+// This is because, if a tombstone offset is successfully written to 
the per-connector offsets topic,
+// but cannot be written to the global offsets topic, then the global 
offsets topic will still contain that
+// source offset, but the per-connector topic will not. Due to the 
fallback-on-global logic used by the worker,
+// if a task requests offsets for one of the tombstoned partitions, 
the worker will provide it with the
+// offsets present in the global offsets topic, instead of indicating 
to the task that no offsets can be found.
+CompletableFuture offsetWriteFuture = 
CompletableFuture.completedFuture(null);
+if (secondaryStore != null && !tombstoneOffsets.isEmpty()) {
+offsetWriteFuture.thenAccept(v -> {
+Future secondaryWriteFuture = 
secondaryStore.set(tombstoneOffsets, new FutureCallback<>());

Review Comment:
   I took the non-fancy route for now i.e using an inline no-op `Callback`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15018: Failing offset flush for EOS when secondary offset store writes fails for tombstone records [kafka]

2024-01-23 Thread via GitHub


vamossagar12 commented on code in PR #13801:
URL: https://github.com/apache/kafka/pull/13801#discussion_r1463143759


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -279,15 +290,54 @@ public Future set(Map 
values, Callback callb
 throw new IllegalStateException("At least one non-null offset 
store must be provided");
 }
 
-return primaryStore.set(values, (primaryWriteError, ignored) -> {
+List partitionsWithTombstoneOffsets = 
values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() == null)
+.map(Map.Entry::getKey).collect(Collectors.toList());
+
+Map tombstoneOffsets = new HashMap<>();
+for (ByteBuffer partition : partitionsWithTombstoneOffsets) {
+tombstoneOffsets.put(partition, null);
+}
+Map regularOffsets = values.entrySet().stream()
+.filter(offsetEntry -> offsetEntry.getValue() != null)
+.collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));

Review Comment:
   Thanks for this. I updated the logic.



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java:
##
@@ -254,7 +260,12 @@ public Map get(long timeout, 
TimeUnit unit) throws Inter
  * write to that store, and the passed-in {@link Callback} is invoked once 
that write completes. If a worker-global
  * store is provided, a secondary write is made to that store if the write 
to the connector-specific store
  * succeeds. Errors with this secondary write are not reflected in the 
returned {@link Future} or the passed-in
- * {@link Callback}; they are only logged as a warning to users.
+ * {@link Callback}; they are only logged as a warning to users. The only 
exception to this rule is when the offsets
+ * that need to be committed contains tombstone records as well. In such 
cases, a write consisting of only tombstone
+ * offsets would first happen on the worker-global store and only if it 
succeeds, would all the offsets be written
+ * to the connector-specific store and the regular offsets would be 
written to the worker-global store. Note that
+ * in this case, failure to write regular offsets to secondary store would 
still not reflect in the returned
+ * {@link Future} or the passed-in {@link Callback}

Review Comment:
   Added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16085: Add metric value consolidated for topics on a broker for tiered storage. [kafka]

2024-01-23 Thread via GitHub


showuon commented on code in PR #15133:
URL: https://github.com/apache/kafka/pull/15133#discussion_r1463136420


##
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##
@@ -406,93 +408,51 @@ class BrokerTopicMetrics(name: Option[String], configOpt: 
java.util.Optional[Kaf
 
   def invalidOffsetOrSequenceRecordsPerSec: Meter = 
metricTypeMap.get(BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec).meter()
 
-  def recordRemoteCopyLagBytes(partition: Int, bytesLag: Long): Unit = {
-val brokerTopicAggregatedMetric = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName).brokerTopicAggregatedMetric
-brokerTopicAggregatedMetric.setPartitionMetricValue(partition, bytesLag)
-  }
-
-  def removeRemoteCopyLagBytes(partition: Int): Unit = {
-val brokerTopicAggregatedMetric = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName).brokerTopicAggregatedMetric
-brokerTopicAggregatedMetric.removePartition(partition)
+  def remoteCopyLagBytesAggrMetric(): AggregatedMetric = {
+
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName).aggregatedMetric
   }
 
   // Visible for testing
-  def remoteCopyLagBytes: Long = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName).brokerTopicAggregatedMetric.value()
-
-  def recordRemoteCopyLagSegments(partition: Int, segmentsLag: Long): Unit = {
-val brokerTopicAggregatedMetric = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName).brokerTopicAggregatedMetric
-brokerTopicAggregatedMetric.setPartitionMetricValue(partition, segmentsLag)
-  }
+  def remoteCopyLagBytes: Long = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName).aggregatedMetric.value()
 
-  def removeRemoteCopyLagSegments(partition: Int): Unit = {
-val brokerTopicAggregatedMetric = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName).brokerTopicAggregatedMetric
-brokerTopicAggregatedMetric.removePartition(partition)
+  def remoteCopyLagSegmentsAggrMetric(): AggregatedMetric = {
+
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName).aggregatedMetric
   }
 
   // Visible for testing
-  def remoteCopyLagSegments: Long = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName).brokerTopicAggregatedMetric.value()
-
-  def recordRemoteLogMetadataCount(partition: Int, count: Long): Unit = {
-val brokerTopicAggregatedMetric = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName).brokerTopicAggregatedMetric
-brokerTopicAggregatedMetric.setPartitionMetricValue(partition, count)
-  }
+  def remoteCopyLagSegments: Long = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_SEGMENTS_METRIC.getName).aggregatedMetric.value()
 
-  def removeRemoteLogMetadataCount(partition: Int): Unit = {
-val brokerTopicAggregatedMetric = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName).brokerTopicAggregatedMetric
-brokerTopicAggregatedMetric.removePartition(partition)
+  def remoteLogMetadataCountAggrMetric(): AggregatedMetric = {
+
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName).aggregatedMetric
   }
 
-  def remoteLogMetadataCount: Long = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName).brokerTopicAggregatedMetric.value()
-
-  def recordRemoteLogSizeBytes(partition: Int, size: Long): Unit = {
-val brokerTopicAggregatedMetric = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName).brokerTopicAggregatedMetric
-brokerTopicAggregatedMetric.setPartitionMetricValue(partition, size)
-  }
+  def remoteLogMetadataCount: Long = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_METADATA_COUNT_METRIC.getName).aggregatedMetric.value()
 
-  def removeRemoteLogSizeBytes(partition: Int): Unit = {
-val brokerTopicAggregatedMetric = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName).brokerTopicAggregatedMetric
-brokerTopicAggregatedMetric.removePartition(partition)
+  def remoteLogSizeBytesAggrMetric(): AggregatedMetric = {
+
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName).aggregatedMetric
   }
 
-  def remoteLogSizeBytes: Long = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName).brokerTopicAggregatedMetric.value()
-
-  def recordRemoteLogSizeComputationTime(partition: Int, timeSpent: Long): 
Unit = {
-val brokerTopicAggregatedMetric = 
metricGaugeTypeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_COMPUTATION_TIME_METRIC.getName).brokerTopicAggregatedMetric
-brokerTopicAggregatedMetric.setPartitionMetricValue(partition, timeSpent)
-  }
+  def remoteLogSizeBytes: Long = 

Re: [PR] MINOR Remove unreachable if-else block in ReplicaManager.scala [kafka]

2024-01-23 Thread via GitHub


divijvaidya commented on PR #15220:
URL: https://github.com/apache/kafka/pull/15220#issuecomment-1905791005

   @drawxy can you please fix the failing compilation. Otherwise looks good.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] streams-scala: remove collections-compat dependency when on Scala 2.13 [kafka]

2024-01-23 Thread via GitHub


divijvaidya commented on code in PR #15239:
URL: https://github.com/apache/kafka/pull/15239#discussion_r1463080761


##
build.gradle:
##
@@ -2286,8 +2286,9 @@ project(':streams:streams-scala') {
 api project(':streams')
 
 api libs.scalaLibrary
-api libs.scalaCollectionCompat
-
+if ( versions.baseScala == '2.12' ) {
+  api libs.scalaCollectionCompat

Review Comment:
   Can you please add a comment here explaining the reason for this removal of 
dependency, basically, capture what you wrote in PR here. It helps the readers 
of the code understand why certain actions were taken.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16066) Upgrade apacheds to 2.0.0.AM27

2024-01-23 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16066?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17809875#comment-17809875
 ] 

Divij Vaidya commented on KAFKA-16066:
--

[~high.lee] please feel free to pick this one up. There has been no activity 
from previous requester on this Jira for more than 20 days now

> Upgrade apacheds to 2.0.0.AM27
> --
>
> Key: KAFKA-16066
> URL: https://issues.apache.org/jira/browse/KAFKA-16066
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Divij Vaidya
>Priority: Major
>  Labels: newbie, newbie++
>
> We are currently using a very old dependency. Notably, apacheds is only used 
> for testing when we use MiniKdc, hence, there is nothing stopping us from 
> upgrading it.
> Notably, apacheds has removed the component 
> org.apache.directory.server:apacheds-protocol-kerberos in favour of using 
> Apache Kerby, hence, we need to make changes in MiniKdc.scala for this 
> upgrade to work correctly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10659) Cogroup topology generation fails if input streams are repartitioned

2024-01-23 Thread Stanislav Spiridonov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17809859#comment-17809859
 ] 

Stanislav Spiridonov commented on KAFKA-10659:
--

The same issue here. The workaround with groupByKey -> with works but I need to 
create these topics manually

> Cogroup topology generation fails if input streams are repartitioned
> 
>
> Key: KAFKA-10659
> URL: https://issues.apache.org/jira/browse/KAFKA-10659
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0, 2.5.1
>Reporter: blueedgenick
>Priority: Major
>
> Example to reproduce:
>  
> {code:java}
> KGroupedStream groupedA = builder
>   .stream(topicA, Consumed.with(Serdes.String(), serdeA))
>   .selectKey((aKey, aVal) -> aVal.someId)
>   .groupByKey();
> KGroupedStream groupedB = builder
>   .stream(topicB, Consumed.with(Serdes.String(), serdeB))
>   .selectKey((bKey, bVal) -> bVal.someId)
>   .groupByKey();
> KGroupedStream groupedC = builder
>   .stream(topicC, Consumed.with(Serdes.String(), serdeC))
>   .selectKey((cKey, cVal) -> cVal.someId)
>   .groupByKey();
> CogroupedKStream cogroup = groupedA.cogroup(AggregatorA)
>   .cogroup(groupedB, AggregatorB)
>  .  cogroup(groupedC, AggregatorC);
> // Aggregate all streams of the cogroup
>  KTable agg = cogroup.aggregate(
>   () -> new ABC(),
>   Named.as("my-agg-proc-name"),
>   Materialized.>as(
>  "abc-agg-store") 
>  .withKeySerde(Serdes.String())
>  .withValueSerde(serdeABC)
>  );
> {code}
>  
>  
> This throws an exception during topology generation: 
>  
> {code:java}
> org.apache.kafka.streams.errors.TopologyException: Invalid topology: 
> Processor abc-agg-store-repartition-filter is already added. at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addProcessor(Inter
> nalTopologyBuilder.java:485)`
>  at 
> org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.writeToTopology(OptimizableRepartitionNode.java:70)
>  at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:307)
>  at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:564)
>  at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:553)
>  at ...
> {code}
>  
> The same exception is observed if the `selectKey(...).groupByKey()`  pattern 
> is replaced with `groupBy(...)`.
> This behavior is observed with topology optimization at default state, 
> explicitly set off, or explicitly set on.
> Interestingly the problem is avoided, and a workable topology produced,, if 
> the grouping step is named by passing a `Grouped.with(...)` expression to 
> either `groupByKey`` or `groupBy`.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16085: Add metric value consolidated for topics on a broker for tiered storage. [kafka]

2024-01-23 Thread via GitHub


showuon commented on code in PR #15133:
URL: https://github.com/apache/kafka/pull/15133#discussion_r1462970366


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -1650,15 +1650,27 @@ private static void 
shutdownAndAwaitTermination(ExecutorService pool, String poo
 }
 
 private void removeRemoteTopicPartitionMetrics(TopicIdPartition 
topicIdPartition) {
-BrokerTopicMetrics topicMetrics = 
brokerTopicStats.topicStats(topicIdPartition.topic());
-int partition = topicIdPartition.partition();
-topicMetrics.removeRemoteCopyLagBytes(partition);
-topicMetrics.removeRemoteCopyLagSegments(partition);
-topicMetrics.removeRemoteDeleteLagBytes(partition);
-topicMetrics.removeRemoteDeleteLagSegments(partition);
-topicMetrics.removeRemoteLogMetadataCount(partition);
-topicMetrics.removeRemoteLogSizeComputationTime(partition);
-topicMetrics.removeRemoteLogSizeBytes(partition);
+String topic = topicIdPartition.topic();
+if (!brokerTopicStats.isTopicStatsExisted(topicIdPartition.topic())) {
+// The topic metrics are already removed, removing this topic key 
from broker-level metrics
+brokerTopicStats.removeBrokerLevelRemoteCopyLagBytes(topic);

Review Comment:
   @kamalcph , when running `ReplicaManager#stopPartitions`, it'll call 
[maybeRemoveTopicMetrics](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L604),
 and the specific topic metrics will be removed if all partitions are offline 
(i.e. topics deletion). And the metrics removal will also remove the topic key 
from brokerTopicStats 
[here](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaRequestHandler.scala#L646).
 That's when it will be deleted. 
   
   So, it the topic metrics are all deleted, we also need to remove the topic 
key from broker-level allTopics metrics. Hope that's clear.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >