kirktrue commented on code in PR #21457:
URL: https://github.com/apache/kafka/pull/21457#discussion_r2802301895
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java:
##########
@@ -277,6 +277,28 @@ void updateSubscriptionState(Map<TopicPartition,
OffsetFetcherUtils.ListOffsetDa
log.trace("Updating high watermark for partition {} to
{}", partition, offset);
subscriptionState.updateHighWatermark(partition, offset);
}
+ } else {
+ if (isolationLevel == IsolationLevel.READ_COMMITTED) {
+ log.warn("Not updating last stable offset for partition {}
as it is no longer assigned", partition);
+ } else {
+ log.warn("Not updating high watermark for partition {} as
it is no longer assigned", partition);
+ }
+ }
+ }
+ }
+
+ /**
+ * If any of the given partitions are assigned, this will clear the
partition's 'end offset requested' flag so
+ * that the next attempt to look up the lag will properly issue another
<code>LIST_OFFSETS</code> request. This
+ * is only intended to be called when <code>LIST_OFFSETS</code> fails.
Successful <code>LIST_OFFSETS</code> calls
+ * should use {@link #updateSubscriptionState(Map, IsolationLevel)}.
+ *
+ * @param partitions Partitions for which the 'end offset requested' flag
should be cleared (if still assigned)
+ */
+ void clearPartitionEndOffsetRequests(Collection<TopicPartition>
partitions) {
+ for (final TopicPartition partition : partitions) {
+ if
(subscriptionState.maybeClearPartitionEndOffsetRequested(partition)) {
+ log.trace("Clearing partition end offset requested for
partition {}", partition);
Review Comment:
It's `TRACE`, so no one will ever see it 😉
j/k I'll change it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]