AndrewJSchofield commented on code in PR #19781:
URL: https://github.com/apache/kafka/pull/19781#discussion_r2101873097


##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -772,11 +772,11 @@ private 
Optional<CoordinatorResult<WriteShareGroupStateResponseData, Coordinator
 
         SharePartitionKey mapKey = SharePartitionKey.getInstance(groupId, 
topicId, partitionId);
         if (partitionData.leaderEpoch() != -1 && 
leaderEpochMap.containsKey(mapKey) && leaderEpochMap.get(mapKey) > 
partitionData.leaderEpoch()) {
-            log.error("Request leader epoch smaller than last recorded.");
+            log.error("Request leader epoch smaller than last recorded 
current: {}, requested:{}.", leaderEpochMap.get(mapKey), 
partitionData.leaderEpoch());

Review Comment:
   Tiny nit: `requested: {}.` (extra space for consistency)



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -915,7 +915,7 @@ private 
Optional<CoordinatorResult<InitializeShareGroupStateResponseData, Coordi
 
         SharePartitionKey key = 
SharePartitionKey.getInstance(request.groupId(), topicId, partitionId);
         if (partitionData.stateEpoch() != -1 && stateEpochMap.containsKey(key) 
&& stateEpochMap.get(key) > partitionData.stateEpoch()) {
-            log.error("Initialize request state epoch smaller than last 
recorded.");
+            log.info("Initialize request state epoch smaller than last 
recorded current: {}, requested:{}.", stateEpochMap.get(key), 
partitionData.stateEpoch());

Review Comment:
   `requested: {}.`



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2653,7 +2641,7 @@ private boolean initializedAssignmentPending(ShareGroup 
group) {
         }
 
         // We need to check if all the group initialized share partitions are 
part of the group assignment.
-        Map<Uuid, Set<Integer>> initializedTps = 
shareGroupPartitionMetadata.get(group.groupId()).initializedTopics();
+        Map<Uuid, Set<Integer>> initializedTps = 
stripInitValue(shareGroupPartitionMetadata.get(group.groupId()).initializedTopics());

Review Comment:
   nit: There is a mismatch in naming here. The record is 
`ShareGroupStatePartitionMetadataInfo` while the timeline hashmap is 
`shareGroupPartitionMetadata`. I wonder if `shareGroupStatePartitionMetadata` 
would be better.



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -814,7 +814,7 @@ private Optional<ReadShareGroupStateResponseData> 
maybeGetReadStateError(ReadSha
 
         SharePartitionKey mapKey = SharePartitionKey.getInstance(groupId, 
topicId, partitionId);
         if (leaderEpochMap.containsKey(mapKey) && leaderEpochMap.get(mapKey) > 
partitionData.leaderEpoch()) {
-            log.error("Request leader epoch id is smaller than last 
recorded.");
+            log.error("Request leader epoch id is smaller than last recorded 
current: {}, requested:{}.", leaderEpochMap.get(mapKey), 
partitionData.leaderEpoch());

Review Comment:
   For consistency, `Request leader epoch smaller than`.... 



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -772,11 +772,11 @@ private 
Optional<CoordinatorResult<WriteShareGroupStateResponseData, Coordinator
 
         SharePartitionKey mapKey = SharePartitionKey.getInstance(groupId, 
topicId, partitionId);
         if (partitionData.leaderEpoch() != -1 && 
leaderEpochMap.containsKey(mapKey) && leaderEpochMap.get(mapKey) > 
partitionData.leaderEpoch()) {
-            log.error("Request leader epoch smaller than last recorded.");
+            log.error("Request leader epoch smaller than last recorded 
current: {}, requested:{}.", leaderEpochMap.get(mapKey), 
partitionData.leaderEpoch());
             return 
Optional.of(getWriteErrorCoordinatorResult(Errors.FENCED_LEADER_EPOCH, null, 
topicId, partitionId));
         }
         if (partitionData.stateEpoch() != -1 && 
stateEpochMap.containsKey(mapKey) && stateEpochMap.get(mapKey) > 
partitionData.stateEpoch()) {
-            log.error("Request state epoch smaller than last recorded.");
+            log.info("Request state epoch smaller than last recorded current: 
{}, requested:{}.", stateEpochMap.get(mapKey), partitionData.stateEpoch());

Review Comment:
   And here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to