[jira] [Commented] (KAFKA-2985) Consumer group stuck in rebalancing state

2016-01-26 Thread Federico Fissore (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15116905#comment-15116905
 ] 

Federico Fissore commented on KAFKA-2985:
-

[~hachikuji] you're right. Sorry for having bothered you. I forgot to code a 
"tearDown()". Stock 0.9.0.0 works fine. Thank you for you're patience

> Consumer group stuck in rebalancing state
> -
>
> Key: KAFKA-2985
> URL: https://issues.apache.org/jira/browse/KAFKA-2985
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.0
> Environment: Kafka 0.9.0.0.
> Kafka Java consumer 0.9.0.0
> 2 Java producers.
> 3 Java consumers using the new consumer API.
> 2 kafka brokers.
>Reporter: Jens Rantil
>Assignee: Jason Gustafson
>
> We've doing some load testing on Kafka. _After_ the load test when our 
> consumers and have two times now seen Kafka become stuck in consumer group 
> rebalancing. This is after all our consumers are done consuming and 
> essentially polling periodically without getting any records.
> The brokers list the consumer group (named "default"), but I can't query the 
> offsets:
> {noformat}
> jrantil@queue-0:/srv/kafka/kafka$ ./bin/kafka-consumer-groups.sh 
> --new-consumer --bootstrap-server localhost:9092 --list
> default
> jrantil@queue-0:/srv/kafka/kafka$ ./bin/kafka-consumer-groups.sh 
> --new-consumer --bootstrap-server localhost:9092 --describe --group 
> default|sort
> Consumer group `default` does not exist or is rebalancing.
> {noformat}
> Retrying to query the offsets for 15 minutes or so still said it was 
> rebalancing. After restarting our first broker, the group immediately started 
> rebalancing. That broker was logging this before restart:
> {noformat}
> [2015-12-12 13:09:48,517] INFO [Group Metadata Manager on Broker 0]: Removed 
> 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)
> [2015-12-12 13:10:16,139] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 16 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:10:16,141] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 16 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:10:16,575] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 16 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:11:15,141] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 17 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:11:15,143] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 17 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:11:15,314] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 17 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:12:14,144] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 18 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:12:14,145] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 18 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:12:14,340] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 18 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:13:13,146] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 19 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:13:13,148] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 19 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:13:13,238] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 19 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:14:12,148] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 20 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:14:12,149] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 20 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:14:12,360] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 20 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:15:11,150] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 21 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:15:11,152] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 21 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:15:11,217] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 21 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:16:10,152] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 22 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:16:10,154] INFO 

[jira] [Commented] (KAFKA-2985) Consumer group stuck in rebalancing state

2016-01-25 Thread Federico Fissore (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15115971#comment-15115971
 ] 

Federico Fissore commented on KAFKA-2985:
-

[~hachikuji] I've just run my test against latest 0.9.0 branch (gradle produced 
kafka_2.10-0.9.0.1-SNAPSHOT.tgz): I've deployed two containers with latest 
server, and used the latest java client.
I confirm the issue is still there: whenever a new test runs (and its consumer 
subscribes) a "Preparing to restabilize group group_id..." phase runs and lasts 
for 20 seconds
However, last test, which publishes and consumes 1 million messages, once 
started, runs smoothly to the end, squeezing my cores.
Depending on how you feel about this kind of stop-the-world, this issue may be 
low-prio

> Consumer group stuck in rebalancing state
> -
>
> Key: KAFKA-2985
> URL: https://issues.apache.org/jira/browse/KAFKA-2985
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.0
> Environment: Kafka 0.9.0.0.
> Kafka Java consumer 0.9.0.0
> 2 Java producers.
> 3 Java consumers using the new consumer API.
> 2 kafka brokers.
>Reporter: Jens Rantil
>Assignee: Jason Gustafson
>
> We've doing some load testing on Kafka. _After_ the load test when our 
> consumers and have two times now seen Kafka become stuck in consumer group 
> rebalancing. This is after all our consumers are done consuming and 
> essentially polling periodically without getting any records.
> The brokers list the consumer group (named "default"), but I can't query the 
> offsets:
> {noformat}
> jrantil@queue-0:/srv/kafka/kafka$ ./bin/kafka-consumer-groups.sh 
> --new-consumer --bootstrap-server localhost:9092 --list
> default
> jrantil@queue-0:/srv/kafka/kafka$ ./bin/kafka-consumer-groups.sh 
> --new-consumer --bootstrap-server localhost:9092 --describe --group 
> default|sort
> Consumer group `default` does not exist or is rebalancing.
> {noformat}
> Retrying to query the offsets for 15 minutes or so still said it was 
> rebalancing. After restarting our first broker, the group immediately started 
> rebalancing. That broker was logging this before restart:
> {noformat}
> [2015-12-12 13:09:48,517] INFO [Group Metadata Manager on Broker 0]: Removed 
> 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)
> [2015-12-12 13:10:16,139] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 16 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:10:16,141] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 16 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:10:16,575] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 16 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:11:15,141] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 17 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:11:15,143] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 17 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:11:15,314] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 17 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:12:14,144] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 18 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:12:14,145] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 18 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:12:14,340] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 18 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:13:13,146] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 19 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:13:13,148] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 19 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:13:13,238] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 19 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:14:12,148] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 20 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:14:12,149] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 20 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:14:12,360] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 20 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:15:11,150] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 21 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:15:11,152] INFO [GroupCoordinator 0]: 

