Copilot commented on code in PR #22468:
URL: https://github.com/apache/kafka/pull/22468#discussion_r3361941567


##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java:
##########
@@ -19,30 +19,42 @@
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.internals.AbstractIterator;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * A container that holds the list {@link ConsumerRecord} per partition for a
  * particular topic. There is one {@link ConsumerRecord} list for every topic
  * partition returned by a {@link Consumer#poll(java.time.Duration)} operation.
  */
 public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
+    private static final Logger log = 
LoggerFactory.getLogger(ConsumerRecords.class);
+
     public static final ConsumerRecords<Object, Object> EMPTY = new 
ConsumerRecords<>(Map.of(), Map.of());
 
     private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
     private final Map<TopicPartition, OffsetAndMetadata> nextOffsets;
 
+    // Flag to detect if legacy ConsumerRecords(Map) constructor is used. See 
KAFKA-20660 for more details.
+    private final boolean tainted;
+    static final long TAINT_LOG_INTERVAL_NS = TimeUnit.MINUTES.toNanos(5);
+    static final AtomicLong TAINTED_NEXT_OFFSETS_LAST_LOG_NS = new 
AtomicLong(System.nanoTime() - TAINT_LOG_INTERVAL_NS);
+

Review Comment:
   These package-private statics are exposed primarily for tests. In other 
client classes this is typically annotated with a "// VisibleForTesting" marker 
comment (e.g., ProducerBatch.java:603). Consider adding the same marker here to 
clarify intent.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java:
##########
@@ -19,30 +19,42 @@
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.internals.AbstractIterator;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * A container that holds the list {@link ConsumerRecord} per partition for a
  * particular topic. There is one {@link ConsumerRecord} list for every topic
  * partition returned by a {@link Consumer#poll(java.time.Duration)} operation.
  */
 public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
+    private static final Logger log = 
LoggerFactory.getLogger(ConsumerRecords.class);
+
     public static final ConsumerRecords<Object, Object> EMPTY = new 
ConsumerRecords<>(Map.of(), Map.of());
 
     private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
     private final Map<TopicPartition, OffsetAndMetadata> nextOffsets;
 
+    // Flag to detect if legacy ConsumerRecords(Map) constructor is used. See 
KAFKA-20660 for more details.
+    private final boolean tainted;
+    static final long TAINT_LOG_INTERVAL_NS = TimeUnit.MINUTES.toNanos(5);
+    static final AtomicLong TAINTED_NEXT_OFFSETS_LAST_LOG_NS = new 
AtomicLong(System.nanoTime() - TAINT_LOG_INTERVAL_NS);
+

Review Comment:
   PR description says the deprecation should be logged only once per JVM, but 
the current implementation is time-based and will log again every 5 minutes 
(and the tests assert periodic logging). Please align either the 
implementation/tests to log only once per JVM (e.g., AtomicBoolean 
compareAndSet) or update the PR description to match the intended periodic 
behavior.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##########
@@ -180,6 +182,65 @@ public void testRecordsAreImmutable() {
         assertEquals(0, emptyRecords.count());
     }
 
+    @Test
+    @SuppressWarnings("deprecation")
+    public void 
testNextOffsetsLogsErrorPeriodicallyWhenConstructedWithDeprecatedConstructor() {
+        TopicPartition tp = new TopicPartition("topic", 0);
+        Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records =
+            Map.of(tp, List.of(new ConsumerRecord<>("topic", 0, 0L, 0, 
"value")));
+
+        try (LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(ConsumerRecords.class)) {
+            // Force the rate-limit window to have elapsed so the next tainted 
call logs.
+            ConsumerRecords.TAINTED_NEXT_OFFSETS_LAST_LOG_NS.set(
+                System.nanoTime() - ConsumerRecords.TAINT_LOG_INTERVAL_NS - 1);
+
+            ConsumerRecords<Integer, String> consumerRecords = new 
ConsumerRecords<>(records);
+
+            // deprecated constructor does not supply next offsets, so the map 
is empty
+            assertTrue(consumerRecords.nextOffsets().isEmpty());
+
+            List<String> errors = appender.getMessages("ERROR");
+            assertEquals(1, errors.size());
+            assertTrue(errors.get(0).contains("deprecated ConsumerRecords(Map) 
constructor"),
+                "Unexpected error message: " + errors.get(0));
+
+            // Within the rate-limit window, neither repeated calls nor new 
tainted instances log again.
+            assertTrue(consumerRecords.nextOffsets().isEmpty());
+            assertTrue(new ConsumerRecords<>(records).nextOffsets().isEmpty());
+            assertEquals(1, appender.getMessages("ERROR").size());
+
+            // Once the window has elapsed, the error is logged again.
+            ConsumerRecords.TAINTED_NEXT_OFFSETS_LAST_LOG_NS.set(
+                System.nanoTime() - ConsumerRecords.TAINT_LOG_INTERVAL_NS - 1);
+            assertTrue(consumerRecords.nextOffsets().isEmpty());
+            assertEquals(2, appender.getMessages("ERROR").size());

Review Comment:
   This test mutates the global static TAINTED_NEXT_OFFSETS_LAST_LOG_NS without 
restoring it, which can leak state across tests and introduce flakes if tests 
are reordered or run concurrently. Capture the previous value and restore it in 
a finally block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to