We need to pass "--new-consumer" property to kafka-consumer-groups.sh
command to use new consumer.

sh kafka-consumer-groups.sh  --bootstrap-server localhost:9092 --list
 --new-consumer


On Thu, Mar 10, 2016 at 12:02 PM, Rajiv Kurian <ra...@signalfx.com> wrote:

> Hi Guozhang,
>
> I tried using the kafka-consumer-groups.sh --list command and it says I
> have no consumer groups set up at all. Yet I am receiving data on 19 out of
> 20 consumer processes (each with their own topic and consumer group).
>
> Here is my full kafka config as printed when my process started up:
>
> metric.reporters = []
>
>         metadata.max.age.ms = 300000
>
>         value.deserializer = class
> sf.org.apache.kafka9.common.serialization.ByteArrayDeserializer
>
>         group.id = myTopic_consumer
>
>         partition.assignment.strategy =
> [sf.org.apache.kafka9.clients.consumer.RangeAssignor]
>
>         reconnect.backoff.ms = 50
>
>         sasl.kerberos.ticket.renew.window.factor = 0.8
>
>         max.partition.fetch.bytes = 1048576
>
>         bootstrap.servers = [myBroker1:9092, myBroker2:9092,
> myBroker3:9092]
>
>         retry.backoff.ms = 100
>
>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>
>         sasl.kerberos.service.name = null
>
>         sasl.kerberos.ticket.renew.jitter = 0.05
>
>         ssl.keystore.type = JKS
>
>         ssl.trustmanager.algorithm = PKIX
>
>         enable.auto.commit = false
>
>         ssl.key.password = null
>
>         fetch.max.wait.ms = 1000
>
>         sasl.kerberos.min.time.before.relogin = 60000
>
>         connections.max.idle.ms = 540000
>
>         ssl.truststore.password = null
>
>         session.timeout.ms = 30000
>
>         metrics.num.samples = 2
>
>         client.id =
>
>         ssl.endpoint.identification.algorithm = null
>
>         key.deserializer = class sf.disco.kafka.VoidDeserializer
>
>         ssl.protocol = TLS
>
>         check.crcs = true
>
>         request.timeout.ms = 40000
>
>         ssl.provider = null
>
>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>
>         ssl.keystore.location = null
>
>         heartbeat.interval.ms = 3000
>
>         auto.commit.interval.ms = 5000
>
>         receive.buffer.bytes = 32768
>
>         ssl.cipher.suites = null
>
>         ssl.truststore.type = JKS
>
>         security.protocol = PLAINTEXT
>
>         ssl.truststore.location = null
>
>         ssl.keystore.password = null
>
>         ssl.keymanager.algorithm = SunX509
>
>         metrics.sample.window.ms = 30000
>
>         fetch.min.bytes = 256
>
>         send.buffer.bytes = 131072
>
>         auto.offset.reset = earliest
>
> It prints out the group.id field as myTopic_consumer. I was expecting to
> get this in the --list command and yet I am not getting it. Is this the
> name of the consumer group or am I missing something?
>
> I use the subscribe call on the consumer and my understanding was that the
> subscribe call would do all the work needed to create/join a group. Given I
> have a single consumer per group and a single group per topic I'd expect to
> see 20 groups (1 for each of my topics). However the --list returns no
> groups at all!
>
> Thanks,
> Rajiv
>
> On Wed, Mar 9, 2016 at 8:22 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Rajiv,
> >
> > In the new Java consumer you used, the ZK dependency has been removed and
> > hence you wont see any data from ZK path.
> >
> > To check the group metadata you can use the ConsumerGroupCommand, wrapped
> > in bin/kafka-consumer-groups.sh.
> >
> > Guozhang
> >
> > On Wed, Mar 9, 2016 at 5:48 PM, Rajiv Kurian <ra...@signalfx.com> wrote:
> >
> > > Don't think I made my questions clear:
> > >
> > > On Kafka 0.9.0.1 broker and 0.9 consumer how do I tell what my
> > > consumer-groups are? Can I still get this information in ZK? I don't
> see
> > > anything in the consumers folder which is alarming to me. This is
> > > especially alarming because I do see that 8 partitions are assigned on
> > the
> > > consumer (via jmx). I specify the consumer group using:
> > >
> > > String myConsumerGroupId = myTopic + "_consumer";
> > >
> > >
> >
> props.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG,
> > >  myConsumerGroupId);
> > >
> > > I am running with this setup on about 20 consumers (each consuming a
> > unique
> > > topic) and I only see one of my consumers not passing any messages to
> my
> > > application even though I see that the jmx console says it is
> receiving 5
> > > requests per second. The other 19 seem to be working fine.
> > >
> > > Each of these 20 topics was created when a message was sent to it i.e.
> it
> > > was not provisioned from before. Messages currently are only being sent
> > to
> > > partition 0 even though there are 8 partitions per topic.
> > >
> > >
> > > Thanks,
> > >
> > > Rajiv
> > >
> > > On Wed, Mar 9, 2016 at 4:30 PM, Rajiv Kurian <ra...@signalfx.com>
> wrote:
> > >
> > > > Also forgot to mention that when I do consume with the console
> > consumer I
> > > > do see data coming through.
> > > >
> > > > On Wed, Mar 9, 2016 at 3:44 PM, Rajiv Kurian <ra...@signalfx.com>
> > wrote:
> > > >
> > > >> I am running the 0.9.0.1 broker with the 0.9 consumer. I am using
> the
> > > >> subscribe feature on the consumer to subscribe to a topic with 8
> > > partitions.
> > > >>
> > > >> consumer.subscribe(Arrays.asList(myTopic));
> > > >>
> > > >> I have a single consumer group for said topic and a single process
> > > >> subscribed with 8 partitions.
> > > >>
> > > >> When I use jmx on the consumer I do see that it has 8 partitions
> > > assigned
> > > >> to it according to the consumer-coordinator-metrics mbean. How can I
> > > tell
> > > >> what topic it is listening to? I couldn't find this on jmx
> anywhere. I
> > > do
> > > >> see that it is getting 5 responses per second according to the
> > > >> consumer-metrics mbean but I don't see any in my actual application.
> > > >>
> > > >> I consume my messages like this:
> > > >>
> > > >>     public int poll(SubscriptionDataHandler handler, long timeout) {
> > > >>
> > > >>         ConsumerRecords<Void, byte[]> records = null;
> > > >>
> > > >>         try {
> > > >>
> > > >>             records = consumer.poll(timeout);
> > > >>
> > > >>         } catch (Exception e) {
> > > >>
> > > >>             logger.error("Exception polling the Kafka , e);  //
> Don't
> > > >> see any exceptions here
> > > >>
> > > >>             return -1;
> > > >>
> > > >>         }
> > > >>
> > > >>         int numBuffers = 0;
> > > >>
> > > >>         if (records != null) {
> > > >>
> > > >>             for (ConsumerRecord<Void, byte[]> record : records) {
> > > >>
> > > >>                 byte[] payload = record.value();
> > > >>
> > > >>                 if (payload != null && payload.length > 0) {
> > > >>
> > > >>                     ByteBuffer wrappedBuffer =
> > ByteBuffer.wrap(payload);
> > > >>
> > > >>                     try {
> > > >>
> > > >>                         handler.handleData(wrappedBuffer); // This
> is
> > > >> never called
> > > >>
> > > >>                     } catch (Exception e) {
> > > >>
> > > >>                         logger.error("Exception consuming buffer ,
> e);
> > > >>
> > > >>                     }
> > > >>
> > > >>                     numBuffers += 1;  // This is never incremented.
> > > >>
> > > >>                 }
> > > >>
> > > >>             }
> > > >>
> > > >>
> > > >>             // Commit only after consuming.
> > > >>
> > > >>             consumer.commitAsync(offsetCommitCallback);
> > > >>
> > > >>         }
> > > >>
> > > >> I also don't see any data in the consumers folder in ZK. In fact it
> is
> > > >> completely empty.
> > > >>
> > > >> When I use the console-consumer, I do see the console-consumer show
> up
> > > in
> > > >> the consumers folder, but none of my actual consumers show up.
> > > >>
> > > >> I tried looking for jmx data on the servers too and couldn't quite
> > > figure
> > > >> out where I can get jmax.
> > > >>
> > > >> I am trying to figure out why the Kafka consumer thinks it is
> getting
> > > >> messages ( 5 responses/second according to jmx) but I don't get any
> in
> > > my
> > > >> application.
> > > >>
> > > >> Thanks,
> > > >> Rajiv
> > > >>
> > > >>
> > > >>
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Reply via email to