[jira] [Commented] (KAFKA-2985) Consumer group stuck in rebalancing state

2016-01-23 Thread Federico Fissore (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15113646#comment-15113646
 ] 

Federico Fissore commented on KAFKA-2985:
-

Jason, no, I used released 0.9.0.0: I still haven't set up a container with 
latest 0.9.0 branch. I'll do that later this Monday
(and it would be great if you could provide a nightly of Kafka: that way I'd 
just have to download it and try it)

> Consumer group stuck in rebalancing state
> -
>
> Key: KAFKA-2985
> URL: https://issues.apache.org/jira/browse/KAFKA-2985
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.0
> Environment: Kafka 0.9.0.0.
> Kafka Java consumer 0.9.0.0
> 2 Java producers.
> 3 Java consumers using the new consumer API.
> 2 kafka brokers.
>Reporter: Jens Rantil
>Assignee: Jason Gustafson
>
> We've doing some load testing on Kafka. _After_ the load test when our 
> consumers and have two times now seen Kafka become stuck in consumer group 
> rebalancing. This is after all our consumers are done consuming and 
> essentially polling periodically without getting any records.
> The brokers list the consumer group (named "default"), but I can't query the 
> offsets:
> {noformat}
> jrantil@queue-0:/srv/kafka/kafka$ ./bin/kafka-consumer-groups.sh 
> --new-consumer --bootstrap-server localhost:9092 --list
> default
> jrantil@queue-0:/srv/kafka/kafka$ ./bin/kafka-consumer-groups.sh 
> --new-consumer --bootstrap-server localhost:9092 --describe --group 
> default|sort
> Consumer group `default` does not exist or is rebalancing.
> {noformat}
> Retrying to query the offsets for 15 minutes or so still said it was 
> rebalancing. After restarting our first broker, the group immediately started 
> rebalancing. That broker was logging this before restart:
> {noformat}
> [2015-12-12 13:09:48,517] INFO [Group Metadata Manager on Broker 0]: Removed 
> 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)
> [2015-12-12 13:10:16,139] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 16 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:10:16,141] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 16 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:10:16,575] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 16 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:11:15,141] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 17 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:11:15,143] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 17 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:11:15,314] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 17 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:12:14,144] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 18 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:12:14,145] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 18 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:12:14,340] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 18 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:13:13,146] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 19 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:13:13,148] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 19 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:13:13,238] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 19 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:14:12,148] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 20 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:14:12,149] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 20 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:14:12,360] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 20 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:15:11,150] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 21 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:15:11,152] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 21 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:15:11,217] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 21 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:16:10,152] INFO [GroupCoordinator 0]: Stabilized group default 
> 

[jira] [Commented] (KAFKA-2985) Consumer group stuck in rebalancing state

2016-01-22 Thread Federico Fissore (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15112100#comment-15112100
 ] 

Federico Fissore commented on KAFKA-2985:
-

My test still shows consumer stuck awaiting for rebalancing of 2 brokers with 
no incoming traffic. So no, this issue should stay open, unless another issue 
tracks this odd behaviour or it's a WONTFIX

