[ https://issues.apache.org/jira/browse/KAFKA-4950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Vahid Hashemian reassigned KAFKA-4950: -------------------------------------- Assignee: Sébastien Launay (was: Vahid Hashemian) > ConcurrentModificationException when iterating over Kafka Metrics > ----------------------------------------------------------------- > > Key: KAFKA-4950 > URL: https://issues.apache.org/jira/browse/KAFKA-4950 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.10.1.1 > Reporter: Dumitru Postoronca > Assignee: Sébastien Launay > Priority: Minor > Fix For: 0.11.0.2 > > > It looks like the when calling {{PartitionStates.partitionSet()}}, while the > resulting Hashmap is being built, the internal state of the allocations can > change, which leads to ConcurrentModificationException during the copy > operation. > {code} > java.util.ConcurrentModificationException > at > java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) > at > java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) > at java.util.AbstractCollection.addAll(AbstractCollection.java:343) > at java.util.HashSet.<init>(HashSet.java:119) > at > org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:291) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:783) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) > {code} > {code} > // client code: > import java.util.Collections; > import java.util.HashMap; > import java.util.Map; > import com.codahale.metrics.Gauge; > import com.codahale.metrics.Metric; > import com.codahale.metrics.MetricSet; > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.MetricName; > import static com.codahale.metrics.MetricRegistry.name; > public class KafkaMetricSet implements MetricSet { > private final KafkaConsumer client; > public KafkaMetricSet(KafkaConsumer client) { > this.client = client; > } > @Override > public Map<String, Metric> getMetrics() { > final Map<String, Metric> gauges = new HashMap<String, Metric>(); > Map<MetricName, org.apache.kafka.common.Metric> m = client.metrics(); > for (Map.Entry<MetricName, org.apache.kafka.common.Metric> e : > m.entrySet()) { > gauges.put(name(e.getKey().group(), e.getKey().name(), "count"), > new Gauge<Double>() { > @Override > public Double getValue() { > return e.getValue().value(); // exception thrown here > } > }); > } > return Collections.unmodifiableMap(gauges); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)