Kafka has fixed retention window for topics. If the retention is full, it will
drop the messages from the tail.
So here what 'might' be happening : the rate at which you are pushing the data
to kafka is faster than the rate at which the consumers can consume the
messages.
Assume you have full kafka queue, with incoming message rate twice that of
consumption rate, i.e. you get two messages incoming per time unit and one
message consumed per time unit. Let us say that the consumer is at offset 'x'
which is at the tail of the kafka queue. After consuming data, next time the
consumer goes and asks for offset 'x+1'. However since the rate of incoming
data is double, kafka has already pushed out 'x' and 'x+1' from the tail and is
now it is pointing to 'x+2'.
Kafka will reset the offset to 'x+2' for consumer and issue a warning that the
offset 'x+1' is not there. This cycle will continue.
-Nikhil
On Wednesday, May 20, 2015 3:24 PM, Benjamin Cuthbert
<[email protected]> wrote:
I am seeing the following:
41828 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.KafkaUtils - Task
[1/1] assigned [Partition{host=node:9092, partition=0}]41829
[Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - Task [1/1]
Deleted partition managers: []41829 [Thread-15-rawWarehousePriceSpout] INFO
storm.kafka.ZkCoordinator - Task [1/1] New partition managers:
[Partition{host=node:9092, partition=0}]41930
[Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - Read
partition information from:
/kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_0 --> null42399
[Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - No
partition information found, using configuration to determine offset42399
[Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager -
Starting Kafka price-engine-demo-server.c.celertech-01.internal:0 from offset
392731342400 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator
- Task [1/1] Finished refreshing45097 [ProcessThread(sid:0 cport:-1):] INFO
org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level
KeeperException when processing sessionid:0x14d72fc889d000c type:create
cxid:0x3 zxid:0x2c txntype:-1 reqpath:n/a Error
Path:/kafkastorm/warehouse_prices/rawWarehousePriceSpout Error:KeeperErrorCode
= NoNode for /kafkastorm/warehouse_prices/rawWarehousePriceSpout
When viewing the zookeeper information there is no path there. Does it get
created automatically?
On 19 May 2015, at 18:19, Benjamin Cuthbert <[email protected]> wrote:
The kafka logs have this
[2015-05-19 17:13:39,772] ERROR [KafkaApi-0] Error when processing fetch
request for partition [warehouse_prices,0] offset 73792051 from consumer with
correlation id 0
(kafka.server.KafkaApis)kafka.common.OffsetOutOfRangeException: Request for
offset 73792051 but we only have log segments in the range 74630863 to
75835831. at kafka.log.Log.read(Log.scala:380) at
kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530)
at
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476)
at
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
How can that be? How can the requestor be behind kafka?
On 19 May 2015, at 14:54, Benjamin Cuthbert <[email protected]> wrote:
Hi Jeff,
So I looked at the docs and I reset the following property:
SpoutConfig spoutConfig = new SpoutConfig( hosts, topic, // topic to read from
KAFKA_STORM_DIR, // the root path in Zookeeper for the spout to store the
consumer offsets newSpoutId); // an id for this consumer for storing the
consumer offsets in Zookeeper
//Check if we should be consuming messages from the beginning
spoutConfig.forceFromStart = consumeFromBeginning; spoutConfig.maxOffsetBehind
= Long.MAX_VALUE; spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = false;
But after an hour of processing details I see
2015-05-19T13:13:03.242+0000 s.k.KafkaUtils [ERROR] Error fetching data from
[Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
partition=0}] for topic [warehouse_prices]:
[OFFSET_OUT_OF_RANGE]2015-05-19T13:13:03.242+0000 s.k.KafkaSpout [WARN] Fetch
failedstorm.kafka.FailedFetchException: Error fetching data from
[Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
partition=0}] for topic [warehouse_prices]: [OFFSET_OUT_OF_RANGE] at
storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:190)
~[celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na] at
storm.kafka.PartitionManager.fill(PartitionManager.java:162)
~[celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na] at
storm.kafka.PartitionManager.next(PartitionManager.java:124)
~[celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na] at
storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:141)
~[celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na] at
backtype.storm.daemon.executor$fn__4654$fn__4669$fn__4698.invoke(executor.clj:565)
[storm-core-0.9.4.jar:0.9.4] at
backtype.storm.util$async_loop$fn__458.invoke(util.clj:463)
[storm-core-0.9.4.jar:0.9.4] at clojure.lang.AFn.run(AFn.java:24)
[clojure-1.5.1.jar:na] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79]
On 18 May 2015, at 22:13, Benjamin Cuthbert <[email protected]> wrote:
Thanks Jeff,
So I looked over the docs but I don’t understand is it runs for 2+ hours then
just starts going:
2015-05-18T22:12:53.673+0100 s.k.PartitionManager [WARN] Using new offset:
644298922015-05-18T22:12:53.705+0100 s.k.KafkaUtils [WARN] Got fetch request
with offset out of range: [63610973]; retrying with default start offset time
from configuration. configured start offset time:
[-2]2015-05-18T22:12:53.743+0100 s.k.PartitionManager [WARN] Using new offset:
644299342015-05-18T22:12:53.773+0100 s.k.KafkaUtils [WARN] Got fetch request
with offset out of range: [63610973]; retrying with default start offset time
from configuration. configured start offset time: [-2]
So why does the offset get reset?
On 18 May 2015, at 20:37, Jeffery Maass <[email protected]> wrote:
The answer will be in how you configured the kafka spout. If after reading the
below, you still need help, please grab the values for all of the settings
mentioned in the doc and send them on.
See this document about the Kafka Spout:
http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.2.0/Storm_UG_v22/Storm_UG_v22.pdf
See this archive message:
http://mail-archives.apache.org/mod_mbox/storm-user/201503.mbox/%3ccahzwdygra33uiv+po01mvthvrjsxvqlvx6babymzye8zu_d...@mail.gmail.com%3E
"
Not exactly.. forceFromStart=true will tell the spout to start reading
fromwhatever is set in startOffsetTime (available options are the
earliestoffset or the latest offset). If forceFromStart=false then
startOffsetTimeis not used at all and the offset is just retrieved from
zookeeper, if it'savailable.
The "Start" in "forceFromStart" has nothing to do with consuming from
thebeginning of the topic. I interpret it as referring to whether you aregoing
to force starting consumption from a different offset.
"
Thank you for your time!
+++++++++++++++++++++
Jeff Maass
linkedin.com/in/jeffmaass
stackoverflow.com/users/373418/maassql
+++++++++++++++++++++
On Mon, May 18, 2015 at 1:53 PM, Benjamin Cuthbert <[email protected]>
wrote:
All,
We are getting loads of these errors
2015-05-18T19:52:44.038+0100 s.k.KafkaUtils [WARN] Got fetch request with
offset out of range: [62379213]; retrying with default start offset time from
configuration. configured start offset time: [-2]
2015-05-18T19:52:44.066+0100 s.k.PartitionManager [WARN] Using new offset:
63495047
And it is stopping messages being pulled from Kafka into the spout and onto the
bolts. Any ideas on how to fix this?