I am seeing the following:
41828 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.KafkaUtils - Task
[1/1] assigned [Partition{host=node:9092, partition=0}]
41829 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - Task
[1/1] Deleted partition managers: []
41829 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - Task
[1/1] New partition managers: [Partition{host=node:9092, partition=0}]
41930 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager -
Read partition information from:
/kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_0 --> null
42399 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager -
No partition information found, using configuration to determine offset
42399 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager -
Starting Kafka price-engine-demo-server.c.celertech-01.internal:0 from offset
3927313
42400 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - Task
[1/1] Finished refreshing
45097 [ProcessThread(sid:0 cport:-1):] INFO
org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level
KeeperException when processing sessionid:0x14d72fc889d000c type:create
cxid:0x3 zxid:0x2c txntype:-1 reqpath:n/a Error
Path:/kafkastorm/warehouse_prices/rawWarehousePriceSpout Error:KeeperErrorCode
= NoNode for /kafkastorm/warehouse_prices/rawWarehousePriceSpout
When viewing the zookeeper information there is no path there. Does it get
created automatically?
> On 19 May 2015, at 18:19, Benjamin Cuthbert <[email protected]> wrote:
>
> The kafka logs have this
>
> [2015-05-19 17:13:39,772] ERROR [KafkaApi-0] Error when processing fetch
> request for partition [warehouse_prices,0] offset 73792051 from consumer with
> correlation id 0 (kafka.server.KafkaApis)
> kafka.common.OffsetOutOfRangeException: Request for offset 73792051 but we
> only have log segments in the range 74630863 to 75835831.
> at kafka.log.Log.read(Log.scala:380)
> at
> kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
> at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
> at
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>
> How can that be? How can the requestor be behind kafka?
>
>
>> On 19 May 2015, at 14:54, Benjamin Cuthbert <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> Hi Jeff,
>>
>> So I looked at the docs and I reset the following property:
>>
>> SpoutConfig spoutConfig = new SpoutConfig(
>> hosts,
>> topic, // topic to read from
>> KAFKA_STORM_DIR, // the root path in Zookeeper
>> for the spout to store the consumer offsets
>> newSpoutId); // an id for this consumer for
>> storing the consumer offsets in Zookeeper
>>
>> //Check if we should be consuming messages from the beginning
>> spoutConfig.forceFromStart = consumeFromBeginning;
>> spoutConfig.maxOffsetBehind = Long.MAX_VALUE;
>> spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = false;
>>
>> But after an hour of processing details I see
>>
>> 2015-05-19T13:13:03.242+0000 s.k.KafkaUtils [ERROR] Error fetching data from
>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>> partition=0}] for topic [warehouse_prices]: [OFFSET_OUT_OF_RANGE]
>> 2015-05-19T13:13:03.242+0000 s.k.KafkaSpout [WARN] Fetch failed
>> storm.kafka.FailedFetchException: Error fetching data from
>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>> partition=0}] for topic [warehouse_prices]: [OFFSET_OUT_OF_RANGE]
>> at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:190)
>> ~[celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na]
>> at storm.kafka.PartitionManager.fill(PartitionManager.java:162)
>> ~[celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na]
>> at storm.kafka.PartitionManager.next(PartitionManager.java:124)
>> ~[celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na]
>> at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:141)
>> ~[celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na]
>> at
>> backtype.storm.daemon.executor$fn__4654$fn__4669$fn__4698.invoke(executor.clj:565)
>> [storm-core-0.9.4.jar:0.9.4]
>> at backtype.storm.util$async_loop$fn__458.invoke(util.clj:463)
>> [storm-core-0.9.4.jar:0.9.4]
>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>> at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
>>
>>
>>> On 18 May 2015, at 22:13, Benjamin Cuthbert <[email protected]
>>> <mailto:[email protected]>> wrote:
>>>
>>> Thanks Jeff,
>>>
>>> So I looked over the docs but I don’t understand is it runs for 2+ hours
>>> then just starts going:
>>>
>>> 2015-05-18T22:12:53.673+0100 s.k.PartitionManager [WARN] Using new offset:
>>> 64429892
>>> 2015-05-18T22:12:53.705+0100 s.k.KafkaUtils [WARN] Got fetch request with
>>> offset out of range: [63610973]; retrying with default start offset time
>>> from configuration. configured start offset time: [-2]
>>> 2015-05-18T22:12:53.743+0100 s.k.PartitionManager [WARN] Using new offset:
>>> 64429934
>>> 2015-05-18T22:12:53.773+0100 s.k.KafkaUtils [WARN] Got fetch request with
>>> offset out of range: [63610973]; retrying with default start offset time
>>> from configuration. configured start offset time: [-2]
>>>
>>> So why does the offset get reset?
>>>
>>>
>>>> On 18 May 2015, at 20:37, Jeffery Maass <[email protected]
>>>> <mailto:[email protected]>> wrote:
>>>>
>>>> The answer will be in how you configured the kafka spout. If after
>>>> reading the below, you still need help, please grab the values for all of
>>>> the settings mentioned in the doc and send them on.
>>>>
>>>> See this document about the Kafka Spout:
>>>> http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.2.0/Storm_UG_v22/Storm_UG_v22.pdf
>>>>
>>>> <http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.2.0/Storm_UG_v22/Storm_UG_v22.pdf>
>>>>
>>>> See this archive message:
>>>> http://mail-archives.apache.org/mod_mbox/storm-user/201503.mbox/%3ccahzwdygra33uiv+po01mvthvrjsxvqlvx6babymzye8zu_d...@mail.gmail.com%3E
>>>>
>>>> <http://mail-archives.apache.org/mod_mbox/storm-user/201503.mbox/%3ccahzwdygra33uiv+po01mvthvrjsxvqlvx6babymzye8zu_d...@mail.gmail.com%3E>
>>>>
>>>> "
>>>> Not exactly.. forceFromStart=true will tell the spout to start reading from
>>>> whatever is set in startOffsetTime (available options are the earliest
>>>> offset or the latest offset). If forceFromStart=false then startOffsetTime
>>>> is not used at all and the offset is just retrieved from zookeeper, if it's
>>>> available.
>>>>
>>>> The "Start" in "forceFromStart" has nothing to do with consuming from the
>>>> beginning of the topic. I interpret it as referring to whether you are
>>>> going to force starting consumption from a different offset.
>>>> "
>>>>
>>>> Thank you for your time!
>>>>
>>>> +++++++++++++++++++++
>>>> Jeff Maass <mailto:[email protected]>
>>>> linkedin.com/in/jeffmaass <http://linkedin.com/in/jeffmaass>
>>>> stackoverflow.com/users/373418/maassql
>>>> <http://stackoverflow.com/users/373418/maassql>
>>>> +++++++++++++++++++++
>>>>
>>>>
>>>> On Mon, May 18, 2015 at 1:53 PM, Benjamin Cuthbert <[email protected]
>>>> <mailto:[email protected]>> wrote:
>>>> All,
>>>>
>>>> We are getting loads of these errors
>>>>
>>>> 2015-05-18T19:52:44.038+0100 s.k.KafkaUtils [WARN] Got fetch request with
>>>> offset out of range: [62379213]; retrying with default start offset time
>>>> from configuration. configured start offset time: [-2]
>>>> 2015-05-18T19:52:44.066+0100 s.k.PartitionManager [WARN] Using new offset:
>>>> 63495047
>>>>
>>>> And it is stopping messages being pulled from Kafka into the spout and
>>>> onto the bolts. Any ideas on how to fix this?
>>>>
>>>
>>
>