Don't think I made my questions clear: On Kafka 0.9.0.1 broker and 0.9 consumer how do I tell what my consumer-groups are? Can I still get this information in ZK? I don't see anything in the consumers folder which is alarming to me. This is especially alarming because I do see that 8 partitions are assigned on the consumer (via jmx). I specify the consumer group using:
String myConsumerGroupId = myTopic + "_consumer"; props.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG, myConsumerGroupId); I am running with this setup on about 20 consumers (each consuming a unique topic) and I only see one of my consumers not passing any messages to my application even though I see that the jmx console says it is receiving 5 requests per second. The other 19 seem to be working fine. Each of these 20 topics was created when a message was sent to it i.e. it was not provisioned from before. Messages currently are only being sent to partition 0 even though there are 8 partitions per topic. Thanks, Rajiv On Wed, Mar 9, 2016 at 4:30 PM, Rajiv Kurian <ra...@signalfx.com> wrote: > Also forgot to mention that when I do consume with the console consumer I > do see data coming through. > > On Wed, Mar 9, 2016 at 3:44 PM, Rajiv Kurian <ra...@signalfx.com> wrote: > >> I am running the 0.9.0.1 broker with the 0.9 consumer. I am using the >> subscribe feature on the consumer to subscribe to a topic with 8 partitions. >> >> consumer.subscribe(Arrays.asList(myTopic)); >> >> I have a single consumer group for said topic and a single process >> subscribed with 8 partitions. >> >> When I use jmx on the consumer I do see that it has 8 partitions assigned >> to it according to the consumer-coordinator-metrics mbean. How can I tell >> what topic it is listening to? I couldn't find this on jmx anywhere. I do >> see that it is getting 5 responses per second according to the >> consumer-metrics mbean but I don't see any in my actual application. >> >> I consume my messages like this: >> >> public int poll(SubscriptionDataHandler handler, long timeout) { >> >> ConsumerRecords<Void, byte[]> records = null; >> >> try { >> >> records = consumer.poll(timeout); >> >> } catch (Exception e) { >> >> logger.error("Exception polling the Kafka , e); // Don't >> see any exceptions here >> >> return -1; >> >> } >> >> int numBuffers = 0; >> >> if (records != null) { >> >> for (ConsumerRecord<Void, byte[]> record : records) { >> >> byte[] payload = record.value(); >> >> if (payload != null && payload.length > 0) { >> >> ByteBuffer wrappedBuffer = ByteBuffer.wrap(payload); >> >> try { >> >> handler.handleData(wrappedBuffer); // This is >> never called >> >> } catch (Exception e) { >> >> logger.error("Exception consuming buffer , e); >> >> } >> >> numBuffers += 1; // This is never incremented. >> >> } >> >> } >> >> >> // Commit only after consuming. >> >> consumer.commitAsync(offsetCommitCallback); >> >> } >> >> I also don't see any data in the consumers folder in ZK. In fact it is >> completely empty. >> >> When I use the console-consumer, I do see the console-consumer show up in >> the consumers folder, but none of my actual consumers show up. >> >> I tried looking for jmx data on the servers too and couldn't quite figure >> out where I can get jmax. >> >> I am trying to figure out why the Kafka consumer thinks it is getting >> messages ( 5 responses/second according to jmx) but I don't get any in my >> application. >> >> Thanks, >> Rajiv >> >> >> >