Hi Jeff,
So I looked at the docs and I reset the following property:
SpoutConfig spoutConfig = new SpoutConfig(
hosts,
topic, // topic to read from
KAFKA_STORM_DIR, // the root path in Zookeeper
for the spout to store the consumer offsets
newSpoutId); // an id for this consumer for
storing the consumer offsets in Zookeeper
//Check if we should be consuming messages from the beginning
spoutConfig.forceFromStart = consumeFromBeginning;
spoutConfig.maxOffsetBehind = Long.MAX_VALUE;
spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = false;
But after an hour of processing details I see
2015-05-19T13:13:03.242+0000 s.k.KafkaUtils [ERROR] Error fetching data from
[Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
partition=0}] for topic [warehouse_prices]: [OFFSET_OUT_OF_RANGE]
2015-05-19T13:13:03.242+0000 s.k.KafkaSpout [WARN] Fetch failed
storm.kafka.FailedFetchException: Error fetching data from
[Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
partition=0}] for topic [warehouse_prices]: [OFFSET_OUT_OF_RANGE]
at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:190)
~[celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na]
at storm.kafka.PartitionManager.fill(PartitionManager.java:162)
~[celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na]
at storm.kafka.PartitionManager.next(PartitionManager.java:124)
~[celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na]
at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:141)
~[celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na]
at
backtype.storm.daemon.executor$fn__4654$fn__4669$fn__4698.invoke(executor.clj:565)
[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.util$async_loop$fn__458.invoke(util.clj:463)
[storm-core-0.9.4.jar:0.9.4]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
> On 18 May 2015, at 22:13, Benjamin Cuthbert <[email protected]> wrote:
>
> Thanks Jeff,
>
> So I looked over the docs but I don’t understand is it runs for 2+ hours then
> just starts going:
>
> 2015-05-18T22:12:53.673+0100 s.k.PartitionManager [WARN] Using new offset:
> 64429892
> 2015-05-18T22:12:53.705+0100 s.k.KafkaUtils [WARN] Got fetch request with
> offset out of range: [63610973]; retrying with default start offset time from
> configuration. configured start offset time: [-2]
> 2015-05-18T22:12:53.743+0100 s.k.PartitionManager [WARN] Using new offset:
> 64429934
> 2015-05-18T22:12:53.773+0100 s.k.KafkaUtils [WARN] Got fetch request with
> offset out of range: [63610973]; retrying with default start offset time from
> configuration. configured start offset time: [-2]
>
> So why does the offset get reset?
>
>
>> On 18 May 2015, at 20:37, Jeffery Maass <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> The answer will be in how you configured the kafka spout. If after reading
>> the below, you still need help, please grab the values for all of the
>> settings mentioned in the doc and send them on.
>>
>> See this document about the Kafka Spout:
>> http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.2.0/Storm_UG_v22/Storm_UG_v22.pdf
>>
>> <http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.2.0/Storm_UG_v22/Storm_UG_v22.pdf>
>>
>> See this archive message:
>> http://mail-archives.apache.org/mod_mbox/storm-user/201503.mbox/%3ccahzwdygra33uiv+po01mvthvrjsxvqlvx6babymzye8zu_d...@mail.gmail.com%3E
>>
>> <http://mail-archives.apache.org/mod_mbox/storm-user/201503.mbox/%3ccahzwdygra33uiv+po01mvthvrjsxvqlvx6babymzye8zu_d...@mail.gmail.com%3E>
>>
>> "
>> Not exactly.. forceFromStart=true will tell the spout to start reading from
>> whatever is set in startOffsetTime (available options are the earliest
>> offset or the latest offset). If forceFromStart=false then startOffsetTime
>> is not used at all and the offset is just retrieved from zookeeper, if it's
>> available.
>>
>> The "Start" in "forceFromStart" has nothing to do with consuming from the
>> beginning of the topic. I interpret it as referring to whether you are
>> going to force starting consumption from a different offset.
>> "
>>
>> Thank you for your time!
>>
>> +++++++++++++++++++++
>> Jeff Maass <mailto:[email protected]>
>> linkedin.com/in/jeffmaass <http://linkedin.com/in/jeffmaass>
>> stackoverflow.com/users/373418/maassql
>> <http://stackoverflow.com/users/373418/maassql>
>> +++++++++++++++++++++
>>
>>
>> On Mon, May 18, 2015 at 1:53 PM, Benjamin Cuthbert <[email protected]
>> <mailto:[email protected]>> wrote:
>> All,
>>
>> We are getting loads of these errors
>>
>> 2015-05-18T19:52:44.038+0100 s.k.KafkaUtils [WARN] Got fetch request with
>> offset out of range: [62379213]; retrying with default start offset time
>> from configuration. configured start offset time: [-2]
>> 2015-05-18T19:52:44.066+0100 s.k.PartitionManager [WARN] Using new offset:
>> 63495047
>>
>> And it is stopping messages being pulled from Kafka into the spout and onto
>> the bolts. Any ideas on how to fix this?
>>
>