Okay so interesting:

I have your configuration in my local eclipse and a remote zookeeper that kafka 
is connected to:

ZkHosts zkhost = new ZkHosts(“1.1.1.1:2181","/brokers");  // /brokers -> kafka 
broker
SpoutConfig spoutconfig = new SpoutConfig(“1.1.1.1:2181", “/prices", 
"/brokers", “rawWarehousePriceSpout"); 

Zookeeper:

[zk: 127.0.0.1:2181(CONNECTED) 80] ls /brokers
[rawWarehousePriceSpout, topics, ids]
[zk: 127.0.0.1:2181(CONNECTED) 81] ls /brokers/rawWarehousePriceSpout
[]
[zk: 127.0.0.1:2181(CONNECTED) 82]
        

Logs i see:

41994 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.KafkaUtils - Task 
[1/1] assigned 
[Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
partition=0}, 
Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
partition=1}, 
Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
partition=2}]
41994 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - Task 
[1/1] Deleted partition managers: []
41994 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - Task 
[1/1] New partition managers: 
[Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
partition=0}, 
Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
partition=1}, 
Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
partition=2}]
42094 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager - 
Read partition information from: /brokers/rawWarehousePriceSpout/partition_0  
--> null
42363 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager - 
No partition information found, using configuration to determine offset
42363 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager - 
Starting Kafka price-engine-demo-server.c.celertech-01.internal:0 from offset 
425530
42366 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager - 
Read partition information from: /brokers/rawWarehousePriceSpout/partition_1  
--> null
42400 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager - 
No partition information found, using configuration to determine offset
42400 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager - 
Starting Kafka price-engine-demo-server.c.celertech-01.internal:1 from offset 
550715
42401 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager - 
Read partition information from: /brokers/rawWarehousePriceSpout/partition_2  
--> null
42432 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager - 
No partition information found, using configuration to determine offset
42432 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager - 
Starting Kafka price-engine-demo-server.c.celertech-01.internal:2 from offset 
439547
42432 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - Task 
[1/1] Finished refreshing
44652 [ProcessThread(sid:0 cport:-1):] INFO  
org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level 
KeeperException when processing sessionid:0x14d76d4418d000c type:create 
cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error 
Path:/brokers/rawWarehousePriceSpout Error:KeeperErrorCode = NoNode for 
/brokers/rawWarehousePriceSpout



