Re: [PR] KAFKA-15521: Refactor build.gradle to align gradle swagger plugin with swagger dependencies [kafka]

2023-10-02 Thread via GitHub


atu-sharm closed pull request #14472: KAFKA-15521: Refactor build.gradle to 
align gradle swagger plugin with swagger dependencies
URL: https://github.com/apache/kafka/pull/14472


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15521: Refactor build.gradle to align gradle swagger plugin with swagger dependencies [kafka]

2023-10-02 Thread via GitHub


atu-sharm opened a new pull request, #14473:
URL: https://github.com/apache/kafka/pull/14473

   Having a single version of swagger to avoid breaking of build
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-15521: Refactor build.gradle to align gradle swagger plugin with swagger dependencies [kafka]

2023-10-02 Thread via GitHub


atu-sharm opened a new pull request, #14472:
URL: https://github.com/apache/kafka/pull/14472

   Having a single version of swagger to avoid breaking of build
   
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-02 Thread via GitHub


yangy commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1343485049


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,30 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+var leaderId = -1
+var leaderEpoch = -1
+partitionInfoOrError match {
+  case Right(x) =>
+  leaderId = x.leaderReplicaIdOpt.getOrElse(-1)
+  leaderEpoch = x.getLeaderEpoch
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+val partitionInfo = metadataCache.getPartitionInfo(tp.topic, 
tp.partition)

Review Comment:
   I see



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15521) Refactor build.gradle to align gradle swagger plugin with swagger dependencies

2023-10-02 Thread Atul Sharma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17771334#comment-17771334
 ] 

Atul Sharma commented on KAFKA-15521:
-

Hi, [~mimaison] can i take this up?


> Refactor build.gradle to align gradle swagger plugin with swagger dependencies
> --
>
> Key: KAFKA-15521
> URL: https://issues.apache.org/jira/browse/KAFKA-15521
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: Mickael Maison
>Priority: Major
>
> We use both the Swagger Gradle plugin 
> "io.swagger.core.v3.swagger-gradle-plugin" and 2 Swagger dependencies 
> swaggerAnnotations and swaggerJaxrs2. The version for the Gradle plugin is in 
> build.gradle while the version for the dependency is in 
> gradle/dependencies.gradle.
> When we upgrade the version of one or the other it sometimes cause build 
> breakages, for example https://github.com/apache/kafka/pull/13387 and 
> https://github.com/apache/kafka/pull/14464
> We should try to have the version defined in a single place to avoid breaking 
> the build again.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]

2023-10-02 Thread via GitHub


showuon commented on code in PR #14428:
URL: https://github.com/apache/kafka/pull/14428#discussion_r1343475329


##
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##
@@ -485,6 +485,49 @@ public void 
testHandleBeginQuorumEpochAfterUserInitiatedResign() throws Exceptio
 context.listener.currentLeaderAndEpoch());
 }
 
+@Test
+public void 
testLeaderShouldResignLeadershipIfNotGetFetchRequestFromMajorityVoters() throws 
Exception {

Review Comment:
   Added a test in `KafkaRaftClientSnapshotTest`. 
   
   > Can we also tests the opposite. That the leader doesn't resign if the 
majority of the replicas (including the leader) have fetch in the last 
fetchTimeoutMs?
   
   I didn't follow you. I've verified: 
   ```
   - 1/2 fetch time
   leadership not get reassigned
   - fetch from one voter
   - 1/2 fetch time
   leadership not get reassigned
   - fetch from another voter
   - 1/2 fetch time
   leadership not get reassigned
   - fetch from the observer
   - 1/2 fetch time
   leadership should get reassigned
   ```
   I think I've verified what you want. Let me know if I need to add other 
things. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]

2023-10-02 Thread via GitHub


showuon commented on code in PR #14428:
URL: https://github.com/apache/kafka/pull/14428#discussion_r1343472387


##
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##
@@ -76,9 +85,37 @@ protected LeaderState(
 boolean hasAcknowledgedLeader = voterId == localId;
 this.voterStates.put(voterId, new ReplicaState(voterId, 
hasAcknowledgedLeader));
 }
+this.majority = voters.size() / 2;
 this.grantingVoters = Collections.unmodifiableSet(new 
HashSet<>(grantingVoters));
 this.log = logContext.logger(LeaderState.class);
 this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
+this.fetchTimeoutMs = fetchTimeoutMs;
+this.fetchTimer = time.timer(fetchTimeoutMs);
+}
+
+public boolean hasMajorityFollowerFetchTimeoutExpired(long currentTimeMs) {

Review Comment:
   Fair enough. Added comments on the methods. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] enable test, check if continues failing in CI [kafka]

2023-10-02 Thread via GitHub


github-actions[bot] commented on PR #13953:
URL: https://github.com/apache/kafka/pull/13953#issuecomment-1744109897

   This PR is being marked as stale since it has not had any activity in 90 
days. If you would like to keep this PR alive, please ask a committer for 
review. If the PR has  merge conflicts, please update it with the latest from 
trunk (or appropriate release branch)  If this PR is no longer valid or 
desired, please feel free to close it. If no activity occurs in the next 30 
days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15526: Simplify the LogAppendInfo class [kafka]

2023-10-02 Thread via GitHub


