Don't think I made my questions clear:

On Kafka 0.9.0.1 broker and 0.9 consumer how do I tell what my
consumer-groups are? Can I still get this information in ZK? I don't see
anything in the consumers folder which is alarming to me. This is
especially alarming because I do see that 8 partitions are assigned on the
consumer (via jmx). I specify the consumer group using:

String myConsumerGroupId = myTopic + "_consumer";

props.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG,
 myConsumerGroupId);

I am running with this setup on about 20 consumers (each consuming a unique
topic) and I only see one of my consumers not passing any messages to my
application even though I see that the jmx console says it is receiving 5
requests per second. The other 19 seem to be working fine.

Each of these 20 topics was created when a message was sent to it i.e. it
was not provisioned from before. Messages currently are only being sent to
partition 0 even though there are 8 partitions per topic.


Thanks,

Rajiv

On Wed, Mar 9, 2016 at 4:30 PM, Rajiv Kurian <ra...@signalfx.com> wrote:

> Also forgot to mention that when I do consume with the console consumer I
> do see data coming through.
>
> On Wed, Mar 9, 2016 at 3:44 PM, Rajiv Kurian <ra...@signalfx.com> wrote:
>
>> I am running the 0.9.0.1 broker with the 0.9 consumer. I am using the
>> subscribe feature on the consumer to subscribe to a topic with 8 partitions.
>>
>> consumer.subscribe(Arrays.asList(myTopic));
>>
>> I have a single consumer group for said topic and a single process
>> subscribed with 8 partitions.
>>
>> When I use jmx on the consumer I do see that it has 8 partitions assigned
>> to it according to the consumer-coordinator-metrics mbean. How can I tell
>> what topic it is listening to? I couldn't find this on jmx anywhere. I do
>> see that it is getting 5 responses per second according to the
>> consumer-metrics mbean but I don't see any in my actual application.
>>
>> I consume my messages like this:
>>
>>     public int poll(SubscriptionDataHandler handler, long timeout) {
>>
>>         ConsumerRecords<Void, byte[]> records = null;
>>
>>         try {
>>
>>             records = consumer.poll(timeout);
>>
>>         } catch (Exception e) {
>>
>>             logger.error("Exception polling the Kafka , e);  // Don't
>> see any exceptions here
>>
>>             return -1;
>>
>>         }
>>
>>         int numBuffers = 0;
>>
>>         if (records != null) {
>>
>>             for (ConsumerRecord<Void, byte[]> record : records) {
>>
>>                 byte[] payload = record.value();
>>
>>                 if (payload != null && payload.length > 0) {
>>
>>                     ByteBuffer wrappedBuffer = ByteBuffer.wrap(payload);
>>
>>                     try {
>>
>>                         handler.handleData(wrappedBuffer); // This is
>> never called
>>
>>                     } catch (Exception e) {
>>
>>                         logger.error("Exception consuming buffer , e);
>>
>>                     }
>>
>>                     numBuffers += 1;  // This is never incremented.
>>
>>                 }
>>
>>             }
>>
>>
>>             // Commit only after consuming.
>>
>>             consumer.commitAsync(offsetCommitCallback);
>>
>>         }
>>
>> I also don't see any data in the consumers folder in ZK. In fact it is
>> completely empty.
>>
>> When I use the console-consumer, I do see the console-consumer show up in
>> the consumers folder, but none of my actual consumers show up.
>>
>> I tried looking for jmx data on the servers too and couldn't quite figure
>> out where I can get jmax.
>>
>> I am trying to figure out why the Kafka consumer thinks it is getting
>> messages ( 5 responses/second according to jmx) but I don't get any in my
>> application.
>>
>> Thanks,
>> Rajiv
>>
>>
>>
>

Reply via email to