See SimpleConsumer. getOffsetsBefore
and the .... getLastOffset example here:
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
whichTime=-1 or -2 (depending whether you want the latest or earliest offset.
However, I've been banging my head against the wall with this API --
it seems that, even with a single broker and a single host, sometimes
the latest offset gets frozen as new data comes in. :-/
public static long getLastOffset(SimpleConsumer consumer, String
topic, int partition,
long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new
TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo
= new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new
PartitionOffsetRequestInfo(whichTime, 1));
kafka.javaapi.OffsetRequest request = new
kafka.javaapi.OffsetRequest(requestInfo,
kafka.api.OffsetRequest.CurrentVersion(),clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
if (response.hasError()) {
System.out.println("Error fetching data Offset Data the
Broker. Reason: " + response.errorCode(topic, partition) );
return 0;
}
long[] offsets = response.offsets(topic, partition);
return offsets[0];
}
On Sun, Feb 22, 2015 at 9:05 PM, Bhuvana Baskar
<[email protected]> wrote:
> Hi,
>
> Please let me know how to find the total number of messages in a particular
> topic.
>
> Regards,
> Bhuvana