Hi, Thanks for that so is the storm-kafka spout not fast enough to process the information? I do not think I am pushing that many messages into a kafka queue as the data has already been split up.
> On 21 May 2015, at 05:34, Nikhil Singh <[email protected]> wrote: > > Kafka has fixed retention window for topics. If the retention is full, it > will drop the messages from the tail. > > So here what 'might' be happening : the rate at which you are pushing the > data to kafka is faster than the rate at which the consumers can consume the > messages. > > Assume you have full kafka queue, with incoming message rate twice that of > consumption rate, i.e. you get two messages incoming per time unit and one > message consumed per time unit. Let us say that the consumer is at offset 'x' > which is at the tail of the kafka queue. After consuming data, next time the > consumer goes and asks for offset 'x+1'. However since the rate of incoming > data is double, kafka has already pushed out 'x' and 'x+1' from the tail and > is now it is pointing to 'x+2'. > > Kafka will reset the offset to 'x+2' for consumer and issue a warning that > the offset 'x+1' is not there. This cycle will continue. > > -Nikhil > > > > On Wednesday, May 20, 2015 3:24 PM, Benjamin Cuthbert > <[email protected]> wrote: > > > I am seeing the following: > > 41828 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.KafkaUtils - Task > [1/1] assigned [Partition{host=node:9092, partition=0}] > 41829 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - > Task [1/1] Deleted partition managers: [] > 41829 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - > Task [1/1] New partition managers: [Partition{host=node:9092, partition=0}] > 41930 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > Read partition information from: > /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_0 --> null > 42399 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > No partition information found, using configuration to determine offset > 42399 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > Starting Kafka price-engine-demo-server.c.celertech-01.internal:0 from offset > 3927313 > 42400 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - > Task [1/1] Finished refreshing > 45097 [ProcessThread(sid:0 cport:-1):] INFO > org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level > KeeperException when processing sessionid:0x14d72fc889d000c type:create > cxid:0x3 zxid:0x2c txntype:-1 reqpath:n/a Error > Path:/kafkastorm/warehouse_prices/rawWarehousePriceSpout > Error:KeeperErrorCode = NoNode for > /kafkastorm/warehouse_prices/rawWarehousePriceSpout > > When viewing the zookeeper information there is no path there. Does it get > created automatically? >> On 19 May 2015, at 18:19, Benjamin Cuthbert <[email protected] >> <mailto:[email protected]>> wrote: >> > > The kafka logs have this > > [2015-05-19 17:13:39,772] ERROR [KafkaApi-0] Error when processing fetch > request for partition [warehouse_prices,0] offset 73792051 from consumer with > correlation id 0 (kafka.server.KafkaApis) > kafka.common.OffsetOutOfRangeException: Request for offset 73792051 but we > only have log segments in the range 74630863 to 75835831. > at kafka.log.Log.read(Log.scala:380) > at > kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476) > at > kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > > How can that be? How can the requestor be behind kafka? > > >> On 19 May 2015, at 14:54, Benjamin Cuthbert <[email protected] >> <mailto:[email protected]>> wrote: >> > > Hi Jeff, > > So I looked at the docs and I reset the following property: > > SpoutConfig spoutConfig = new SpoutConfig( > hosts, > topic, // topic to read from > KAFKA_STORM_DIR, // the root path in Zookeeper > for the spout to store the consumer offsets > newSpoutId); // an id for this consumer for > storing the consumer offsets in Zookeeper > > //Check if we should be consuming messages from the beginning > spoutConfig.forceFromStart = consumeFromBeginning; > spoutConfig.maxOffsetBehind = Long.MAX_VALUE; > spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = false; > > But after an hour of processing details I see > > 2015-05-19T13:13:03.242+0000 s.k.KafkaUtils [ERROR] Error fetching data from > [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, > partition=0}] for topic [warehouse_prices]: [OFFSET_OUT_OF_RANGE] > 2015-05-19T13:13:03.242+0000 s.k.KafkaSpout [WARN] Fetch failed > storm.kafka.FailedFetchException: Error fetching data from > [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, > partition=0}] for topic [warehouse_prices]: [OFFSET_OUT_OF_RANGE] > at storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:190) > ~[celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na] > at storm.kafka.PartitionManager.fill(PartitionManager.java:162) > ~[celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na] > at storm.kafka.PartitionManager.next(PartitionManager.java:124) > ~[celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na] > at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:141) > ~[celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na] > at > backtype.storm.daemon.executor$fn__4654$fn__4669$fn__4698.invoke(executor.clj:565) > [storm-core-0.9.4.jar:0.9.4] > at backtype.storm.util$async_loop$fn__458.invoke(util.clj:463) > [storm-core-0.9.4.jar:0.9.4] > at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] > at java.lang.Thread.run(Thread.java:745) [na:1.7.0_79] > > >> On 18 May 2015, at 22:13, Benjamin Cuthbert <[email protected] >> <mailto:[email protected]>> wrote: >> > > Thanks Jeff, > > So I looked over the docs but I don’t understand is it runs for 2+ hours then > just starts going: > > 2015-05-18T22:12:53.673+0100 s.k.PartitionManager [WARN] Using new offset: > 64429892 > 2015-05-18T22:12:53.705+0100 s.k.KafkaUtils [WARN] Got fetch request with > offset out of range: [63610973]; retrying with default start offset time from > configuration. configured start offset time: [-2] > 2015-05-18T22:12:53.743+0100 s.k.PartitionManager [WARN] Using new offset: > 64429934 > 2015-05-18T22:12:53.773+0100 s.k.KafkaUtils [WARN] Got fetch request with > offset out of range: [63610973]; retrying with default start offset time from > configuration. configured start offset time: [-2] > > So why does the offset get reset? > > >> On 18 May 2015, at 20:37, Jeffery Maass <[email protected] >> <mailto:[email protected]>> wrote: >> >> The answer will be in how you configured the kafka spout. If after reading >> the below, you still need help, please grab the values for all of the >> settings mentioned in the doc and send them on. >> >> See this document about the Kafka Spout: >> http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.2.0/Storm_UG_v22/Storm_UG_v22.pdf >> >> <http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.2.0/Storm_UG_v22/Storm_UG_v22.pdf> >> >> See this archive message: >> http://mail-archives.apache.org/mod_mbox/storm-user/201503.mbox/%3ccahzwdygra33uiv+po01mvthvrjsxvqlvx6babymzye8zu_d...@mail.gmail.com%3E >> >> <http://mail-archives.apache.org/mod_mbox/storm-user/201503.mbox/%3ccahzwdygra33uiv+po01mvthvrjsxvqlvx6babymzye8zu_d...@mail.gmail.com%3E> >> >> " >> Not exactly.. forceFromStart=true will tell the spout to start reading from >> whatever is set in startOffsetTime (available options are the earliest >> offset or the latest offset). If forceFromStart=false then startOffsetTime >> is not used at all and the offset is just retrieved from zookeeper, if it's >> available. >> >> The "Start" in "forceFromStart" has nothing to do with consuming from the >> beginning of the topic. I interpret it as referring to whether you are >> going to force starting consumption from a different offset. >> " >> >> Thank you for your time! >> >> +++++++++++++++++++++ >> Jeff Maass <mailto:[email protected]> >> linkedin.com/in/jeffmaass <http://linkedin.com/in/jeffmaass> >> stackoverflow.com/users/373418/maassql >> <http://stackoverflow.com/users/373418/maassql> >> +++++++++++++++++++++ >> >> >> On Mon, May 18, 2015 at 1:53 PM, Benjamin Cuthbert <[email protected] >> <mailto:[email protected]>> wrote: >> All, >> >> We are getting loads of these errors >> >> 2015-05-18T19:52:44.038+0100 s.k.KafkaUtils [WARN] Got fetch request with >> offset out of range: [62379213]; retrying with default start offset time >> from configuration. configured start offset time: [-2] >> 2015-05-18T19:52:44.066+0100 s.k.PartitionManager [WARN] Using new offset: >> 63495047 >> >> And it is stopping messages being pulled from Kafka into the spout and onto >> the bolts. Any ideas on how to fix this? >> > > > > > >