> Consumer group stuck in rebalancing state
> -
>
> Key: KAFKA-2985
> URL: https://issues.apache.org/jira/browse/KAFKA-2985
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.0
> Environment: Kafka 0.9.0.0.
> Kafka Java consumer 0.9.0.0
> 2 Java producers.
> 3 Java consumers using the new consumer API.
> 2 kafka brokers.
>Reporter: Jens Rantil
>Assignee: Jason Gustafson
>
> We've doing some load testing on Kafka. _After_ the load test when our 
> consumers and have two times now seen Kafka become stuck in consumer group 
> rebalancing. This is after all our consumers are done consuming and 
> essentially polling periodically without getting any records.
> The brokers list the consumer group (named "default"), but I can't query the 
> offsets:
> {noformat}
> jrantil@queue-0:/srv/kafka/kafka$ ./bin/kafka-consumer-groups.sh 
> --new-consumer --bootstrap-server localhost:9092 --list
> default
> jrantil@queue-0:/srv/kafka/kafka$ ./bin/kafka-consumer-groups.sh 
> --new-consumer --bootstrap-server localhost:9092 --describe --group 
> default|sort
> Consumer group `default` does not exist or is rebalancing.
> {noformat}
> Retrying to query the offsets for 15 minutes or so still said it was 
> rebalancing. After restarting our first broker, the group immediately started 
> rebalancing. That broker was logging this before restart:
> {noformat}
> [2015-12-12 13:09:48,517] INFO [Group Metadata Manager on Broker 0]: Removed 
> 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)
> [2015-12-12 13:10:16,139] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 16 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:10:16,141] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 16 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:10:16,575] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 16 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:11:15,141] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 17 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:11:15,143] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 17 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:11:15,314] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 17 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:12:14,144] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 18 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:12:14,145] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 18 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:12:14,340] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 18 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:13:13,146] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 19 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:13:13,148] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 19 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:13:13,238] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 19 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:14:12,148] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 20 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:14:12,149] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 20 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:14:12,360] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 20 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:15:11,150] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 21 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:15:11,152] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 21 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:15:11,217] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 21 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:16:10,152] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 22 

[jira] [Commented] (KAFKA-2985) Consumer group stuck in rebalancing state

2016-01-18 Thread Federico Fissore (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15105037#comment-15105037
 ] 

Federico Fissore commented on KAFKA-2985:
-

I'm too experiencing the same behaviour. I have a short test (below), 1 
producer, 1 consumer, 2 brokers. First run, is fast as expected, second run 
gets stuck at first {code}poll(0){code}.
ATM I'm not able to test latest from branch 0.9.0. A nightly build of kafka may 
help a lot, because then I'd just have to change my Dockerfile urls and I'll be 
up & running (related to #KAFKA-2380)

{code}
public class KafkaNewConsumerAPITest {

   private KafkaProducer producer;
   private KafkaConsumer consumer;

   @Before
   public void setUp() throws Exception {
  Properties props = new Properties();
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.4:9092");
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
KafkaMapSerializer.class.getName());
  props.put(ProducerConfig.LINGER_MS_CONFIG, 0);

  producer = new KafkaProducer<>(props);

  props = new Properties();
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.4:9092");
  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
KafkaMapDeserializer.class.getName());
  props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
RoundRobinAssignor.class.getName());
  props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  consumer = new KafkaConsumer<>(props);
   }

   @Test
   public void shouldProduceAndConsumeOneMessage() throws Exception {
  String topic = "_test_topic_";

  consumer.subscribe(Collections.singletonList(topic));

  System.out.println("polling");
  ConsumerRecords records = consumer.poll(0);
  System.out.println("polled");
  for (ConsumerRecord record : records) {
 System.out.printf("offset = %d, key = %s, value = %s", 
record.offset(), record.key(), record.value());
  }
  consumer.commitSync();

  System.out.println("producing");
  Map hello = new MapBuilder().put("name", 
"world").put("random", "" + Math.random()).build();
  ProducerRecord record = new ProducerRecord<>(topic, 
"hello", hello);

  producer.send(record, (recordMetadata, e) -> {
 assertNull(e);
 assertTrue(recordMetadata.offset() >= 0);
 assertTrue(recordMetadata.partition() >= 0);
 assertEquals(topic, recordMetadata.topic());
  });

  producer.close();

  System.out.println("polling");
  records = consumer.poll(2000);
  System.out.println("polled");
  assertFalse(records.isEmpty());
  Iterator> iterator = records.iterator();

  ConsumerRecord consumedRecord = iterator.next();
  System.out.printf("offset = %d, key = %s, value = %s", 
consumedRecord.offset(), consumedRecord.key(), consumedRecord.value());
  assertEquals("hello", consumedRecord.key());
  Map value = (Map) consumedRecord.value();
  assertEquals("world", value.get("name"));
  assertEquals(hello.get("random"), value.get("random"));

  consumer.commitSync();
   }
}
{code}

