apoorvmittal10 commented on code in PR #18571: URL: https://github.com/apache/kafka/pull/18571#discussion_r1918991457
########## clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.clients.admin.internals.CoordinatorKey; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * The result of the {@link Admin#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)} call. + * <p> + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class ListShareGroupOffsetsResult { + + private final Map<String, KafkaFuture<Map<TopicPartition, Long>>> futures; + + public ListShareGroupOffsetsResult(final Map<CoordinatorKey, KafkaFuture<Map<TopicPartition, Long>>> futures) { + this.futures = futures.entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey().idValue, Map.Entry::getValue)); + } + + /** + * Return a future which yields all Map<String, Map<TopicPartition, Long> objects, if requests for all the groups succeed. + */ Review Comment: nit: Would be better to write the comments as like javadoc i.e. @return, @param? Same for other method and classes. ########## clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsSpec.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +/** + * Specification of share group offsets to list using {@link Admin#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)}. + * <p> + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class ListShareGroupOffsetsSpec { + + private Collection<TopicPartition> topicPartitions; + + public ListShareGroupOffsetsSpec() { + } + + /** + * Set the topic partitions whose offsets are to be listed for a share group. + */ + public ListShareGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topicPartitions) { + this.topicPartitions = topicPartitions; + return this; + } + + /** + * Returns the topic partitions whose offsets are to be listed for a share group. + */ + public Collection<TopicPartition> topicPartitions() { + return topicPartitions == null ? Collections.emptyList() : topicPartitions; Review Comment: Just check if there are no xlint warnings as `Collections.emptyList()` has unchecked suppression so using List.of() might be better. In case there are no warnings then you can have either. ```suggestion return topicPartitions == null ? List.of() : topicPartitions; ``` ########## server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java: ########## @@ -730,6 +734,147 @@ protected RPCType rpcType() { } } + public class ReadStateSummaryHandler extends PersisterStateManagerHandler { + private final int leaderEpoch; + private final CompletableFuture<ReadShareGroupStateSummaryResponse> result; + private final BackoffManager readStateSummaryBackoff; + + public ReadStateSummaryHandler( + String groupId, + Uuid topicId, + int partition, + int leaderEpoch, + CompletableFuture<ReadShareGroupStateSummaryResponse> result, + long backoffMs, + long backoffMaxMs, + int maxRPCRetryAttempts, + Consumer<ClientResponse> onCompleteCallback + ) { + super(groupId, topicId, partition, backoffMs, backoffMaxMs, maxRPCRetryAttempts); + this.leaderEpoch = leaderEpoch; + this.result = result; + this.readStateSummaryBackoff = new BackoffManager(maxRPCRetryAttempts, backoffMs, backoffMaxMs); + } + + public ReadStateSummaryHandler( + String groupId, + Uuid topicId, + int partition, + int leaderEpoch, + CompletableFuture<ReadShareGroupStateSummaryResponse> result, + Consumer<ClientResponse> onCompleteCallback + ) { + this( + groupId, + topicId, + partition, + leaderEpoch, + result, + REQUEST_BACKOFF_MS, + REQUEST_BACKOFF_MAX_MS, + MAX_FIND_COORD_ATTEMPTS, + onCompleteCallback + ); + } + + @Override + protected String name() { + return "ReadStateSummaryHandler"; + } + + @Override + protected AbstractRequest.Builder<ReadShareGroupStateSummaryRequest> requestBuilder() { + throw new RuntimeException("Read Summary requests are batchable, hence individual requests not needed."); + } + + @Override + protected boolean isResponseForRequest(ClientResponse response) { + return response.requestHeader().apiKey() == ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY; + } + + @Override + protected void handleRequestResponse(ClientResponse response) { + log.debug("Read state summary response received - {}", response); + readStateSummaryBackoff.incrementAttempt(); + + ReadShareGroupStateSummaryResponse combinedResponse = (ReadShareGroupStateSummaryResponse) response.responseBody(); + for (ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult readStateSummaryResult : combinedResponse.data().results()) { + if (readStateSummaryResult.topicId().equals(partitionKey().topicId())) { + Optional<ReadShareGroupStateSummaryResponseData.PartitionResult> partitionStateData = + readStateSummaryResult.partitions().stream().filter(partitionResult -> partitionResult.partition() == partitionKey().partition()) + .findFirst(); + + if (partitionStateData.isPresent()) { + Errors error = Errors.forCode(partitionStateData.get().errorCode()); + switch (error) { + case NONE: + readStateSummaryBackoff.resetAttempts(); + ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult result = ReadShareGroupStateSummaryResponse.toResponseReadStateSummaryResult( + partitionKey().topicId(), + Collections.singletonList(partitionStateData.get()) + ); + this.result.complete(new ReadShareGroupStateSummaryResponse(new ReadShareGroupStateSummaryResponseData() + .setResults(Collections.singletonList(result)))); + return; + + // check retriable errors + case COORDINATOR_NOT_AVAILABLE: + case COORDINATOR_LOAD_IN_PROGRESS: + case NOT_COORDINATOR: + log.warn("Received retriable error in read state summary RPC for key {}: {}", partitionKey(), error.message()); + if (!readStateSummaryBackoff.canAttempt()) { + log.error("Exhausted max retries for read state summary RPC for key {} without success.", partitionKey()); + readStateSummaryErrorReponse(error, new Exception("Exhausted max retries to complete read state summary RPC without success.")); + return; + } + super.resetCoordinatorNode(); + timer.add(new PersisterTimerTask(readStateSummaryBackoff.backOff(), this)); + return; + + default: + log.error("Unable to perform read state summary RPC for key {}: {}", partitionKey(), error.message()); + readStateSummaryErrorReponse(error, null); + return; + } + } + } + } + + // no response found specific topic partition + IllegalStateException exception = new IllegalStateException( + "Failed to read state summary for share partition " + partitionKey() + ); + readStateSummaryErrorReponse(Errors.forException(exception), exception); + } + + protected void readStateSummaryErrorReponse(Errors error, Exception exception) { + this.result.complete(new ReadShareGroupStateSummaryResponse( + ReadShareGroupStateSummaryResponse.toErrorResponseData(partitionKey().topicId(), partitionKey().partition(), error, "Error in read state summary RPC. " + + (exception == null ? error.message() : exception.getMessage())))); + } + + @Override + protected void findCoordinatorErrorResponse(Errors error, Exception exception) { + this.result.complete(new ReadShareGroupStateSummaryResponse( + ReadShareGroupStateSummaryResponse.toErrorResponseData(partitionKey().topicId(), partitionKey().partition(), error, "Error in find coordinator. " + + (exception == null ? error.message() : exception.getMessage())))); + } Review Comment: Why do we need method `findCoordinatorErrorResponse` in ReadStateSummaryHandler? ########## clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryRequest.java: ########## @@ -34,7 +35,7 @@ public static class Builder extends AbstractRequest.Builder<ReadShareGroupStateS private final ReadShareGroupStateSummaryRequestData data; public Builder(ReadShareGroupStateSummaryRequestData data) { - this(data, false); + this(data, true); Review Comment: Ok, if it's happening at other places but you can use other constructor as well where you can explicilty pass `true` for `enableUnstableLastVersion` while creating the request builder. Anyways the `enableUnstableLastVersion` for Persister won't be for long hence I leave it on you. But if we default to true then is there even any point of making these RPCs unstable in json? ########## clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsSpec.java: ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; + +/** + * Specification of share group offsets to list using {@link Admin#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)}. + * <p> + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class ListShareGroupOffsetsSpec { + + private Collection<TopicPartition> topicPartitions; + + public ListShareGroupOffsetsSpec() { + } Review Comment: Should we just remove this default constructor, it's adding no value? ########## server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java: ########## @@ -730,6 +734,147 @@ protected RPCType rpcType() { } } + public class ReadStateSummaryHandler extends PersisterStateManagerHandler { + private final int leaderEpoch; + private final CompletableFuture<ReadShareGroupStateSummaryResponse> result; + private final BackoffManager readStateSummaryBackoff; + + public ReadStateSummaryHandler( + String groupId, + Uuid topicId, + int partition, + int leaderEpoch, + CompletableFuture<ReadShareGroupStateSummaryResponse> result, + long backoffMs, + long backoffMaxMs, + int maxRPCRetryAttempts, + Consumer<ClientResponse> onCompleteCallback + ) { + super(groupId, topicId, partition, backoffMs, backoffMaxMs, maxRPCRetryAttempts); + this.leaderEpoch = leaderEpoch; + this.result = result; + this.readStateSummaryBackoff = new BackoffManager(maxRPCRetryAttempts, backoffMs, backoffMaxMs); + } + + public ReadStateSummaryHandler( + String groupId, + Uuid topicId, + int partition, + int leaderEpoch, + CompletableFuture<ReadShareGroupStateSummaryResponse> result, + Consumer<ClientResponse> onCompleteCallback + ) { + this( + groupId, + topicId, + partition, + leaderEpoch, + result, + REQUEST_BACKOFF_MS, + REQUEST_BACKOFF_MAX_MS, + MAX_FIND_COORD_ATTEMPTS, + onCompleteCallback + ); + } + + @Override + protected String name() { + return "ReadStateSummaryHandler"; + } + + @Override + protected AbstractRequest.Builder<ReadShareGroupStateSummaryRequest> requestBuilder() { + throw new RuntimeException("Read Summary requests are batchable, hence individual requests not needed."); + } + + @Override + protected boolean isResponseForRequest(ClientResponse response) { + return response.requestHeader().apiKey() == ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY; + } + + @Override + protected void handleRequestResponse(ClientResponse response) { + log.debug("Read state summary response received - {}", response); + readStateSummaryBackoff.incrementAttempt(); + + ReadShareGroupStateSummaryResponse combinedResponse = (ReadShareGroupStateSummaryResponse) response.responseBody(); + for (ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult readStateSummaryResult : combinedResponse.data().results()) { + if (readStateSummaryResult.topicId().equals(partitionKey().topicId())) { + Optional<ReadShareGroupStateSummaryResponseData.PartitionResult> partitionStateData = + readStateSummaryResult.partitions().stream().filter(partitionResult -> partitionResult.partition() == partitionKey().partition()) + .findFirst(); + + if (partitionStateData.isPresent()) { + Errors error = Errors.forCode(partitionStateData.get().errorCode()); + switch (error) { + case NONE: + readStateSummaryBackoff.resetAttempts(); + ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult result = ReadShareGroupStateSummaryResponse.toResponseReadStateSummaryResult( + partitionKey().topicId(), + Collections.singletonList(partitionStateData.get()) + ); + this.result.complete(new ReadShareGroupStateSummaryResponse(new ReadShareGroupStateSummaryResponseData() + .setResults(Collections.singletonList(result)))); + return; + + // check retriable errors + case COORDINATOR_NOT_AVAILABLE: + case COORDINATOR_LOAD_IN_PROGRESS: + case NOT_COORDINATOR: + log.warn("Received retriable error in read state summary RPC for key {}: {}", partitionKey(), error.message()); + if (!readStateSummaryBackoff.canAttempt()) { + log.error("Exhausted max retries for read state summary RPC for key {} without success.", partitionKey()); + readStateSummaryErrorReponse(error, new Exception("Exhausted max retries to complete read state summary RPC without success.")); + return; + } + super.resetCoordinatorNode(); + timer.add(new PersisterTimerTask(readStateSummaryBackoff.backOff(), this)); + return; + + default: + log.error("Unable to perform read state summary RPC for key {}: {}", partitionKey(), error.message()); + readStateSummaryErrorReponse(error, null); + return; + } + } + } + } + + // no response found specific topic partition + IllegalStateException exception = new IllegalStateException( + "Failed to read state summary for share partition " + partitionKey() + ); + readStateSummaryErrorReponse(Errors.forException(exception), exception); Review Comment: You can just pass `exception` and can convert to Error in the method itself. Don't see any value to pass both explicitly. ########## server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java: ########## @@ -969,5 +1116,27 @@ private static AbstractRequest.Builder<? extends AbstractRequest> coalesceReads( .setPartitions(entry.getValue())) .collect(Collectors.toList()))); } + + private static AbstractRequest.Builder<? extends AbstractRequest> coalesceReadSummarys(String groupId, List<? extends PersisterStateManagerHandler> handlers) { + Map<Uuid, List<ReadShareGroupStateSummaryRequestData.PartitionData>> partitionData = new HashMap<>(); + handlers.forEach(persHandler -> { Review Comment: What do we mean by `pers` here? ########## clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryRequest.java: ########## @@ -83,8 +84,24 @@ public ReadShareGroupStateSummaryRequestData data() { public static ReadShareGroupStateSummaryRequest parse(ByteBuffer buffer, short version) { return new ReadShareGroupStateSummaryRequest( - new ReadShareGroupStateSummaryRequestData(new ByteBufferAccessor(buffer), version), - version + new ReadShareGroupStateSummaryRequestData(new ByteBufferAccessor(buffer), version), + version ); } + + public static List<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult> getErrorList( + List<Uuid> topicIds, + Errors error + ) { + return topicIds.stream() + .map(topicId -> + new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(topicId) + .setPartitions(List.of( + new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setErrorCode(error.code()) + .setErrorMessage(error.message()) Review Comment: This will give default Error message and original exception message will be lost, do you want that behaviour? If yes, then please write a comment here else please fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org