[jira] [Commented] (KAFKA-4950) ConcurrentModificationException when iterating over Kafka Metrics
[ https://issues.apache.org/jira/browse/KAFKA-4950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16016640#comment-16016640 ] Vahid Hashemian commented on KAFKA-4950: [~dpostoronca] Still no luck on my side reproducing this error. I run two threads as you suggested. In one thread I run (in a loop) your metric collection code above (that leads to calling {{PartitionStates.partitionSet()}}; and in the other, I do repeating {{poll}} calls (that lead to calling {{PartitionStates.moveToEnd(..)}}). Both threads run with the same consumer instance. > ConcurrentModificationException when iterating over Kafka Metrics > - > > Key: KAFKA-4950 > URL: https://issues.apache.org/jira/browse/KAFKA-4950 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.1 >Reporter: Dumitru Postoronca >Assignee: Vahid Hashemian >Priority: Minor > Fix For: 0.11.0.0 > > > It looks like the when calling {{PartitionStates.partitionSet()}}, while the > resulting Hashmap is being built, the internal state of the allocations can > change, which leads to ConcurrentModificationException during the copy > operation. > {code} > java.util.ConcurrentModificationException > at > java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) > at > java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) > at java.util.AbstractCollection.addAll(AbstractCollection.java:343) > at java.util.HashSet.(HashSet.java:119) > at > org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:291) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:783) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) > {code} > {code} > // client code: > import java.util.Collections; > import java.util.HashMap; > import java.util.Map; > import com.codahale.metrics.Gauge; > import com.codahale.metrics.Metric; > import com.codahale.metrics.MetricSet; > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.MetricName; > import static com.codahale.metrics.MetricRegistry.name; > public class KafkaMetricSet implements MetricSet { > private final KafkaConsumer client; > public KafkaMetricSet(KafkaConsumer client) { > this.client = client; > } > @Override > public MapgetMetrics() { > final Map gauges = new HashMap (); > Map m = client.metrics(); > for (Map.Entry e : > m.entrySet()) { > gauges.put(name(e.getKey().group(), e.getKey().name(), "count"), > new Gauge() { > @Override > public Double getValue() { > return e.getValue().value(); // exception thrown here > } > }); > } > return Collections.unmodifiableMap(gauges); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4950) ConcurrentModificationException when iterating over Kafka Metrics
[ https://issues.apache.org/jira/browse/KAFKA-4950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16011958#comment-16011958 ] Dumitru Postoronca commented on KAFKA-4950: --- [~vahid]: I would try to simulate by having two threads, one calling {{partitionSet()}} in a loop and another calling any other method in PartitionStates, for example {{moveToEnd(..)}} or perhaps {{remove(topicPartition)}}. You would have two threads modifying the same {{map}} variable. > ConcurrentModificationException when iterating over Kafka Metrics > - > > Key: KAFKA-4950 > URL: https://issues.apache.org/jira/browse/KAFKA-4950 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.1 >Reporter: Dumitru Postoronca >Assignee: Vahid Hashemian >Priority: Minor > Fix For: 0.11.0.0 > > > It looks like the when calling {{PartitionStates.partitionSet()}}, while the > resulting Hashmap is being built, the internal state of the allocations can > change, which leads to ConcurrentModificationException during the copy > operation. > {code} > java.util.ConcurrentModificationException > at > java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) > at > java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) > at java.util.AbstractCollection.addAll(AbstractCollection.java:343) > at java.util.HashSet.(HashSet.java:119) > at > org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:291) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:783) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) > {code} > {code} > // client code: > import java.util.Collections; > import java.util.HashMap; > import java.util.Map; > import com.codahale.metrics.Gauge; > import com.codahale.metrics.Metric; > import com.codahale.metrics.MetricSet; > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.MetricName; > import static com.codahale.metrics.MetricRegistry.name; > public class KafkaMetricSet implements MetricSet { > private final KafkaConsumer client; > public KafkaMetricSet(KafkaConsumer client) { > this.client = client; > } > @Override > public MapgetMetrics() { > final Map gauges = new HashMap (); > Map m = client.metrics(); > for (Map.Entry e : > m.entrySet()) { > gauges.put(name(e.getKey().group(), e.getKey().name(), "count"), > new Gauge() { > @Override > public Double getValue() { > return e.getValue().value(); // exception thrown here > } > }); > } > return Collections.unmodifiableMap(gauges); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4950) ConcurrentModificationException when iterating over Kafka Metrics
[ https://issues.apache.org/jira/browse/KAFKA-4950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16005658#comment-16005658 ] Vahid Hashemian commented on KAFKA-4950: [~dpostoronca] Thanks for providing the additional details. I cannot reproduce the error when running a simple consumer (that polls and collects the metrics in a loop) with the provided {{KafkaMetricSet}} class. I see that both {{PartitionStates.partitionSet()}} and the overloaded {{getValue()}} methods are called but they don't seem to interfere with each other. Perhaps I'm missing something? > ConcurrentModificationException when iterating over Kafka Metrics > - > > Key: KAFKA-4950 > URL: https://issues.apache.org/jira/browse/KAFKA-4950 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.1 >Reporter: Dumitru Postoronca >Assignee: Vahid Hashemian >Priority: Minor > Fix For: 0.11.0.0 > > > It looks like the when calling {{PartitionStates.partitionSet()}}, while the > resulting Hashmap is being built, the internal state of the allocations can > change, which leads to ConcurrentModificationException during the copy > operation. > {code} > java.util.ConcurrentModificationException > at > java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) > at > java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) > at java.util.AbstractCollection.addAll(AbstractCollection.java:343) > at java.util.HashSet.(HashSet.java:119) > at > org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:291) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:783) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) > {code} > {code} > // client code: > import java.util.Collections; > import java.util.HashMap; > import java.util.Map; > import com.codahale.metrics.Gauge; > import com.codahale.metrics.Metric; > import com.codahale.metrics.MetricSet; > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.MetricName; > import static com.codahale.metrics.MetricRegistry.name; > public class KafkaMetricSet implements MetricSet { > private final KafkaConsumer client; > public KafkaMetricSet(KafkaConsumer client) { > this.client = client; > } > @Override > public MapgetMetrics() { > final Map gauges = new HashMap (); > Map m = client.metrics(); > for (Map.Entry e : > m.entrySet()) { > gauges.put(name(e.getKey().group(), e.getKey().name(), "count"), > new Gauge() { > @Override > public Double getValue() { > return e.getValue().value(); // exception thrown here > } > }); > } > return Collections.unmodifiableMap(gauges); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4950) ConcurrentModificationException when iterating over Kafka Metrics
[ https://issues.apache.org/jira/browse/KAFKA-4950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977294#comment-15977294 ] Dumitru Postoronca commented on KAFKA-4950: --- [~vahid]: updated description with whole class. The code is basically mapping Kakfa metrics to Codahale metrics. I think the dependency version is {{'io.dropwizard.metrics:metrics-core:3.1.2'}}. The metrics library then calls the gauge.getValue() method every minute, to get the current value, thus effectively executing {{e.getValue().value()}}. > ConcurrentModificationException when iterating over Kafka Metrics > - > > Key: KAFKA-4950 > URL: https://issues.apache.org/jira/browse/KAFKA-4950 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.1 >Reporter: Dumitru Postoronca >Assignee: Vahid Hashemian >Priority: Minor > Fix For: 0.11.0.0 > > > It looks like the when calling {{PartitionStates.partitionSet()}}, while the > resulting Hashmap is being built, the internal state of the allocations can > change, which leads to ConcurrentModificationException during the copy > operation. > {code} > java.util.ConcurrentModificationException > at > java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) > at > java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) > at java.util.AbstractCollection.addAll(AbstractCollection.java:343) > at java.util.HashSet.(HashSet.java:119) > at > org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:291) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:783) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) > {code} > {code} > // client code: > import java.util.Collections; > import java.util.HashMap; > import java.util.Map; > import com.codahale.metrics.Gauge; > import com.codahale.metrics.Metric; > import com.codahale.metrics.MetricSet; > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.MetricName; > import static com.codahale.metrics.MetricRegistry.name; > public class KafkaMetricSet implements MetricSet { > private final KafkaConsumer client; > public KafkaMetricSet(KafkaConsumer client) { > this.client = client; > } > @Override > public MapgetMetrics() { > final Map gauges = new HashMap (); > Map m = client.metrics(); > for (Map.Entry e : > m.entrySet()) { > gauges.put(name(e.getKey().group(), e.getKey().name(), "count"), > new Gauge() { > @Override > public Double getValue() { > return e.getValue().value(); // exception thrown here > } > }); > } > return Collections.unmodifiableMap(gauges); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4950) ConcurrentModificationException when iterating over Kafka Metrics
[ https://issues.apache.org/jira/browse/KAFKA-4950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15977116#comment-15977116 ] Vahid Hashemian commented on KAFKA-4950: [~dpostoronca] Would you mind sharing a bit more of your client code to avoid the compile errors? Thanks. > ConcurrentModificationException when iterating over Kafka Metrics > - > > Key: KAFKA-4950 > URL: https://issues.apache.org/jira/browse/KAFKA-4950 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.1 >Reporter: Dumitru Postoronca >Assignee: Vahid Hashemian >Priority: Minor > Fix For: 0.11.0.0 > > > It looks like the when calling {{PartitionStates.partitionSet()}}, while the > resulting Hashmap is being built, the internal state of the allocations can > change, which leads to ConcurrentModificationException during the copy > operation. > {code} > java.util.ConcurrentModificationException > at > java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) > at > java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) > at java.util.AbstractCollection.addAll(AbstractCollection.java:343) > at java.util.HashSet.(HashSet.java:119) > at > org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:291) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:783) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) > {code} > {code} > // client code: > private final KafkaConsumer client; > Mapm = client.metrics(); > for (Map.Entry e : > m.entrySet()) { > gauges.put(name(e.getKey().group(), e.getKey().name(), "count"), > new Gauge() { > @Override > public Double getValue() { > return e.getValue().value(); // exception thrown here > } > }); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4950) ConcurrentModificationException when iterating over Kafka Metrics
[ https://issues.apache.org/jira/browse/KAFKA-4950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15963308#comment-15963308 ] Vahid Hashemian commented on KAFKA-4950: [~ijuma] Yup, thanks! > ConcurrentModificationException when iterating over Kafka Metrics > - > > Key: KAFKA-4950 > URL: https://issues.apache.org/jira/browse/KAFKA-4950 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.1 >Reporter: Dumitru Postoronca >Priority: Minor > Fix For: 0.11.0.0 > > > It looks like the when calling {{PartitionStates.partitionSet()}}, while the > resulting Hashmap is being built, the internal state of the allocations can > change, which leads to ConcurrentModificationException during the copy > operation. > {code} > java.util.ConcurrentModificationException > at > java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) > at > java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) > at java.util.AbstractCollection.addAll(AbstractCollection.java:343) > at java.util.HashSet.(HashSet.java:119) > at > org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:291) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:783) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) > {code} > {code} > // client code: > private final KafkaConsumer client; > Mapm = client.metrics(); > for (Map.Entry e : > m.entrySet()) { > gauges.put(name(e.getKey().group(), e.getKey().name(), "count"), > new Gauge() { > @Override > public Double getValue() { > return e.getValue().value(); // exception thrown here > } > }); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-4950) ConcurrentModificationException when iterating over Kafka Metrics
[ https://issues.apache.org/jira/browse/KAFKA-4950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15960720#comment-15960720 ] Ismael Juma commented on KAFKA-4950: [~vahid], would you like to pick this one up? > ConcurrentModificationException when iterating over Kafka Metrics > - > > Key: KAFKA-4950 > URL: https://issues.apache.org/jira/browse/KAFKA-4950 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.1.1 >Reporter: Dumitru Postoronca >Priority: Minor > Fix For: 0.11.0.0 > > > It looks like the when calling {{PartitionStates.partitionSet()}}, while the > resulting Hashmap is being built, the internal state of the allocations can > change, which leads to ConcurrentModificationException during the copy > operation. > {code} > java.util.ConcurrentModificationException > at > java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) > at > java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) > at java.util.AbstractCollection.addAll(AbstractCollection.java:343) > at java.util.HashSet.(HashSet.java:119) > at > org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:66) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:291) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:783) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) > {code} > {code} > // client code: > private final KafkaConsumer client; > Mapm = client.metrics(); > for (Map.Entry e : > m.entrySet()) { > gauges.put(name(e.getKey().group(), e.getKey().name(), "count"), > new Gauge() { > @Override > public Double getValue() { > return e.getValue().value(); // exception thrown here > } > }); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)