It does read from the stored offsets. For the first time when you deploy
the topology and if you intend to read from the beginning of the topic
than set forceFromStart=true. If you kill and redeploy the topology and
you want to read from last saved position than make sure you set
forceFromStart=false. While topology is running it does save the offsets
into provided zookeeper root and replays the failed tuples based on the
saved offset. Here is the code where its reading from zookeeper
https://github.com/apache/storm/blob/master/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java#L71
On Sun, Dec 28, 2014, at 05:14 PM, 이승진 wrote:
> Hi all,
>
> Is there any way for a kafka spout to read from the committed offset
> in zookeeper?
>
> Based on two test cases in storm-kafka, it seems like there is no such
> way but ways to read from the end or from the beginning.
>
> thanks in advance.
>
> @Test public void getOffsetFromConfigAndDontForceFromStart() {
> config.forceFromStart = false; config.startOffsetTime =
> OffsetRequest.EarliestTime(); createTopicAndSendMessage(); long
> latestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0,
> OffsetRequest.LatestTime()); long offsetFromConfig =
> KafkaUtils.getOffset(simpleConsumer, config.topic, 0, config);
> assertThat(latestOffset, is(equalTo(offsetFromConfig))); }
>
> @Test public void getOffsetFromConfigAndFroceFromStart() {
> config.forceFromStart = true; config.startOffsetTime =
> OffsetRequest.EarliestTime(); createTopicAndSendMessage(); long
> earliestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0,
> OffsetRequest.EarliestTime()); long offsetFromConfig =
> KafkaUtils.getOffset(simpleConsumer, config.topic, 0, config);
> assertThat(earliestOffset, is(equalTo(offsetFromConfig))); }
>
>
>