lucasbru commented on code in PR #22468:
URL: https://github.com/apache/kafka/pull/22468#discussion_r3354134158


##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java:
##########
@@ -76,6 +93,14 @@ public List<ConsumerRecord<K, V>> records(TopicPartition 
partition) {
      * @return The next offsets that the consumer will consume
      */
     public Map<TopicPartition, OffsetAndMetadata> nextOffsets() {
+        if (this.tainted && TAINTED_NEXT_OFFSETS_LOGGED.compareAndSet(false, 
true)) {
+            log.error("ConsumerRecords#nextOffsets() returned empty because 
this instance was built with the " +

Review Comment:
   Logging at ERROR but only once per JVM is a bit of a tension: if this 
scrolls past at startup the signal is gone for the rest of the process 
lifetime. Rate limiting the log is a nice idea, but I think this is going too 
far. Did you consider logging periodically instead of once-ever, so a 
misconfigured interceptor is more likely to be noticed?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java:
##########
@@ -19,30 +19,42 @@
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.internals.AbstractIterator;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * A container that holds the list {@link ConsumerRecord} per partition for a
  * particular topic. There is one {@link ConsumerRecord} list for every topic
  * partition returned by a {@link Consumer#poll(java.time.Duration)} operation.
  */
 public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
+    private static final Logger log = 
LoggerFactory.getLogger(ConsumerRecords.class);
+
+    // Ensures the tainted-records error (see nextOffsets()) is logged at most 
once per JVM since nextOffsets() is called on every poll
+    static final AtomicBoolean TAINTED_NEXT_OFFSETS_LOGGED = new 
AtomicBoolean(false);
+
     public static final ConsumerRecords<Object, Object> EMPTY = new 
ConsumerRecords<>(Map.of(), Map.of());
 
     private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
     private final Map<TopicPartition, OffsetAndMetadata> nextOffsets;
 
+    // Flag to detect if legacy ConsumerRecords(Map) constructor is used. See 
KAFKA-20660 for more details.
+    private final boolean tainted;
+
     /**
      * @deprecated Since 4.0. Use {@link #ConsumerRecords(Map, Map)} instead.
      */
-    @Deprecated
+    @Deprecated(since = "4.1", forRemoval = true)

Review Comment:
   `since = "4.1"` doesn't match the javadoc just above (Since 4.0). This 
constructor was deprecated in 4.0 with KIP-1094, so this should be `since = 
"4.0"` (also matters given the plan to cherry-pick to 4.0).
   
   I also think this is a potentially source-incompatible change, so it should 
go into a separate PR as we cannot cherry-pick this in previous versions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to