> Consumer group stuck in rebalancing state
> -
>
> Key: KAFKA-2985
> URL: https://issues.apache.org/jira/browse/KAFKA-2985
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.0
> Environment: Kafka 0.9.0.0.
> Kafka Java consumer 0.9.0.0
> 2 Java producers.
> 3 Java consumers using the new consumer API.
> 2 kafka brokers.
>Reporter: Jens Rantil
>Assignee: Jason Gustafson
>
> We've doing some load testing on Kafka. _After_ the load test when our 
> consumers and have two times now seen Kafka become stuck in consumer group 
> rebalancing. This is after all our consumers are done consuming and 
> essentially polling periodically without getting any records.
> The brokers list the consumer group (named "default"), but I can't query the 
> offsets:
> {noformat}
> jrantil@queue-0:/srv/kafka/kafka$ ./bin/kafka-consumer-groups.sh 
> --new-consumer --bootstrap-server localhost:9092 --list
> default
> jrantil@queue-0:/srv/kafka/kafka$ ./bin/kafka-consumer-groups.sh 
> --new-consumer --bootstrap-server localhost:9092 --describe --group 
> default|sort
> Consumer group `default` does not exist or is rebalancing.
> {noformat}
> Retrying to query the offsets for 15 minutes or so still said it was 
> 

[jira] [Comment Edited] (KAFKA-2985) Consumer group stuck in rebalancing state

2016-01-18 Thread Federico Fissore (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15105037#comment-15105037
 ] 

Federico Fissore edited comment on KAFKA-2985 at 1/18/16 10:00 AM:
---

I'm too experiencing the same behaviour. I have a short test (below), 1 
producer, 1 consumer, 2 brokers. First run is fast as expected. Second run gets 
stuck at first {code}poll(0){code}.
ATM I'm not able to test latest from branch 0.9.0. A nightly build of kafka may 
help a lot, because then I'd just have to change my Dockerfile urls and I'll be 
up & running (related to #KAFKA-2380)

{code}
public class KafkaNewConsumerAPITest {

   private KafkaProducer producer;
   private KafkaConsumer consumer;

   @Before
   public void setUp() throws Exception {
  Properties props = new Properties();
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.4:9092");
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
KafkaMapSerializer.class.getName());
  props.put(ProducerConfig.LINGER_MS_CONFIG, 0);

  producer = new KafkaProducer<>(props);

  props = new Properties();
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.4:9092");
  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
KafkaMapDeserializer.class.getName());
  props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
RoundRobinAssignor.class.getName());
  props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  consumer = new KafkaConsumer<>(props);
   }

   @Test
   public void shouldProduceAndConsumeOneMessage() throws Exception {
  String topic = "_test_topic_";

  consumer.subscribe(Collections.singletonList(topic));

  System.out.println("polling");
  ConsumerRecords records = consumer.poll(0);
  System.out.println("polled");
  for (ConsumerRecord record : records) {
 System.out.printf("offset = %d, key = %s, value = %s", 
record.offset(), record.key(), record.value());
  }
  consumer.commitSync();

  System.out.println("producing");
  Map hello = new MapBuilder().put("name", 
"world").put("random", "" + Math.random()).build();
  ProducerRecord record = new ProducerRecord<>(topic, 
"hello", hello);

  producer.send(record, (recordMetadata, e) -> {
 assertNull(e);
 assertTrue(recordMetadata.offset() >= 0);
 assertTrue(recordMetadata.partition() >= 0);
 assertEquals(topic, recordMetadata.topic());
  });

  producer.close();

  System.out.println("polling");
  records = consumer.poll(2000);
  System.out.println("polled");
  assertFalse(records.isEmpty());
  Iterator> iterator = records.iterator();

  ConsumerRecord consumedRecord = iterator.next();
  System.out.printf("offset = %d, key = %s, value = %s", 
consumedRecord.offset(), consumedRecord.key(), consumedRecord.value());
  assertEquals("hello", consumedRecord.key());
  Map value = (Map) consumedRecord.value();
  assertEquals("world", value.get("name"));
  assertEquals(hello.get("random"), value.get("random"));

  consumer.commitSync();
   }
}
{code}


