ezhou413 commented on code in PR #22468:
URL: https://github.com/apache/kafka/pull/22468#discussion_r3360171948
##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java:
##########
@@ -19,30 +19,42 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.internals.AbstractIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* A container that holds the list {@link ConsumerRecord} per partition for a
* particular topic. There is one {@link ConsumerRecord} list for every topic
* partition returned by a {@link Consumer#poll(java.time.Duration)} operation.
*/
public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
+ private static final Logger log =
LoggerFactory.getLogger(ConsumerRecords.class);
+
+ // Ensures the tainted-records error (see nextOffsets()) is logged at most
once per JVM since nextOffsets() is called on every poll
+ static final AtomicBoolean TAINTED_NEXT_OFFSETS_LOGGED = new
AtomicBoolean(false);
+
public static final ConsumerRecords<Object, Object> EMPTY = new
ConsumerRecords<>(Map.of(), Map.of());
private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
private final Map<TopicPartition, OffsetAndMetadata> nextOffsets;
+ // Flag to detect if legacy ConsumerRecords(Map) constructor is used. See
KAFKA-20660 for more details.
+ private final boolean tainted;
+
/**
* @deprecated Since 4.0. Use {@link #ConsumerRecords(Map, Map)} instead.
*/
- @Deprecated
+ @Deprecated(since = "4.1", forRemoval = true)
Review Comment:
Reopened #22469 and changed the version to 4.0
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]