Hi all,
Is there any way for a kafka spout to read from the committed offset in
zookeeper?
Based on two test cases in storm-kafka, it seems like there is no such way but
ways to read from the end or from the beginning.
thanks in advance.
@Test public void getOffsetFromConfigAndDontForceFromStart() {
config.forceFromStart = false; config.startOffsetTime =
OffsetRequest.EarliestTime(); createTopicAndSendMessage(); long
latestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0,
OffsetRequest.LatestTime()); long offsetFromConfig =
KafkaUtils.getOffset(simpleConsumer, config.topic, 0, config);
assertThat(latestOffset, is(equalTo(offsetFromConfig))); }
@Test public void getOffsetFromConfigAndFroceFromStart() {
config.forceFromStart = true; config.startOffsetTime =
OffsetRequest.EarliestTime(); createTopicAndSendMessage(); long
earliestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0,
OffsetRequest.EarliestTime()); long offsetFromConfig =
KafkaUtils.getOffset(simpleConsumer, config.topic, 0, config);
assertThat(earliestOffset, is(equalTo(offsetFromConfig))); }