So I do not see what you see. So just so I am sure the SpoutConfig when deployed into an env should have the zookeeper host with the same value as the kafka zookeeper?
[zk: localhost:2000(CONNECTED) 7] ls /brokers/rawWarehousePriceSpout/partition_0 [] [zk: localhost:2000(CONNECTED) 8] ls /brokers/rawWarehousePriceSpout/partition_0 [] [zk: localhost:2000(CONNECTED) 9] 42742 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - Read partition information from: /brokers/rawWarehousePriceSpout/partition_0 --> null 43107 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - No partition information found, using configuration to determine offset 43107 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - Starting Kafka price-engine-demo-server.c.celertech-01.internal:0 from offset 802916 43109 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - Read partition information from: /brokers/rawWarehousePriceSpout/partition_1 --> null 43130 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - No partition information found, using configuration to determine offset 43130 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - Starting Kafka price-engine-demo-server.c.celertech-01.internal:1 from offset 1071876 43130 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - Read partition information from: /brokers/rawWarehousePriceSpout/partition_2 --> null 43154 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - No partition information found, using configuration to determine offset 43154 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager - Starting Kafka price-engine-demo-server.c.celertech-01.internal:2 from offset 1494070 43154 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - Task [1/1] Finished refreshing 121420 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager connections 121641 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092, 1=price-engine-demo-server.c.celertech-01.internal:9092, 2=price-engine-demo-server.c.celertech-01.internal:9092}} 121641 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.KafkaUtils - Task [1/1] assigned [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, partition=0}, Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, partition=1}, Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, partition=2}] 121641 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - Task [1/1] Deleted partition managers: [] 121641 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - Task [1/1] New partition managers: [] 121641 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - Task [1/1] Finished refreshing > On 21 May 2015, at 15:37, Cristian Makoto Sandiga <cmsand...@gmail.com> wrote: > > Again, when is eclipse local topology you have to connect to localhost:2000 > (in the embedded stom zookeeper) > > 2015-05-21 11:28 GMT-03:00 Cristian Makoto Sandiga <cmsand...@gmail.com > <mailto:cmsand...@gmail.com>>: > I dont know, i'm using BaseRichBolt. > > 2015-05-21 11:25 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com > <mailto:cuthbert....@gmail.com>>: > > I am using BaseBasicBolt as that was told ack events automatically. Should we > not use that? > > >> On 21 May 2015, at 15:24, Cristian Makoto Sandiga <cmsand...@gmail.com >> <mailto:cmsand...@gmail.com>> wrote: >> >> Are you acking in your bolt? collector.ack(input); >> >> 2015-05-21 11:19 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com >> <mailto:cuthbert....@gmail.com>>: >> Okay so interesting: >> >> I have your configuration in my local eclipse and a remote zookeeper that >> kafka is connected to: >> >> ZkHosts zkhost = new ZkHosts(“1.1.1.1:2181 >> <http://1.1.1.1:2181/>","/brokers"); // /brokers -> kafka broker >> SpoutConfig spoutconfig = new SpoutConfig(“1.1.1.1:2181 >> <http://1.1.1.1:2181/>", “/prices", "/brokers", “rawWarehousePriceSpout"); >> >> Zookeeper: >> >> [zk: 127.0.0.1:2181(CONNECTED) 80] ls /brokers >> [rawWarehousePriceSpout, topics, ids] >> [zk: 127.0.0.1:2181(CONNECTED) 81] ls /brokers/rawWarehousePriceSpout >> [] >> [zk: 127.0.0.1:2181(CONNECTED) 82] >> >> >> Logs i see: >> >> 41994 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.KafkaUtils - Task >> [1/1] assigned >> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >> partition=0}, >> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >> partition=1}, >> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >> partition=2}] >> 41994 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >> Task [1/1] Deleted partition managers: [] >> 41994 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >> Task [1/1] New partition managers: >> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >> partition=0}, >> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >> partition=1}, >> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >> partition=2}] >> 42094 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Read partition information from: >> /brokers/rawWarehousePriceSpout/partition_0 --> null >> 42363 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - No partition information found, using configuration to determine offset >> 42363 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Starting Kafka price-engine-demo-server.c.celertech-01.internal:0 from >> offset 425530 >> 42366 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Read partition information from: >> /brokers/rawWarehousePriceSpout/partition_1 --> null >> 42400 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - No partition information found, using configuration to determine offset >> 42400 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Starting Kafka price-engine-demo-server.c.celertech-01.internal:1 from >> offset 550715 >> 42401 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Read partition information from: >> /brokers/rawWarehousePriceSpout/partition_2 --> null >> 42432 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - No partition information found, using configuration to determine offset >> 42432 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Starting Kafka price-engine-demo-server.c.celertech-01.internal:2 from >> offset 439547 >> 42432 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >> Task [1/1] Finished refreshing >> 44652 [ProcessThread(sid:0 cport:-1):] INFO >> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level >> KeeperException when processing sessionid:0x14d76d4418d000c type:create >> cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error >> Path:/brokers/rawWarehousePriceSpout Error:KeeperErrorCode = NoNode for >> /brokers/rawWarehousePriceSpout >> >> >> >>> On 21 May 2015, at 15:02, Cristian Makoto Sandiga <cmsand...@gmail.com >>> <mailto:cmsand...@gmail.com>> wrote: >>> >>> Im sorry, i made a mistake >>> >>> ZkHosts zkhost = new ZkHosts("localhost:2181","/brokers"); // >>> /brokers -> kafka broker >>> SpoutConfig spoutconfig = new SpoutConfig(zkhost, >>> "bid_history", "/brokers", "kafka-spout"); --> storm --> zookeeper >>> >>> If you see not is a ERROR, is a INFO usually when directory structure not >>> exist. >>> >>> Take a look Zookeeper Kaka-broker (port 2181) >>> ---------------------------------- >>> >>> -- bin/zookeeper-shell.sh localhost:2181 >>> >>> ls /brokers/topics/bid_history/partitions/0/state >>> [] >>> >>> get /brokers/topics/bid_history/partitions/0/state >>> {"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0]} >>> cZxid = 0x113 >>> ctime = Tue May 19 18:19:24 BRT 2015 >>> mZxid = 0x113 >>> mtime = Tue May 19 18:19:24 BRT 2015 >>> pZxid = 0x113 >>> cversion = 0 >>> dataVersion = 0 >>> aclVersion = 0 >>> ephemeralOwner = 0x0 >>> dataLength = 72 >>> numChildren = 0 >>> >>> >>> Take a look Zookeeper Storm-broker (port 2000) >>> ---------------------------------- >>> Start topology, send a message and then look: >>> >>> bin/zookeeper-shell.sh localhost:2000 >>> Connecting to localhost:2000 >>> Welcome to ZooKeeper! >>> JLine support is disabled >>> >>> ls / >>> [storm, brokers, zookeeper] >>> ls /brokers >>> [kafka-spout] >>> ls /brokers/kafka-spout >>> [partition_0] >>> ls /brokers/kafka-spout/partition_0 >>> [] >>> get /brokers/kafka-spout/partition_0 >>> {"topology":{"id":"a9be1962-6b4e-4ed4-ae68-155a1948a1f6","name":"consolidate_reports"},"offset":4426029,"partition":0,"broker":{"host":"localhost","port":9092},"topic":"bid_history"} >>> cZxid = 0x50 >>> ctime = Thu May 21 11:00:48 BRT 2015 >>> mZxid = 0x50 >>> mtime = Thu May 21 11:00:48 BRT 2015 >>> pZxid = 0x50 >>> cversion = 0 >>> dataVersion = 0 >>> aclVersion = 0 >>> ephemeralOwner = 0x0 >>> dataLength = 182 >>> numChildren = 0 >>> >>> >>> 2015-05-21 10:43 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com >>> <mailto:cuthbert....@gmail.com>>: >>> Bec I do see this error in the storm logs >>> >>> 5304 [ProcessThread(sid:0 cport:-1):] INFO >>> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level >>> KeeperException when processing sessionid:0x14d76b1ea54000c type:create >>> cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error >>> Path:/brokers/rawWarehousePriceSpout Error:KeeperErrorCode = NoNode for >>> /brokers/rawWarehousePriceSpout >>> >>> >>> >>>> On 21 May 2015, at 14:04, Cristian Makoto Sandiga <cmsand...@gmail.com >>>> <mailto:cmsand...@gmail.com>> wrote: >>>> >>>> Are you wrong >>>> >>>> zookeeperPath , // the root path in Zookeeper for the spout to store the >>>> consumer offsets >>>> >>>> Is for you KafkaSpout find information about broker meta information ( >>>> topics, partitions) >>>> >>>> You have to override >>>> >>>> SpoutConfig spoutconfig = new SpoutConfig(zkhost, "bid_history", >>>> "/brokers", "kafka-spout"); >>>> spoutconfig.zkServers >>>> spoutconfig.zkPort >>>> >>>> Take a look a KafkaSpout.class and do a debug mode in this part. >>>> >>>> 73 Map stateConf = new HashMap(conf); >>>> 74 List<String> zkServers = _spoutConfig.zkServers; >>>> if (zkServers == null) { >>>> zkServers = (List<String>) >>>> conf.get(Config.STORM_ZOOKEEPER_SERVERS); >>>> } >>>> Integer zkPort = _spoutConfig.zkPort; >>>> if (zkPort == null) { >>>> zkPort = ((Number) >>>> conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue(); >>>> } >>>> >>>> >>>> >>>> >>>> 2015-05-21 9:53 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com >>>> <mailto:cuthbert....@gmail.com>>: >>>> So when I run it out of eclipse in local mode I am pointing to the same >>>> zookeeper as I am when it is deployed. Whatever way it still does not >>>> write the offset. >>>> >>>> Only different I have is my spout is configured like so: >>>> >>>> BrokerHosts hosts = new ZkHosts(zookeeperQuorum); >>>> >>>> String zookeeperPath = KAFKA_STORM_DIR + "/" + topic; >>>> SpoutConfig spoutConfig = new SpoutConfig( >>>> hosts, >>>> topic, // topic to read from >>>> zookeeperPath , // the root path in Zookeeper >>>> for the spout to store the consumer offsets >>>> spoutId); // an id for this consumer for >>>> storing the consumer offsets in Zookeeper >>>> >>>> //Check if we should be consuming messages from the beginning >>>> spoutConfig.forceFromStart = consumeFromBeginning; >>>> spoutConfig.maxOffsetBehind = Long.MAX_VALUE; >>>> spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = true; >>>> spoutConfig.scheme = new SchemeAsMultiScheme(new >>>> KafkaAnalyticsMessageDecoder()); >>>> >>>> >>>>> On 21 May 2015, at 13:50, Cristian Makoto Sandiga <cmsand...@gmail.com >>>>> <mailto:cmsand...@gmail.com>> wrote: >>>>> >>>>> I have the same problem when i have Storm in Local cluster mode. but is >>>>> because, Local mode has a zookeeper embedded and every time that restart >>>>> the offset is null and is the reason that you have >>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2 --> null. >>>>> >>>>> Try to run Local Mode, send some messages to spout, and then see Offset >>>>> in Zookeeper (port 2000). >>>>> >>>>> Or use Zookeeper of kafka, override. >>>>> >>>>> spoutconfig.zkServers >>>>> spoutconfig.zkPort >>>>> >>>>> >>>>> 2015-05-21 9:39 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com >>>>> <mailto:cuthbert....@gmail.com>>: >>>>> Thanks, >>>>> >>>>> Yup kafka is creating them on startup when the topics gets its first msg. >>>>> And from below the storm logs are never really committing the offset to >>>>> zookeeper. >>>>> >>>>> Zookeeper kafka topics details >>>>> >>>>> [zk: 127.0.0.1:2181(CONNECTED) 41] ls >>>>> /brokers/topics/warehouse_prices/partitions >>>>> [2, 1, 0] >>>>> [zk: 127.0.0.1:2181(CONNECTED) 42] >>>>> >>>>> Zookeeper Storm >>>>> >>>>> [zk: 127.0.0.1:2181(CONNECTED) 42] ls >>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout >>>>> [] >>>>> [zk: 127.0.0.1:2181(CONNECTED) 43] >>>>> >>>>> >>>>> Storm logs >>>>> >>>>> 44325 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator >>>>> - Task [1/1] New partition managers: >>>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>>> partition=0}, >>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>>> partition=1}, >>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>>> partition=2}] >>>>> 44491 [Thread-15-rawWarehousePriceSpout] INFO >>>>> storm.kafka.PartitionManager - Read partition information from: >>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_0 --> null >>>>> 44746 [Thread-15-rawWarehousePriceSpout] INFO >>>>> storm.kafka.PartitionManager - No partition information found, using >>>>> configuration to determine offset >>>>> 44746 [Thread-15-rawWarehousePriceSpout] INFO >>>>> storm.kafka.PartitionManager - Last commit offset from zookeeper: 0 >>>>> 44747 [Thread-15-rawWarehousePriceSpout] INFO >>>>> storm.kafka.PartitionManager - Commit offset 0 is more than >>>>> 9223372036854775807 behind, resetting to startOffsetTime=-2 >>>>> 44747 [Thread-15-rawWarehousePriceSpout] INFO >>>>> storm.kafka.PartitionManager - Starting Kafka >>>>> price-engine-demo-server.c.celertech-01.internal:0 from offset 0 >>>>> 44749 [Thread-15-rawWarehousePriceSpout] INFO >>>>> storm.kafka.PartitionManager - Read partition information from: >>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_1 --> null >>>>> 44778 [Thread-15-rawWarehousePriceSpout] INFO >>>>> storm.kafka.PartitionManager - No partition information found, using >>>>> configuration to determine offset >>>>> 44778 [Thread-15-rawWarehousePriceSpout] INFO >>>>> storm.kafka.PartitionManager - Last commit offset from zookeeper: 0 >>>>> 44778 [Thread-15-rawWarehousePriceSpout] INFO >>>>> storm.kafka.PartitionManager - Commit offset 0 is more than >>>>> 9223372036854775807 behind, resetting to startOffsetTime=-2 >>>>> 44779 [Thread-15-rawWarehousePriceSpout] INFO >>>>> storm.kafka.PartitionManager - Starting Kafka >>>>> price-engine-demo-server.c.celertech-01.internal:1 from offset 0 >>>>> 44781 [Thread-15-rawWarehousePriceSpout] INFO >>>>> storm.kafka.PartitionManager - Read partition information from: >>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2 --> null >>>>> 44809 [Thread-15-rawWarehousePriceSpout] INFO >>>>> storm.kafka.PartitionManager - No partition information found, using >>>>> configuration to determine offset >>>>> 44809 [Thread-15-rawWarehousePriceSpout] INFO >>>>> storm.kafka.PartitionManager - Last commit offset from zookeeper: 0 >>>>> 44810 [Thread-15-rawWarehousePriceSpout] INFO >>>>> storm.kafka.PartitionManager - Commit offset 0 is more than >>>>> 9223372036854775807 behind, resetting to startOffsetTime=-2 >>>>> 44810 [Thread-15-rawWarehousePriceSpout] INFO >>>>> storm.kafka.PartitionManager - Starting Kafka >>>>> price-engine-demo-server.c.celertech-01.internal:2 from offset 0 >>>>> 44810 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator >>>>> - Task [1/1] Finished refreshing >>>>> 47438 [ProcessThread(sid:0 cport:-1):] INFO >>>>> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level >>>>> KeeperException when processing sessionid:0x14d7658db3c000c type:create >>>>> cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error >>>>> Path:/kafkastorm/warehouse_prices/rawWarehousePriceSpout >>>>> Error:KeeperErrorCode = NoNode for >>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout >>>>> 104184 [Thread-15-rawWarehousePriceSpout] WARN storm.kafka.KafkaUtils - >>>>> No data found in Kafka Partition partition_0 >>>>> 104828 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator >>>>> - Task [1/1] Refreshing partition manager connections >>>>> 105005 [Thread-15-rawWarehousePriceSpout] INFO >>>>> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: >>>>> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092, >>>>> 1=price-engine-demo-server.c.celertech-01.internal:9092, >>>>> 2=price-engine-demo-server.c.celertech-01.internal:9092}} >>>>> 105005 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.KafkaUtils - >>>>> Task [1/1] assigned >>>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>>> partition=0}, >>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>>> partition=1}, >>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>>> partition=2}] >>>>> 105006 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator >>>>> - Task [1/1] Deleted partition managers: [] >>>>> 105006 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator >>>>> - Task [1/1] New partition managers: [] >>>>> 105006 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator >>>>> - Task [1/1] Finished refreshing >>>>> 164204 [Thread-15-rawWarehousePriceSpout] WARN storm.kafka.KafkaUtils - >>>>> No data found in Kafka Partition partition_0 >>>>> 165063 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator >>>>> - Task [1/1] Refreshing partition manager connections >>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO >>>>> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: >>>>> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092, >>>>> 1=price-engine-demo-server.c.celertech-01.internal:9092, >>>>> 2=price-engine-demo-server.c.celertech-01.internal:9092}} >>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.KafkaUtils - >>>>> Task [1/1] assigned >>>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>>> partition=0}, >>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>>> partition=1}, >>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>>> partition=2}] >>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator >>>>> - Task [1/1] Deleted partition managers: [] >>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator >>>>> - Task [1/1] New partition managers: [] >>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator >>>>> - Task [1/1] Finished refreshing >>>>> 224233 [Thread-15-rawWarehousePriceSpout] WARN storm.kafka.KafkaUtils - >>>>> No data found in Kafka Partition partition_0 >>>>> 225308 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator >>>>> - Task [1/1] Refreshing partition manager connections >>>>> >>>>> >>>>>> On 21 May 2015, at 13:28, Cristian Makoto Sandiga <cmsand...@gmail.com >>>>>> <mailto:cmsand...@gmail.com>> wrote: >>>>>> >>>>>> Zookeeper create nothing when startup, you have to create your >>>>>> partitions in kafka broker. >>>>>> >>>>>> bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic >>>>>> click_history --replication-factor 1 --partitions 10 >>>>>> >>>>>> >>>>>> >>>>>> 2015-05-21 8:58 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com >>>>>> <mailto:cuthbert....@gmail.com>>: >>>>>> All, >>>>>> >>>>>> We changed or paths in zookeeper and we are now seeing >>>>>> >>>>>> java.lang.RuntimeException: java.lang.RuntimeException: >>>>>> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = >>>>>> NoNode for /brokers/topics/warehouse_prices/partitions >>>>>> at >>>>>> storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:81) >>>>>> ~[storm-kafka-0.9.4.jar:0.9.4] >>>>>> at storm.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:42) >>>>>> ~[storm-kafka-0.9.4.jar:0.9.4] >>>>>> at storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:57) >>>>>> ~[storm-kafka-0.9.4.jar:0.9.4] >>>>>> at storm.kafka.KafkaSpout.open(KafkaSpout.java:87) >>>>>> ~[storm-kafka-0.9.4.jar:0.9.4] >>>>>> at >>>>>> backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:522) >>>>>> ~[storm-core-0.9.4.jar:0.9.4] >>>>>> at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) >>>>>> ~[storm-core-0.9.4.jar:0.9.4] >>>>>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] >>>>>> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51] >>>>>> >>>>>> Should this not be discovered by the Spout on startup? >>>>>> >>>>> >>>>> >>>> >>>> >>> >>> >> >> > > >