Hi,
I am migrating an application from Kafka to Pulsar, I want to know
which is the ID of the latest message sent to a topic.
My case is that I would like to know if the reader has already
processed all available data.
I am using the Reader API because I need full control on the consumer
side about the portion of data to process (I will store the last
Message ID on a DB).
In Kafka I had a "consumer#seekToEnd + consumer#position" API to move
to the end of a topic and then read the offsets for each partition.
In Pulsar I am trying to achieve the same goal with:
try (Reader<byte[]> pulsarReader = pulsarClient.newReader()
.startMessageId(MessageId.latest)
.topic(topicName)
.create()) {
MessageId lastMessageId = null;
Message<byte[]> readNext = pulsarReader.readNext(1,
TimeUnit.SECONDS);
if (readNext != null) {
lastMessageId = readNext.getMessageId();
}
}
This is not working, as within 1 second it is not guaranteed to have a
message....
Which is the best way ?
Thanks
Enrico