Hello all:
I’m using Kafka version 0.11.0.1, with the new Java consumer API (same
version), and commit offsets to Kafka.
I want to get the consumer lags, so I use the following operation command:
$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9093 --describe
--group foo.test.consumers
Note: This will only show information about consumers that use the Java
consumer API (non-ZooKeeper-based consumers).
Consumer group ‘foo.test.consumers' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST
CLIENT-ID
foo 0 109929174 109929190 16 - -
-
foo 2 109929222 109929240 18 - -
-
foo 1 109929004 109929023 19 - -
-
I have 2 questions regarding the output above:
1. What does the statement “Consumer group ‘foo.test.consumers' has no active
members.” mean? My consumers are working correctly and using the sole group
“foo.test.consumers”. It doesn’t make sense from the statement's literature
meaning.
I googled it and the few results are useless.
2. Why are the “CONSUMER-ID HOST CLIENT-ID” are all “-“? I didn’t find any
information about “CONSUMER-ID” or “HOST" in the Kafka documents nor the
Consumer’s JavaDoc. Thought I did find out how to set a client id in the
KafkaConsumer, but even if I explicitly set it, the “CLIENT-ID” in the command
output is still “-".
Here's my code snippet about creating a KafkaConsumer:
Properties props = new Properties();
props.put("enable.auto.commit", "false");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("group.id", "foo.test.consumers");
props.put("client.id", "1234");
KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(props);
I’m manually assigning a topic-partition to the KafkaConsumer:
kafkaConsumer.assign(Collections.singleton(new TopicPartition("foo", 0)));
And manually committing offsets to Kafka:
String metadata = localhost + "@" + System.currentTimeMillis();
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(uncommittedOffset +
1, metadata);
kafkaConsumer.commitSync(Collections.singletonMap(new TopicPartition("foo", 0),
offsetAndMetadata));
Thanks!