Thanks for all your help. I have deployed the code onto our server and it is still not writing to zookeeper. And after 2 hours I saw the following:
2015-05-22T08:07:19.549+0000 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [1620784]; retrying with default start offset time from configuration. configured start offset time: [-2] 2015-05-22T08:07:19.550+0000 s.k.PartitionManager [WARN] Using new offset: 3837758 2015-05-22T08:07:19.550+0000 s.k.KafkaUtils [WARN] Got fetch request with offset out of range: [988915]; retrying with default start offset time from configuration. configured start offset time: [-2] 2015-05-22T08:07:19.551+0000 s.k.PartitionManager [WARN] Using new offset: 2377398 > On 21 May 2015, at 19:28, Cristian Makoto Sandiga <cmsand...@gmail.com> wrote: > > use GET > > get /brokers/rawWarehousePriceSpout/partition_0 > > 2015-05-21 15:27 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com > <mailto:cuthbert....@gmail.com>>: > So I do not see what you see. So just so I am sure the SpoutConfig when > deployed into an env should have the zookeeper host with the same value as > the kafka zookeeper? > > [zk: localhost:2000(CONNECTED) 7] ls > /brokers/rawWarehousePriceSpout/partition_0 > [] > [zk: localhost:2000(CONNECTED) 8] ls > /brokers/rawWarehousePriceSpout/partition_0 > [] > [zk: localhost:2000(CONNECTED) 9] > > > 42742 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > Read partition information from: /brokers/rawWarehousePriceSpout/partition_0 > --> null > 43107 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > No partition information found, using configuration to determine offset > 43107 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > Starting Kafka price-engine-demo-server.c.celertech-01.internal:0 from offset > 802916 > 43109 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > Read partition information from: /brokers/rawWarehousePriceSpout/partition_1 > --> null > 43130 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > No partition information found, using configuration to determine offset > 43130 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > Starting Kafka price-engine-demo-server.c.celertech-01.internal:1 from offset > 1071876 > 43130 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > Read partition information from: /brokers/rawWarehousePriceSpout/partition_2 > --> null > 43154 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > No partition information found, using configuration to determine offset > 43154 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - > Starting Kafka price-engine-demo-server.c.celertech-01.internal:2 from offset > 1494070 > 43154 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - > Task [1/1] Finished refreshing > 121420 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - > Task [1/1] Refreshing partition manager connections > 121641 [Thread-15-rawWarehousePriceSpout] INFO > storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: > GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092, > 1=price-engine-demo-server.c.celertech-01.internal:9092, > 2=price-engine-demo-server.c.celertech-01.internal:9092}} > 121641 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.KafkaUtils - Task > [1/1] assigned > [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, > partition=0}, > Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, > partition=1}, > Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, > partition=2}] > 121641 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - > Task [1/1] Deleted partition managers: [] > 121641 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - > Task [1/1] New partition managers: [] > 121641 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - > Task [1/1] Finished refreshing > > >> On 21 May 2015, at 15:37, Cristian Makoto Sandiga <cmsand...@gmail.com >> <mailto:cmsand...@gmail.com>> wrote: >> >> Again, when is eclipse local topology you have to connect to localhost:2000 >> (in the embedded stom zookeeper) >> >> 2015-05-21 11:28 GMT-03:00 Cristian Makoto Sandiga <cmsand...@gmail.com >> <mailto:cmsand...@gmail.com>>: >> I dont know, i'm using BaseRichBolt. >> >> 2015-05-21 11:25 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com >> <mailto:cuthbert....@gmail.com>>: >> >> I am using BaseBasicBolt as that was told ack events automatically. Should >> we not use that? >> >> >>> On 21 May 2015, at 15:24, Cristian Makoto Sandiga <cmsand...@gmail.com >>> <mailto:cmsand...@gmail.com>> wrote: >>> >>> Are you acking in your bolt? collector.ack(input); >>> >>> 2015-05-21 11:19 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com >>> <mailto:cuthbert....@gmail.com>>: >>> Okay so interesting: >>> >>> I have your configuration in my local eclipse and a remote zookeeper that >>> kafka is connected to: >>> >>> ZkHosts zkhost = new ZkHosts(“1.1.1.1:2181 >>> <http://1.1.1.1:2181/>","/brokers"); // /brokers -> kafka broker >>> SpoutConfig spoutconfig = new SpoutConfig(“1.1.1.1:2181 >>> <http://1.1.1.1:2181/>", “/prices", "/brokers", “rawWarehousePriceSpout"); >>> >>> Zookeeper: >>> >>> [zk: 127.0.0.1:2181(CONNECTED) 80] ls /brokers >>> [rawWarehousePriceSpout, topics, ids] >>> [zk: 127.0.0.1:2181(CONNECTED) 81] ls /brokers/rawWarehousePriceSpout >>> [] >>> [zk: 127.0.0.1:2181(CONNECTED) 82] >>> >>> >>> Logs i see: >>> >>> 41994 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.KafkaUtils - >>> Task [1/1] assigned >>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>> partition=0}, >>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>> partition=1}, >>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>> partition=2}] >>> 41994 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >>> Task [1/1] Deleted partition managers: [] >>> 41994 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >>> Task [1/1] New partition managers: >>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>> partition=0}, >>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>> partition=1}, >>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>> partition=2}] >>> 42094 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >>> - Read partition information from: >>> /brokers/rawWarehousePriceSpout/partition_0 --> null >>> 42363 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >>> - No partition information found, using configuration to determine offset >>> 42363 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >>> - Starting Kafka price-engine-demo-server.c.celertech-01.internal:0 from >>> offset 425530 >>> 42366 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >>> - Read partition information from: >>> /brokers/rawWarehousePriceSpout/partition_1 --> null >>> 42400 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >>> - No partition information found, using configuration to determine offset >>> 42400 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >>> - Starting Kafka price-engine-demo-server.c.celertech-01.internal:1 from >>> offset 550715 >>> 42401 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >>> - Read partition information from: >>> /brokers/rawWarehousePriceSpout/partition_2 --> null >>> 42432 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >>> - No partition information found, using configuration to determine offset >>> 42432 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >>> - Starting Kafka price-engine-demo-server.c.celertech-01.internal:2 from >>> offset 439547 >>> 42432 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >>> Task [1/1] Finished refreshing >>> 44652 [ProcessThread(sid:0 cport:-1):] INFO >>> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level >>> KeeperException when processing sessionid:0x14d76d4418d000c type:create >>> cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error >>> Path:/brokers/rawWarehousePriceSpout Error:KeeperErrorCode = NoNode for >>> /brokers/rawWarehousePriceSpout >>> >>> >>> >>>> On 21 May 2015, at 15:02, Cristian Makoto Sandiga <cmsand...@gmail.com >>>> <mailto:cmsand...@gmail.com>> wrote: >>>> >>>> Im sorry, i made a mistake >>>> >>>> ZkHosts zkhost = new ZkHosts("localhost:2181","/brokers"); // >>>> /brokers -> kafka broker >>>> SpoutConfig spoutconfig = new SpoutConfig(zkhost, >>>> "bid_history", "/brokers", "kafka-spout"); --> storm --> zookeeper >>>> >>>> If you see not is a ERROR, is a INFO usually when directory structure not >>>> exist. >>>> >>>> Take a look Zookeeper Kaka-broker (port 2181) >>>> ---------------------------------- >>>> >>>> -- bin/zookeeper-shell.sh localhost:2181 >>>> >>>> ls /brokers/topics/bid_history/partitions/0/state >>>> [] >>>> >>>> get /brokers/topics/bid_history/partitions/0/state >>>> {"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0]} >>>> cZxid = 0x113 >>>> ctime = Tue May 19 18:19:24 BRT 2015 >>>> mZxid = 0x113 >>>> mtime = Tue May 19 18:19:24 BRT 2015 >>>> pZxid = 0x113 >>>> cversion = 0 >>>> dataVersion = 0 >>>> aclVersion = 0 >>>> ephemeralOwner = 0x0 >>>> dataLength = 72 >>>> numChildren = 0 >>>> >>>> >>>> Take a look Zookeeper Storm-broker (port 2000) >>>> ---------------------------------- >>>> Start topology, send a message and then look: >>>> >>>> bin/zookeeper-shell.sh localhost:2000 >>>> Connecting to localhost:2000 >>>> Welcome to ZooKeeper! >>>> JLine support is disabled >>>> >>>> ls / >>>> [storm, brokers, zookeeper] >>>> ls /brokers >>>> [kafka-spout] >>>> ls /brokers/kafka-spout >>>> [partition_0] >>>> ls /brokers/kafka-spout/partition_0 >>>> [] >>>> get /brokers/kafka-spout/partition_0 >>>> {"topology":{"id":"a9be1962-6b4e-4ed4-ae68-155a1948a1f6","name":"consolidate_reports"},"offset":4426029,"partition":0,"broker":{"host":"localhost","port":9092},"topic":"bid_history"} >>>> cZxid = 0x50 >>>> ctime = Thu May 21 11:00:48 BRT 2015 >>>> mZxid = 0x50 >>>> mtime = Thu May 21 11:00:48 BRT 2015 >>>> pZxid = 0x50 >>>> cversion = 0 >>>> dataVersion = 0 >>>> aclVersion = 0 >>>> ephemeralOwner = 0x0 >>>> dataLength = 182 >>>> numChildren = 0 >>>> >>>> >>>> 2015-05-21 10:43 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com >>>> <mailto:cuthbert....@gmail.com>>: >>>> Bec I do see this error in the storm logs >>>> >>>> 5304 [ProcessThread(sid:0 cport:-1):] INFO >>>> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level >>>> KeeperException when processing sessionid:0x14d76b1ea54000c type:create >>>> cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error >>>> Path:/brokers/rawWarehousePriceSpout Error:KeeperErrorCode = NoNode for >>>> /brokers/rawWarehousePriceSpout >>>> >>>> >>>> >>>>> On 21 May 2015, at 14:04, Cristian Makoto Sandiga <cmsand...@gmail.com >>>>> <mailto:cmsand...@gmail.com>> wrote: >>>>> >>>>> Are you wrong >>>>> >>>>> zookeeperPath , // the root path in Zookeeper for the spout to store the >>>>> consumer offsets >>>>> >>>>> Is for you KafkaSpout find information about broker meta information ( >>>>> topics, partitions) >>>>> >>>>> You have to override >>>>> >>>>> SpoutConfig spoutconfig = new SpoutConfig(zkhost, "bid_history", >>>>> "/brokers", "kafka-spout"); >>>>> spoutconfig.zkServers >>>>> spoutconfig.zkPort >>>>> >>>>> Take a look a KafkaSpout.class and do a debug mode in this part. >>>>> >>>>> 73 Map stateConf = new HashMap(conf); >>>>> 74 List<String> zkServers = _spoutConfig.zkServers; >>>>> if (zkServers == null) { >>>>> zkServers = (List<String>) >>>>> conf.get(Config.STORM_ZOOKEEPER_SERVERS); >>>>> } >>>>> Integer zkPort = _spoutConfig.zkPort; >>>>> if (zkPort == null) { >>>>> zkPort = ((Number) >>>>> conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue(); >>>>> } >>>>> >>>>> >>>>> >>>>> >>>>> 2015-05-21 9:53 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com >>>>> <mailto:cuthbert....@gmail.com>>: >>>>> So when I run it out of eclipse in local mode I am pointing to the same >>>>> zookeeper as I am when it is deployed. Whatever way it still does not >>>>> write the offset. >>>>> >>>>> Only different I have is my spout is configured like so: >>>>> >>>>> BrokerHosts hosts = new ZkHosts(zookeeperQuorum); >>>>> >>>>> String zookeeperPath = KAFKA_STORM_DIR + "/" + topic; >>>>> SpoutConfig spoutConfig = new SpoutConfig( >>>>> hosts, >>>>> topic, // topic to read from >>>>> zookeeperPath , // the root path in Zookeeper >>>>> for the spout to store the consumer offsets >>>>> spoutId); // an id for this consumer for >>>>> storing the consumer offsets in Zookeeper >>>>> >>>>> //Check if we should be consuming messages from the beginning >>>>> spoutConfig.forceFromStart = consumeFromBeginning; >>>>> spoutConfig.maxOffsetBehind = Long.MAX_VALUE; >>>>> spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = true; >>>>> spoutConfig.scheme = new SchemeAsMultiScheme(new >>>>> KafkaAnalyticsMessageDecoder()); >>>>> >>>>> >>>>>> On 21 May 2015, at 13:50, Cristian Makoto Sandiga <cmsand...@gmail.com >>>>>> <mailto:cmsand...@gmail.com>> wrote: >>>>>> >>>>>> I have the same problem when i have Storm in Local cluster mode. but is >>>>>> because, Local mode has a zookeeper embedded and every time that restart >>>>>> the offset is null and is the reason that you have >>>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2 --> >>>>>> null. >>>>>> >>>>>> Try to run Local Mode, send some messages to spout, and then see Offset >>>>>> in Zookeeper (port 2000). >>>>>> >>>>>> Or use Zookeeper of kafka, override. >>>>>> >>>>>> spoutconfig.zkServers >>>>>> spoutconfig.zkPort >>>>>> >>>>>> >>>>>> 2015-05-21 9:39 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com >>>>>> <mailto:cuthbert....@gmail.com>>: >>>>>> Thanks, >>>>>> >>>>>> Yup kafka is creating them on startup when the topics gets its first >>>>>> msg. And from below the storm logs are never really committing the >>>>>> offset to zookeeper. >>>>>> >>>>>> Zookeeper kafka topics details >>>>>> >>>>>> [zk: 127.0.0.1:2181(CONNECTED) 41] ls >>>>>> /brokers/topics/warehouse_prices/partitions >>>>>> [2, 1, 0] >>>>>> [zk: 127.0.0.1:2181(CONNECTED) 42] >>>>>> >>>>>> Zookeeper Storm >>>>>> >>>>>> [zk: 127.0.0.1:2181(CONNECTED) 42] ls >>>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout >>>>>> [] >>>>>> [zk: 127.0.0.1:2181(CONNECTED) 43] >>>>>> >>>>>> >>>>>> Storm logs >>>>>> >>>>>> 44325 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator >>>>>> - Task [1/1] New partition managers: >>>>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>>>> partition=0}, >>>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>>>> partition=1}, >>>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>>>> partition=2}] >>>>>> 44491 [Thread-15-rawWarehousePriceSpout] INFO >>>>>> storm.kafka.PartitionManager - Read partition information from: >>>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_0 --> null >>>>>> 44746 [Thread-15-rawWarehousePriceSpout] INFO >>>>>> storm.kafka.PartitionManager - No partition information found, using >>>>>> configuration to determine offset >>>>>> 44746 [Thread-15-rawWarehousePriceSpout] INFO >>>>>> storm.kafka.PartitionManager - Last commit offset from zookeeper: 0 >>>>>> 44747 [Thread-15-rawWarehousePriceSpout] INFO >>>>>> storm.kafka.PartitionManager - Commit offset 0 is more than >>>>>> 9223372036854775807 behind, resetting to startOffsetTime=-2 >>>>>> 44747 [Thread-15-rawWarehousePriceSpout] INFO >>>>>> storm.kafka.PartitionManager - Starting Kafka >>>>>> price-engine-demo-server.c.celertech-01.internal:0 from offset 0 >>>>>> 44749 [Thread-15-rawWarehousePriceSpout] INFO >>>>>> storm.kafka.PartitionManager - Read partition information from: >>>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_1 --> null >>>>>> 44778 [Thread-15-rawWarehousePriceSpout] INFO >>>>>> storm.kafka.PartitionManager - No partition information found, using >>>>>> configuration to determine offset >>>>>> 44778 [Thread-15-rawWarehousePriceSpout] INFO >>>>>> storm.kafka.PartitionManager - Last commit offset from zookeeper: 0 >>>>>> 44778 [Thread-15-rawWarehousePriceSpout] INFO >>>>>> storm.kafka.PartitionManager - Commit offset 0 is more than >>>>>> 9223372036854775807 behind, resetting to startOffsetTime=-2 >>>>>> 44779 [Thread-15-rawWarehousePriceSpout] INFO >>>>>> storm.kafka.PartitionManager - Starting Kafka >>>>>> price-engine-demo-server.c.celertech-01.internal:1 from offset 0 >>>>>> 44781 [Thread-15-rawWarehousePriceSpout] INFO >>>>>> storm.kafka.PartitionManager - Read partition information from: >>>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2 --> null >>>>>> 44809 [Thread-15-rawWarehousePriceSpout] INFO >>>>>> storm.kafka.PartitionManager - No partition information found, using >>>>>> configuration to determine offset >>>>>> 44809 [Thread-15-rawWarehousePriceSpout] INFO >>>>>> storm.kafka.PartitionManager - Last commit offset from zookeeper: 0 >>>>>> 44810 [Thread-15-rawWarehousePriceSpout] INFO >>>>>> storm.kafka.PartitionManager - Commit offset 0 is more than >>>>>> 9223372036854775807 behind, resetting to startOffsetTime=-2 >>>>>> 44810 [Thread-15-rawWarehousePriceSpout] INFO >>>>>> storm.kafka.PartitionManager - Starting Kafka >>>>>> price-engine-demo-server.c.celertech-01.internal:2 from offset 0 >>>>>> 44810 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator >>>>>> - Task [1/1] Finished refreshing >>>>>> 47438 [ProcessThread(sid:0 cport:-1):] INFO >>>>>> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level >>>>>> KeeperException when processing sessionid:0x14d7658db3c000c type:create >>>>>> cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error >>>>>> Path:/kafkastorm/warehouse_prices/rawWarehousePriceSpout >>>>>> Error:KeeperErrorCode = NoNode for >>>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout >>>>>> 104184 [Thread-15-rawWarehousePriceSpout] WARN storm.kafka.KafkaUtils - >>>>>> No data found in Kafka Partition partition_0 >>>>>> 104828 [Thread-15-rawWarehousePriceSpout] INFO >>>>>> storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager >>>>>> connections >>>>>> 105005 [Thread-15-rawWarehousePriceSpout] INFO >>>>>> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: >>>>>> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092, >>>>>> 1=price-engine-demo-server.c.celertech-01.internal:9092, >>>>>> 2=price-engine-demo-server.c.celertech-01.internal:9092}} >>>>>> 105005 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.KafkaUtils - >>>>>> Task [1/1] assigned >>>>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>>>> partition=0}, >>>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>>>> partition=1}, >>>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>>>> partition=2}] >>>>>> 105006 [Thread-15-rawWarehousePriceSpout] INFO >>>>>> storm.kafka.ZkCoordinator - Task [1/1] Deleted partition managers: [] >>>>>> 105006 [Thread-15-rawWarehousePriceSpout] INFO >>>>>> storm.kafka.ZkCoordinator - Task [1/1] New partition managers: [] >>>>>> 105006 [Thread-15-rawWarehousePriceSpout] INFO >>>>>> storm.kafka.ZkCoordinator - Task [1/1] Finished refreshing >>>>>> 164204 [Thread-15-rawWarehousePriceSpout] WARN storm.kafka.KafkaUtils - >>>>>> No data found in Kafka Partition partition_0 >>>>>> 165063 [Thread-15-rawWarehousePriceSpout] INFO >>>>>> storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager >>>>>> connections >>>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO >>>>>> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: >>>>>> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092, >>>>>> 1=price-engine-demo-server.c.celertech-01.internal:9092, >>>>>> 2=price-engine-demo-server.c.celertech-01.internal:9092}} >>>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.KafkaUtils - >>>>>> Task [1/1] assigned >>>>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>>>> partition=0}, >>>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>>>> partition=1}, >>>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>>>> partition=2}] >>>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO >>>>>> storm.kafka.ZkCoordinator - Task [1/1] Deleted partition managers: [] >>>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO >>>>>> storm.kafka.ZkCoordinator - Task [1/1] New partition managers: [] >>>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO >>>>>> storm.kafka.ZkCoordinator - Task [1/1] Finished refreshing >>>>>> 224233 [Thread-15-rawWarehousePriceSpout] WARN storm.kafka.KafkaUtils - >>>>>> No data found in Kafka Partition partition_0 >>>>>> 225308 [Thread-15-rawWarehousePriceSpout] INFO >>>>>> storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager >>>>>> connections >>>>>> >>>>>> >>>>>>> On 21 May 2015, at 13:28, Cristian Makoto Sandiga <cmsand...@gmail.com >>>>>>> <mailto:cmsand...@gmail.com>> wrote: >>>>>>> >>>>>>> Zookeeper create nothing when startup, you have to create your >>>>>>> partitions in kafka broker. >>>>>>> >>>>>>> bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic >>>>>>> click_history --replication-factor 1 --partitions 10 >>>>>>> >>>>>>> >>>>>>> >>>>>>> 2015-05-21 8:58 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com >>>>>>> <mailto:cuthbert....@gmail.com>>: >>>>>>> All, >>>>>>> >>>>>>> We changed or paths in zookeeper and we are now seeing >>>>>>> >>>>>>> java.lang.RuntimeException: java.lang.RuntimeException: >>>>>>> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = >>>>>>> NoNode for /brokers/topics/warehouse_prices/partitions >>>>>>> at >>>>>>> storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:81) >>>>>>> ~[storm-kafka-0.9.4.jar:0.9.4] >>>>>>> at >>>>>>> storm.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:42) >>>>>>> ~[storm-kafka-0.9.4.jar:0.9.4] >>>>>>> at storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:57) >>>>>>> ~[storm-kafka-0.9.4.jar:0.9.4] >>>>>>> at storm.kafka.KafkaSpout.open(KafkaSpout.java:87) >>>>>>> ~[storm-kafka-0.9.4.jar:0.9.4] >>>>>>> at >>>>>>> backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:522) >>>>>>> ~[storm-core-0.9.4.jar:0.9.4] >>>>>>> at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) >>>>>>> ~[storm-core-0.9.4.jar:0.9.4] >>>>>>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] >>>>>>> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51] >>>>>>> >>>>>>> Should this not be discovered by the Spout on startup? >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>> >>>> >>> >>> >> >> >> > >