Hi all,
Is there any way for a kafka spout to read from the committed offset in 
zookeeper?
Based on two test cases in storm-kafka, it seems like there is no such way but 
ways to read from the end or from the beginning.
thanks in advance.
    @Test    public void getOffsetFromConfigAndDontForceFromStart() {        
config.forceFromStart = false;        config.startOffsetTime = 
OffsetRequest.EarliestTime();        createTopicAndSendMessage();        long 
latestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, 
OffsetRequest.LatestTime());        long offsetFromConfig = 
KafkaUtils.getOffset(simpleConsumer, config.topic, 0, config);        
assertThat(latestOffset, is(equalTo(offsetFromConfig)));    }
    @Test    public void getOffsetFromConfigAndFroceFromStart() {        
config.forceFromStart = true;        config.startOffsetTime = 
OffsetRequest.EarliestTime();        createTopicAndSendMessage();        long 
earliestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, 
OffsetRequest.EarliestTime());        long offsetFromConfig = 
KafkaUtils.getOffset(simpleConsumer, config.topic, 0, config);        
assertThat(earliestOffset, is(equalTo(offsetFromConfig)));    }

 

Reply via email to