apoorvmittal10 commented on code in PR #18571: URL: https://github.com/apache/kafka/pull/18571#discussion_r1918186242
########## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ########## @@ -3796,6 +3796,13 @@ public DescribeShareGroupsResult describeShareGroups(final Collection<String> gr .collect(Collectors.toMap(entry -> entry.getKey().idValue, Map.Entry::getValue))); } + // To do in a follow-up PR + @Override + public ListShareGroupOffsetsResult listShareGroupOffsets(final Map<String, ListShareGroupOffsetsSpec> groupSpecs, + final ListShareGroupOffsetsOptions options) { + // To-do + return null; Review Comment: nit: generally better to throw Exception that not yet implemented but it's fine for now as follow up PRs will be soon after. ########## clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryRequest.java: ########## @@ -34,7 +35,7 @@ public static class Builder extends AbstractRequest.Builder<ReadShareGroupStateS private final ReadShareGroupStateSummaryRequestData data; public Builder(ReadShareGroupStateSummaryRequestData data) { - this(data, false); + this(data, true); Review Comment: Should this be `true`? I see ShareFetchRequest or ShareAcknowledgeRequest still has `false` in enable unstable version and get's tackled at caller level with enable versions in server.properties. ########## clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsResult.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.clients.admin.internals.CoordinatorKey; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +/** + * The result of the {@link Admin#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)} call. + * <p> + * The API of this class is evolving, see {@link Admin} for details. Review Comment: What should one look for in Admin.java to get details about `evolving`? ########## clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsOptions.java: ########## @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; + +/** + * Options for {@link Admin#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)}. + * <p> + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class ListShareGroupOffsetsOptions extends AbstractOptions<ListShareGroupOffsetsOptions> { +} Review Comment: Empty line at end of new files. ########## clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsSpec.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.Objects; + +/** + * Specification of share group offsets to list using {@link Admin#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)}. + * <p> + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class ListShareGroupOffsetsSpec { + + private Collection<TopicPartition> topicPartitions; + + public ListShareGroupOffsetsSpec() { + topicPartitions = new ArrayList<>(); + } Review Comment: nit: What's the benefit of initializing by default? I don't see any `add` method to put topicPartition in collection rather a method to completely replace. ########## server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java: ########## @@ -288,7 +291,96 @@ public CompletableFuture<DeleteShareGroupStateResult> deleteState(DeleteShareGro * @return A completable future of ReadShareGroupStateSummaryResult */ public CompletableFuture<ReadShareGroupStateSummaryResult> readSummary(ReadShareGroupStateSummaryParameters request) { - throw new RuntimeException("not implemented"); + try { + validate(request); + } catch (Exception e) { + log.error("Unable to validate read state summary request", e); + return CompletableFuture.failedFuture(e); + } + GroupTopicPartitionData<PartitionIdLeaderEpochData> gtp = request.groupTopicPartitionData(); + String groupId = gtp.groupId(); + Map<Uuid, Map<Integer, CompletableFuture<ReadShareGroupStateSummaryResponse>>> futureMap = new HashMap<>(); + List<PersisterStateManager.ReadStateSummaryHandler> handlers = new ArrayList<>(); + + gtp.topicsData().forEach(topicData -> { + topicData.partitions().forEach(partitionData -> { + CompletableFuture<ReadShareGroupStateSummaryResponse> future = futureMap + .computeIfAbsent(topicData.topicId(), k -> new HashMap<>()) + .computeIfAbsent(partitionData.partition(), k -> new CompletableFuture<>()); + + handlers.add( + stateManager.new ReadStateSummaryHandler( + groupId, + topicData.topicId(), + partitionData.partition(), + partitionData.leaderEpoch(), + future, + null + ) + ); + }); + }); + + for (PersisterStateManager.PersisterStateManagerHandler handler : handlers) { + stateManager.enqueue(handler); + } + + // Combine all futures into a single CompletableFuture<Void> + CompletableFuture<Void> combinedFuture = CompletableFuture.allOf( + handlers.stream() + .map(PersisterStateManager.ReadStateSummaryHandler::result) + .toArray(CompletableFuture[]::new)); + + // Transform the combined CompletableFuture<Void> into CompletableFuture<ReadShareGroupStateResult> + return combinedFuture.thenApply(v -> readSummaryResponsesToResult(futureMap)); + } + + /** + * Takes in a list of COMPLETED futures and combines the results, + * taking care of errors if any, into a single ReadShareGroupStateSummaryResult + * + * @param futureMap - HashMap of {topic -> {part -> future}} Review Comment: Please do not use such abbreviations in javadoc. ```suggestion * @param futureMap - HashMap of {topic -> {partition -> future}} ``` ########## server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java: ########## @@ -730,6 +734,147 @@ protected RPCType rpcType() { } } + public class ReadStateSummaryHandler extends PersisterStateManagerHandler { + private final int leaderEpoch; + private final CompletableFuture<ReadShareGroupStateSummaryResponse> result; + private final BackoffManager readStateSummaryBackoff; + + public ReadStateSummaryHandler( + String groupId, + Uuid topicId, + int partition, + int leaderEpoch, + CompletableFuture<ReadShareGroupStateSummaryResponse> result, + long backoffMs, + long backoffMaxMs, + int maxRPCRetryAttempts, + Consumer<ClientResponse> onCompleteCallback + ) { + super(groupId, topicId, partition, backoffMs, backoffMaxMs, maxRPCRetryAttempts); + this.leaderEpoch = leaderEpoch; + this.result = result; + this.readStateSummaryBackoff = new BackoffManager(maxRPCRetryAttempts, backoffMs, backoffMaxMs); + } + + public ReadStateSummaryHandler( + String groupId, + Uuid topicId, + int partition, + int leaderEpoch, + CompletableFuture<ReadShareGroupStateSummaryResponse> result, + Consumer<ClientResponse> onCompleteCallback + ) { + this( + groupId, + topicId, + partition, + leaderEpoch, + result, + REQUEST_BACKOFF_MS, + REQUEST_BACKOFF_MAX_MS, + MAX_FIND_COORD_ATTEMPTS, + onCompleteCallback + ); + } + + @Override + protected String name() { + return "ReadStateSummaryHandler"; + } + + @Override + protected AbstractRequest.Builder<ReadShareGroupStateSummaryRequest> requestBuilder() { + throw new RuntimeException("Read Summary requests are batchable, hence individual requests not needed."); + } + + @Override + protected boolean isResponseForRequest(ClientResponse response) { + return response.requestHeader().apiKey() == ApiKeys.READ_SHARE_GROUP_STATE_SUMMARY; + } + + @Override + protected void handleRequestResponse(ClientResponse response) { Review Comment: Though not for this PR but just `handleResponse` method name seems better. ########## server-common/src/main/java/org/apache/kafka/server/share/persister/PersisterStateManager.java: ########## @@ -730,6 +734,147 @@ protected RPCType rpcType() { } } + public class ReadStateSummaryHandler extends PersisterStateManagerHandler { + private final int leaderEpoch; + private final CompletableFuture<ReadShareGroupStateSummaryResponse> result; + private final BackoffManager readStateSummaryBackoff; + + public ReadStateSummaryHandler( + String groupId, + Uuid topicId, + int partition, + int leaderEpoch, + CompletableFuture<ReadShareGroupStateSummaryResponse> result, + long backoffMs, + long backoffMaxMs, + int maxRPCRetryAttempts, + Consumer<ClientResponse> onCompleteCallback + ) { + super(groupId, topicId, partition, backoffMs, backoffMaxMs, maxRPCRetryAttempts); + this.leaderEpoch = leaderEpoch; + this.result = result; + this.readStateSummaryBackoff = new BackoffManager(maxRPCRetryAttempts, backoffMs, backoffMaxMs); + } + + public ReadStateSummaryHandler( + String groupId, + Uuid topicId, + int partition, + int leaderEpoch, + CompletableFuture<ReadShareGroupStateSummaryResponse> result, + Consumer<ClientResponse> onCompleteCallback + ) { + this( + groupId, + topicId, + partition, + leaderEpoch, + result, + REQUEST_BACKOFF_MS, + REQUEST_BACKOFF_MAX_MS, + MAX_FIND_COORD_ATTEMPTS, + onCompleteCallback + ); + } + + @Override + protected String name() { + return "ReadStateSummaryHandler"; + } + + @Override + protected AbstractRequest.Builder<ReadShareGroupStateSummaryRequest> requestBuilder() { + throw new RuntimeException("Read Summary requests are batchable, hence individual requests not needed."); Review Comment: Should it be RuntimeException or something more specific? ########## server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java: ########## @@ -288,7 +291,96 @@ public CompletableFuture<DeleteShareGroupStateResult> deleteState(DeleteShareGro * @return A completable future of ReadShareGroupStateSummaryResult */ public CompletableFuture<ReadShareGroupStateSummaryResult> readSummary(ReadShareGroupStateSummaryParameters request) { - throw new RuntimeException("not implemented"); + try { + validate(request); + } catch (Exception e) { + log.error("Unable to validate read state summary request", e); Review Comment: Should we have error log for this failure, is this error concerning? ########## clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryRequest.java: ########## @@ -83,8 +84,24 @@ public ReadShareGroupStateSummaryRequestData data() { public static ReadShareGroupStateSummaryRequest parse(ByteBuffer buffer, short version) { return new ReadShareGroupStateSummaryRequest( - new ReadShareGroupStateSummaryRequestData(new ByteBufferAccessor(buffer), version), - version + new ReadShareGroupStateSummaryRequestData(new ByteBufferAccessor(buffer), version), + version ); } + + public static List<ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult> getErrorList( + List<Uuid> topicIds, + Errors error + ) { + return topicIds.stream() + .map(topicId -> + new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(topicId) + .setPartitions(List.of( + new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setErrorCode(error.code()) + .setErrorMessage(error.exceptionName()) Review Comment: Why do we want to return the class name of the exception? Also shouldn't the actual exception be passed here to get the correct error message? ########## clients/src/main/java/org/apache/kafka/clients/admin/ListShareGroupOffsetsSpec.java: ########## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Map; +import java.util.Objects; + +/** + * Specification of share group offsets to list using {@link Admin#listShareGroupOffsets(Map, ListShareGroupOffsetsOptions)}. + * <p> + * The API of this class is evolving, see {@link Admin} for details. + */ +@InterfaceStability.Evolving +public class ListShareGroupOffsetsSpec { + + private Collection<TopicPartition> topicPartitions; + + public ListShareGroupOffsetsSpec() { + topicPartitions = new ArrayList<>(); + } + + /** + * Set the topic partitions whose offsets are to be listed for a share group. + */ + public ListShareGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topicPartitions) { + this.topicPartitions = topicPartitions; + return this; + } + + /** + * Returns the topic partitions whose offsets are to be listed for a share group. + */ + public Collection<TopicPartition> topicPartitions() { + return topicPartitions; Review Comment: In case you remove the default initialization then `return topicPartitions == null ? Collections.emptyList()/List.of() : topicPartitions;` ########## server-common/src/main/java/org/apache/kafka/server/share/persister/DefaultStatePersister.java: ########## @@ -288,7 +291,96 @@ public CompletableFuture<DeleteShareGroupStateResult> deleteState(DeleteShareGro * @return A completable future of ReadShareGroupStateSummaryResult */ public CompletableFuture<ReadShareGroupStateSummaryResult> readSummary(ReadShareGroupStateSummaryParameters request) { - throw new RuntimeException("not implemented"); + try { + validate(request); + } catch (Exception e) { + log.error("Unable to validate read state summary request", e); + return CompletableFuture.failedFuture(e); + } + GroupTopicPartitionData<PartitionIdLeaderEpochData> gtp = request.groupTopicPartitionData(); Review Comment: nit: line break above would be good. ########## clients/src/main/java/org/apache/kafka/common/requests/ReadShareGroupStateSummaryResponse.java: ########## @@ -59,9 +61,52 @@ public int throttleTimeMs() { public void maybeSetThrottleTimeMs(int throttleTimeMs) { // No op } + public static ReadShareGroupStateSummaryResponse parse(ByteBuffer buffer, short version) { return new ReadShareGroupStateSummaryResponse( - new ReadShareGroupStateSummaryResponseData(new ByteBufferAccessor(buffer), version) + new ReadShareGroupStateSummaryResponseData(new ByteBufferAccessor(buffer), version) ); } + + public static ReadShareGroupStateSummaryResponseData toErrorResponseData(Uuid topicId, int partitionId, Errors error, String errorMessage) { + return new ReadShareGroupStateSummaryResponseData().setResults( + List.of(new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(topicId) + .setPartitions(List.of(new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(partitionId) + .setErrorCode(error.code()) + .setErrorMessage(errorMessage))))); + } + + public static ReadShareGroupStateSummaryResponseData.PartitionResult toErrorResponsePartitionResult(int partitionId, Errors error, String errorMessage) { + return new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(partitionId) + .setErrorCode(error.code()) + .setErrorMessage(errorMessage); + } + + public static ReadShareGroupStateSummaryResponseData toResponseData( + Uuid topicId, + int partition, + long startOffset, + int stateEpoch + ) { + return new ReadShareGroupStateSummaryResponseData() + .setResults(List.of( + new ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult() + .setTopicId(topicId) + .setPartitions(List.of( + new ReadShareGroupStateSummaryResponseData.PartitionResult() + .setPartition(partition) + .setStartOffset(startOffset) + .setStateEpoch(stateEpoch) + )) + )); + } + + public static ReadShareGroupStateSummaryResponseData.ReadStateSummaryResult toResponseReadStateSummaryResult(Uuid topicId, List<ReadShareGroupStateSummaryResponseData.PartitionResult> partitionResults) { Review Comment: Might want to break in 2 lines, seems too big for single line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org