junrao commented on code in PR #16842:
URL: https://github.com/apache/kafka/pull/16842#discussion_r1724082805
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1497,17 +1535,29 @@ private long findLastOffsetAcknowledged() {
}
// Visible for testing
- boolean isWriteShareGroupStateSuccessful(List<PersisterStateBatch>
stateBatches) {
+ boolean isWriteShareGroupStateSuccessful(List<PersisterStateBatch>
stateBatches, boolean isRetry) {
WriteShareGroupStateResult response;
try {
response = persister.writeState(new
WriteShareGroupStateParameters.Builder()
.setGroupTopicPartitionData(new
GroupTopicPartitionData.Builder<PartitionStateBatchData>()
.setGroupId(this.groupId)
.setTopicsData(Collections.singletonList(new
TopicData<>(topicIdPartition.topicId(),
Collections.singletonList(PartitionFactory.newPartitionStateBatchData(
- topicIdPartition.partition(), stateEpoch,
startOffset, 0, stateBatches))))
+ topicIdPartition.partition(), stateEpoch,
startOffset, leaderEpoch, stateBatches))))
).build()).build()).get();
} catch (InterruptedException | ExecutionException e) {
+ if (e.getCause() != null && e.getCause() instanceof
FencedLeaderEpochException) {
+ if (isRetry) {
+ log.error("Fenced leader exception occurred for the share
partition: {}-{}. Re-try failed,"
+ + " current leader epoch: {}", groupId,
topicIdPartition, leaderEpoch, e);
+ return false;
+ }
+ log.info("Fenced leader exception occurred for the share
partition: {}-{}. Re-fetch partition"
+ + " leader epoch, current leader epoch: {}", groupId,
topicIdPartition, leaderEpoch);
+ leaderEpoch =
getLeaderEpoch(topicIdPartition.topicPartition());
+ // Retry the write state operation.
+ return isWriteShareGroupStateSuccessful(stateBatches, true);
Review Comment:
This doesn't look right. ShareFetch can only be served at the leader. If
this broker is not the leader, leaderEpoch is -1. In that case, we need to send
a NOT_LEADER_OR_FOLLOWER error to the client instead of retrying.
The easiest thing on receiving FencedLeaderEpochException is probably to
always send an error back to the client and mark the SharePartition as invalid.
This way, if the broker becomes the leader again, a new SharePartition will be
created and trigger the initialization of group state from the share
coordinator.
We probably also want to check if the broker is still the leader on each
shareFetch/shareAck request.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]