splett2 commented on code in PR #14470:
URL: https://github.com/apache/kafka/pull/14470#discussion_r1343331209


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -892,16 +889,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   maybeDuplicate match {
 case Some(duplicate) =>
-  appendInfo.setFirstOffset(Optional.of(new 
LogOffsetMetadata(duplicate.firstOffset)))
+  appendInfo.setFirstOffset(duplicate.firstOffset)
   appendInfo.setLastOffset(duplicate.lastOffset)
   appendInfo.setLogAppendTime(duplicate.timestamp)
   appendInfo.setLogStartOffset(logStartOffset)
 case None =>
-  // Before appending update the first offset metadata to include 
segment information
-  appendInfo.setFirstOffset(appendInfo.firstOffset.map { 
offsetMetadata =>

Review Comment:
   We removed this because none of the readers use anything besides the 
`messageOffset` (which is the `appendInfo.firstOffset` now).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15526: Simplify the LogAppendInfo class [kafka]

2023-10-02 Thread via GitHub


splett2 commented on code in PR #14470:
URL: https://github.com/apache/kafka/pull/14470#discussion_r1343327345


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -787,19 +770,22 @@ class ReplicaManager(val config: KafkaConfig,
 )
 }
 
-val errorResults = errorsPerPartition.map {
-  case (topicPartition, error) =>
-topicPartition -> LogAppendResult(
-  LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-  Some(error.exception())
+val allResults = localProduceResults ++ errorResults
+val produceStatus = allResults.map { case (topicPartition, result) =>
+  topicPartition -> ProducePartitionStatus(
+result.info.lastOffset + 1, // required offset
+new PartitionResponse(
+  result.error,
+  result.info.firstOffset,
+  result.info.lastOffset,
+  result.info.logAppendTime,
+  result.info.logStartOffset,
+  result.info.recordErrors,
+  result.exception.map(_.getMessage).orNull

Review Comment:
   Hmm, good catch. somehow I didn't think about that. I refactored some of the 
LogAppendResult stuff to try to make it more explicit when we use a custom 
error message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14504: Implement DescribeGroups API [kafka]

2023-10-02 Thread via GitHub


dongnuo123 commented on code in PR #14462:
URL: https://github.com/apache/kafka/pull/14462#discussion_r1343272684


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -444,6 +445,66 @@ public List 
listGroups(List statesFi
 return groupStream.map(group -> 
group.asListedGroup(committedOffset)).collect(Collectors.toList());
 }
 
+/**
+ * Handles a DescribeGroup request.
+ *
+ * @param groupIds  The IDs of the groups to describe.
+ * @param committedOffset   A specified committed offset corresponding to 
this shard.
+ *
+ * @return A list containing the DescribeGroupsResponseData.DescribedGroup.
+ */
+public List describeGroups(
+List groupIds,
+long committedOffset
+) {
+final List describedGroups 
= new ArrayList<>();
+groupIds.forEach(groupId -> {
+try {
+Group group = group(groupId, committedOffset);
+if (group.type() != GENERIC) {
+// We don't support upgrading/downgrading between 
protocols at the moment, so
+// we throw an exception if a group exists with the wrong 
type.
+throw new GroupIdNotFoundException(String.format("Group %s 
is not a generic group.",
+groupId));
+}
+GenericGroup genericGroup = (GenericGroup) group;

Review Comment:
   Yeah, you're right. We should return a GroupCoordinator.DeadGroup



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14274 [6, 7]: Introduction of fetch request manager [kafka]

2023-10-02 Thread via GitHub


junrao commented on code in PR #14406:
URL: https://github.com/apache/kafka/pull/14406#discussion_r1334821243


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java:
##
@@ -0,0 +1,3565 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.RecordTooLargeException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.message.ApiMessageType;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.OffsetForLeaderTopicResult;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.NetworkReceive;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.LegacyRecord;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.FetchMetadata;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import 

Re: [PR] KAFKA-14506: Implement DeleteGroups API and OffsetDelete API [kafka]

2023-10-02 Thread via GitHub


jeffkbkim commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1343258721


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3071,6 +3071,33 @@ private void removeCurrentMemberFromGenericGroup(
 group.remove(member.memberId());
 }
 
+/**
+ * Handles a DeleteGroups request.
+ * Populates the record list passed in with record to update the state 
machine.
+ * Validations are done in {@link 
GroupCoordinatorShard#deleteGroups(RequestContext, List)} by
+ * calling {@link GroupMetadataManager#validateDeleteGroup(String)}.
+ *
+ * @param groupId The ID of the group to be deleted. It has been checked 
in {@link GroupMetadataManager#validateDeleteGroup}.
+ * @param records The record list to populate.
+ */
+public void deleteGroup(
+String groupId,
+List records
+) {
+// In this method, we only populate records with tombstone records, so 
we don't expect an exception to be thrown here.

Review Comment:
   "At this point, we have already validated the group id so we know that the 
group exists and that no exception will be thrown."
   
   how's this?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##
@@ -341,6 +384,22 @@ public CoordinatorResult 
genericGroupLeave(
 return groupMetadataManager.genericGroupLeave(context, request);
 }
 
+/**
+ * Handles a OffsetDelete request.

Review Comment:
   nit: an



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -333,6 +349,94 @@ public CoordinatorResult 
commitOffset(
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Handles an OffsetDelete request.
+ *
+ * @param request The OffsetDelete request.
+ *
+ * @return A Result containing the OffsetDeleteResponseData response and
+ * a list of records to update the state machine.
+ */
+public CoordinatorResult deleteOffsets(
+OffsetDeleteRequestData request
+) throws ApiException {
+final Group group = validateOffsetDelete(request);
+final List records = new ArrayList<>();
+final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =

Review Comment:
   what's the benefit of using final variables here?



##
clients/src/test/java/org/apache/kafka/common/requests/DeleteGroupsRequestTest.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.DeleteGroupsRequestData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static 
org.apache.kafka.common.requests.DeleteGroupsRequest.getErrorResultCollection;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class DeleteGroupsRequestTest {
+
+protected static String groupId1 = "group-id-1";
+protected static String groupId2 = "group-id-2";

Review Comment:
   we can move these into the test as well



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -3071,6 +3071,33 @@ private void removeCurrentMemberFromGenericGroup(
 group.remove(member.memberId());
 }
 
+/**
+ * Handles a DeleteGroups request.
+ * Populates the record list passed in with record to update the state 
machine.
+ * Validations are done in {@link 
GroupCoordinatorShard#deleteGroups(RequestContext, List)} by
+ * calling {@link GroupMetadataManager#validateDeleteGroup(String)}.
+ *
+ * @param groupId The ID of the group to be deleted. It has been checked 
in {@link GroupMetadataManager#validateDeleteGroup}.

Review Comment:
   nit: can we change all usages of "ID" to "id"?



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##
@@ 

Re: [PR] KAFKA-13988: Fix MM2 not consuming from latest when "auto.offset.reset=latest" is set [kafka]

2023-10-02 Thread via GitHub


xinyuliu-cb commented on PR #13905:
URL: https://github.com/apache/kafka/pull/13905#issuecomment-1743917984

   same here. catching up from the earliest offsets is too expensive and 
resource demanding to MM2 and Kafka brokers. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15028) AddPartitionsToTxnManager metrics

2023-10-02 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15028?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17771294#comment-17771294
 ] 

Justine Olshan commented on KAFKA-15028:


[~divijvaidya] I saw your comment on the RC thread about missing documentation 
for this. Where do we typically put metrics documentation?

> AddPartitionsToTxnManager metrics
> -
>
> Key: KAFKA-15028
> URL: https://issues.apache.org/jira/browse/KAFKA-15028
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
> Fix For: 3.6.0
>
> Attachments: latency-cpu.html
>
>
> KIP-890 added metrics for the AddPartitionsToTxnManager
> VerificationTimeMs – number of milliseconds from adding partition info to the 
> manager to the time the response is sent. This will include the round trip to 
> the transaction coordinator if it is called. This will also account for 
> verifications that fail before the coordinator is called.
> VerificationFailureRate – rate of verifications that returned in failure 
> either from the AddPartitionsToTxn response or through errors in the manager.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [WIP] KAFKA-14589 ConsumerGroupCommand rewritten in java [kafka]

2023-10-02 Thread via GitHub


nizhikov opened a new pull request, #14471:
URL: https://github.com/apache/kafka/pull/14471

   WIP PR.
   
   This PR contains changes to rewrite `ConsumerGroupCommand` in java and 
transfer it to `tools` module
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14595 Move ReassignPartitionsCommand to java [kafka]

2023-10-02 Thread via GitHub


nizhikov commented on PR #13247:
URL: https://github.com/apache/kafka/pull/13247#issuecomment-1743782703

   @mimaison 
   @ijuma 
   
   It seems we reduced changes as much as possible. 
   So it's time to do final review of command transfer from scala to java.
   
   Can you, please, take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]

2023-10-02 Thread via GitHub


ahuang98 commented on code in PR #14428:
URL: https://github.com/apache/kafka/pull/14428#discussion_r1343169152


##
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##
@@ -76,9 +85,37 @@ protected LeaderState(
 boolean hasAcknowledgedLeader = voterId == localId;
 this.voterStates.put(voterId, new ReplicaState(voterId, 
hasAcknowledgedLeader));
 }
+this.majority = voters.size() / 2;
 this.grantingVoters = Collections.unmodifiableSet(new 
HashSet<>(grantingVoters));
 this.log = logContext.logger(LeaderState.class);
 this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
+this.fetchTimeoutMs = fetchTimeoutMs;
+this.fetchTimer = time.timer(fetchTimeoutMs);
+}
+
+public boolean hasMajorityFollowerFetchTimeoutExpired(long currentTimeMs) {

Review Comment:
   I was more suggesting that this method might benefit from a comment which 
describes behavior. I guess the info log explains it well enough



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14595 ReassignPartitionsIntegrationTest rewritten in java [kafka]

2023-10-02 Thread via GitHub


nizhikov commented on PR #14456:
URL: https://github.com/apache/kafka/pull/14456#issuecomment-1743719944

   @jolshan Thank you very much for the review and merge. Appreciate it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15528) KIP-986: Cross-Cluster Replication

2023-10-02 Thread Greg Harris (Jira)
Greg Harris created KAFKA-15528:
---

 Summary: KIP-986: Cross-Cluster Replication
 Key: KAFKA-15528
 URL: https://issues.apache.org/jira/browse/KAFKA-15528
 Project: Kafka
  Issue Type: New Feature
Reporter: Greg Harris


https://cwiki.apache.org/confluence/display/KAFKA/KIP-986%3A+Cross-Cluster+Replication



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14595 ReassignPartitionsIntegrationTest rewritten in java [kafka]

2023-10-02 Thread via GitHub


jolshan merged PR #14456:
URL: https://github.com/apache/kafka/pull/14456


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14595 ReassignPartitionsIntegrationTest rewritten in java [kafka]

2023-10-02 Thread via GitHub


jolshan commented on PR #14456:
URL: https://github.com/apache/kafka/pull/14456#issuecomment-1743685795

   Hey sorry. I was off for the weekend. I can take another look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-02 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1343033045


##
clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java:
##
@@ -210,6 +238,12 @@ public String toString() {
 b.append(logStartOffset);
 b.append(", recordErrors: ");
 b.append(recordErrors);
+b.append(", currentLeader: ");
+if (currentLeader != null) {

Review Comment:
   looking at the java docs I think you're right, this could be replaced by 
`b.append(currentLeader)`. I'm not sure why `errorMessage` was written this 
way, it looks like it was changed explicitly in [this 
commit](https://github.com/apache/kafka/commit/f41a5c2c8632bfd0dc50321c1c69418db04f42f6#diff-82aef2b279f7d0093b5e7bbd34cbf9abfa6bb5ed454c72419c03dbe2e58e0eab)
 but I don't see a reason for it, I could probably change this as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-15527) Add reverseRange and reverseAll query over kv-store in IQv2

2023-10-02 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15527:

Labels: kip  (was: )

> Add reverseRange and reverseAll query over kv-store in IQv2
> ---
>
> Key: KAFKA-15527
> URL: https://issues.apache.org/jira/browse/KAFKA-15527
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>  Labels: kip
>
> Add reverseRange and reverseAll query over kv-store in IQv2



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15527) Add reverseRange and reverseAll query over kv-store in IQv2

2023-10-02 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-15527:

Component/s: streams

> Add reverseRange and reverseAll query over kv-store in IQv2
> ---
>
> Key: KAFKA-15527
> URL: https://issues.apache.org/jira/browse/KAFKA-15527
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>
> Add reverseRange and reverseAll query over kv-store in IQv2



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13005) Support JBOD in kraft mode

2023-10-02 Thread Proven Provenzano (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17771237#comment-17771237
 ] 

Proven Provenzano commented on KAFKA-13005:
---

Can this story be closed as duplicate as a new set of tasks for KRaft JBOD 
support is outlined in 
[KAFKA-14127|https://issues.apache.org/jira/browse/KAFKA-14127]

> Support JBOD in kraft mode
> --
>
> Key: KAFKA-13005
> URL: https://issues.apache.org/jira/browse/KAFKA-13005
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Deng Ziming
>Priority: Major
>  Labels: kip-500
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15449) Verify transactional offset commits (KIP-890 part 1)

2023-10-02 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan resolved KAFKA-15449.

Resolution: Fixed

> Verify transactional offset commits (KIP-890 part 1)
> 
>
> Key: KAFKA-15449
> URL: https://issues.apache.org/jira/browse/KAFKA-15449
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Critical
>
> We verify on produce requests but not offset commits. We should fix this to 
> avoid hanging transactions on consumer offset partitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15449: Verify transactional offset commits (KIP-890 part 1) [kafka]

2023-10-02 Thread via GitHub


jolshan merged PR #14370:
URL: https://github.com/apache/kafka/pull/14370


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15526: Simplify the LogAppendInfo class [kafka]

2023-10-02 Thread via GitHub


jolshan commented on code in PR #14470:
URL: https://github.com/apache/kafka/pull/14470#discussion_r1342935092


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -1141,13 +1134,13 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   // Also indicate whether we have the accurate first offset or not
   if (!readFirstMessage) {
 if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
-  firstOffset = Optional.of(new LogOffsetMetadata(batch.baseOffset))
+  firstOffset = batch.baseOffset
 lastOffsetOfFirstBatch = batch.lastOffset
 readFirstMessage = true
   }
 
   // check that offsets are monotonically increasing
-  if (lastOffset >= batch.lastOffset)
+  if (requireOffsetsMonotonic && lastOffset >= batch.lastOffset)

Review Comment:
   We technically don't need this since the check also has `if 
(requireOffsetsMonotonic && !monotonic)`



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -787,19 +770,22 @@ class ReplicaManager(val config: KafkaConfig,
 )
 }
 
-val errorResults = errorsPerPartition.map {
-  case (topicPartition, error) =>
-topicPartition -> LogAppendResult(
-  LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
-  Some(error.exception())
+val allResults = localProduceResults ++ errorResults
+val produceStatus = allResults.map { case (topicPartition, result) =>
+  topicPartition -> ProducePartitionStatus(
+result.info.lastOffset + 1, // required offset
+new PartitionResponse(
+  result.error,
+  result.info.firstOffset,
+  result.info.lastOffset,
+  result.info.logAppendTime,
+  result.info.logStartOffset,
+  result.info.recordErrors,
+  result.exception.map(_.getMessage).orNull

Review Comment:
   Are we going to start returning the error message for other exceptions now 
too? I think we will add the default exception message when previously we were 
not. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15526: Simplify the LogAppendInfo class [kafka]

2023-10-02 Thread via GitHub


jolshan commented on code in PR #14470:
URL: https://github.com/apache/kafka/pull/14470#discussion_r1342933163


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -892,16 +889,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   maybeDuplicate match {
 case Some(duplicate) =>
-  appendInfo.setFirstOffset(Optional.of(new 
LogOffsetMetadata(duplicate.firstOffset)))
+  appendInfo.setFirstOffset(duplicate.firstOffset)
   appendInfo.setLastOffset(duplicate.lastOffset)
   appendInfo.setLogAppendTime(duplicate.timestamp)
   appendInfo.setLogStartOffset(logStartOffset)
 case None =>
-  // Before appending update the first offset metadata to include 
segment information
-  appendInfo.setFirstOffset(appendInfo.firstOffset.map { 
offsetMetadata =>

Review Comment:
   We removed this since the default is zero, and in that case we take the 
segment base offset anyway?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15415 On producer-batch retry, skip-backoff on a new leader [kafka]

2023-10-02 Thread via GitHub


msn-tldr commented on PR #14384:
URL: https://github.com/apache/kafka/pull/14384#issuecomment-1743383185

   @kirktrue 
   > @jsancio - can you weigh in about changes to org.apache.kafka.common 
needing a KIP?
   
   I have removed the changes to public classes, so this shouldn't require a 
KIP, take a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15415 On producer-batch retry, skip-backoff on a new leader [kafka]

2023-10-02 Thread via GitHub


msn-tldr commented on code in PR #14384:
URL: https://github.com/apache/kafka/pull/14384#discussion_r1340370371


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java:
##
@@ -156,6 +157,8 @@ public class SenderTest {
 private SenderMetricsRegistry senderMetricsRegistry = null;
 private final LogContext logContext = new LogContext();
 
+private final Logger log = logContext.logger(SenderTest.class);

Review Comment:
   Inclined to keep it since helps improve readability of test-logs when test 
fails. By default logging is off, see similar comment below for details.



##
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##
@@ -96,6 +98,8 @@ public class RecordAccumulatorTest {
 private final long maxBlockTimeMs = 1000;
 private final LogContext logContext = new LogContext();
 
+private final Logger log = logContext.logger(RecordAccumulatorTest.class);

Review Comment:
   I am inclined to keep it, as it helps improve readability of test logs 
if/when test fails. 
   By default, the logging is turned off in 
`clients/src/resources/log4.properties`



##
clients/src/main/java/org/apache/kafka/common/PartitionInfo.java:
##
@@ -20,6 +20,7 @@
  * This is used to describe per-partition state in the MetadataResponse.
  */
 public class PartitionInfo {
+public static final int UNKNOWN_LEADER_EPOCH = -1;

Review Comment:
   @dajac this is removed now, thanks!



##
clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java:
##
@@ -94,9 +100,40 @@ public ProducerBatch(TopicPartition tp, 
MemoryRecordsBuilder recordsBuilder, lon
 this.isSplitBatch = isSplitBatch;
 float compressionRatioEstimation = 
CompressionRatioEstimator.estimation(topicPartition.topic(),

 recordsBuilder.compressionType());
+this.currentLeaderEpoch = PartitionInfo.UNKNOWN_LEADER_EPOCH;
+this.leaderChangedAttempts = -1;
 
recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation);
 }
 
+/*
+ * Returns whether the leader epoch has changed since the last attempt.
+ * @param latestLeaderEpoch The latest leader epoch.
+ * @return true if the leader has changed, otherwise false.
+ */
+boolean hasLeaderChanged(int latestLeaderEpoch) {
+boolean leaderChanged = false;
+// Checking for leader change makes sense only from 1st retry 
onwards(attempt >=1).
+log.trace("For {}, attempting to change leader, currentLeaderEpoch:{}, 
leaderChangedAttempts:{}, latestLeaderEpoch: {}, current Attempt: {}",
+this, currentLeaderEpoch, leaderChangedAttempts, 
latestLeaderEpoch, attempts());
+if (attempts() >= 1) {
+// If the leader's epoch has changed, this counts as a leader 
change
+if (currentLeaderEpoch != latestLeaderEpoch) {
+leaderChangedAttempts = attempts();
+leaderChanged = true;
+} else {
+// Otherwise, it's only a leader change until the first 
attempt is made with this leader

Review Comment:
   > least one attempt
   
   implies a retry.
   
   > the above epoch comparison is false
   
   So leader-epoch is still the same. 
   
   Consider leader was changed at attempt=5, to epoch=100. 
maybeUpdateLeaderEpoch() should detect leader change even when called again at 
attempt=5, with the same epoch=100. As this is the same attempt in which the 
leader change was detected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15527) Add reverseRange and reverseAll query over kv-store in IQv2

2023-10-02 Thread Hanyu Zheng (Jira)
Hanyu Zheng created KAFKA-15527:
---

 Summary: Add reverseRange and reverseAll query over kv-store in 
IQv2
 Key: KAFKA-15527
 URL: https://issues.apache.org/jira/browse/KAFKA-15527
 Project: Kafka
  Issue Type: Improvement
Reporter: Hanyu Zheng


Add reverseRange and reverseAll query over kv-store in IQv2



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15527) Add reverseRange and reverseAll query over kv-store in IQv2

2023-10-02 Thread Hanyu Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hanyu Zheng reassigned KAFKA-15527:
---

Assignee: Hanyu Zheng

> Add reverseRange and reverseAll query over kv-store in IQv2
> ---
>
> Key: KAFKA-15527
> URL: https://issues.apache.org/jira/browse/KAFKA-15527
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Hanyu Zheng
>Assignee: Hanyu Zheng
>Priority: Major
>
> Add reverseRange and reverseAll query over kv-store in IQv2



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests [kafka]

2023-10-02 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1342920121


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. 
The module creates a
+ * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link 
MembershipManager} and enqueue it to
+ * the network queue to be sent out. Once the response is received, the module 
will update the state in the
+ * {@link MembershipManager} and handle any errors.
+ *
+ * The manager will try to send a heartbeat when the member is in {@link 
MemberState#STABLE},
+ * {@link MemberState#UNJOINED}, or {@link MemberState#RECONCILING}. Which 
mean the member is either in a stable
+ * group, is trying to join a group, or is in the process of reconciling the 
assignment changes.
+ *
+ * If the member got kick out of a group, it will try to give up the 
current assignment by invoking {@code
+ * OnPartitionsLost} because reattempting to join again with a zero epoch.
+ *
+ * If the member does not have groupId configured or encountering fatal 
exceptions, a heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat 
and try to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, 
TimeoutException. The subsequent attempt will be
+ * backoff exponentially.
+ *
+ * When the member completes the assignment reconciliation, the {@link 
HeartbeatRequestState} will be reset so
+ * that a heartbeat will be sent in the next event loop.
+ *
+ * See {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final 

Re: [PR] MINOR: suppress dependencycheck warning for CVE-2023-35116 [kafka]

2023-10-02 Thread via GitHub


jlprat merged PR #14460:
URL: https://github.com/apache/kafka/pull/14460


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests [kafka]

2023-10-02 Thread via GitHub


philipnee commented on PR #14364:
URL: https://github.com/apache/kafka/pull/14364#issuecomment-1743351780

   Hello @dajac - Thanks for the review. I hope I've addressed most of your 
concerns in the recent reviews. Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: suppress dependencycheck warning for CVE-2023-35116 [kafka]

2023-10-02 Thread via GitHub


jlprat commented on PR #14460:
URL: https://github.com/apache/kafka/pull/14460#issuecomment-1743351726

   Test failures are all unrelated to the change


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests [kafka]

2023-10-02 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1342915079


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. 
The module creates a
+ * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link 
MembershipManager} and enqueue it to
+ * the network queue to be sent out. Once the response is received, the module 
will update the state in the
+ * {@link MembershipManager} and handle any errors.
+ *
+ * The manager will try to send a heartbeat when the member is in {@link 
MemberState#STABLE},
+ * {@link MemberState#UNJOINED}, or {@link MemberState#RECONCILING}. Which 
mean the member is either in a stable
+ * group, is trying to join a group, or is in the process of reconciling the 
assignment changes.
+ *
+ * If the member got kick out of a group, it will try to give up the 
current assignment by invoking {@code
+ * OnPartitionsLost} because reattempting to join again with a zero epoch.
+ *
+ * If the member does not have groupId configured or encountering fatal 
exceptions, a heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat 
and try to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, 
TimeoutException. The subsequent attempt will be
+ * backoff exponentially.
+ *
+ * When the member completes the assignment reconciliation, the {@link 
HeartbeatRequestState} will be reset so
+ * that a heartbeat will be sent in the next event loop.
+ *
+ * See {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final 

Re: [PR] KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests [kafka]

2023-10-02 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1342910020


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. 
The module creates a
+ * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link 
MembershipManager} and enqueue it to
+ * the network queue to be sent out. Once the response is received, the module 
will update the state in the
+ * {@link MembershipManager} and handle any errors.
+ *
+ * The manager only emits heartbeat when the member is in a group, tries to 
join or rejoin a group.
+ * If the member does not have groupId configured, got kicked out of the 
group, or encountering fatal exceptions, the
+ * heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat 
and try to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, 
TimeoutException. The subsequent attempt will be
+ * backoff exponentially.
+ *
+ * If the member completes the assignment changes, i.e. revocation and 
assignment, a heartbeat request will be
+ * sent in the next event loop.
+ *
+ * See {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final Set fatalErrors = new HashSet<>(Arrays.asList(
+Errors.GROUP_AUTHORIZATION_FAILED,
+Errors.INVALID_REQUEST,
+Errors.GROUP_MAX_SIZE_REACHED,
+Errors.UNSUPPORTED_ASSIGNOR,
+Errors.UNRELEASED_INSTANCE_ID));
+
+private final int rebalanceTimeoutMs;

Review Comment:
   Added some comments directly above the var - Is this enough or you actually 
want this to be presented in the "javaDoc" of the class section?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15526: Simplify the LogAppendInfo class [kafka]

2023-10-02 Thread via GitHub


splett2 commented on code in PR #14470:
URL: https://github.com/apache/kafka/pull/14470#discussion_r1342909611


##
core/src/main/scala/kafka/raft/KafkaMetadataLog.scala:
##
@@ -96,10 +96,10 @@ final class KafkaMetadataLog private (
   }
 
   private def handleAndConvertLogAppendInfo(appendInfo: 
internals.log.LogAppendInfo): LogAppendInfo = {
-if (appendInfo.firstOffset.isPresent())
-  new LogAppendInfo(appendInfo.firstOffset.get().messageOffset, 
appendInfo.lastOffset)
+if (appendInfo.firstOffset != UnifiedLog.UnknownOffset)
+  new LogAppendInfo(appendInfo.firstOffset, appendInfo.lastOffset)
 else
-  throw new KafkaException(s"Append failed unexpectedly: 
${appendInfo.errorMessage}")

Review Comment:
   `errorMessage` would have never been populated in the calling context of 
`handleAndConvertLogAppendInfo`, so it is just dropped from the log in the PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15526) Simplify LogAppendInfo parameters

2023-10-02 Thread David Mao (Jira)
David Mao created KAFKA-15526:
-

 Summary: Simplify LogAppendInfo parameters
 Key: KAFKA-15526
 URL: https://issues.apache.org/jira/browse/KAFKA-15526
 Project: Kafka
  Issue Type: Improvement
Reporter: David Mao
Assignee: David Mao


Currently LogAppendInfo is quite overloaded, carrying a bunch of redundant 
information. This makes some of the code unnecessarily complex in the log 
layer, since the log layer is unsure which fields are required to populate for 
higher layers, and higher layers are unsure which fields are required to bubble 
back to clients.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests [kafka]

2023-10-02 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1342899010


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. 
The module creates a
+ * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link 
MembershipManager} and enqueue it to
+ * the network queue to be sent out. Once the response is received, the module 
will update the state in the
+ * {@link MembershipManager} and handle any errors.
+ *
+ * The manager will try to send a heartbeat when the member is in {@link 
MemberState#STABLE},
+ * {@link MemberState#UNJOINED}, or {@link MemberState#RECONCILING}. Which 
mean the member is either in a stable
+ * group, is trying to join a group, or is in the process of reconciling the 
assignment changes.
+ *
+ * If the member got kick out of a group, it will try to give up the 
current assignment by invoking {@code
+ * OnPartitionsLost} because reattempting to join again with a zero epoch.
+ *
+ * If the member does not have groupId configured or encountering fatal 
exceptions, a heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat 
and try to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, 
TimeoutException. The subsequent attempt will be
+ * backoff exponentially.
+ *
+ * When the member completes the assignment reconciliation, the {@link 
HeartbeatRequestState} will be reset so
+ * that a heartbeat will be sent in the next event loop.
+ *
+ * See {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final 

[jira] [Commented] (KAFKA-15525) Segment uploads stop working following a broker failure

2023-10-02 Thread Francois Visconte (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17771171#comment-17771171
 ] 

Francois Visconte commented on KAFKA-15525:
---

I was planning to do that change anyways but the strange thing is that it 
doesn't happen consistently when I'm shutting down brokers in my cluster. 

> Segment uploads stop working following a broker failure
> ---
>
> Key: KAFKA-15525
> URL: https://issues.apache.org/jira/browse/KAFKA-15525
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Francois Visconte
>Priority: Major
>
> I have a tiered-storage enabled cluster and topic where I continuously 
> produce and consume to/from a TS-enabled topic on that cluster.
> Here are the topic settings I’m using: 
> {code:java}
> local.retention.ms=12
> remote.storage.enable=true
> retention.ms: 1080
> segment.bytes: 51200
> {code}
> Here are my broker settings:
> {code:java}
> remote.log.storage.system.enable=true
> remote.log.storage.manager.class.path=/opt/kafka/tiered-storage-libs/*
> remote.log.storage.manager.class.name=io.aiven.kafka.tieredstorage.RemoteStorageManager
> remote.log.metadata.manager.class.name=org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager
> remote.log.metadata.manager.listener.name=INTERNAL_PLAINTEXT
> remote.log.manager.task.interval.ms=5000
> remote.log.manager.thread.pool.size=10
> remote.log.reader.threads=10
> remote.log.reader.max.pending.tasks=100
> rlmm.config.remote.log.metadata.topic.replication.factor=1
> rlmm.config.remote.log.metadata.topic.num.partitions=50
> rlmm.config.remote.log.metadata.topic.retention.ms=-1
> rsm.config.chunk.cache.class=io.aiven.kafka.tieredstorage.chunkmanager.cache.DiskBasedChunkCache
> rsm.config.chunk.cache.path=/data/tiered-storage-cache
> rsm.config.chunk.cache.size=1073741824
> rsm.config.metrics.recording.level=DEBUG    
> rsm.config.storage.aws.credentials.provider.class=software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider
> rsm.config.storage.backend.class.name=io.aiven.kafka.tieredstorage.storage.s3.S3Storage
> rsm.config.storage.backend.class=io.aiven.kafka.tieredstorage.storage.s3.S3Storage
> rsm.config.storage.s3.region=us-east-1
> rsm.config.chunk.size=102400
> rsm.config.storage.s3.multipart.upload.part.size=16777216 {code}
> When a broker in the cluster get rotated (replaced or restarted) some brokers 
> start throwing this error repeatedly: 
> {code:java}
> [RemoteLogManager=1 partition=yTypIvtBRY2l3sD4-8M7fA:loadgen-3] Error 
> occurred while copying log segments of partition: 
> yTypIvtBRY2l3sD4-8M7fA:loadgen-3 
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.KafkaException: 
> java.util.concurrent.TimeoutException: Timed out in catching up with the 
> expected offset by consumer.
>     at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
>     at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
>     at 
> kafka.log.remote.RemoteLogManager$RLMTask.copyLogSegment(RemoteLogManager.java:728)
>     at 
> kafka.log.remote.RemoteLogManager$RLMTask.copyLogSegmentsToRemote(RemoteLogManager.java:687)
>     at 
> kafka.log.remote.RemoteLogManager$RLMTask.run(RemoteLogManager.java:790)
>     at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
>     at 
> java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>     at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>     at java.base/java.lang.Thread.run(Thread.java:833)
> Caused by: org.apache.kafka.common.KafkaException: 
> java.util.concurrent.TimeoutException: Timed out in catching up with the 
> expected offset by consumer.
>     at 
> org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.lambda$storeRemoteLogMetadata$0(TopicBasedRemoteLogMetadataManager.java:188)
>     at 
> java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:718)
>     at 
> java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:483)
>     at 
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
>     at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
>     at 
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
>     at 
> 

Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-02 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1342844776


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,30 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+var leaderId = -1
+var leaderEpoch = -1
+partitionInfoOrError match {
+  case Right(x) =>
+  leaderId = x.leaderReplicaIdOpt.getOrElse(-1)
+  leaderEpoch = x.getLeaderEpoch
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+val partitionInfo = metadataCache.getPartitionInfo(tp.topic, 
tp.partition)

Review Comment:
   I don't think so, getPartitionInfo returns an Option, the equivalent of null 
would be an empty option. We don't seem to null check this value elsewhere 
either.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14506: Implement DeleteGroups API and OffsetDelete API [kafka]

2023-10-02 Thread via GitHub


yangy commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1342851207


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -333,6 +348,87 @@ public CoordinatorResult 
commitOffset(
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Handles an OffsetDelete request.
+ *
+ * @param request The OffsetDelete request.
+ *
+ * @return A Result containing the OffsetDeleteResponseData response and
+ * a list of records to update the state machine.
+ */
+public CoordinatorResult deleteOffsets(
+OffsetDeleteRequestData request
+) throws ApiException {
+final Group group = validateOffsetDelete(request);
+final List records = new ArrayList<>();
+final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+final OffsetDeleteResponseData response = new 
OffsetDeleteResponseData();
+final TimelineHashMap> offsetsByTopic =
+offsetsByGroup.get(request.groupId());
+
+request.topics().forEach(topic -> {
+final 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+final TimelineHashMap 
offsetsByPartition = offsetsByTopic == null ?
+null : offsetsByTopic.get(topic.name());
+
+if (group.isSubscribedToTopic(topic.name())) {
+topic.partitions().forEach(partition ->
+responsePartitionCollection.add(new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+.setPartitionIndex(partition.partitionIndex())
+.setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code())
+)
+);
+} else {
+topic.partitions().forEach(partition -> {
+if (offsetsByPartition != null && 
offsetsByPartition.containsKey(partition.partitionIndex())) {
+responsePartitionCollection.add(new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+.setPartitionIndex(partition.partitionIndex())
+);
+
records.add(RecordHelpers.newOffsetCommitTombstoneRecord(
+request.groupId(),
+topic.name(),
+partition.partitionIndex()
+));
+}
+});
+}
+
+final OffsetDeleteResponseData.OffsetDeleteResponseTopic 
responseTopic =
+new OffsetDeleteResponseData.OffsetDeleteResponseTopic()
+.setName(topic.name())
+.setPartitions(responsePartitionCollection);
+responseTopicCollection.add(responseTopic);
+});
+response.setTopics(responseTopicCollection);
+
+return new CoordinatorResult<>(records, response);
+}
+
+/**
+ * Deletes offsets as part of a DeleteGroups request.
+ * Populates the record list passed in with records to update the state 
machine.
+ * Validations are done in {@link 
GroupCoordinatorShard#deleteGroups(RequestContext, List)}
+ *
+ * @param groupId The ID of the given group.
+ * @param records The record list to populate.
+ */
+public void deleteAllOffsets(
+String groupId,
+List records
+) {
+TimelineHashMap> 
offsetsByTopic = offsetsByGroup.get(groupId);
+
+if (offsetsByTopic != null) {

Review Comment:
   Any chance deleteAllOffsets will get invoked before the group is completely 
removed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-02 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1342850556


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,30 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+var leaderId = -1
+var leaderEpoch = -1
+partitionInfoOrError match {
+  case Right(x) =>
+  leaderId = x.leaderReplicaIdOpt.getOrElse(-1)
+  leaderEpoch = x.getLeaderEpoch
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+val partitionInfo = metadataCache.getPartitionInfo(tp.topic, 
tp.partition)
+partitionInfo.foreach { info =>

Review Comment:
   Looking at other uses of partitionInfo I think this is a style choice. There 
can only be 1 partitionInfo in the getPartitionInfo object, so the forEach 
should only ever access 1 entry, I think this is just a more succinct way of 
accessing it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KIP-951: Server side and protocol changes for KIP-951 [kafka]

2023-10-02 Thread via GitHub


chb2ab commented on code in PR #1:
URL: https://github.com/apache/kafka/pull/1#discussion_r1342844776


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -559,6 +560,30 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  case class LeaderNode(leaderId: Int, leaderEpoch: Int, node: Node)
+
+  private def getCurrentLeader(tp: TopicPartition): LeaderNode = {
+val partitionInfoOrError = replicaManager.getPartitionOrError(tp)
+var leaderId = -1
+var leaderEpoch = -1
+partitionInfoOrError match {
+  case Right(x) =>
+  leaderId = x.leaderReplicaIdOpt.getOrElse(-1)
+  leaderEpoch = x.getLeaderEpoch
+  case Left(x) =>
+debug(s"Unable to retrieve local leaderId and Epoch with error $x, 
falling back to metadata cache")
+val partitionInfo = metadataCache.getPartitionInfo(tp.topic, 
tp.partition)

Review Comment:
   I don't think so, getPartitionInfo returns an Option, the equivalent of null 
would be an empty option.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14504: Implement DescribeGroups API [kafka]

2023-10-02 Thread via GitHub


dajac commented on code in PR #14462:
URL: https://github.com/apache/kafka/pull/14462#discussion_r1342814124


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -444,6 +445,66 @@ public List 
listGroups(List statesFi
 return groupStream.map(group -> 
group.asListedGroup(committedOffset)).collect(Collectors.toList());
 }
 
+/**
+ * Handles a DescribeGroup request.
+ *
+ * @param groupIds  The IDs of the groups to describe.
+ * @param committedOffset   A specified committed offset corresponding to 
this shard.
+ *
+ * @return A list containing the DescribeGroupsResponseData.DescribedGroup.
+ */
+public List describeGroups(
+List groupIds,
+long committedOffset
+) {
+final List describedGroups 
= new ArrayList<>();
+groupIds.forEach(groupId -> {
+try {
+Group group = group(groupId, committedOffset);
+if (group.type() != GENERIC) {
+// We don't support upgrading/downgrading between 
protocols at the moment, so
+// we throw an exception if a group exists with the wrong 
type.
+throw new GroupIdNotFoundException(String.format("Group %s 
is not a generic group.",
+groupId));
+}
+GenericGroup genericGroup = (GenericGroup) group;

Review Comment:
   When the group does not exist, in the current code, we return 
`GroupCoordinator.DeadGroup` instead of returning a `GroupIdNotFoundException` 
exception. Do you confirm?



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##
@@ -751,6 +752,112 @@ public void testListGroupsFailedImmediately()
 assertEquals(Collections.emptyList(), listGroupsResponseData.groups());
 }
 
+@Test
+public void testDescribeGroups() throws Exception {
+CoordinatorRuntime runtime = 
mockRuntime();
+GroupCoordinatorService service = new GroupCoordinatorService(
+new LogContext(),
+createConfig(),
+runtime
+);
+int partitionCount = 2;
+service.startup(() -> partitionCount);
+
+DescribeGroupsResponseData.DescribedGroup describedGroup1 = new 
DescribeGroupsResponseData.DescribedGroup()
+.setGroupId("group-id-1");
+DescribeGroupsResponseData.DescribedGroup describedGroup2 = new 
DescribeGroupsResponseData.DescribedGroup()
+.setGroupId("group-id-2");
+List 
expectedDescribedGroups = Arrays.asList(
+describedGroup1,
+describedGroup2
+);
+
+when(runtime.scheduleReadOperation(
+ArgumentMatchers.eq("describe-groups"),
+ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup1)));
+
+CompletableFuture describedGroupFuture = new 
CompletableFuture<>();
+when(runtime.scheduleReadOperation(
+ArgumentMatchers.eq("describe-groups"),
+ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)),
+ArgumentMatchers.any()
+)).thenReturn(describedGroupFuture);
+
+CompletableFuture> 
future =
+service.describeGroups(requestContext(ApiKeys.DESCRIBE_GROUPS), 
Arrays.asList("group-id-1", "group-id-2"));
+
+assertFalse(future.isDone());
+
describedGroupFuture.complete(Collections.singletonList(describedGroup2));
+
+assertTrue(future.get().containsAll(expectedDescribedGroups));
+assertTrue(expectedDescribedGroups.containsAll(future.get()));

Review Comment:
   nit: Could we use assertEquals here?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##
@@ -444,6 +445,66 @@ public List 
listGroups(List statesFi
 return groupStream.map(group -> 
group.asListedGroup(committedOffset)).collect(Collectors.toList());
 }
 
+/**
+ * Handles a DescribeGroup request.
+ *
+ * @param groupIds  The IDs of the groups to describe.
+ * @param committedOffset   A specified committed offset corresponding to 
this shard.
+ *
+ * @return A list containing the DescribeGroupsResponseData.DescribedGroup.
+ */
+public List describeGroups(
+List groupIds,
+long committedOffset
+) {
+final List describedGroups 
= new ArrayList<>();
+groupIds.forEach(groupId -> {
+try {
+Group group = group(groupId, committedOffset);
+if (group.type() != GENERIC) {
+// We don't support upgrading/downgrading between 
protocols at the moment, so
+// we throw an exception if a group exists 

Re: [PR] KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests [kafka]

2023-10-02 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1342785385


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. 
The module creates a
+ * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link 
MembershipManager} and enqueue it to
+ * the network queue to be sent out. Once the response is received, the module 
will update the state in the
+ * {@link MembershipManager} and handle any errors.
+ *
+ * The manager will try to send a heartbeat when the member is in {@link 
MemberState#STABLE},
+ * {@link MemberState#UNJOINED}, or {@link MemberState#RECONCILING}. Which 
mean the member is either in a stable
+ * group, is trying to join a group, or is in the process of reconciling the 
assignment changes.
+ *
+ * If the member got kick out of a group, it will try to give up the 
current assignment by invoking {@code
+ * OnPartitionsLost} because reattempting to join again with a zero epoch.
+ *
+ * If the member does not have groupId configured or encountering fatal 
exceptions, a heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat 
and try to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, 
TimeoutException. The subsequent attempt will be
+ * backoff exponentially.
+ *
+ * When the member completes the assignment reconciliation, the {@link 
HeartbeatRequestState} will be reset so
+ * that a heartbeat will be sent in the next event loop.
+ *
+ * See {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final 

Re: [PR] KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests [kafka]

2023-10-02 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1342783380


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. 
The module creates a
+ * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link 
MembershipManager} and enqueue it to
+ * the network queue to be sent out. Once the response is received, the module 
will update the state in the
+ * {@link MembershipManager} and handle any errors.
+ *
+ * The manager will try to send a heartbeat when the member is in {@link 
MemberState#STABLE},
+ * {@link MemberState#UNJOINED}, or {@link MemberState#RECONCILING}. Which 
mean the member is either in a stable
+ * group, is trying to join a group, or is in the process of reconciling the 
assignment changes.
+ *
+ * If the member got kick out of a group, it will try to give up the 
current assignment by invoking {@code
+ * OnPartitionsLost} because reattempting to join again with a zero epoch.
+ *
+ * If the member does not have groupId configured or encountering fatal 
exceptions, a heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat 
and try to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, 
TimeoutException. The subsequent attempt will be
+ * backoff exponentially.
+ *
+ * When the member completes the assignment reconciliation, the {@link 
HeartbeatRequestState} will be reset so
+ * that a heartbeat will be sent in the next event loop.
+ *
+ * See {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final 

Re: [PR] KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests [kafka]

2023-10-02 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1342781938


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. 
The module creates a
+ * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link 
MembershipManager} and enqueue it to
+ * the network queue to be sent out. Once the response is received, the module 
will update the state in the
+ * {@link MembershipManager} and handle any errors.
+ *
+ * The manager will try to send a heartbeat when the member is in {@link 
MemberState#STABLE},
+ * {@link MemberState#UNJOINED}, or {@link MemberState#RECONCILING}. Which 
mean the member is either in a stable
+ * group, is trying to join a group, or is in the process of reconciling the 
assignment changes.
+ *
+ * If the member got kick out of a group, it will try to give up the 
current assignment by invoking {@code
+ * OnPartitionsLost} because reattempting to join again with a zero epoch.
+ *
+ * If the member does not have groupId configured or encountering fatal 
exceptions, a heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat 
and try to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, 
TimeoutException. The subsequent attempt will be
+ * backoff exponentially.
+ *
+ * When the member completes the assignment reconciliation, the {@link 
HeartbeatRequestState} will be reset so
+ * that a heartbeat will be sent in the next event loop.
+ *
+ * See {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final 

Re: [PR] KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests [kafka]

2023-10-02 Thread via GitHub


philipnee commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1342778930


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. 
The module creates a
+ * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link 
MembershipManager} and enqueue it to
+ * the network queue to be sent out. Once the response is received, the module 
will update the state in the
+ * {@link MembershipManager} and handle any errors.
+ *
+ * The manager will try to send a heartbeat when the member is in {@link 
MemberState#STABLE},
+ * {@link MemberState#UNJOINED}, or {@link MemberState#RECONCILING}. Which 
mean the member is either in a stable
+ * group, is trying to join a group, or is in the process of reconciling the 
assignment changes.
+ *
+ * If the member got kick out of a group, it will try to give up the 
current assignment by invoking {@code
+ * OnPartitionsLost} because reattempting to join again with a zero epoch.
+ *
+ * If the member does not have groupId configured or encountering fatal 
exceptions, a heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat 
and try to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, 
TimeoutException. The subsequent attempt will be
+ * backoff exponentially.
+ *
+ * When the member completes the assignment reconciliation, the {@link 
HeartbeatRequestState} will be reset so
+ * that a heartbeat will be sent in the next event loop.
+ *
+ * See {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final 

Re: [PR] KAFKA-15326: [9/N] Start and stop executors and cornercases [kafka]

2023-10-02 Thread via GitHub


lucasbru merged PR #14281:
URL: https://github.com/apache/kafka/pull/14281


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15326: [9/N] Start and stop executors and cornercases [kafka]

2023-10-02 Thread via GitHub


lucasbru commented on PR #14281:
URL: https://github.com/apache/kafka/pull/14281#issuecomment-174303

   Build failures unrelated
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests [kafka]

2023-10-02 Thread via GitHub


dajac commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1342694336


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. 
The module creates a
+ * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link 
MembershipManager} and enqueue it to
+ * the network queue to be sent out. Once the response is received, the module 
will update the state in the
+ * {@link MembershipManager} and handle any errors.
+ *
+ * The manager will try to send a heartbeat when the member is in {@link 
MemberState#STABLE},
+ * {@link MemberState#UNJOINED}, or {@link MemberState#RECONCILING}. Which 
mean the member is either in a stable
+ * group, is trying to join a group, or is in the process of reconciling the 
assignment changes.
+ *
+ * If the member got kick out of a group, it will try to give up the 
current assignment by invoking {@code
+ * OnPartitionsLost} because reattempting to join again with a zero epoch.
+ *
+ * If the member does not have groupId configured or encountering fatal 
exceptions, a heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat 
and try to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, 
TimeoutException. The subsequent attempt will be
+ * backoff exponentially.
+ *
+ * When the member completes the assignment reconciliation, the {@link 
HeartbeatRequestState} will be reset so
+ * that a heartbeat will be sent in the next event loop.
+ *
+ * See {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final 

Re: [PR] KAFKA-15278: Implement HeartbeatRequestManager to handle heartbeat requests [kafka]

2023-10-02 Thread via GitHub


dajac commented on code in PR #14364:
URL: https://github.com/apache/kafka/pull/14364#discussion_r1342667683


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java:
##
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
+import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+/**
+ * Manages the request creation and response handling for the heartbeat. 
The module creates a
+ * {@link ConsumerGroupHeartbeatRequest} using the state stored in the {@link 
MembershipManager} and enqueue it to
+ * the network queue to be sent out. Once the response is received, the module 
will update the state in the
+ * {@link MembershipManager} and handle any errors.
+ *
+ * The manager will try to send a heartbeat when the member is in {@link 
MemberState#STABLE},
+ * {@link MemberState#UNJOINED}, or {@link MemberState#RECONCILING}. Which 
mean the member is either in a stable
+ * group, is trying to join a group, or is in the process of reconciling the 
assignment changes.
+ *
+ * If the member got kick out of a group, it will try to give up the 
current assignment by invoking {@code
+ * OnPartitionsLost} because reattempting to join again with a zero epoch.
+ *
+ * If the member does not have groupId configured or encountering fatal 
exceptions, a heartbeat will not be sent.
+ *
+ * If the coordinator not is not found, we will skip sending the heartbeat 
and try to find a coordinator first.
+ *
+ * If the heartbeat failed due to retriable errors, such as, 
TimeoutException. The subsequent attempt will be
+ * backoff exponentially.
+ *
+ * When the member completes the assignment reconciliation, the {@link 
HeartbeatRequestState} will be reset so
+ * that a heartbeat will be sent in the next event loop.
+ *
+ * See {@link HeartbeatRequestState} for more details.
+ */
+public class HeartbeatRequestManager implements RequestManager {
+private final Logger logger;
+private final int rebalanceTimeoutMs;
+
+private final CoordinatorRequestManager coordinatorRequestManager;
+private final SubscriptionState subscriptions;
+private final HeartbeatRequestState heartbeatRequestState;
+private final MembershipManager membershipManager;
+private final ErrorEventHandler nonRetriableErrorHandler;
+
+public HeartbeatRequestManager(
+final Time time,
+final LogContext logContext,
+final ConsumerConfig config,
+final CoordinatorRequestManager coordinatorRequestManager,
+final SubscriptionState subscriptions,
+final MembershipManager membershipManager,
+final ErrorEventHandler nonRetriableErrorHandler) {
+this.coordinatorRequestManager = coordinatorRequestManager;
+this.logger = logContext.logger(getClass());
+this.subscriptions = subscriptions;
+this.membershipManager = membershipManager;
+this.nonRetriableErrorHandler = nonRetriableErrorHandler;
+this.rebalanceTimeoutMs = 
config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG);
+long retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
+long retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
+this.heartbeatRequestState = new HeartbeatRequestState(logContext, 
time, 0, retryBackoffMs,
+retryBackoffMaxMs, rebalanceTimeoutMs);
+}
+
+// Visible for testing
+HeartbeatRequestManager(
+final 

Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]

2023-10-02 Thread via GitHub


showuon commented on code in PR #14428:
URL: https://github.com/apache/kafka/pull/14428#discussion_r1342676499


##
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##
@@ -76,9 +85,37 @@ protected LeaderState(
 boolean hasAcknowledgedLeader = voterId == localId;
 this.voterStates.put(voterId, new ReplicaState(voterId, 
hasAcknowledgedLeader));
 }
+this.majority = voters.size() / 2;
 this.grantingVoters = Collections.unmodifiableSet(new 
HashSet<>(grantingVoters));
 this.log = logContext.logger(LeaderState.class);
 this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
+this.fetchTimeoutMs = fetchTimeoutMs;
+this.fetchTimer = time.timer(fetchTimeoutMs);
+}
+
+public boolean hasMajorityFollowerFetchTimeoutExpired(long currentTimeMs) {
+fetchTimer.update(currentTimeMs);
+boolean isExpired = fetchTimer.isExpired();
+if (isExpired) {
+log.info("Did not receive fetch request from the majority of the 
voters within {}ms. Current fetched voters are {}.",
+fetchTimeoutMs, fetchedVoters);
+}
+return isExpired;
+}
+
+public void maybeResetMajorityFollowerFetchTimeout(int id, long 
currentTimeMs) {
+updateFetchedVoters(id);
+if (fetchedVoters.size() >= majority) {
+fetchedVoters.clear();
+fetchTimer.update(currentTimeMs);
+fetchTimer.reset(fetchTimeoutMs);
+}
+}
+
+private void updateFetchedVoters(int id) {
+if (isVoter(id)) {
+fetchedVoters.add(id);
+}

Review Comment:
   > Note that ReplicaState already contains the lastFetchTimestamp.
   
   I'm trying to re-use the `lastFetchTimestamp` in ReplicaState today, but 
found it won't work as expected since the default value for it is `-1`, which 
means, when a note becomes a leader, all the `lastFetchTimestamp` of follower 
nodes are `-1`.  Using current `timer` way is more readable IMO.
   
   > The part that is not clear to me is when or how to wake up the leader for 
a poll. We need to update KafkaRaftClient::pollLeader so that the replicas' 
last fetch time is taken into account when blocking on the messageQueue.poll
   
   Good question. My thought is, we add some buffer to tolerate the operation 
time. Like when [checking 
shrinkISR](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L373-L375),
 we give a 1.5x of the timeout to make things easier, instead of calculating 
the accurate timestamp. So, I'm thinking we use `fetchTimeout * 1.5`. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]

2023-10-02 Thread via GitHub


showuon commented on code in PR #14428:
URL: https://github.com/apache/kafka/pull/14428#discussion_r1342668812


##
raft/src/main/java/org/apache/kafka/raft/LeaderState.java:
##
@@ -76,9 +85,37 @@ protected LeaderState(
 boolean hasAcknowledgedLeader = voterId == localId;
 this.voterStates.put(voterId, new ReplicaState(voterId, 
hasAcknowledgedLeader));
 }
+this.majority = voters.size() / 2;
 this.grantingVoters = Collections.unmodifiableSet(new 
HashSet<>(grantingVoters));
 this.log = logContext.logger(LeaderState.class);
 this.accumulator = Objects.requireNonNull(accumulator, "accumulator 
must be non-null");
+this.fetchTimeoutMs = fetchTimeoutMs;
+this.fetchTimer = time.timer(fetchTimeoutMs);
+}
+
+public boolean hasMajorityFollowerFetchTimeoutExpired(long currentTimeMs) {

Review Comment:
   Changed to `hasMajorityFollowerFetchExpired`. Let me know if you have any 
better suggestion. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]

2023-10-02 Thread via GitHub


showuon commented on code in PR #14428:
URL: https://github.com/apache/kafka/pull/14428#discussion_r1342669657


##
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java:
##
@@ -485,6 +485,49 @@ public void 
testHandleBeginQuorumEpochAfterUserInitiatedResign() throws Exceptio
 context.listener.currentLeaderAndEpoch());
 }
 
+@Test
+public void 
testLeaderShouldResignLeadershipIfNotGetFetchRequestFromMajorityVoters() throws 
Exception {

Review Comment:
   Will add tests later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15489: resign leadership when no fetch from majority voters [kafka]

2023-10-02 Thread via GitHub


showuon commented on code in PR #14428:
URL: https://github.com/apache/kafka/pull/14428#discussion_r1342665261


##
raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java:
##
@@ -965,6 +965,10 @@ private CompletableFuture 
handleFetchRequest(
 }
 
 int replicaId = FetchRequest.replicaId(request);
+
+Optional> state = quorum.maybeLeaderState();
+state.ifPresent(s -> 
s.maybeResetMajorityFollowerFetchTimeout(replicaId, currentTimeMs));

Review Comment:
   Agree! Now, we update the timer when we update the lastFetchTimestamp in 
replicaState.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14506: Implement DeleteGroups API and OffsetDelete API [kafka]

2023-10-02 Thread via GitHub


dajac commented on code in PR #14408:
URL: https://github.com/apache/kafka/pull/14408#discussion_r1342599402


##
clients/src/test/java/org/apache/kafka/common/requests/DeleteGroupsRequestTest.java:
##
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.DeleteGroupsRequestData;
+import org.apache.kafka.common.message.DeleteGroupsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static 
org.apache.kafka.common.requests.DeleteGroupsRequest.getErrorResultCollection;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class DeleteGroupsRequestTest {
+
+protected static String groupId1 = "group-id-1";
+protected static String groupId2 = "group-id-2";
+
+private static DeleteGroupsRequestData data;
+
+@BeforeEach
+public void setUp() {
+data = new DeleteGroupsRequestData()
+.setGroupsNames(Arrays.asList(groupId1, groupId2));
+}

Review Comment:
   nit: Given that there is only one test. I would rather move everything into 
that test.



##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##
@@ -307,6 +344,57 @@ private void replay(
 
 lastWrittenOffset++;
 }
+
+public void testOffsetDeleteWith(
+OffsetMetadataManagerTestContext context,
+String groupId,
+String topic,
+int partition,
+Errors error

Review Comment:
   nit: expectedError?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##
@@ -333,6 +348,87 @@ public CoordinatorResult 
commitOffset(
 return new CoordinatorResult<>(records, response);
 }
 
+/**
+ * Handles an OffsetDelete request.
+ *
+ * @param request The OffsetDelete request.
+ *
+ * @return A Result containing the OffsetDeleteResponseData response and
+ * a list of records to update the state machine.
+ */
+public CoordinatorResult deleteOffsets(
+OffsetDeleteRequestData request
+) throws ApiException {
+final Group group = validateOffsetDelete(request);
+final List records = new ArrayList<>();
+final OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection 
responseTopicCollection =
+new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection();
+final OffsetDeleteResponseData response = new 
OffsetDeleteResponseData();
+final TimelineHashMap> offsetsByTopic =
+offsetsByGroup.get(request.groupId());
+
+request.topics().forEach(topic -> {
+final 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection 
responsePartitionCollection =
+new 
OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
+final TimelineHashMap 
offsetsByPartition = offsetsByTopic == null ?
+null : offsetsByTopic.get(topic.name());
+
+if (group.isSubscribedToTopic(topic.name())) {
+topic.partitions().forEach(partition ->
+responsePartitionCollection.add(new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+.setPartitionIndex(partition.partitionIndex())
+.setErrorCode(Errors.GROUP_SUBSCRIBED_TO_TOPIC.code())
+)
+);
+} else {
+topic.partitions().forEach(partition -> {
+if (offsetsByPartition != null && 
offsetsByPartition.containsKey(partition.partitionIndex())) {
+responsePartitionCollection.add(new 
OffsetDeleteResponseData.OffsetDeleteResponsePartition()
+.setPartitionIndex(partition.partitionIndex())
+);
+
records.add(RecordHelpers.newOffsetCommitTombstoneRecord(
+request.groupId(),
+

[jira] [Commented] (KAFKA-15525) Segment uploads stop working following a broker failure

2023-10-02 Thread Kamal Chandraprakash (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17771065#comment-17771065
 ] 

Kamal Chandraprakash commented on KAFKA-15525:
--

While uploading the segment, the RemoteLogManager sends an event to the 
internal topic, if it's unavailable then it cannot upload the segment. 
{{rlmm.config.remote.log.metadata.topic.replication.factor}} is set to 1, can 
you try increasing the replication-factor to 3 (or) 4?

> Segment uploads stop working following a broker failure
> ---
>
> Key: KAFKA-15525
> URL: https://issues.apache.org/jira/browse/KAFKA-15525
> Project: Kafka
>  Issue Type: Bug
>  Components: Tiered-Storage
>Affects Versions: 3.6.0
>Reporter: Francois Visconte
>Priority: Major
>
> I have a tiered-storage enabled cluster and topic where I continuously 
> produce and consume to/from a TS-enabled topic on that cluster.
> Here are the topic settings I’m using: 
> {code:java}
> local.retention.ms=12
> remote.storage.enable=true
> retention.ms: 1080
> segment.bytes: 51200
> {code}
> Here are my broker settings:
> {code:java}
> remote.log.storage.system.enable=true
> remote.log.storage.manager.class.path=/opt/kafka/tiered-storage-libs/*
> remote.log.storage.manager.class.name=io.aiven.kafka.tieredstorage.RemoteStorageManager
> remote.log.metadata.manager.class.name=org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager
> remote.log.metadata.manager.listener.name=INTERNAL_PLAINTEXT
> remote.log.manager.task.interval.ms=5000
> remote.log.manager.thread.pool.size=10
> remote.log.reader.threads=10
> remote.log.reader.max.pending.tasks=100
> rlmm.config.remote.log.metadata.topic.replication.factor=1
> rlmm.config.remote.log.metadata.topic.num.partitions=50
> rlmm.config.remote.log.metadata.topic.retention.ms=-1
> rsm.config.chunk.cache.class=io.aiven.kafka.tieredstorage.chunkmanager.cache.DiskBasedChunkCache
> rsm.config.chunk.cache.path=/data/tiered-storage-cache
> rsm.config.chunk.cache.size=1073741824
> rsm.config.metrics.recording.level=DEBUG    
> rsm.config.storage.aws.credentials.provider.class=software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider
> rsm.config.storage.backend.class.name=io.aiven.kafka.tieredstorage.storage.s3.S3Storage
> rsm.config.storage.backend.class=io.aiven.kafka.tieredstorage.storage.s3.S3Storage
> rsm.config.storage.s3.region=us-east-1
> rsm.config.chunk.size=102400
> rsm.config.storage.s3.multipart.upload.part.size=16777216 {code}
> When a broker in the cluster get rotated (replaced or restarted) some brokers 
> start throwing this error repeatedly: 
> {code:java}
> [RemoteLogManager=1 partition=yTypIvtBRY2l3sD4-8M7fA:loadgen-3] Error 
> occurred while copying log segments of partition: 
> yTypIvtBRY2l3sD4-8M7fA:loadgen-3 
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.KafkaException: 
> java.util.concurrent.TimeoutException: Timed out in catching up with the 
> expected offset by consumer.
>     at 
> java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
>     at 
> java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
>     at 
> kafka.log.remote.RemoteLogManager$RLMTask.copyLogSegment(RemoteLogManager.java:728)
>     at 
> kafka.log.remote.RemoteLogManager$RLMTask.copyLogSegmentsToRemote(RemoteLogManager.java:687)
>     at 
> kafka.log.remote.RemoteLogManager$RLMTask.run(RemoteLogManager.java:790)
>     at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
>     at 
> java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>     at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>     at java.base/java.lang.Thread.run(Thread.java:833)
> Caused by: org.apache.kafka.common.KafkaException: 
> java.util.concurrent.TimeoutException: Timed out in catching up with the 
> expected offset by consumer.
>     at 
> org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.lambda$storeRemoteLogMetadata$0(TopicBasedRemoteLogMetadataManager.java:188)
>     at 
> java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:718)
>     at 
> java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:483)
>     at 
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
>     at 
> 

Re: [PR] MINOR: Correcting Javadocs for throwIfMemberEpochIsInvalid [kafka]

2023-10-02 Thread via GitHub


dajac merged PR #14468:
URL: https://github.com/apache/kafka/pull/14468


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Remove the client side assignor from the ConsumerGroupHeartbeat API [kafka]

2023-10-02 Thread via GitHub


dajac merged PR #14469:
URL: https://github.com/apache/kafka/pull/14469


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14581: Moving GetOffsetShell to tools [kafka]

2023-10-02 Thread via GitHub


fvaleri commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1742881176

   > @fvaleri I think it's time to take a note about this changes in 
[KAFKA-14705](https://issues.apache.org/jira/browse/KAFKA-14705), isn't it?
   
   This should fall under the phrase: "4. We should also get rid of many 
deprecated options across all tools, including not migrated tools."
   
   If you want to provide a list of all deprecated options across all tools, 
that would be great, and I will be happy to review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15498) Upgrade Snappy-Java to 1.1.10.4

2023-10-02 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17771051#comment-17771051
 ] 

Luke Chen commented on KAFKA-15498:
---

Note: In v3.7.0, Snappy-Java is upgraded to 1.1.10.5 in this PR: 
https://github.com/apache/kafka/pull/14458

> Upgrade Snappy-Java to 1.1.10.4
> ---
>
> Key: KAFKA-15498
> URL: https://issues.apache.org/jira/browse/KAFKA-15498
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.4.1, 3.5.1
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Blocker
> Fix For: 3.6.0
>
>
> Snappy-java published a new vulnerability
> <[https://github.com/xerial/snappy-java/security/advisories/GHSA-55g7-9cwv-5qfv]>
> that will cause OOM error in the server.
> Kafka is also impacted by this vulnerability since it's like CVE-2023-34455
> <[https://nvd.nist.gov/vuln/detail/CVE-2023-34455]>.
> We'd better bump the snappy-java version to bypass this vulnerability.
> PR <[https://github.com/apache/kafka/pull/14434]> is created to run the CI
> build.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15498: bump snappy-java version to 1.1.10.5 [kafka]

2023-10-02 Thread via GitHub


showuon merged PR #14458:
URL: https://github.com/apache/kafka/pull/14458


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15498: bump snappy-java version to 1.1.10.5 [kafka]

2023-10-02 Thread via GitHub


showuon commented on PR #14458:
URL: https://github.com/apache/kafka/pull/14458#issuecomment-1742858256

   Agree it's not a blocker for v3.6.0. Failed tests are unrelated:
   ```
   Build / JDK 21 and Scala 2.13 / 
kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testDescribeTokenForOtherUserPasses(String).quorum=kraft
   Build / JDK 21 and Scala 2.13 / 
kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testDescribeTokenForOtherUserFails(String).quorum=kraft
   Build / JDK 21 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   Build / JDK 11 and Scala 2.13 / 
kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest.testCreateTokenForOtherUserFails(String).quorum=kraft
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector
   Build / JDK 17 and Scala 2.13 / 
kafka.server.DescribeClusterRequestTest.testDescribeClusterRequestIncludingClusterAuthorizedOperations(String).quorum=kraft
   Build / JDK 17 and Scala 2.13 / 
kafka.server.DynamicBrokerReconfigurationTest.testTrustStoreAlter(String).quorum=kraft
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-15525) Segment uploads stop working following a broker failure

2023-10-02 Thread Francois Visconte (Jira)
Francois Visconte created KAFKA-15525:
-

 Summary: Segment uploads stop working following a broker failure
 Key: KAFKA-15525
 URL: https://issues.apache.org/jira/browse/KAFKA-15525
 Project: Kafka
  Issue Type: Bug
  Components: Tiered-Storage
Affects Versions: 3.6.0
Reporter: Francois Visconte


I have a tiered-storage enabled cluster and topic where I continuously produce 
and consume to/from a TS-enabled topic on that cluster.

Here are the topic settings I’m using: 

{code:java}
local.retention.ms=12
remote.storage.enable=true
retention.ms: 1080
segment.bytes: 51200
{code}
Here are my broker settings:
{code:java}
remote.log.storage.system.enable=true
remote.log.storage.manager.class.path=/opt/kafka/tiered-storage-libs/*
remote.log.storage.manager.class.name=io.aiven.kafka.tieredstorage.RemoteStorageManager
remote.log.metadata.manager.class.name=org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager
remote.log.metadata.manager.listener.name=INTERNAL_PLAINTEXT
remote.log.manager.task.interval.ms=5000
remote.log.manager.thread.pool.size=10
remote.log.reader.threads=10
remote.log.reader.max.pending.tasks=100
rlmm.config.remote.log.metadata.topic.replication.factor=1
rlmm.config.remote.log.metadata.topic.num.partitions=50
rlmm.config.remote.log.metadata.topic.retention.ms=-1
rsm.config.chunk.cache.class=io.aiven.kafka.tieredstorage.chunkmanager.cache.DiskBasedChunkCache
rsm.config.chunk.cache.path=/data/tiered-storage-cache
rsm.config.chunk.cache.size=1073741824
rsm.config.metrics.recording.level=DEBUG    
rsm.config.storage.aws.credentials.provider.class=software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider
rsm.config.storage.backend.class.name=io.aiven.kafka.tieredstorage.storage.s3.S3Storage
rsm.config.storage.backend.class=io.aiven.kafka.tieredstorage.storage.s3.S3Storage
rsm.config.storage.s3.region=us-east-1
rsm.config.chunk.size=102400
rsm.config.storage.s3.multipart.upload.part.size=16777216 {code}
When a broker in the cluster get rotated (replaced or restarted) some brokers 
start throwing this error repeatedly: 
{code:java}
[RemoteLogManager=1 partition=yTypIvtBRY2l3sD4-8M7fA:loadgen-3] Error 
occurred while copying log segments of partition: 
yTypIvtBRY2l3sD4-8M7fA:loadgen-3 

java.util.concurrent.ExecutionException: 
org.apache.kafka.common.KafkaException: java.util.concurrent.TimeoutException: 
Timed out in catching up with the expected offset by consumer.
    at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
    at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
    at 
kafka.log.remote.RemoteLogManager$RLMTask.copyLogSegment(RemoteLogManager.java:728)
    at 
kafka.log.remote.RemoteLogManager$RLMTask.copyLogSegmentsToRemote(RemoteLogManager.java:687)
    at kafka.log.remote.RemoteLogManager$RLMTask.run(RemoteLogManager.java:790)
    at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at 
java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.common.KafkaException: 
java.util.concurrent.TimeoutException: Timed out in catching up with the 
expected offset by consumer.
    at 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.lambda$storeRemoteLogMetadata$0(TopicBasedRemoteLogMetadataManager.java:188)
    at 
java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:718)
    at 
java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:483)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
    at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
    at 
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
    at 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Caused by: java.util.concurrent.TimeoutException: Timed out in catching up with 
the expected offset by consumer.
    at 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerManager.waitTillConsumptionCatchesUp(ConsumerManager.java:121)
    at 
org.apache.kafka.server.log.remote.metadata.storage.ConsumerManager.waitTillConsumptionCatchesUp(ConsumerManager.java:89)
    at 

Re: [PR] KAFKA-14581: Moving GetOffsetShell to tools [kafka]

2023-10-02 Thread via GitHub


ruslankrivoshein commented on PR #13562:
URL: https://github.com/apache/kafka/pull/13562#issuecomment-1742817052

   @fvaleri I think it's time to take a note about this changes in 
[KAFKA-14705](https://issues.apache.org/jira/browse/KAFKA-14705), isn't it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Logging fix in StreamsPartitionAssignor [kafka]

2023-10-02 Thread via GitHub


lucasbru commented on PR #14435:
URL: https://github.com/apache/kafka/pull/14435#issuecomment-1742776359

   > @lucasbru you set up your apache account already, right? I'm leaving it to 
you to merge but just let me know if you need me to do so
   
   Yes, done and backported


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14595 ReassignPartitionsIntegrationTest rewritten in java [kafka]

2023-10-02 Thread via GitHub


nizhikov commented on PR #14456:
URL: https://github.com/apache/kafka/pull/14456#issuecomment-1742776092

   CI is OK after code review changes.
   @jolshan do you have other comments?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Logging fix in StreamsPartitionAssignor [kafka]

2023-10-02 Thread via GitHub


lucasbru merged PR #14435:
URL: https://github.com/apache/kafka/pull/14435


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14034 Idempotent producer should wait for preceding in-flight b… [kafka]

2023-10-02 Thread via GitHub


urbandan commented on PR #13796:
URL: https://github.com/apache/kafka/pull/13796#issuecomment-1742583164

   @viktorsomogyi I would like to, yes, but my understanding is (based on the 
comments of @jolshan) that another loosely related bug blocks this fix. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15511: Catch CorruptIndexException instead of CorruptRecordException [kafka]

2023-10-02 Thread via GitHub


showuon commented on PR #14459:
URL: https://github.com/apache/kafka/pull/14459#issuecomment-1742572611

   Thanks for the fix @iit2009060 ! Nice find!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #14468: MINOR: Correcting Javadocs for throwIfMemberEpochIsInvalid

2023-10-02 Thread via GitHub


vamossagar12 commented on PR #14468:
URL: https://github.com/apache/kafka/pull/14468#issuecomment-1742509255

   @dajac , sorry about that. It was a checkstyle failure. I have corrected it 
and ran locally. Checkstyles passed now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org