Copilot commented on code in PR #22468:
URL: https://github.com/apache/kafka/pull/22468#discussion_r3361941567
##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java:
##########
@@ -19,30 +19,42 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.internals.AbstractIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
/**
* A container that holds the list {@link ConsumerRecord} per partition for a
* particular topic. There is one {@link ConsumerRecord} list for every topic
* partition returned by a {@link Consumer#poll(java.time.Duration)} operation.
*/
public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
+ private static final Logger log =
LoggerFactory.getLogger(ConsumerRecords.class);
+
public static final ConsumerRecords<Object, Object> EMPTY = new
ConsumerRecords<>(Map.of(), Map.of());
private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
private final Map<TopicPartition, OffsetAndMetadata> nextOffsets;
+ // Flag to detect if legacy ConsumerRecords(Map) constructor is used. See
KAFKA-20660 for more details.
+ private final boolean tainted;
+ static final long TAINT_LOG_INTERVAL_NS = TimeUnit.MINUTES.toNanos(5);
+ static final AtomicLong TAINTED_NEXT_OFFSETS_LAST_LOG_NS = new
AtomicLong(System.nanoTime() - TAINT_LOG_INTERVAL_NS);
+
Review Comment:
These package-private statics are exposed primarily for tests. In other
client classes this is typically annotated with a "// VisibleForTesting" marker
comment (e.g., ProducerBatch.java:603). Consider adding the same marker here to
clarify intent.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java:
##########
@@ -19,30 +19,42 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.internals.AbstractIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
/**
* A container that holds the list {@link ConsumerRecord} per partition for a
* particular topic. There is one {@link ConsumerRecord} list for every topic
* partition returned by a {@link Consumer#poll(java.time.Duration)} operation.
*/
public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
+ private static final Logger log =
LoggerFactory.getLogger(ConsumerRecords.class);
+
public static final ConsumerRecords<Object, Object> EMPTY = new
ConsumerRecords<>(Map.of(), Map.of());
private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
private final Map<TopicPartition, OffsetAndMetadata> nextOffsets;
+ // Flag to detect if legacy ConsumerRecords(Map) constructor is used. See
KAFKA-20660 for more details.
+ private final boolean tainted;
+ static final long TAINT_LOG_INTERVAL_NS = TimeUnit.MINUTES.toNanos(5);
+ static final AtomicLong TAINTED_NEXT_OFFSETS_LAST_LOG_NS = new
AtomicLong(System.nanoTime() - TAINT_LOG_INTERVAL_NS);
+
Review Comment:
PR description says the deprecation should be logged only once per JVM, but
the current implementation is time-based and will log again every 5 minutes
(and the tests assert periodic logging). Please align either the
implementation/tests to log only once per JVM (e.g., AtomicBoolean
compareAndSet) or update the PR description to match the intended periodic
behavior.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java:
##########
@@ -180,6 +182,65 @@ public void testRecordsAreImmutable() {
assertEquals(0, emptyRecords.count());
}
+ @Test
+ @SuppressWarnings("deprecation")
+ public void
testNextOffsetsLogsErrorPeriodicallyWhenConstructedWithDeprecatedConstructor() {
+ TopicPartition tp = new TopicPartition("topic", 0);
+ Map<TopicPartition, List<ConsumerRecord<Integer, String>>> records =
+ Map.of(tp, List.of(new ConsumerRecord<>("topic", 0, 0L, 0,
"value")));
+
+ try (LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(ConsumerRecords.class)) {
+ // Force the rate-limit window to have elapsed so the next tainted
call logs.
+ ConsumerRecords.TAINTED_NEXT_OFFSETS_LAST_LOG_NS.set(
+ System.nanoTime() - ConsumerRecords.TAINT_LOG_INTERVAL_NS - 1);
+
+ ConsumerRecords<Integer, String> consumerRecords = new
ConsumerRecords<>(records);
+
+ // deprecated constructor does not supply next offsets, so the map
is empty
+ assertTrue(consumerRecords.nextOffsets().isEmpty());
+
+ List<String> errors = appender.getMessages("ERROR");
+ assertEquals(1, errors.size());
+ assertTrue(errors.get(0).contains("deprecated ConsumerRecords(Map)
constructor"),
+ "Unexpected error message: " + errors.get(0));
+
+ // Within the rate-limit window, neither repeated calls nor new
tainted instances log again.
+ assertTrue(consumerRecords.nextOffsets().isEmpty());
+ assertTrue(new ConsumerRecords<>(records).nextOffsets().isEmpty());
+ assertEquals(1, appender.getMessages("ERROR").size());
+
+ // Once the window has elapsed, the error is logged again.
+ ConsumerRecords.TAINTED_NEXT_OFFSETS_LAST_LOG_NS.set(
+ System.nanoTime() - ConsumerRecords.TAINT_LOG_INTERVAL_NS - 1);
+ assertTrue(consumerRecords.nextOffsets().isEmpty());
+ assertEquals(2, appender.getMessages("ERROR").size());
Review Comment:
This test mutates the global static TAINTED_NEXT_OFFSETS_LAST_LOG_NS without
restoring it, which can leak state across tests and introduce flakes if tests
are reordered or run concurrently. Capture the previous value and restore it in
a finally block.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]