Kafka has fixed retention window for topics. If the retention is full, it will 
drop the messages from the tail. 
So here what 'might' be happening : the rate at which you are pushing the data 
to kafka is faster than the rate at which the consumers can consume the 
messages.
Assume you have full kafka queue, with incoming message rate twice that of 
consumption rate, i.e. you get two messages incoming per time unit and one 
message consumed per time unit. Let us say that the consumer is at offset 'x' 
which is at the tail of the kafka queue. After consuming data, next time the 
consumer goes and asks for offset 'x+1'. However since the rate of incoming 
data is double, kafka has already pushed out 'x' and 'x+1' from the tail and is 
now it is pointing to 'x+2'.

Kafka will reset the offset to 'x+2' for consumer and issue a warning that the 
offset 'x+1' is not there. This cycle will continue.
-Nikhil 


     On Wednesday, May 20, 2015 3:24 PM, Benjamin Cuthbert 
<[email protected]> wrote:
   

 I am seeing the following:
41828 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.KafkaUtils - Task 
[1/1] assigned [Partition{host=node:9092, partition=0}]41829 
[Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - Task [1/1] 
Deleted partition managers: []41829 [Thread-15-rawWarehousePriceSpout] INFO  
storm.kafka.ZkCoordinator - Task [1/1] New partition managers: 
[Partition{host=node:9092, partition=0}]41930 
[Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager - Read 
partition information from: 
/kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_0  --> null42399 
[Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager - No 
partition information found, using configuration to determine offset42399 
[Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager - 
Starting Kafka price-engine-demo-server.c.celertech-01.internal:0 from offset 
392731342400 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator 
- Task [1/1] Finished refreshing45097 [ProcessThread(sid:0 cport:-1):] INFO  
org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level 
KeeperException when processing sessionid:0x14d72fc889d000c type:create 
cxid:0x3 zxid:0x2c txntype:-1 reqpath:n/a Error 
Path:/kafkastorm/warehouse_prices/rawWarehousePriceSpout Error:KeeperErrorCode 
= NoNode for /kafkastorm/warehouse_prices/rawWarehousePriceSpout
When viewing the zookeeper information there is no path there. Does it get 
created automatically?
On 19 May 2015, at 18:19, Benjamin Cuthbert <[email protected]> wrote:

The kafka logs have this
[2015-05-19 17:13:39,772] ERROR [KafkaApi-0] Error when processing fetch 
request for partition [warehouse_prices,0] offset 73792051 from consumer with 
correlation id 0 
(kafka.server.KafkaApis)kafka.common.OffsetOutOfRangeException: Request for 
offset 73792051 but we only have log segments in the range 74630863 to 
75835831. at kafka.log.Log.read(Log.scala:380) at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
 at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
 at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
How can that be? How can the requestor be behind kafka?


On 19 May 2015, at 14:54, Benjamin Cuthbert <[email protected]> wrote:

Hi Jeff,
So I looked at the docs and I reset the following property:
SpoutConfig spoutConfig = new SpoutConfig( hosts, topic, // topic to read from 
KAFKA_STORM_DIR, // the root path in Zookeeper for the spout to store the 
consumer offsets newSpoutId); // an id for this consumer for storing the 
consumer offsets in Zookeeper        
 //Check if we should be consuming messages from the beginning 
spoutConfig.forceFromStart = consumeFromBeginning; spoutConfig.maxOffsetBehind 
= Long.MAX_VALUE; spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = false;
But after an hour of processing details I see
2015-05-19T13:13:03.242+0000 s.k.KafkaUtils [ERROR] Error fetching data from 
[Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
partition=0}] for topic [warehouse_prices]: 
[OFFSET_OUT_OF_RANGE]2015-05-19T13:13:03.242+0000 s.k.KafkaSpout [WARN] Fetch 
failedstorm.kafka.FailedFetchException: Error fetching data from 
[Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
partition=0}] for topic [warehouse_prices]: [OFFSET_OUT_OF_RANGE] at 
storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:190) 
~[celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na] at 
storm.kafka.PartitionManager.fill(PartitionManager.java:162) 
~[celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na] at 
storm.kafka.PartitionManager.next(PartitionManager.java:124) 
~[celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na] at 
storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:141) 
~[celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na] at 
backtype.storm.daemon.executor$fn__4654$fn__4669$fn__4698.invoke(executor.clj:565)
 [storm-core-0.9.4.jar:0.9.4] at 
backtype.storm.util$async_loop$fn__458.invoke(util.clj:463) 
[storm-core-0.9.4.jar:0.9.4] at clojure.lang.AFn.run(AFn.java:24) 
[clojure-1.5.1.jar:na] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]


On 18 May 2015, at 22:13, Benjamin Cuthbert <[email protected]> wrote:

Thanks Jeff,
So I looked over the docs but I don’t understand is it runs for 2+ hours then 
just starts going:
2015-05-18T22:12:53.673+0100 s.k.PartitionManager [WARN] Using new offset: 
644298922015-05-18T22:12:53.705+0100 s.k.KafkaUtils [WARN] Got fetch request 
with offset out of range: [63610973]; retrying with default start offset time 
from configuration. configured start offset time: 
[-2]2015-05-18T22:12:53.743+0100 s.k.PartitionManager [WARN] Using new offset: 
644299342015-05-18T22:12:53.773+0100 s.k.KafkaUtils [WARN] Got fetch request 
with offset out of range: [63610973]; retrying with default start offset time 
from configuration. configured start offset time: [-2]
So why does the offset get reset?


On 18 May 2015, at 20:37, Jeffery Maass <[email protected]> wrote:
The answer will be in how you configured the kafka spout.  If after reading the 
below, you still need help, please grab the values for all of the settings 
mentioned in the doc and send them on.

See this document about the Kafka Spout:
http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.2.0/Storm_UG_v22/Storm_UG_v22.pdf

See this archive message:
http://mail-archives.apache.org/mod_mbox/storm-user/201503.mbox/%3ccahzwdygra33uiv+po01mvthvrjsxvqlvx6babymzye8zu_d...@mail.gmail.com%3E

"
Not exactly.. forceFromStart=true will tell the spout to start reading 
fromwhatever is set in startOffsetTime (available options are the 
earliestoffset or the latest offset). If forceFromStart=false then 
startOffsetTimeis not used at all and the offset is just retrieved from 
zookeeper, if it'savailable.
The "Start" in "forceFromStart" has nothing to do with consuming from 
thebeginning of the topic. I interpret it as referring to whether you aregoing 
to force starting consumption from a different offset.
"

Thank you for your time!

+++++++++++++++++++++
Jeff Maass
linkedin.com/in/jeffmaass
stackoverflow.com/users/373418/maassql
+++++++++++++++++++++


On Mon, May 18, 2015 at 1:53 PM, Benjamin Cuthbert <[email protected]> 
wrote:

All,

We are getting loads of these errors

2015-05-18T19:52:44.038+0100 s.k.KafkaUtils [WARN] Got fetch request with 
offset out of range: [62379213]; retrying with default start offset time from 
configuration. configured start offset time: [-2]
2015-05-18T19:52:44.066+0100 s.k.PartitionManager [WARN] Using new offset: 
63495047

And it is stopping messages being pulled from Kafka into the spout and onto the 
bolts. Any ideas on how to fix this?








  

Reply via email to