This line of code long lastCompletedOffset = lastCompletedOffset(); if (_committedTo != lastCompletedOffset) {
In PartitionManager Will always be = in my case it is never different so there is never a write. > On 24 May 2015, at 18:18, Benjamin Cuthbert <cuthbert....@gmail.com> wrote: > > The issue is that storm did not create the /brokers/<topics> details in > zookeeper so when the works starts it gets > > 2015-05-24T09:28:54.706+0000 s.k.PartitionManager [WARN] Error reading and/or > parsing at ZkNode: /brokers/rawWarehousePriceSpout/partition_0 > java.lang.NullPointerException: null > at storm.kafka.PartitionManager.<init>(PartitionManager.java:76) > ~[celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na] > at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) > [celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na] > at > storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) > [celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na] > at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) > [celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na] > at > backtype.storm.daemon.executor$fn__4654$fn__4669$fn__4698.invoke(executor.clj:565) > [storm-core-0.9.4.jar:0.9.4] > > Even this the information exists because it was created manually: > > [zk: 127.0.0.1:2181(CONNECTED) 23] get > /brokers/rawWarehousePriceSpout/partition_0 > {} > cZxid = 0x38d0c > ctime = Thu May 21 18:53:27 UTC 2015 > mZxid = 0x38d0c > mtime = Thu May 21 18:53:27 UTC 2015 > pZxid = 0x38d0c > cversion = 0 > dataVersion = 0 > aclVersion = 0 > ephemeralOwner = 0x0 > dataLength = 2 > numChildren = 0 > [zk: 127.0.0.1:2181(CONNECTED) 24] > > But I guess it is looking for certain information > >> On 21 May 2015, at 19:28, Cristian Makoto Sandiga <cmsand...@gmail.com >> <mailto:cmsand...@gmail.com>> wrote: >> >> use GET >> >> get /brokers/rawWarehousePriceSpout/partition_0 >> >> 2015-05-21 15:27 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com >> <mailto:cuthbert....@gmail.com>>: >> So I do not see what you see. So just so I am sure the SpoutConfig when >> deployed into an env should have the zookeeper host with the same value as >> the kafka zookeeper? >> >> [zk: localhost:2000(CONNECTED) 7] ls >> /brokers/rawWarehousePriceSpout/partition_0 >> [] >> [zk: localhost:2000(CONNECTED) 8] ls >> /brokers/rawWarehousePriceSpout/partition_0 >> [] >> [zk: localhost:2000(CONNECTED) 9] >> >> >> 42742 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Read partition information from: >> /brokers/rawWarehousePriceSpout/partition_0 --> null >> 43107 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - No partition information found, using configuration to determine offset >> 43107 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Starting Kafka price-engine-demo-server.c.celertech-01.internal:0 from >> offset 802916 >> 43109 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Read partition information from: >> /brokers/rawWarehousePriceSpout/partition_1 --> null >> 43130 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - No partition information found, using configuration to determine offset >> 43130 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Starting Kafka price-engine-demo-server.c.celertech-01.internal:1 from >> offset 1071876 >> 43130 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Read partition information from: >> /brokers/rawWarehousePriceSpout/partition_2 --> null >> 43154 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - No partition information found, using configuration to determine offset >> 43154 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.PartitionManager >> - Starting Kafka price-engine-demo-server.c.celertech-01.internal:2 from >> offset 1494070 >> 43154 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >> Task [1/1] Finished refreshing >> 121420 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >> Task [1/1] Refreshing partition manager connections >> 121641 [Thread-15-rawWarehousePriceSpout] INFO >> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: >> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092, >> 1=price-engine-demo-server.c.celertech-01.internal:9092, >> 2=price-engine-demo-server.c.celertech-01.internal:9092}} >> 121641 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.KafkaUtils - >> Task [1/1] assigned >> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >> partition=0}, >> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >> partition=1}, >> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >> partition=2}] >> 121641 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >> Task [1/1] Deleted partition managers: [] >> 121641 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >> Task [1/1] New partition managers: [] >> 121641 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >> Task [1/1] Finished refreshing >> >> >>> On 21 May 2015, at 15:37, Cristian Makoto Sandiga <cmsand...@gmail.com >>> <mailto:cmsand...@gmail.com>> wrote: >>> >>> Again, when is eclipse local topology you have to connect to localhost:2000 >>> (in the embedded stom zookeeper) >>> >>> 2015-05-21 11:28 GMT-03:00 Cristian Makoto Sandiga <cmsand...@gmail.com >>> <mailto:cmsand...@gmail.com>>: >>> I dont know, i'm using BaseRichBolt. >>> >>> 2015-05-21 11:25 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com >>> <mailto:cuthbert....@gmail.com>>: >>> >>> I am using BaseBasicBolt as that was told ack events automatically. Should >>> we not use that? >>> >>> >>>> On 21 May 2015, at 15:24, Cristian Makoto Sandiga <cmsand...@gmail.com >>>> <mailto:cmsand...@gmail.com>> wrote: >>>> >>>> Are you acking in your bolt? collector.ack(input); >>>> >>>> 2015-05-21 11:19 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com >>>> <mailto:cuthbert....@gmail.com>>: >>>> Okay so interesting: >>>> >>>> I have your configuration in my local eclipse and a remote zookeeper that >>>> kafka is connected to: >>>> >>>> ZkHosts zkhost = new ZkHosts(“1.1.1.1:2181 >>>> <http://1.1.1.1:2181/>","/brokers"); // /brokers -> kafka broker >>>> SpoutConfig spoutconfig = new SpoutConfig(“1.1.1.1:2181 >>>> <http://1.1.1.1:2181/>", “/prices", "/brokers", “rawWarehousePriceSpout"); >>>> >>>> Zookeeper: >>>> >>>> [zk: 127.0.0.1:2181(CONNECTED) 80] ls /brokers >>>> [rawWarehousePriceSpout, topics, ids] >>>> [zk: 127.0.0.1:2181(CONNECTED) 81] ls /brokers/rawWarehousePriceSpout >>>> [] >>>> [zk: 127.0.0.1:2181(CONNECTED) 82] >>>> >>>> >>>> Logs i see: >>>> >>>> 41994 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.KafkaUtils - >>>> Task [1/1] assigned >>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>> partition=0}, >>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>> partition=1}, >>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>> partition=2}] >>>> 41994 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >>>> Task [1/1] Deleted partition managers: [] >>>> 41994 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >>>> Task [1/1] New partition managers: >>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>> partition=0}, >>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>> partition=1}, >>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>> partition=2}] >>>> 42094 [Thread-15-rawWarehousePriceSpout] INFO >>>> storm.kafka.PartitionManager - Read partition information from: >>>> /brokers/rawWarehousePriceSpout/partition_0 --> null >>>> 42363 [Thread-15-rawWarehousePriceSpout] INFO >>>> storm.kafka.PartitionManager - No partition information found, using >>>> configuration to determine offset >>>> 42363 [Thread-15-rawWarehousePriceSpout] INFO >>>> storm.kafka.PartitionManager - Starting Kafka >>>> price-engine-demo-server.c.celertech-01.internal:0 from offset 425530 >>>> 42366 [Thread-15-rawWarehousePriceSpout] INFO >>>> storm.kafka.PartitionManager - Read partition information from: >>>> /brokers/rawWarehousePriceSpout/partition_1 --> null >>>> 42400 [Thread-15-rawWarehousePriceSpout] INFO >>>> storm.kafka.PartitionManager - No partition information found, using >>>> configuration to determine offset >>>> 42400 [Thread-15-rawWarehousePriceSpout] INFO >>>> storm.kafka.PartitionManager - Starting Kafka >>>> price-engine-demo-server.c.celertech-01.internal:1 from offset 550715 >>>> 42401 [Thread-15-rawWarehousePriceSpout] INFO >>>> storm.kafka.PartitionManager - Read partition information from: >>>> /brokers/rawWarehousePriceSpout/partition_2 --> null >>>> 42432 [Thread-15-rawWarehousePriceSpout] INFO >>>> storm.kafka.PartitionManager - No partition information found, using >>>> configuration to determine offset >>>> 42432 [Thread-15-rawWarehousePriceSpout] INFO >>>> storm.kafka.PartitionManager - Starting Kafka >>>> price-engine-demo-server.c.celertech-01.internal:2 from offset 439547 >>>> 42432 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.ZkCoordinator - >>>> Task [1/1] Finished refreshing >>>> 44652 [ProcessThread(sid:0 cport:-1):] INFO >>>> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level >>>> KeeperException when processing sessionid:0x14d76d4418d000c type:create >>>> cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error >>>> Path:/brokers/rawWarehousePriceSpout Error:KeeperErrorCode = NoNode for >>>> /brokers/rawWarehousePriceSpout >>>> >>>> >>>> >>>>> On 21 May 2015, at 15:02, Cristian Makoto Sandiga <cmsand...@gmail.com >>>>> <mailto:cmsand...@gmail.com>> wrote: >>>>> >>>>> Im sorry, i made a mistake >>>>> >>>>> ZkHosts zkhost = new ZkHosts("localhost:2181","/brokers"); // >>>>> /brokers -> kafka broker >>>>> SpoutConfig spoutconfig = new SpoutConfig(zkhost, >>>>> "bid_history", "/brokers", "kafka-spout"); --> storm --> zookeeper >>>>> >>>>> If you see not is a ERROR, is a INFO usually when directory structure not >>>>> exist. >>>>> >>>>> Take a look Zookeeper Kaka-broker (port 2181) >>>>> ---------------------------------- >>>>> >>>>> -- bin/zookeeper-shell.sh localhost:2181 >>>>> >>>>> ls /brokers/topics/bid_history/partitions/0/state >>>>> [] >>>>> >>>>> get /brokers/topics/bid_history/partitions/0/state >>>>> {"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0]} >>>>> cZxid = 0x113 >>>>> ctime = Tue May 19 18:19:24 BRT 2015 >>>>> mZxid = 0x113 >>>>> mtime = Tue May 19 18:19:24 BRT 2015 >>>>> pZxid = 0x113 >>>>> cversion = 0 >>>>> dataVersion = 0 >>>>> aclVersion = 0 >>>>> ephemeralOwner = 0x0 >>>>> dataLength = 72 >>>>> numChildren = 0 >>>>> >>>>> >>>>> Take a look Zookeeper Storm-broker (port 2000) >>>>> ---------------------------------- >>>>> Start topology, send a message and then look: >>>>> >>>>> bin/zookeeper-shell.sh localhost:2000 >>>>> Connecting to localhost:2000 >>>>> Welcome to ZooKeeper! >>>>> JLine support is disabled >>>>> >>>>> ls / >>>>> [storm, brokers, zookeeper] >>>>> ls /brokers >>>>> [kafka-spout] >>>>> ls /brokers/kafka-spout >>>>> [partition_0] >>>>> ls /brokers/kafka-spout/partition_0 >>>>> [] >>>>> get /brokers/kafka-spout/partition_0 >>>>> {"topology":{"id":"a9be1962-6b4e-4ed4-ae68-155a1948a1f6","name":"consolidate_reports"},"offset":4426029,"partition":0,"broker":{"host":"localhost","port":9092},"topic":"bid_history"} >>>>> cZxid = 0x50 >>>>> ctime = Thu May 21 11:00:48 BRT 2015 >>>>> mZxid = 0x50 >>>>> mtime = Thu May 21 11:00:48 BRT 2015 >>>>> pZxid = 0x50 >>>>> cversion = 0 >>>>> dataVersion = 0 >>>>> aclVersion = 0 >>>>> ephemeralOwner = 0x0 >>>>> dataLength = 182 >>>>> numChildren = 0 >>>>> >>>>> >>>>> 2015-05-21 10:43 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com >>>>> <mailto:cuthbert....@gmail.com>>: >>>>> Bec I do see this error in the storm logs >>>>> >>>>> 5304 [ProcessThread(sid:0 cport:-1):] INFO >>>>> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level >>>>> KeeperException when processing sessionid:0x14d76b1ea54000c type:create >>>>> cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error >>>>> Path:/brokers/rawWarehousePriceSpout Error:KeeperErrorCode = NoNode for >>>>> /brokers/rawWarehousePriceSpout >>>>> >>>>> >>>>> >>>>>> On 21 May 2015, at 14:04, Cristian Makoto Sandiga <cmsand...@gmail.com >>>>>> <mailto:cmsand...@gmail.com>> wrote: >>>>>> >>>>>> Are you wrong >>>>>> >>>>>> zookeeperPath , // the root path in Zookeeper for the spout to store the >>>>>> consumer offsets >>>>>> >>>>>> Is for you KafkaSpout find information about broker meta information ( >>>>>> topics, partitions) >>>>>> >>>>>> You have to override >>>>>> >>>>>> SpoutConfig spoutconfig = new SpoutConfig(zkhost, "bid_history", >>>>>> "/brokers", "kafka-spout"); >>>>>> spoutconfig.zkServers >>>>>> spoutconfig.zkPort >>>>>> >>>>>> Take a look a KafkaSpout.class and do a debug mode in this part. >>>>>> >>>>>> 73 Map stateConf = new HashMap(conf); >>>>>> 74 List<String> zkServers = _spoutConfig.zkServers; >>>>>> if (zkServers == null) { >>>>>> zkServers = (List<String>) >>>>>> conf.get(Config.STORM_ZOOKEEPER_SERVERS); >>>>>> } >>>>>> Integer zkPort = _spoutConfig.zkPort; >>>>>> if (zkPort == null) { >>>>>> zkPort = ((Number) >>>>>> conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue(); >>>>>> } >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> 2015-05-21 9:53 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com >>>>>> <mailto:cuthbert....@gmail.com>>: >>>>>> So when I run it out of eclipse in local mode I am pointing to the same >>>>>> zookeeper as I am when it is deployed. Whatever way it still does not >>>>>> write the offset. >>>>>> >>>>>> Only different I have is my spout is configured like so: >>>>>> >>>>>> BrokerHosts hosts = new ZkHosts(zookeeperQuorum); >>>>>> >>>>>> String zookeeperPath = KAFKA_STORM_DIR + "/" + topic; >>>>>> SpoutConfig spoutConfig = new SpoutConfig( >>>>>> hosts, >>>>>> topic, // topic to read from >>>>>> zookeeperPath , // the root path in Zookeeper >>>>>> for the spout to store the consumer offsets >>>>>> spoutId); // an id for this consumer for >>>>>> storing the consumer offsets in Zookeeper >>>>>> >>>>>> //Check if we should be consuming messages from the beginning >>>>>> spoutConfig.forceFromStart = consumeFromBeginning; >>>>>> spoutConfig.maxOffsetBehind = Long.MAX_VALUE; >>>>>> spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = true; >>>>>> spoutConfig.scheme = new SchemeAsMultiScheme(new >>>>>> KafkaAnalyticsMessageDecoder()); >>>>>> >>>>>> >>>>>>> On 21 May 2015, at 13:50, Cristian Makoto Sandiga <cmsand...@gmail.com >>>>>>> <mailto:cmsand...@gmail.com>> wrote: >>>>>>> >>>>>>> I have the same problem when i have Storm in Local cluster mode. but is >>>>>>> because, Local mode has a zookeeper embedded and every time that >>>>>>> restart the offset is null and is the reason that you have >>>>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2 --> >>>>>>> null. >>>>>>> >>>>>>> Try to run Local Mode, send some messages to spout, and then see Offset >>>>>>> in Zookeeper (port 2000). >>>>>>> >>>>>>> Or use Zookeeper of kafka, override. >>>>>>> >>>>>>> spoutconfig.zkServers >>>>>>> spoutconfig.zkPort >>>>>>> >>>>>>> >>>>>>> 2015-05-21 9:39 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com >>>>>>> <mailto:cuthbert....@gmail.com>>: >>>>>>> Thanks, >>>>>>> >>>>>>> Yup kafka is creating them on startup when the topics gets its first >>>>>>> msg. And from below the storm logs are never really committing the >>>>>>> offset to zookeeper. >>>>>>> >>>>>>> Zookeeper kafka topics details >>>>>>> >>>>>>> [zk: 127.0.0.1:2181(CONNECTED) 41] ls >>>>>>> /brokers/topics/warehouse_prices/partitions >>>>>>> [2, 1, 0] >>>>>>> [zk: 127.0.0.1:2181(CONNECTED) 42] >>>>>>> >>>>>>> Zookeeper Storm >>>>>>> >>>>>>> [zk: 127.0.0.1:2181(CONNECTED) 42] ls >>>>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout >>>>>>> [] >>>>>>> [zk: 127.0.0.1:2181(CONNECTED) 43] >>>>>>> >>>>>>> >>>>>>> Storm logs >>>>>>> >>>>>>> 44325 [Thread-15-rawWarehousePriceSpout] INFO >>>>>>> storm.kafka.ZkCoordinator - Task [1/1] New partition managers: >>>>>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>>>>> partition=0}, >>>>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>>>>> partition=1}, >>>>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>>>>> partition=2}] >>>>>>> 44491 [Thread-15-rawWarehousePriceSpout] INFO >>>>>>> storm.kafka.PartitionManager - Read partition information from: >>>>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_0 --> >>>>>>> null >>>>>>> 44746 [Thread-15-rawWarehousePriceSpout] INFO >>>>>>> storm.kafka.PartitionManager - No partition information found, using >>>>>>> configuration to determine offset >>>>>>> 44746 [Thread-15-rawWarehousePriceSpout] INFO >>>>>>> storm.kafka.PartitionManager - Last commit offset from zookeeper: 0 >>>>>>> 44747 [Thread-15-rawWarehousePriceSpout] INFO >>>>>>> storm.kafka.PartitionManager - Commit offset 0 is more than >>>>>>> 9223372036854775807 behind, resetting to startOffsetTime=-2 >>>>>>> 44747 [Thread-15-rawWarehousePriceSpout] INFO >>>>>>> storm.kafka.PartitionManager - Starting Kafka >>>>>>> price-engine-demo-server.c.celertech-01.internal:0 from offset 0 >>>>>>> 44749 [Thread-15-rawWarehousePriceSpout] INFO >>>>>>> storm.kafka.PartitionManager - Read partition information from: >>>>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_1 --> >>>>>>> null >>>>>>> 44778 [Thread-15-rawWarehousePriceSpout] INFO >>>>>>> storm.kafka.PartitionManager - No partition information found, using >>>>>>> configuration to determine offset >>>>>>> 44778 [Thread-15-rawWarehousePriceSpout] INFO >>>>>>> storm.kafka.PartitionManager - Last commit offset from zookeeper: 0 >>>>>>> 44778 [Thread-15-rawWarehousePriceSpout] INFO >>>>>>> storm.kafka.PartitionManager - Commit offset 0 is more than >>>>>>> 9223372036854775807 behind, resetting to startOffsetTime=-2 >>>>>>> 44779 [Thread-15-rawWarehousePriceSpout] INFO >>>>>>> storm.kafka.PartitionManager - Starting Kafka >>>>>>> price-engine-demo-server.c.celertech-01.internal:1 from offset 0 >>>>>>> 44781 [Thread-15-rawWarehousePriceSpout] INFO >>>>>>> storm.kafka.PartitionManager - Read partition information from: >>>>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2 --> >>>>>>> null >>>>>>> 44809 [Thread-15-rawWarehousePriceSpout] INFO >>>>>>> storm.kafka.PartitionManager - No partition information found, using >>>>>>> configuration to determine offset >>>>>>> 44809 [Thread-15-rawWarehousePriceSpout] INFO >>>>>>> storm.kafka.PartitionManager - Last commit offset from zookeeper: 0 >>>>>>> 44810 [Thread-15-rawWarehousePriceSpout] INFO >>>>>>> storm.kafka.PartitionManager - Commit offset 0 is more than >>>>>>> 9223372036854775807 behind, resetting to startOffsetTime=-2 >>>>>>> 44810 [Thread-15-rawWarehousePriceSpout] INFO >>>>>>> storm.kafka.PartitionManager - Starting Kafka >>>>>>> price-engine-demo-server.c.celertech-01.internal:2 from offset 0 >>>>>>> 44810 [Thread-15-rawWarehousePriceSpout] INFO >>>>>>> storm.kafka.ZkCoordinator - Task [1/1] Finished refreshing >>>>>>> 47438 [ProcessThread(sid:0 cport:-1):] INFO >>>>>>> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level >>>>>>> KeeperException when processing sessionid:0x14d7658db3c000c type:create >>>>>>> cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error >>>>>>> Path:/kafkastorm/warehouse_prices/rawWarehousePriceSpout >>>>>>> Error:KeeperErrorCode = NoNode for >>>>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout >>>>>>> 104184 [Thread-15-rawWarehousePriceSpout] WARN storm.kafka.KafkaUtils >>>>>>> - No data found in Kafka Partition partition_0 >>>>>>> 104828 [Thread-15-rawWarehousePriceSpout] INFO >>>>>>> storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager >>>>>>> connections >>>>>>> 105005 [Thread-15-rawWarehousePriceSpout] INFO >>>>>>> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: >>>>>>> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092, >>>>>>> 1=price-engine-demo-server.c.celertech-01.internal:9092, >>>>>>> 2=price-engine-demo-server.c.celertech-01.internal:9092}} >>>>>>> 105005 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.KafkaUtils >>>>>>> - Task [1/1] assigned >>>>>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>>>>> partition=0}, >>>>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>>>>> partition=1}, >>>>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>>>>> partition=2}] >>>>>>> 105006 [Thread-15-rawWarehousePriceSpout] INFO >>>>>>> storm.kafka.ZkCoordinator - Task [1/1] Deleted partition managers: [] >>>>>>> 105006 [Thread-15-rawWarehousePriceSpout] INFO >>>>>>> storm.kafka.ZkCoordinator - Task [1/1] New partition managers: [] >>>>>>> 105006 [Thread-15-rawWarehousePriceSpout] INFO >>>>>>> storm.kafka.ZkCoordinator - Task [1/1] Finished refreshing >>>>>>> 164204 [Thread-15-rawWarehousePriceSpout] WARN storm.kafka.KafkaUtils >>>>>>> - No data found in Kafka Partition partition_0 >>>>>>> 165063 [Thread-15-rawWarehousePriceSpout] INFO >>>>>>> storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager >>>>>>> connections >>>>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO >>>>>>> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: >>>>>>> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092, >>>>>>> 1=price-engine-demo-server.c.celertech-01.internal:9092, >>>>>>> 2=price-engine-demo-server.c.celertech-01.internal:9092}} >>>>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO storm.kafka.KafkaUtils >>>>>>> - Task [1/1] assigned >>>>>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>>>>> partition=0}, >>>>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>>>>> partition=1}, >>>>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, >>>>>>> partition=2}] >>>>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO >>>>>>> storm.kafka.ZkCoordinator - Task [1/1] Deleted partition managers: [] >>>>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO >>>>>>> storm.kafka.ZkCoordinator - Task [1/1] New partition managers: [] >>>>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO >>>>>>> storm.kafka.ZkCoordinator - Task [1/1] Finished refreshing >>>>>>> 224233 [Thread-15-rawWarehousePriceSpout] WARN storm.kafka.KafkaUtils >>>>>>> - No data found in Kafka Partition partition_0 >>>>>>> 225308 [Thread-15-rawWarehousePriceSpout] INFO >>>>>>> storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager >>>>>>> connections >>>>>>> >>>>>>> >>>>>>>> On 21 May 2015, at 13:28, Cristian Makoto Sandiga <cmsand...@gmail.com >>>>>>>> <mailto:cmsand...@gmail.com>> wrote: >>>>>>>> >>>>>>>> Zookeeper create nothing when startup, you have to create your >>>>>>>> partitions in kafka broker. >>>>>>>> >>>>>>>> bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic >>>>>>>> click_history --replication-factor 1 --partitions 10 >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> 2015-05-21 8:58 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com >>>>>>>> <mailto:cuthbert....@gmail.com>>: >>>>>>>> All, >>>>>>>> >>>>>>>> We changed or paths in zookeeper and we are now seeing >>>>>>>> >>>>>>>> java.lang.RuntimeException: java.lang.RuntimeException: >>>>>>>> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode >>>>>>>> = NoNode for /brokers/topics/warehouse_prices/partitions >>>>>>>> at >>>>>>>> storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:81) >>>>>>>> ~[storm-kafka-0.9.4.jar:0.9.4] >>>>>>>> at >>>>>>>> storm.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:42) >>>>>>>> ~[storm-kafka-0.9.4.jar:0.9.4] >>>>>>>> at storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:57) >>>>>>>> ~[storm-kafka-0.9.4.jar:0.9.4] >>>>>>>> at storm.kafka.KafkaSpout.open(KafkaSpout.java:87) >>>>>>>> ~[storm-kafka-0.9.4.jar:0.9.4] >>>>>>>> at >>>>>>>> backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:522) >>>>>>>> ~[storm-core-0.9.4.jar:0.9.4] >>>>>>>> at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) >>>>>>>> ~[storm-core-0.9.4.jar:0.9.4] >>>>>>>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] >>>>>>>> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51] >>>>>>>> >>>>>>>> Should this not be discovered by the Spout on startup? >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>> >>>> >>> >>> >>> >> >> >