was (Author: fridrik):
I'm too experiencing the same behaviour. I have a short test (below), 1 
producer, 1 consumer, 2 brokers. First run, is fast as expected, second run 
gets stuck at first {code}poll(0){code}.
ATM I'm not able to test latest from branch 0.9.0. A nightly build of kafka may 
help a lot, because then I'd just have to change my Dockerfile urls and I'll be 
up & running (related to #KAFKA-2380)

{code}
public class KafkaNewConsumerAPITest {

   private KafkaProducer producer;
   private KafkaConsumer consumer;

   @Before
   public void setUp() throws Exception {
  Properties props = new Properties();
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.4:9092");
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
KafkaMapSerializer.class.getName());
  props.put(ProducerConfig.LINGER_MS_CONFIG, 0);

  producer = new KafkaProducer<>(props);

  props = new Properties();
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.4:9092");
  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
  

[jira] [Commented] (KAFKA-2985) Consumer group stuck in rebalancing state

2016-01-18 Thread Federico Fissore (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15105078#comment-15105078
 ] 

Federico Fissore commented on KAFKA-2985:
-

Below a snipped of the broker logs when running above test twice. First run is 
fast as expected IF it happens after "dead and removed". Second run starts with 
second "Preparing to restabilize..." and completes after "Stabilized group...", 
thus taking about 20 seconds to complete

{code}
[2016-01-18 10:26:59,329] INFO [GroupCoordinator 2]: Group group_id generation 
6 is dead and removed (kafka.coordinator.GroupCoordinator)
[2016-01-18 10:28:03,883] INFO [GroupCoordinator 2]: Preparing to restabilize 
group group_id with old generation 0 (kafka.coordinator.GroupCoordinator)
[2016-01-18 10:28:03,883] INFO [GroupCoordinator 2]: Stabilized group group_id 
generation 1 (kafka.coordinator.GroupCoordinator)
[2016-01-18 10:28:03,888] INFO [GroupCoordinator 2]: Assignment received from 
leader for group group_id for generation 1 (kafka.coordinator.GroupCoordinator)
[2016-01-18 10:28:17,159] INFO [GroupCoordinator 2]: Preparing to restabilize 
group group_id with old generation 1 (kafka.coordinator.GroupCoordinator)
[2016-01-18 10:28:33,892] INFO [GroupCoordinator 2]: Stabilized group group_id 
generation 2 (kafka.coordinator.GroupCoordinator)
[2016-01-18 10:28:33,900] INFO [GroupCoordinator 2]: Assignment received from 
leader for group group_id for generation 2 (kafka.coordinator.GroupCoordinator)
{code}

> Consumer group stuck in rebalancing state
> -
>
> Key: KAFKA-2985
> URL: https://issues.apache.org/jira/browse/KAFKA-2985
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.9.0.0
> Environment: Kafka 0.9.0.0.
> Kafka Java consumer 0.9.0.0
> 2 Java producers.
> 3 Java consumers using the new consumer API.
> 2 kafka brokers.
>Reporter: Jens Rantil
>Assignee: Jason Gustafson
>
> We've doing some load testing on Kafka. _After_ the load test when our 
> consumers and have two times now seen Kafka become stuck in consumer group 
> rebalancing. This is after all our consumers are done consuming and 
> essentially polling periodically without getting any records.
> The brokers list the consumer group (named "default"), but I can't query the 
> offsets:
> {noformat}
> jrantil@queue-0:/srv/kafka/kafka$ ./bin/kafka-consumer-groups.sh 
> --new-consumer --bootstrap-server localhost:9092 --list
> default
> jrantil@queue-0:/srv/kafka/kafka$ ./bin/kafka-consumer-groups.sh 
> --new-consumer --bootstrap-server localhost:9092 --describe --group 
> default|sort
> Consumer group `default` does not exist or is rebalancing.
> {noformat}
> Retrying to query the offsets for 15 minutes or so still said it was 
> rebalancing. After restarting our first broker, the group immediately started 
> rebalancing. That broker was logging this before restart:
> {noformat}
> [2015-12-12 13:09:48,517] INFO [Group Metadata Manager on Broker 0]: Removed 
> 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)
> [2015-12-12 13:10:16,139] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 16 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:10:16,141] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 16 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:10:16,575] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 16 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:11:15,141] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 17 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:11:15,143] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 17 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:11:15,314] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 17 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:12:14,144] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 18 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:12:14,145] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 18 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:12:14,340] INFO [GroupCoordinator 0]: Preparing to restabilize 
> group default with old generation 18 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:13:13,146] INFO [GroupCoordinator 0]: Stabilized group default 
> generation 19 (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:13:13,148] INFO [GroupCoordinator 0]: Assignment received from 
> leader for group default for generation 19 
> (kafka.coordinator.GroupCoordinator)
> [2015-12-12 13:13:13,238] INFO [GroupCoordinator 0]: Preparing to