> On 21 May 2015, at 15:02, Cristian Makoto Sandiga <cmsand...@gmail.com> wrote:
> 
> Im sorry, i made a mistake
> 
>               ZkHosts zkhost = new ZkHosts("localhost:2181","/brokers");  // 
> /brokers -> kafka broker
>               SpoutConfig spoutconfig = new SpoutConfig(zkhost, 
> "bid_history", "/brokers", "kafka-spout"); --> storm --> zookeeper 
> 
> If you see not is a ERROR, is a INFO usually when directory structure not 
> exist.
> 
> Take a look  Zookeeper Kaka-broker (port 2181)
> ----------------------------------
> 
> -- bin/zookeeper-shell.sh localhost:2181  
> 
> ls /brokers/topics/bid_history/partitions/0/state
> []
> 
> get /brokers/topics/bid_history/partitions/0/state
> {"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0]}
> cZxid = 0x113
> ctime = Tue May 19 18:19:24 BRT 2015
> mZxid = 0x113
> mtime = Tue May 19 18:19:24 BRT 2015
> pZxid = 0x113
> cversion = 0
> dataVersion = 0
> aclVersion = 0
> ephemeralOwner = 0x0
> dataLength = 72
> numChildren = 0
> 
> 
> Take a look  Zookeeper Storm-broker (port 2000)
> ----------------------------------
> Start topology, send a message and then look:
> 
> bin/zookeeper-shell.sh localhost:2000
> Connecting to localhost:2000
> Welcome to ZooKeeper!
> JLine support is disabled
> 
> ls /
> [storm, brokers, zookeeper]
> ls /brokers
> [kafka-spout]
> ls /brokers/kafka-spout
> [partition_0]
> ls /brokers/kafka-spout/partition_0
> []
> get /brokers/kafka-spout/partition_0
> {"topology":{"id":"a9be1962-6b4e-4ed4-ae68-155a1948a1f6","name":"consolidate_reports"},"offset":4426029,"partition":0,"broker":{"host":"localhost","port":9092},"topic":"bid_history"}
> cZxid = 0x50
> ctime = Thu May 21 11:00:48 BRT 2015
> mZxid = 0x50
> mtime = Thu May 21 11:00:48 BRT 2015
> pZxid = 0x50
> cversion = 0
> dataVersion = 0
> aclVersion = 0
> ephemeralOwner = 0x0
> dataLength = 182
> numChildren = 0
> 
> 
> 2015-05-21 10:43 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com 
> <mailto:cuthbert....@gmail.com>>:
> Bec I do see this error in the storm logs
> 
> 5304 [ProcessThread(sid:0 cport:-1):] INFO  
> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level 
> KeeperException when processing sessionid:0x14d76b1ea54000c type:create 
> cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error 
> Path:/brokers/rawWarehousePriceSpout Error:KeeperErrorCode = NoNode for 
> /brokers/rawWarehousePriceSpout
> 
> 
> 
>> On 21 May 2015, at 14:04, Cristian Makoto Sandiga <cmsand...@gmail.com 
>> <mailto:cmsand...@gmail.com>> wrote:
>> 
>> Are you wrong 
>> 
>> zookeeperPath , // the root path in Zookeeper for the spout to store the 
>> consumer offsets
>> 
>> Is for you KafkaSpout find information about broker meta information ( 
>> topics, partitions)
>> 
>> You have to override 
>> 
>> SpoutConfig spoutconfig = new SpoutConfig(zkhost, "bid_history", "/brokers", 
>> "kafka-spout");
>> spoutconfig.zkServers
>> spoutconfig.zkPort 
>> 
>> Take a look a KafkaSpout.class and do a debug mode in this part.
>> 
>> 73        Map stateConf = new HashMap(conf);
>> 74       List<String> zkServers = _spoutConfig.zkServers;
>>         if (zkServers == null) {
>>             zkServers = (List<String>) 
>> conf.get(Config.STORM_ZOOKEEPER_SERVERS);
>>         }
>>         Integer zkPort = _spoutConfig.zkPort;
>>         if (zkPort == null) {
>>             zkPort = ((Number) 
>> conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
>>         }
>> 
>> 
>> 
>> 
>> 2015-05-21 9:53 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com 
>> <mailto:cuthbert....@gmail.com>>:
>> So when I run it out of eclipse in local mode I am pointing to the same 
>> zookeeper as I am when it is deployed. Whatever way it still does not write 
>> the offset.
>> 
>> Only different I have is my spout is configured like so:
>> 
>> BrokerHosts hosts = new ZkHosts(zookeeperQuorum);
>>              
>>              String zookeeperPath = KAFKA_STORM_DIR + "/" + topic;
>>              SpoutConfig spoutConfig = new SpoutConfig(
>>                              hosts,
>>                              topic, // topic to read from
>>                              zookeeperPath , // the root path in Zookeeper 
>> for the spout to store the consumer offsets
>>                              spoutId); // an id for this consumer for 
>> storing the consumer offsets in Zookeeper
>>         
>>              //Check if we should be consuming messages from the beginning
>>              spoutConfig.forceFromStart = consumeFromBeginning;
>>              spoutConfig.maxOffsetBehind = Long.MAX_VALUE;
>>              spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = true;
>>              spoutConfig.scheme = new SchemeAsMultiScheme(new 
>> KafkaAnalyticsMessageDecoder());
>> 
>> 
>>> On 21 May 2015, at 13:50, Cristian Makoto Sandiga <cmsand...@gmail.com 
>>> <mailto:cmsand...@gmail.com>> wrote:
>>> 
>>> I have the same problem when i have Storm in Local cluster mode. but is 
>>> because, Local mode has a zookeeper embedded and every time that restart  
>>> the offset is null  and is the reason that you have  
>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2  --> null.
>>> 
>>> Try to run Local Mode, send some messages to spout, and then see Offset in 
>>> Zookeeper (port 2000).
>>> 
>>> Or use Zookeeper of kafka, override.
>>> 
>>> spoutconfig.zkServers
>>> spoutconfig.zkPort
>>> 
>>> 
>>> 2015-05-21 9:39 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com 
>>> <mailto:cuthbert....@gmail.com>>:
>>> Thanks,
>>> 
>>> Yup kafka is creating them on startup when the topics gets its first msg. 
>>> And from below the storm logs are never really committing the offset to 
>>> zookeeper. 
>>> 
>>> Zookeeper kafka topics details
>>> 
>>> [zk: 127.0.0.1:2181(CONNECTED) 41] ls 
>>> /brokers/topics/warehouse_prices/partitions
>>> [2, 1, 0]
>>> [zk: 127.0.0.1:2181(CONNECTED) 42]
>>> 
>>> Zookeeper Storm
>>> 
>>> [zk: 127.0.0.1:2181(CONNECTED) 42] ls 
>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout
>>> []
>>> [zk: 127.0.0.1:2181(CONNECTED) 43]
>>> 
>>> 
>>> Storm logs
>>> 
>>> 44325 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
>>> Task [1/1] New partition managers: 
>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>>> partition=0}, 
>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>>> partition=1}, 
>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>>> partition=2}]
>>> 44491 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>>> - Read partition information from: 
>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_0  --> null
>>> 44746 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>>> - No partition information found, using configuration to determine offset
>>> 44746 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>>> - Last commit offset from zookeeper: 0
>>> 44747 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>>> - Commit offset 0 is more than 9223372036854775807 behind, resetting to 
>>> startOffsetTime=-2
>>> 44747 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>>> - Starting Kafka price-engine-demo-server.c.celertech-01.internal:0 from 
>>> offset 0
>>> 44749 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>>> - Read partition information from: 
>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_1  --> null
>>> 44778 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>>> - No partition information found, using configuration to determine offset
>>> 44778 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>>> - Last commit offset from zookeeper: 0
>>> 44778 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>>> - Commit offset 0 is more than 9223372036854775807 behind, resetting to 
>>> startOffsetTime=-2
>>> 44779 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>>> - Starting Kafka price-engine-demo-server.c.celertech-01.internal:1 from 
>>> offset 0
>>> 44781 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>>> - Read partition information from: 
>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2  --> null
>>> 44809 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>>> - No partition information found, using configuration to determine offset
>>> 44809 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>>> - Last commit offset from zookeeper: 0
>>> 44810 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>>> - Commit offset 0 is more than 9223372036854775807 behind, resetting to 
>>> startOffsetTime=-2
>>> 44810 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>>> - Starting Kafka price-engine-demo-server.c.celertech-01.internal:2 from 
>>> offset 0
>>> 44810 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
>>> Task [1/1] Finished refreshing
>>> 47438 [ProcessThread(sid:0 cport:-1):] INFO  
>>> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level 
>>> KeeperException when processing sessionid:0x14d7658db3c000c type:create 
>>> cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error 
>>> Path:/kafkastorm/warehouse_prices/rawWarehousePriceSpout 
>>> Error:KeeperErrorCode = NoNode for 
>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout
>>> 104184 [Thread-15-rawWarehousePriceSpout] WARN  storm.kafka.KafkaUtils - No 
>>> data found in Kafka Partition partition_0
>>> 104828 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
>>> Task [1/1] Refreshing partition manager connections
>>> 105005 [Thread-15-rawWarehousePriceSpout] INFO  
>>> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: 
>>> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092,
>>>  1=price-engine-demo-server.c.celertech-01.internal:9092, 
>>> 2=price-engine-demo-server.c.celertech-01.internal:9092}}
>>> 105005 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.KafkaUtils - 
>>> Task [1/1] assigned 
>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>>> partition=0}, 
>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>>> partition=1}, 
>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>>> partition=2}]
>>> 105006 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
>>> Task [1/1] Deleted partition managers: []
>>> 105006 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
>>> Task [1/1] New partition managers: []
>>> 105006 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
>>> Task [1/1] Finished refreshing
>>> 164204 [Thread-15-rawWarehousePriceSpout] WARN  storm.kafka.KafkaUtils - No 
>>> data found in Kafka Partition partition_0
>>> 165063 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
>>> Task [1/1] Refreshing partition manager connections
>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO  
>>> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: 
>>> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092,
>>>  1=price-engine-demo-server.c.celertech-01.internal:9092, 
>>> 2=price-engine-demo-server.c.celertech-01.internal:9092}}
>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.KafkaUtils - 
>>> Task [1/1] assigned 
>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>>> partition=0}, 
>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>>> partition=1}, 
>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>>> partition=2}]
>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
>>> Task [1/1] Deleted partition managers: []
>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
>>> Task [1/1] New partition managers: []
>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
>>> Task [1/1] Finished refreshing
>>> 224233 [Thread-15-rawWarehousePriceSpout] WARN  storm.kafka.KafkaUtils - No 
>>> data found in Kafka Partition partition_0
>>> 225308 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
>>> Task [1/1] Refreshing partition manager connections
>>> 
>>> 
>>>> On 21 May 2015, at 13:28, Cristian Makoto Sandiga <cmsand...@gmail.com 
>>>> <mailto:cmsand...@gmail.com>> wrote:
>>>> 
>>>> Zookeeper create nothing when startup, you have to create your partitions 
>>>> in kafka broker. 
>>>> 
>>>> bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic 
>>>> click_history --replication-factor 1 --partitions 10
>>>> 
>>>> 
>>>> 
>>>> 2015-05-21 8:58 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com 
>>>> <mailto:cuthbert....@gmail.com>>:
>>>> All,
>>>> 
>>>> We changed or paths in zookeeper and we are now seeing
>>>> 
>>>> java.lang.RuntimeException: java.lang.RuntimeException: 
>>>> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = 
>>>> NoNode for /brokers/topics/warehouse_prices/partitions
>>>>    at 
>>>> storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:81)
>>>>  ~[storm-kafka-0.9.4.jar:0.9.4]
>>>>    at storm.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:42) 
>>>> ~[storm-kafka-0.9.4.jar:0.9.4]
>>>>    at storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:57) 
>>>> ~[storm-kafka-0.9.4.jar:0.9.4]
>>>>    at storm.kafka.KafkaSpout.open(KafkaSpout.java:87) 
>>>> ~[storm-kafka-0.9.4.jar:0.9.4]
>>>>    at 
>>>> backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:522) 
>>>> ~[storm-core-0.9.4.jar:0.9.4]
>>>>    at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) 
>>>> ~[storm-core-0.9.4.jar:0.9.4]
>>>>    at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>>>    at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
>>>> 
>>>> Should this not be discovered by the Spout on startup? 
>>>> 
>>> 
>>> 
>> 
>> 
> 
> 

Reply via email to