dajac commented on code in PR #17580:
URL: https://github.com/apache/kafka/pull/17580#discussion_r1870826093
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -419,37 +419,54 @@ public CompletableFuture<ReadShareGroupStateResponseData>
readState(RequestConte
// be looping over the keys below and constructing new
ReadShareGroupStateRequestData objects to pass
// onto the shard method.
- request.topics().forEach(topicData -> {
+ // It is possible that a read state request contains a leaderEpoch
which is the higher than seen so
+ // far for a specific share partition. Hence, for each read request -
we must issue a write state request
+ // as well so that the share state is up-to-date.
+
+ for (ReadShareGroupStateRequestData.ReadStateData topicData :
request.topics()) {
Uuid topicId = topicData.topicId();
- topicData.partitions().forEach(partitionData -> {
- // Request object containing information of a single topic
partition
- ReadShareGroupStateRequestData requestForCurrentPartition =
new ReadShareGroupStateRequestData()
- .setGroupId(groupId)
- .setTopics(Collections.singletonList(new
ReadShareGroupStateRequestData.ReadStateData()
- .setTopicId(topicId)
-
.setPartitions(Collections.singletonList(partitionData))));
+ for (ReadShareGroupStateRequestData.PartitionData partitionData :
topicData.partitions()) {
SharePartitionKey coordinatorKey =
SharePartitionKey.getInstance(request.groupId(), topicId,
partitionData.partition());
- // Scheduling a runtime read operation to read share partition
state from the coordinator in memory state
- CompletableFuture<ReadShareGroupStateResponseData> future =
runtime.scheduleReadOperation(
- "read-share-group-state",
- topicPartitionFor(coordinatorKey),
- (coordinator, offset) ->
coordinator.readState(requestForCurrentPartition, offset)
- ).exceptionally(exception -> handleOperationException(
- "read-share-group-state",
- request,
- exception,
- (error, message) ->
ReadShareGroupStateResponse.toErrorResponseData(
- topicData.topicId(),
- partitionData.partition(),
- error,
- "Unable to read share group state: " +
exception.getMessage()
- ),
- log
- ));
+
+ CompletableFuture<ReadShareGroupStateResponseData> readFuture
= maybeUpdateLeaderEpoch(
+ coordinatorKey,
+ partitionData
+ ).thenCompose(result -> {
+ // maybeUpdateLeaderEpoch should not deliberately throw an
exception. Possible
+ // return value in the future returned from it could be:
+ // - An empty read state response (does not contain any
results) => we should proceed with read
+ // - A read state response which contains some error
information (contains results data) => we should forward the error
+ if (!result.results().isEmpty()) {
+ return CompletableFuture.completedFuture(result);
+ }
+ ReadShareGroupStateRequestData requestForCurrentPartition
= new ReadShareGroupStateRequestData()
+ .setGroupId(groupId)
+ .setTopics(Collections.singletonList(new
ReadShareGroupStateRequestData.ReadStateData()
+ .setTopicId(topicId)
+
.setPartitions(Collections.singletonList(partitionData))));
+ // Scheduling a runtime read operation to read share
partition state from the coordinator in memory state
+ return runtime.scheduleReadOperation(
+ "read-share-group-state",
+ topicPartitionFor(coordinatorKey),
+ (coordinator, offset) ->
coordinator.readState(requestForCurrentPartition, offset)
Review Comment:
It is a slightly orthogonal question but is it safe to read the state at the
last committed offset here? This means that you can potentially go back to the
state at the committed offset after a share leader failover instead of starting
from the last published state. Think about the following:
* share leader writes new state at offset 20. this is is not committed yet.
* share leader fails over to another replica. it reads the state based on
the last committed offset, say 10 in this case.
* offset 20 is committed afterwards.
I am not sure whether this is a realistic scenario or whether they are other
cases where we write and the read. If we want a strong read after write
consistency, it may be better to use a write operation to read too because it
guarantees the ordering. If you do something like this, you could also combine
updating the leader epoch in the same operation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]