Im sorry, i made a mistake

ZkHosts zkhost = new ZkHosts("localhost:2181","/brokers");  // /brokers ->
kafka broker
SpoutConfig spoutconfig = new SpoutConfig(zkhost, "bid_history",
"/brokers", "kafka-spout"); --> storm --> zookeeper

If you see not is a ERROR, is a INFO usually when directory structure not
exist.

Take a look  Zookeeper Kaka-broker (port 2181)
----------------------------------

-- bin/zookeeper-shell.sh localhost:2181

ls /brokers/topics/bid_history/partitions/0/state
[]

get /brokers/topics/bid_history/partitions/0/state
{"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0]}
cZxid = 0x113
ctime = Tue May 19 18:19:24 BRT 2015
mZxid = 0x113
mtime = Tue May 19 18:19:24 BRT 2015
pZxid = 0x113
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 72
numChildren = 0


Take a look  Zookeeper Storm-broker (port 2000)
----------------------------------
Start topology, send a message and then look:

bin/zookeeper-shell.sh localhost:2000
Connecting to localhost:2000
Welcome to ZooKeeper!
JLine support is disabled

ls /
[storm, brokers, zookeeper]
ls /brokers
[kafka-spout]
ls /brokers/kafka-spout
[partition_0]
ls /brokers/kafka-spout/partition_0
[]
get /brokers/kafka-spout/partition_0
{"topology":{"id":"a9be1962-6b4e-4ed4-ae68-155a1948a1f6","name":"consolidate_reports"},"offset":4426029,"partition":0,"broker":{"host":"localhost","port":9092},"topic":"bid_history"}
cZxid = 0x50
ctime = Thu May 21 11:00:48 BRT 2015
mZxid = 0x50
mtime = Thu May 21 11:00:48 BRT 2015
pZxid = 0x50
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 182
numChildren = 0


2015-05-21 10:43 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com>:

> Bec I do see this error in the storm logs
>
> 5304 [ProcessThread(sid:0 cport:-1):] INFO
> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level
> KeeperException when processing sessionid:0x14d76b1ea54000c type:create
> cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error
> Path:/brokers/rawWarehousePriceSpout Error:KeeperErrorCode = NoNode for
> /brokers/rawWarehousePriceSpout
>
>
>
> On 21 May 2015, at 14:04, Cristian Makoto Sandiga <cmsand...@gmail.com>
> wrote:
>
> Are you wrong
>
> zookeeperPath , // the root path in Zookeeper for the spout to store the
> consumer offsets
>
> Is for you KafkaSpout find information about broker meta information (
> topics, partitions)
>
> You have to override
>
> SpoutConfig spoutconfig = new SpoutConfig(zkhost, "bid_history",
> "/brokers", "kafka-spout");
> spoutconfig.zkServers
> spoutconfig.zkPort
>
> Take a look a KafkaSpout.class and do a debug mode in this part.
>
> 73        Map stateConf = new HashMap(conf);
> 74       List<String> zkServers = _spoutConfig.zkServers;
>         if (zkServers == null) {
>             zkServers = (List<String>)
> conf.get(Config.STORM_ZOOKEEPER_SERVERS);
>         }
>         Integer zkPort = _spoutConfig.zkPort;
>         if (zkPort == null) {
>             zkPort = ((Number)
> conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
>         }
>
>
>
>
> 2015-05-21 9:53 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com>:
>
>> So when I run it out of eclipse in local mode I am pointing to the same
>> zookeeper as I am when it is deployed. Whatever way it still does not write
>> the offset.
>>
>> Only different I have is my spout is configured like so:
>>
>> BrokerHosts hosts = new ZkHosts(zookeeperQuorum);
>>
>> String zookeeperPath = KAFKA_STORM_DIR + "/" + topic;
>> SpoutConfig spoutConfig = new SpoutConfig(
>> hosts,
>> topic, // topic to read from
>> zookeeperPath , // the root path in Zookeeper for the spout to store the
>> consumer offsets
>> spoutId); // an id for this consumer for storing the consumer offsets in
>> Zookeeper
>>
>> //Check if we should be consuming messages from the beginning
>> spoutConfig.forceFromStart = consumeFromBeginning;
>> spoutConfig.maxOffsetBehind = Long.MAX_VALUE;
>> spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = true;
>> spoutConfig.scheme = new SchemeAsMultiScheme(new
>> KafkaAnalyticsMessageDecoder());
>>
>>
>> On 21 May 2015, at 13:50, Cristian Makoto Sandiga <cmsand...@gmail.com>
>> wrote:
>>
>> I have the same problem when i have Storm in Local cluster mode. but is
>> because, Local mode has a zookeeper embedded and every time that restart
>>  the offset is null  and is the reason that you have
>>  /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2  -->
>> null.
>>
>> Try to run Local Mode, send some messages to spout, and then see Offset
>> in Zookeeper (port 2000).
>>
>> Or use Zookeeper of kafka, override.
>>
>> spoutconfig.zkServers
>> spoutconfig.zkPort
>>
>>
>> 2015-05-21 9:39 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com>:
>>
>>> Thanks,
>>>
>>> Yup kafka is creating them on startup when the topics gets its first
>>> msg. And from below the storm logs are never really committing the offset
>>> to zookeeper.
>>>
>>> Zookeeper kafka topics details
>>>
>>> [zk: 127.0.0.1:2181(CONNECTED) 41] ls
>>> /brokers/topics/warehouse_prices/partitions
>>> [2, 1, 0]
>>> [zk: 127.0.0.1:2181(CONNECTED) 42]
>>>
>>> Zookeeper Storm
>>>
>>> [zk: 127.0.0.1:2181(CONNECTED) 42] ls
>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout
>>> []
>>> [zk: 127.0.0.1:2181(CONNECTED) 43]
>>>
>>>
>>> Storm logs
>>>
>>> 44325 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator
>>> - Task [1/1] New partition managers:
>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>> partition=0},
>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>> partition=1},
>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>> partition=2}]
>>> 44491 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.PartitionManager - Read partition information from:
>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_0  --> null
>>> 44746 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.PartitionManager - No partition information found, using
>>> configuration to determine offset
>>> 44746 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.PartitionManager - Last commit offset from zookeeper: 0
>>> 44747 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.PartitionManager - Commit offset 0 is more than
>>> 9223372036854775807 behind, resetting to startOffsetTime=-2
>>> 44747 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.PartitionManager - Starting Kafka
>>> price-engine-demo-server.c.celertech-01.internal:0 from offset 0
>>> 44749 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.PartitionManager - Read partition information from:
>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_1  --> null
>>> 44778 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.PartitionManager - No partition information found, using
>>> configuration to determine offset
>>> 44778 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.PartitionManager - Last commit offset from zookeeper: 0
>>> 44778 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.PartitionManager - Commit offset 0 is more than
>>> 9223372036854775807 behind, resetting to startOffsetTime=-2
>>> 44779 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.PartitionManager - Starting Kafka
>>> price-engine-demo-server.c.celertech-01.internal:1 from offset 0
>>> 44781 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.PartitionManager - Read partition information from:
>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2  --> null
>>> 44809 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.PartitionManager - No partition information found, using
>>> configuration to determine offset
>>> 44809 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.PartitionManager - Last commit offset from zookeeper: 0
>>> 44810 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.PartitionManager - Commit offset 0 is more than
>>> 9223372036854775807 behind, resetting to startOffsetTime=-2
>>> 44810 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.PartitionManager - Starting Kafka
>>> price-engine-demo-server.c.celertech-01.internal:2 from offset 0
>>> 44810 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator
>>> - Task [1/1] Finished refreshing
>>> 47438 [ProcessThread(sid:0 cport:-1):] INFO
>>> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level
>>> KeeperException when processing sessionid:0x14d7658db3c000c type:create
>>> cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error
>>> Path:/kafkastorm/warehouse_prices/rawWarehousePriceSpout
>>> Error:KeeperErrorCode = NoNode for
>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout
>>> 104184 [Thread-15-rawWarehousePriceSpout] WARN  storm.kafka.KafkaUtils -
>>> No data found in Kafka Partition partition_0
>>> 104828 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager
>>> connections
>>> 105005 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper:
>>> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092,
>>> 1=price-engine-demo-server.c.celertech-01.internal:9092,
>>> 2=price-engine-demo-server.c.celertech-01.internal:9092}}
>>> 105005 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.KafkaUtils -
>>> Task [1/1] assigned
>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>> partition=0},
>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>> partition=1},
>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>> partition=2}]
>>> 105006 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.ZkCoordinator - Task [1/1] Deleted partition managers: []
>>> 105006 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.ZkCoordinator - Task [1/1] New partition managers: []
>>> 105006 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.ZkCoordinator - Task [1/1] Finished refreshing
>>> 164204 [Thread-15-rawWarehousePriceSpout] WARN  storm.kafka.KafkaUtils -
>>> No data found in Kafka Partition partition_0
>>> 165063 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager
>>> connections
>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper:
>>> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092,
>>> 1=price-engine-demo-server.c.celertech-01.internal:9092,
>>> 2=price-engine-demo-server.c.celertech-01.internal:9092}}
>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.KafkaUtils -
>>> Task [1/1] assigned
>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>> partition=0},
>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>> partition=1},
>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>> partition=2}]
>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.ZkCoordinator - Task [1/1] Deleted partition managers: []
>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.ZkCoordinator - Task [1/1] New partition managers: []
>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.ZkCoordinator - Task [1/1] Finished refreshing
>>> 224233 [Thread-15-rawWarehousePriceSpout] WARN  storm.kafka.KafkaUtils -
>>> No data found in Kafka Partition partition_0
>>> 225308 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager
>>> connections
>>>
>>>
>>> On 21 May 2015, at 13:28, Cristian Makoto Sandiga <cmsand...@gmail.com>
>>> wrote:
>>>
>>> Zookeeper create nothing when startup, you have to create your
>>> partitions in kafka broker.
>>>
>>> bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic
>>> click_history --replication-factor 1 --partitions 10
>>>
>>>
>>>
>>> 2015-05-21 8:58 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com>:
>>>
>>>> All,
>>>>
>>>> We changed or paths in zookeeper and we are now seeing
>>>>
>>>> java.lang.RuntimeException: java.lang.RuntimeException:
>>>> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode
>>>> = NoNode for /brokers/topics/warehouse_prices/partitions
>>>> at storm.kafka.DynamicBrokersReader.getBrokerInfo(
>>>> DynamicBrokersReader.java:81) ~[storm-kafka-0.9.4.jar:0.9.4]
>>>> at storm.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:42)
>>>> ~[storm-kafka-0.9.4.jar:0.9.4]
>>>> at storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:57)
>>>> ~[storm-kafka-0.9.4.jar:0.9.4]
>>>> at storm.kafka.KafkaSpout.open(KafkaSpout.java:87)
>>>> ~[storm-kafka-0.9.4.jar:0.9.4]
>>>> at
>>>> backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:522)
>>>> ~[storm-core-0.9.4.jar:0.9.4]
>>>> at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461)
>>>> ~[storm-core-0.9.4.jar:0.9.4]
>>>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>>> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
>>>>
>>>> Should this not be discovered by the Spout on startup?
>>>>
>>>
>>>
>>>
>>
>>
>
>

Reply via email to