dajac commented on code in PR #17580:
URL: https://github.com/apache/kafka/pull/17580#discussion_r1870826093


##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -419,37 +419,54 @@ public CompletableFuture<ReadShareGroupStateResponseData> 
readState(RequestConte
         // be looping over the keys below and constructing new 
ReadShareGroupStateRequestData objects to pass
         // onto the shard method.
 
-        request.topics().forEach(topicData -> {
+        // It is possible that a read state request contains a leaderEpoch 
which is the higher than seen so
+        // far for a specific share partition. Hence, for each read request - 
we must issue a write state request
+        // as well so that the share state is up-to-date.
+
+        for (ReadShareGroupStateRequestData.ReadStateData topicData : 
request.topics()) {
             Uuid topicId = topicData.topicId();
-            topicData.partitions().forEach(partitionData -> {
-                // Request object containing information of a single topic 
partition
-                ReadShareGroupStateRequestData requestForCurrentPartition = 
new ReadShareGroupStateRequestData()
-                    .setGroupId(groupId)
-                    .setTopics(Collections.singletonList(new 
ReadShareGroupStateRequestData.ReadStateData()
-                        .setTopicId(topicId)
-                        
.setPartitions(Collections.singletonList(partitionData))));
+            for (ReadShareGroupStateRequestData.PartitionData partitionData : 
topicData.partitions()) {
                 SharePartitionKey coordinatorKey = 
SharePartitionKey.getInstance(request.groupId(), topicId, 
partitionData.partition());
-                // Scheduling a runtime read operation to read share partition 
state from the coordinator in memory state
-                CompletableFuture<ReadShareGroupStateResponseData> future = 
runtime.scheduleReadOperation(
-                    "read-share-group-state",
-                    topicPartitionFor(coordinatorKey),
-                    (coordinator, offset) -> 
coordinator.readState(requestForCurrentPartition, offset)
-                ).exceptionally(exception -> handleOperationException(
-                    "read-share-group-state",
-                    request,
-                    exception,
-                    (error, message) -> 
ReadShareGroupStateResponse.toErrorResponseData(
-                        topicData.topicId(),
-                        partitionData.partition(),
-                        error,
-                        "Unable to read share group state: " + 
exception.getMessage()
-                    ),
-                    log
-                ));
+
+                CompletableFuture<ReadShareGroupStateResponseData> readFuture 
= maybeUpdateLeaderEpoch(
+                    coordinatorKey,
+                    partitionData
+                ).thenCompose(result -> {
+                    // maybeUpdateLeaderEpoch should not deliberately throw an 
exception. Possible
+                    // return value in the future returned from it could be:
+                    // - An empty read state response (does not contain any 
results) => we should proceed with read
+                    // - A read state response which contains some error 
information (contains results data) => we should forward the error
+                    if (!result.results().isEmpty()) {
+                        return CompletableFuture.completedFuture(result);
+                    }
+                    ReadShareGroupStateRequestData requestForCurrentPartition 
= new ReadShareGroupStateRequestData()
+                        .setGroupId(groupId)
+                        .setTopics(Collections.singletonList(new 
ReadShareGroupStateRequestData.ReadStateData()
+                            .setTopicId(topicId)
+                            
.setPartitions(Collections.singletonList(partitionData))));
+                    // Scheduling a runtime read operation to read share 
partition state from the coordinator in memory state
+                    return runtime.scheduleReadOperation(
+                        "read-share-group-state",
+                        topicPartitionFor(coordinatorKey),
+                        (coordinator, offset) -> 
coordinator.readState(requestForCurrentPartition, offset)

Review Comment:
   It is a slightly orthogonal question but is it safe to read the state at the 
last committed offset here? This means that you can potentially go back to the 
state at the committed offset after a share leader failover instead of starting 
from the last published state. Think about the following:
   * share leader writes new state at offset 20. this is is not committed yet.
   * share leader fails over to another replica. it reads the state based on 
the last committed offset, say 10 in this case.
   * offset 20 is committed afterwards.
   
   I am not sure whether this is a realistic scenario or whether they are other 
cases where we write and the read. If we want a strong read after write 
consistency, it may be better to use a write operation to read too because it 
guarantees the ordering. If you do something like this, you could also combine 
updating the leader epoch in the same operation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to