junrao commented on code in PR #16842:
URL: https://github.com/apache/kafka/pull/16842#discussion_r1724082805


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -1497,17 +1535,29 @@ private long findLastOffsetAcknowledged() {
     }
 
     // Visible for testing
-    boolean isWriteShareGroupStateSuccessful(List<PersisterStateBatch> 
stateBatches) {
+    boolean isWriteShareGroupStateSuccessful(List<PersisterStateBatch> 
stateBatches, boolean isRetry) {
         WriteShareGroupStateResult response;
         try {
             response = persister.writeState(new 
WriteShareGroupStateParameters.Builder()
                 .setGroupTopicPartitionData(new 
GroupTopicPartitionData.Builder<PartitionStateBatchData>()
                     .setGroupId(this.groupId)
                     .setTopicsData(Collections.singletonList(new 
TopicData<>(topicIdPartition.topicId(),
                         
Collections.singletonList(PartitionFactory.newPartitionStateBatchData(
-                            topicIdPartition.partition(), stateEpoch, 
startOffset, 0, stateBatches))))
+                            topicIdPartition.partition(), stateEpoch, 
startOffset, leaderEpoch, stateBatches))))
                     ).build()).build()).get();
         } catch (InterruptedException | ExecutionException e) {
+            if (e.getCause() != null && e.getCause() instanceof 
FencedLeaderEpochException) {
+                if (isRetry) {
+                    log.error("Fenced leader exception occurred for the share 
partition: {}-{}. Re-try failed,"
+                        + " current leader epoch: {}", groupId, 
topicIdPartition, leaderEpoch, e);
+                    return false;
+                }
+                log.info("Fenced leader exception occurred for the share 
partition: {}-{}. Re-fetch partition"
+                    + " leader epoch, current leader epoch: {}", groupId, 
topicIdPartition, leaderEpoch);
+                leaderEpoch = 
getLeaderEpoch(topicIdPartition.topicPartition());
+                // Retry the write state operation.
+                return isWriteShareGroupStateSuccessful(stateBatches, true);

Review Comment:
   This doesn't look right. ShareFetch can only be served at the leader. If 
this broker is not the leader, leaderEpoch is -1. In that case, we need to send 
a NOT_LEADER_OR_FOLLOWER error to the client instead of retrying.
   
   The easiest thing on receiving FencedLeaderEpochException is probably to 
always send an error back to the client and mark the SharePartition as invalid. 
This way, if the broker becomes the leader again, a new SharePartition will be 
created and trigger the initialization of group state from the share 
coordinator. 
   
   We probably also want to check if the broker is still the leader on each 
shareFetch/shareAck request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to