This line of code

  long lastCompletedOffset = lastCompletedOffset();
        if (_committedTo != lastCompletedOffset) {

In PartitionManager

Will always be = in my case it is never different so there is never a write.


> On 24 May 2015, at 18:18, Benjamin Cuthbert <cuthbert....@gmail.com> wrote:
> 
> The issue is that storm did not create the /brokers/<topics> details in 
> zookeeper so when the works starts it gets
> 
> 2015-05-24T09:28:54.706+0000 s.k.PartitionManager [WARN] Error reading and/or 
> parsing at ZkNode: /brokers/rawWarehousePriceSpout/partition_0
> java.lang.NullPointerException: null
>       at storm.kafka.PartitionManager.<init>(PartitionManager.java:76) 
> ~[celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na]
>       at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) 
> [celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na]
>       at 
> storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) 
> [celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na]
>       at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) 
> [celertech-analytics-dependencies-DEVELOP-HEAD-SNAPSHOT.jar:na]
>       at 
> backtype.storm.daemon.executor$fn__4654$fn__4669$fn__4698.invoke(executor.clj:565)
>  [storm-core-0.9.4.jar:0.9.4]
> 
> Even this the information exists because it was created manually:
> 
> [zk: 127.0.0.1:2181(CONNECTED) 23] get 
> /brokers/rawWarehousePriceSpout/partition_0
> {}
> cZxid = 0x38d0c
> ctime = Thu May 21 18:53:27 UTC 2015
> mZxid = 0x38d0c
> mtime = Thu May 21 18:53:27 UTC 2015
> pZxid = 0x38d0c
> cversion = 0
> dataVersion = 0
> aclVersion = 0
> ephemeralOwner = 0x0
> dataLength = 2
> numChildren = 0
> [zk: 127.0.0.1:2181(CONNECTED) 24]
> 
> But I guess it is looking for certain information
> 
>> On 21 May 2015, at 19:28, Cristian Makoto Sandiga <cmsand...@gmail.com 
>> <mailto:cmsand...@gmail.com>> wrote:
>> 
>> use GET
>> 
>> get /brokers/rawWarehousePriceSpout/partition_0
>> 
>> 2015-05-21 15:27 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com 
>> <mailto:cuthbert....@gmail.com>>:
>> So I do not see what you see. So just so I am sure the SpoutConfig when 
>> deployed into an env should have the zookeeper host with the same value as 
>> the kafka zookeeper?
>> 
>> [zk: localhost:2000(CONNECTED) 7] ls 
>> /brokers/rawWarehousePriceSpout/partition_0
>> []
>> [zk: localhost:2000(CONNECTED) 8] ls 
>> /brokers/rawWarehousePriceSpout/partition_0
>> []
>> [zk: localhost:2000(CONNECTED) 9]
>> 
>> 
>> 42742 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>> - Read partition information from: 
>> /brokers/rawWarehousePriceSpout/partition_0  --> null
>> 43107 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>> - No partition information found, using configuration to determine offset
>> 43107 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>> - Starting Kafka price-engine-demo-server.c.celertech-01.internal:0 from 
>> offset 802916
>> 43109 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>> - Read partition information from: 
>> /brokers/rawWarehousePriceSpout/partition_1  --> null
>> 43130 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>> - No partition information found, using configuration to determine offset
>> 43130 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>> - Starting Kafka price-engine-demo-server.c.celertech-01.internal:1 from 
>> offset 1071876
>> 43130 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>> - Read partition information from: 
>> /brokers/rawWarehousePriceSpout/partition_2  --> null
>> 43154 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>> - No partition information found, using configuration to determine offset
>> 43154 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.PartitionManager 
>> - Starting Kafka price-engine-demo-server.c.celertech-01.internal:2 from 
>> offset 1494070
>> 43154 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
>> Task [1/1] Finished refreshing
>> 121420 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
>> Task [1/1] Refreshing partition manager connections
>> 121641 [Thread-15-rawWarehousePriceSpout] INFO  
>> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: 
>> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092,
>>  1=price-engine-demo-server.c.celertech-01.internal:9092, 
>> 2=price-engine-demo-server.c.celertech-01.internal:9092}}
>> 121641 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.KafkaUtils - 
>> Task [1/1] assigned 
>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>> partition=0}, 
>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>> partition=1}, 
>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>> partition=2}]
>> 121641 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
>> Task [1/1] Deleted partition managers: []
>> 121641 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
>> Task [1/1] New partition managers: []
>> 121641 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
>> Task [1/1] Finished refreshing
>> 
>> 
>>> On 21 May 2015, at 15:37, Cristian Makoto Sandiga <cmsand...@gmail.com 
>>> <mailto:cmsand...@gmail.com>> wrote:
>>> 
>>> Again, when is eclipse local topology you have to connect to localhost:2000 
>>> (in the embedded stom zookeeper)
>>> 
>>> 2015-05-21 11:28 GMT-03:00 Cristian Makoto Sandiga <cmsand...@gmail.com 
>>> <mailto:cmsand...@gmail.com>>:
>>> I dont know, i'm using BaseRichBolt.
>>> 
>>> 2015-05-21 11:25 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com 
>>> <mailto:cuthbert....@gmail.com>>:
>>> 
>>> I am using BaseBasicBolt as that was told ack events automatically. Should 
>>> we not use that?
>>> 
>>>  
>>>> On 21 May 2015, at 15:24, Cristian Makoto Sandiga <cmsand...@gmail.com 
>>>> <mailto:cmsand...@gmail.com>> wrote:
>>>> 
>>>> Are you acking in your bolt? collector.ack(input);
>>>> 
>>>> 2015-05-21 11:19 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com 
>>>> <mailto:cuthbert....@gmail.com>>:
>>>> Okay so interesting:
>>>> 
>>>> I have your configuration in my local eclipse and a remote zookeeper that 
>>>> kafka is connected to:
>>>> 
>>>> ZkHosts zkhost = new ZkHosts(“1.1.1.1:2181 
>>>> <http://1.1.1.1:2181/>","/brokers");  // /brokers -> kafka broker
>>>> SpoutConfig spoutconfig = new SpoutConfig(“1.1.1.1:2181 
>>>> <http://1.1.1.1:2181/>", “/prices", "/brokers", “rawWarehousePriceSpout"); 
>>>> 
>>>> Zookeeper:
>>>> 
>>>> [zk: 127.0.0.1:2181(CONNECTED) 80] ls /brokers
>>>> [rawWarehousePriceSpout, topics, ids]
>>>> [zk: 127.0.0.1:2181(CONNECTED) 81] ls /brokers/rawWarehousePriceSpout
>>>> []
>>>> [zk: 127.0.0.1:2181(CONNECTED) 82]
>>>>    
>>>> 
>>>> Logs i see:
>>>> 
>>>> 41994 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.KafkaUtils - 
>>>> Task [1/1] assigned 
>>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>>>> partition=0}, 
>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>>>> partition=1}, 
>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>>>> partition=2}]
>>>> 41994 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
>>>> Task [1/1] Deleted partition managers: []
>>>> 41994 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
>>>> Task [1/1] New partition managers: 
>>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>>>> partition=0}, 
>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>>>> partition=1}, 
>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>>>> partition=2}]
>>>> 42094 [Thread-15-rawWarehousePriceSpout] INFO  
>>>> storm.kafka.PartitionManager - Read partition information from: 
>>>> /brokers/rawWarehousePriceSpout/partition_0  --> null
>>>> 42363 [Thread-15-rawWarehousePriceSpout] INFO  
>>>> storm.kafka.PartitionManager - No partition information found, using 
>>>> configuration to determine offset
>>>> 42363 [Thread-15-rawWarehousePriceSpout] INFO  
>>>> storm.kafka.PartitionManager - Starting Kafka 
>>>> price-engine-demo-server.c.celertech-01.internal:0 from offset 425530
>>>> 42366 [Thread-15-rawWarehousePriceSpout] INFO  
>>>> storm.kafka.PartitionManager - Read partition information from: 
>>>> /brokers/rawWarehousePriceSpout/partition_1  --> null
>>>> 42400 [Thread-15-rawWarehousePriceSpout] INFO  
>>>> storm.kafka.PartitionManager - No partition information found, using 
>>>> configuration to determine offset
>>>> 42400 [Thread-15-rawWarehousePriceSpout] INFO  
>>>> storm.kafka.PartitionManager - Starting Kafka 
>>>> price-engine-demo-server.c.celertech-01.internal:1 from offset 550715
>>>> 42401 [Thread-15-rawWarehousePriceSpout] INFO  
>>>> storm.kafka.PartitionManager - Read partition information from: 
>>>> /brokers/rawWarehousePriceSpout/partition_2  --> null
>>>> 42432 [Thread-15-rawWarehousePriceSpout] INFO  
>>>> storm.kafka.PartitionManager - No partition information found, using 
>>>> configuration to determine offset
>>>> 42432 [Thread-15-rawWarehousePriceSpout] INFO  
>>>> storm.kafka.PartitionManager - Starting Kafka 
>>>> price-engine-demo-server.c.celertech-01.internal:2 from offset 439547
>>>> 42432 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator - 
>>>> Task [1/1] Finished refreshing
>>>> 44652 [ProcessThread(sid:0 cport:-1):] INFO  
>>>> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level 
>>>> KeeperException when processing sessionid:0x14d76d4418d000c type:create 
>>>> cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error 
>>>> Path:/brokers/rawWarehousePriceSpout Error:KeeperErrorCode = NoNode for 
>>>> /brokers/rawWarehousePriceSpout
>>>> 
>>>> 
>>>> 
>>>>> On 21 May 2015, at 15:02, Cristian Makoto Sandiga <cmsand...@gmail.com 
>>>>> <mailto:cmsand...@gmail.com>> wrote:
>>>>> 
>>>>> Im sorry, i made a mistake
>>>>> 
>>>>>           ZkHosts zkhost = new ZkHosts("localhost:2181","/brokers");  // 
>>>>> /brokers -> kafka broker
>>>>>           SpoutConfig spoutconfig = new SpoutConfig(zkhost, 
>>>>> "bid_history", "/brokers", "kafka-spout"); --> storm --> zookeeper 
>>>>> 
>>>>> If you see not is a ERROR, is a INFO usually when directory structure not 
>>>>> exist.
>>>>> 
>>>>> Take a look  Zookeeper Kaka-broker (port 2181)
>>>>> ----------------------------------
>>>>> 
>>>>> -- bin/zookeeper-shell.sh localhost:2181  
>>>>> 
>>>>> ls /brokers/topics/bid_history/partitions/0/state
>>>>> []
>>>>> 
>>>>> get /brokers/topics/bid_history/partitions/0/state
>>>>> {"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0]}
>>>>> cZxid = 0x113
>>>>> ctime = Tue May 19 18:19:24 BRT 2015
>>>>> mZxid = 0x113
>>>>> mtime = Tue May 19 18:19:24 BRT 2015
>>>>> pZxid = 0x113
>>>>> cversion = 0
>>>>> dataVersion = 0
>>>>> aclVersion = 0
>>>>> ephemeralOwner = 0x0
>>>>> dataLength = 72
>>>>> numChildren = 0
>>>>> 
>>>>> 
>>>>> Take a look  Zookeeper Storm-broker (port 2000)
>>>>> ----------------------------------
>>>>> Start topology, send a message and then look:
>>>>> 
>>>>> bin/zookeeper-shell.sh localhost:2000
>>>>> Connecting to localhost:2000
>>>>> Welcome to ZooKeeper!
>>>>> JLine support is disabled
>>>>> 
>>>>> ls /
>>>>> [storm, brokers, zookeeper]
>>>>> ls /brokers
>>>>> [kafka-spout]
>>>>> ls /brokers/kafka-spout
>>>>> [partition_0]
>>>>> ls /brokers/kafka-spout/partition_0
>>>>> []
>>>>> get /brokers/kafka-spout/partition_0
>>>>> {"topology":{"id":"a9be1962-6b4e-4ed4-ae68-155a1948a1f6","name":"consolidate_reports"},"offset":4426029,"partition":0,"broker":{"host":"localhost","port":9092},"topic":"bid_history"}
>>>>> cZxid = 0x50
>>>>> ctime = Thu May 21 11:00:48 BRT 2015
>>>>> mZxid = 0x50
>>>>> mtime = Thu May 21 11:00:48 BRT 2015
>>>>> pZxid = 0x50
>>>>> cversion = 0
>>>>> dataVersion = 0
>>>>> aclVersion = 0
>>>>> ephemeralOwner = 0x0
>>>>> dataLength = 182
>>>>> numChildren = 0
>>>>> 
>>>>> 
>>>>> 2015-05-21 10:43 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com 
>>>>> <mailto:cuthbert....@gmail.com>>:
>>>>> Bec I do see this error in the storm logs
>>>>> 
>>>>> 5304 [ProcessThread(sid:0 cport:-1):] INFO  
>>>>> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level 
>>>>> KeeperException when processing sessionid:0x14d76b1ea54000c type:create 
>>>>> cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error 
>>>>> Path:/brokers/rawWarehousePriceSpout Error:KeeperErrorCode = NoNode for 
>>>>> /brokers/rawWarehousePriceSpout
>>>>> 
>>>>> 
>>>>> 
>>>>>> On 21 May 2015, at 14:04, Cristian Makoto Sandiga <cmsand...@gmail.com 
>>>>>> <mailto:cmsand...@gmail.com>> wrote:
>>>>>> 
>>>>>> Are you wrong 
>>>>>> 
>>>>>> zookeeperPath , // the root path in Zookeeper for the spout to store the 
>>>>>> consumer offsets
>>>>>> 
>>>>>> Is for you KafkaSpout find information about broker meta information ( 
>>>>>> topics, partitions)
>>>>>> 
>>>>>> You have to override 
>>>>>> 
>>>>>> SpoutConfig spoutconfig = new SpoutConfig(zkhost, "bid_history", 
>>>>>> "/brokers", "kafka-spout");
>>>>>> spoutconfig.zkServers
>>>>>> spoutconfig.zkPort 
>>>>>> 
>>>>>> Take a look a KafkaSpout.class and do a debug mode in this part.
>>>>>> 
>>>>>> 73        Map stateConf = new HashMap(conf);
>>>>>> 74       List<String> zkServers = _spoutConfig.zkServers;
>>>>>>         if (zkServers == null) {
>>>>>>             zkServers = (List<String>) 
>>>>>> conf.get(Config.STORM_ZOOKEEPER_SERVERS);
>>>>>>         }
>>>>>>         Integer zkPort = _spoutConfig.zkPort;
>>>>>>         if (zkPort == null) {
>>>>>>             zkPort = ((Number) 
>>>>>> conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
>>>>>>         }
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 2015-05-21 9:53 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com 
>>>>>> <mailto:cuthbert....@gmail.com>>:
>>>>>> So when I run it out of eclipse in local mode I am pointing to the same 
>>>>>> zookeeper as I am when it is deployed. Whatever way it still does not 
>>>>>> write the offset.
>>>>>> 
>>>>>> Only different I have is my spout is configured like so:
>>>>>> 
>>>>>> BrokerHosts hosts = new ZkHosts(zookeeperQuorum);
>>>>>>          
>>>>>>          String zookeeperPath = KAFKA_STORM_DIR + "/" + topic;
>>>>>>          SpoutConfig spoutConfig = new SpoutConfig(
>>>>>>                          hosts,
>>>>>>                          topic, // topic to read from
>>>>>>                          zookeeperPath , // the root path in Zookeeper 
>>>>>> for the spout to store the consumer offsets
>>>>>>                          spoutId); // an id for this consumer for 
>>>>>> storing the consumer offsets in Zookeeper
>>>>>>         
>>>>>>          //Check if we should be consuming messages from the beginning
>>>>>>          spoutConfig.forceFromStart = consumeFromBeginning;
>>>>>>          spoutConfig.maxOffsetBehind = Long.MAX_VALUE;
>>>>>>          spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = true;
>>>>>>          spoutConfig.scheme = new SchemeAsMultiScheme(new 
>>>>>> KafkaAnalyticsMessageDecoder());
>>>>>> 
>>>>>> 
>>>>>>> On 21 May 2015, at 13:50, Cristian Makoto Sandiga <cmsand...@gmail.com 
>>>>>>> <mailto:cmsand...@gmail.com>> wrote:
>>>>>>> 
>>>>>>> I have the same problem when i have Storm in Local cluster mode. but is 
>>>>>>> because, Local mode has a zookeeper embedded and every time that 
>>>>>>> restart  the offset is null  and is the reason that you have  
>>>>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2  --> 
>>>>>>> null.
>>>>>>> 
>>>>>>> Try to run Local Mode, send some messages to spout, and then see Offset 
>>>>>>> in Zookeeper (port 2000).
>>>>>>> 
>>>>>>> Or use Zookeeper of kafka, override.
>>>>>>> 
>>>>>>> spoutconfig.zkServers
>>>>>>> spoutconfig.zkPort
>>>>>>> 
>>>>>>> 
>>>>>>> 2015-05-21 9:39 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com 
>>>>>>> <mailto:cuthbert....@gmail.com>>:
>>>>>>> Thanks,
>>>>>>> 
>>>>>>> Yup kafka is creating them on startup when the topics gets its first 
>>>>>>> msg. And from below the storm logs are never really committing the 
>>>>>>> offset to zookeeper. 
>>>>>>> 
>>>>>>> Zookeeper kafka topics details
>>>>>>> 
>>>>>>> [zk: 127.0.0.1:2181(CONNECTED) 41] ls 
>>>>>>> /brokers/topics/warehouse_prices/partitions
>>>>>>> [2, 1, 0]
>>>>>>> [zk: 127.0.0.1:2181(CONNECTED) 42]
>>>>>>> 
>>>>>>> Zookeeper Storm
>>>>>>> 
>>>>>>> [zk: 127.0.0.1:2181(CONNECTED) 42] ls 
>>>>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout
>>>>>>> []
>>>>>>> [zk: 127.0.0.1:2181(CONNECTED) 43]
>>>>>>> 
>>>>>>> 
>>>>>>> Storm logs
>>>>>>> 
>>>>>>> 44325 [Thread-15-rawWarehousePriceSpout] INFO  
>>>>>>> storm.kafka.ZkCoordinator - Task [1/1] New partition managers: 
>>>>>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>>>>>>> partition=0}, 
>>>>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>>>>>>> partition=1}, 
>>>>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>>>>>>> partition=2}]
>>>>>>> 44491 [Thread-15-rawWarehousePriceSpout] INFO  
>>>>>>> storm.kafka.PartitionManager - Read partition information from: 
>>>>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_0  --> 
>>>>>>> null
>>>>>>> 44746 [Thread-15-rawWarehousePriceSpout] INFO  
>>>>>>> storm.kafka.PartitionManager - No partition information found, using 
>>>>>>> configuration to determine offset
>>>>>>> 44746 [Thread-15-rawWarehousePriceSpout] INFO  
>>>>>>> storm.kafka.PartitionManager - Last commit offset from zookeeper: 0
>>>>>>> 44747 [Thread-15-rawWarehousePriceSpout] INFO  
>>>>>>> storm.kafka.PartitionManager - Commit offset 0 is more than 
>>>>>>> 9223372036854775807 behind, resetting to startOffsetTime=-2
>>>>>>> 44747 [Thread-15-rawWarehousePriceSpout] INFO  
>>>>>>> storm.kafka.PartitionManager - Starting Kafka 
>>>>>>> price-engine-demo-server.c.celertech-01.internal:0 from offset 0
>>>>>>> 44749 [Thread-15-rawWarehousePriceSpout] INFO  
>>>>>>> storm.kafka.PartitionManager - Read partition information from: 
>>>>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_1  --> 
>>>>>>> null
>>>>>>> 44778 [Thread-15-rawWarehousePriceSpout] INFO  
>>>>>>> storm.kafka.PartitionManager - No partition information found, using 
>>>>>>> configuration to determine offset
>>>>>>> 44778 [Thread-15-rawWarehousePriceSpout] INFO  
>>>>>>> storm.kafka.PartitionManager - Last commit offset from zookeeper: 0
>>>>>>> 44778 [Thread-15-rawWarehousePriceSpout] INFO  
>>>>>>> storm.kafka.PartitionManager - Commit offset 0 is more than 
>>>>>>> 9223372036854775807 behind, resetting to startOffsetTime=-2
>>>>>>> 44779 [Thread-15-rawWarehousePriceSpout] INFO  
>>>>>>> storm.kafka.PartitionManager - Starting Kafka 
>>>>>>> price-engine-demo-server.c.celertech-01.internal:1 from offset 0
>>>>>>> 44781 [Thread-15-rawWarehousePriceSpout] INFO  
>>>>>>> storm.kafka.PartitionManager - Read partition information from: 
>>>>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2  --> 
>>>>>>> null
>>>>>>> 44809 [Thread-15-rawWarehousePriceSpout] INFO  
>>>>>>> storm.kafka.PartitionManager - No partition information found, using 
>>>>>>> configuration to determine offset
>>>>>>> 44809 [Thread-15-rawWarehousePriceSpout] INFO  
>>>>>>> storm.kafka.PartitionManager - Last commit offset from zookeeper: 0
>>>>>>> 44810 [Thread-15-rawWarehousePriceSpout] INFO  
>>>>>>> storm.kafka.PartitionManager - Commit offset 0 is more than 
>>>>>>> 9223372036854775807 behind, resetting to startOffsetTime=-2
>>>>>>> 44810 [Thread-15-rawWarehousePriceSpout] INFO  
>>>>>>> storm.kafka.PartitionManager - Starting Kafka 
>>>>>>> price-engine-demo-server.c.celertech-01.internal:2 from offset 0
>>>>>>> 44810 [Thread-15-rawWarehousePriceSpout] INFO  
>>>>>>> storm.kafka.ZkCoordinator - Task [1/1] Finished refreshing
>>>>>>> 47438 [ProcessThread(sid:0 cport:-1):] INFO  
>>>>>>> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level 
>>>>>>> KeeperException when processing sessionid:0x14d7658db3c000c type:create 
>>>>>>> cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error 
>>>>>>> Path:/kafkastorm/warehouse_prices/rawWarehousePriceSpout 
>>>>>>> Error:KeeperErrorCode = NoNode for 
>>>>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout
>>>>>>> 104184 [Thread-15-rawWarehousePriceSpout] WARN  storm.kafka.KafkaUtils 
>>>>>>> - No data found in Kafka Partition partition_0
>>>>>>> 104828 [Thread-15-rawWarehousePriceSpout] INFO  
>>>>>>> storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager 
>>>>>>> connections
>>>>>>> 105005 [Thread-15-rawWarehousePriceSpout] INFO  
>>>>>>> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: 
>>>>>>> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092,
>>>>>>>  1=price-engine-demo-server.c.celertech-01.internal:9092, 
>>>>>>> 2=price-engine-demo-server.c.celertech-01.internal:9092}}
>>>>>>> 105005 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.KafkaUtils 
>>>>>>> - Task [1/1] assigned 
>>>>>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>>>>>>> partition=0}, 
>>>>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>>>>>>> partition=1}, 
>>>>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>>>>>>> partition=2}]
>>>>>>> 105006 [Thread-15-rawWarehousePriceSpout] INFO  
>>>>>>> storm.kafka.ZkCoordinator - Task [1/1] Deleted partition managers: []
>>>>>>> 105006 [Thread-15-rawWarehousePriceSpout] INFO  
>>>>>>> storm.kafka.ZkCoordinator - Task [1/1] New partition managers: []
>>>>>>> 105006 [Thread-15-rawWarehousePriceSpout] INFO  
>>>>>>> storm.kafka.ZkCoordinator - Task [1/1] Finished refreshing
>>>>>>> 164204 [Thread-15-rawWarehousePriceSpout] WARN  storm.kafka.KafkaUtils 
>>>>>>> - No data found in Kafka Partition partition_0
>>>>>>> 165063 [Thread-15-rawWarehousePriceSpout] INFO  
>>>>>>> storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager 
>>>>>>> connections
>>>>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO  
>>>>>>> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper: 
>>>>>>> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092,
>>>>>>>  1=price-engine-demo-server.c.celertech-01.internal:9092, 
>>>>>>> 2=price-engine-demo-server.c.celertech-01.internal:9092}}
>>>>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.KafkaUtils 
>>>>>>> - Task [1/1] assigned 
>>>>>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>>>>>>> partition=0}, 
>>>>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>>>>>>> partition=1}, 
>>>>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092, 
>>>>>>> partition=2}]
>>>>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO  
>>>>>>> storm.kafka.ZkCoordinator - Task [1/1] Deleted partition managers: []
>>>>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO  
>>>>>>> storm.kafka.ZkCoordinator - Task [1/1] New partition managers: []
>>>>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO  
>>>>>>> storm.kafka.ZkCoordinator - Task [1/1] Finished refreshing
>>>>>>> 224233 [Thread-15-rawWarehousePriceSpout] WARN  storm.kafka.KafkaUtils 
>>>>>>> - No data found in Kafka Partition partition_0
>>>>>>> 225308 [Thread-15-rawWarehousePriceSpout] INFO  
>>>>>>> storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager 
>>>>>>> connections
>>>>>>> 
>>>>>>> 
>>>>>>>> On 21 May 2015, at 13:28, Cristian Makoto Sandiga <cmsand...@gmail.com 
>>>>>>>> <mailto:cmsand...@gmail.com>> wrote:
>>>>>>>> 
>>>>>>>> Zookeeper create nothing when startup, you have to create your 
>>>>>>>> partitions in kafka broker. 
>>>>>>>> 
>>>>>>>> bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic 
>>>>>>>> click_history --replication-factor 1 --partitions 10
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 2015-05-21 8:58 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com 
>>>>>>>> <mailto:cuthbert....@gmail.com>>:
>>>>>>>> All,
>>>>>>>> 
>>>>>>>> We changed or paths in zookeeper and we are now seeing
>>>>>>>> 
>>>>>>>> java.lang.RuntimeException: java.lang.RuntimeException: 
>>>>>>>> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode 
>>>>>>>> = NoNode for /brokers/topics/warehouse_prices/partitions
>>>>>>>>        at 
>>>>>>>> storm.kafka.DynamicBrokersReader.getBrokerInfo(DynamicBrokersReader.java:81)
>>>>>>>>  ~[storm-kafka-0.9.4.jar:0.9.4]
>>>>>>>>        at 
>>>>>>>> storm.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:42) 
>>>>>>>> ~[storm-kafka-0.9.4.jar:0.9.4]
>>>>>>>>        at storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:57) 
>>>>>>>> ~[storm-kafka-0.9.4.jar:0.9.4]
>>>>>>>>        at storm.kafka.KafkaSpout.open(KafkaSpout.java:87) 
>>>>>>>> ~[storm-kafka-0.9.4.jar:0.9.4]
>>>>>>>>        at 
>>>>>>>> backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:522)
>>>>>>>>  ~[storm-core-0.9.4.jar:0.9.4]
>>>>>>>>        at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461) 
>>>>>>>> ~[storm-core-0.9.4.jar:0.9.4]
>>>>>>>>        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>>>>>>>        at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
>>>>>>>> 
>>>>>>>> Should this not be discovered by the Spout on startup? 
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>>> 
>>> 
>>> 
>>> 
>> 
>> 
> 

Reply via email to