Again, when is eclipse local topology you have to connect to localhost:2000
(in the embedded stom zookeeper)

2015-05-21 11:28 GMT-03:00 Cristian Makoto Sandiga <cmsand...@gmail.com>:

> I dont know, i'm using BaseRichBolt.
>
> 2015-05-21 11:25 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com>:
>
> I am using BaseBasicBolt as that was told ack events
>> automatically. Should we not use that?
>>
>>
>>
>> On 21 May 2015, at 15:24, Cristian Makoto Sandiga <cmsand...@gmail.com>
>> wrote:
>>
>> Are you acking in your bolt? collector.ack(input);
>>
>> 2015-05-21 11:19 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com>:
>>
>>> Okay so interesting:
>>>
>>> I have your configuration in my local eclipse and a remote zookeeper
>>> that kafka is connected to:
>>>
>>> ZkHosts zkhost = new ZkHosts(“1.1.1.1:2181","/brokers");  // /brokers
>>> -> kafka broker
>>> SpoutConfig spoutconfig = new SpoutConfig(“1.1.1.1:2181", “/prices",
>>> "/brokers", “rawWarehousePriceSpout");
>>>
>>> Zookeeper:
>>>
>>> [zk: 127.0.0.1:2181(CONNECTED) 80] ls /brokers
>>> [rawWarehousePriceSpout, topics, ids]
>>> [zk: 127.0.0.1:2181(CONNECTED) 81] ls /brokers/rawWarehousePriceSpout
>>> []
>>> [zk: 127.0.0.1:2181(CONNECTED) 82]
>>>
>>>
>>> Logs i see:
>>>
>>> 41994 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.KafkaUtils -
>>> Task [1/1] assigned
>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>> partition=0},
>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>> partition=1},
>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>> partition=2}]
>>> 41994 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator
>>> - Task [1/1] Deleted partition managers: []
>>> 41994 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator
>>> - Task [1/1] New partition managers:
>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>> partition=0},
>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>> partition=1},
>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>> partition=2}]
>>> 42094 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.PartitionManager - Read partition information from:
>>> /brokers/rawWarehousePriceSpout/partition_0  --> null
>>> 42363 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.PartitionManager - No partition information found, using
>>> configuration to determine offset
>>> 42363 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.PartitionManager - Starting Kafka
>>> price-engine-demo-server.c.celertech-01.internal:0 from offset 425530
>>> 42366 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.PartitionManager - Read partition information from:
>>> /brokers/rawWarehousePriceSpout/partition_1  --> null
>>> 42400 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.PartitionManager - No partition information found, using
>>> configuration to determine offset
>>> 42400 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.PartitionManager - Starting Kafka
>>> price-engine-demo-server.c.celertech-01.internal:1 from offset 550715
>>> 42401 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.PartitionManager - Read partition information from:
>>> /brokers/rawWarehousePriceSpout/partition_2  --> null
>>> 42432 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.PartitionManager - No partition information found, using
>>> configuration to determine offset
>>> 42432 [Thread-15-rawWarehousePriceSpout] INFO
>>> storm.kafka.PartitionManager - Starting Kafka
>>> price-engine-demo-server.c.celertech-01.internal:2 from offset 439547
>>> 42432 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator
>>> - Task [1/1] Finished refreshing
>>> 44652 [ProcessThread(sid:0 cport:-1):] INFO
>>> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level
>>> KeeperException when processing sessionid:0x14d76d4418d000c type:create
>>> cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error
>>> Path:/brokers/rawWarehousePriceSpout Error:KeeperErrorCode = NoNode for
>>> /brokers/rawWarehousePriceSpout
>>>
>>>
>>>
>>> On 21 May 2015, at 15:02, Cristian Makoto Sandiga <cmsand...@gmail.com>
>>> wrote:
>>>
>>> Im sorry, i made a mistake
>>>
>>> ZkHosts zkhost = new ZkHosts("localhost:2181","/brokers");  // /brokers
>>> -> kafka broker
>>> SpoutConfig spoutconfig = new SpoutConfig(zkhost, "bid_history",
>>> "/brokers", "kafka-spout"); --> storm --> zookeeper
>>>
>>> If you see not is a ERROR, is a INFO usually when directory structure
>>> not exist.
>>>
>>> Take a look  Zookeeper Kaka-broker (port 2181)
>>> ----------------------------------
>>>
>>> -- bin/zookeeper-shell.sh localhost:2181
>>>
>>> ls /brokers/topics/bid_history/partitions/0/state
>>> []
>>>
>>> get /brokers/topics/bid_history/partitions/0/state
>>> {"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0]}
>>> cZxid = 0x113
>>> ctime = Tue May 19 18:19:24 BRT 2015
>>> mZxid = 0x113
>>> mtime = Tue May 19 18:19:24 BRT 2015
>>> pZxid = 0x113
>>> cversion = 0
>>> dataVersion = 0
>>> aclVersion = 0
>>> ephemeralOwner = 0x0
>>> dataLength = 72
>>> numChildren = 0
>>>
>>>
>>> Take a look  Zookeeper Storm-broker (port 2000)
>>> ----------------------------------
>>> Start topology, send a message and then look:
>>>
>>> bin/zookeeper-shell.sh localhost:2000
>>> Connecting to localhost:2000
>>> Welcome to ZooKeeper!
>>> JLine support is disabled
>>>
>>> ls /
>>> [storm, brokers, zookeeper]
>>> ls /brokers
>>> [kafka-spout]
>>> ls /brokers/kafka-spout
>>> [partition_0]
>>> ls /brokers/kafka-spout/partition_0
>>> []
>>> get /brokers/kafka-spout/partition_0
>>>
>>> {"topology":{"id":"a9be1962-6b4e-4ed4-ae68-155a1948a1f6","name":"consolidate_reports"},"offset":4426029,"partition":0,"broker":{"host":"localhost","port":9092},"topic":"bid_history"}
>>> cZxid = 0x50
>>> ctime = Thu May 21 11:00:48 BRT 2015
>>> mZxid = 0x50
>>> mtime = Thu May 21 11:00:48 BRT 2015
>>> pZxid = 0x50
>>> cversion = 0
>>> dataVersion = 0
>>> aclVersion = 0
>>> ephemeralOwner = 0x0
>>> dataLength = 182
>>> numChildren = 0
>>>
>>>
>>> 2015-05-21 10:43 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com>:
>>>
>>>> Bec I do see this error in the storm logs
>>>>
>>>> 5304 [ProcessThread(sid:0 cport:-1):] INFO
>>>> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level
>>>> KeeperException when processing sessionid:0x14d76b1ea54000c
>>>> type:create cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error
>>>> Path:/brokers/rawWarehousePriceSpout Error:KeeperErrorCode = NoNode for
>>>> /brokers/rawWarehousePriceSpout
>>>>
>>>>
>>>>
>>>> On 21 May 2015, at 14:04, Cristian Makoto Sandiga <cmsand...@gmail.com>
>>>> wrote:
>>>>
>>>> Are you wrong
>>>>
>>>> zookeeperPath , // the root path in Zookeeper for the spout to store
>>>> the consumer offsets
>>>>
>>>> Is for you KafkaSpout find information about broker meta information (
>>>> topics, partitions)
>>>>
>>>> You have to override
>>>>
>>>> SpoutConfig spoutconfig = new SpoutConfig(zkhost, "bid_history",
>>>> "/brokers", "kafka-spout");
>>>> spoutconfig.zkServers
>>>> spoutconfig.zkPort
>>>>
>>>> Take a look a KafkaSpout.class and do a debug mode in this part.
>>>>
>>>> 73        Map stateConf = new HashMap(conf);
>>>> 74       List<String> zkServers = _spoutConfig.zkServers;
>>>>         if (zkServers == null) {
>>>>             zkServers = (List<String>)
>>>> conf.get(Config.STORM_ZOOKEEPER_SERVERS);
>>>>         }
>>>>         Integer zkPort = _spoutConfig.zkPort;
>>>>         if (zkPort == null) {
>>>>             zkPort = ((Number)
>>>> conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
>>>>         }
>>>>
>>>>
>>>>
>>>>
>>>> 2015-05-21 9:53 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com>:
>>>>
>>>>> So when I run it out of eclipse in local mode I am pointing to the
>>>>> same zookeeper as I am when it is deployed. Whatever way it still does not
>>>>> write the offset.
>>>>>
>>>>> Only different I have is my spout is configured like so:
>>>>>
>>>>> BrokerHosts hosts = new ZkHosts(zookeeperQuorum);
>>>>>
>>>>> String zookeeperPath = KAFKA_STORM_DIR + "/" + topic;
>>>>> SpoutConfig spoutConfig = new SpoutConfig(
>>>>> hosts,
>>>>> topic, // topic to read from
>>>>> zookeeperPath , // the root path in Zookeeper for the spout to store
>>>>> the consumer offsets
>>>>> spoutId); // an id for this consumer for storing the consumer offsets
>>>>> in Zookeeper
>>>>>
>>>>> //Check if we should be consuming messages from the beginning
>>>>> spoutConfig.forceFromStart = consumeFromBeginning;
>>>>> spoutConfig.maxOffsetBehind = Long.MAX_VALUE;
>>>>> spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = true;
>>>>> spoutConfig.scheme = new SchemeAsMultiScheme(new
>>>>> KafkaAnalyticsMessageDecoder());
>>>>>
>>>>>
>>>>> On 21 May 2015, at 13:50, Cristian Makoto Sandiga <cmsand...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> I have the same problem when i have Storm in Local cluster mode. but
>>>>> is because, Local mode has a zookeeper embedded and every time that 
>>>>> restart
>>>>>  the offset is null  and is the reason that you have
>>>>>  /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2  -->
>>>>> null.
>>>>>
>>>>> Try to run Local Mode, send some messages to spout, and then see
>>>>> Offset in Zookeeper (port 2000).
>>>>>
>>>>> Or use Zookeeper of kafka, override.
>>>>>
>>>>> spoutconfig.zkServers
>>>>> spoutconfig.zkPort
>>>>>
>>>>>
>>>>> 2015-05-21 9:39 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com>:
>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Yup kafka is creating them on startup when the topics gets its first
>>>>>> msg. And from below the storm logs are never really committing the offset
>>>>>> to zookeeper.
>>>>>>
>>>>>> Zookeeper kafka topics details
>>>>>>
>>>>>> [zk: 127.0.0.1:2181(CONNECTED) 41] ls
>>>>>> /brokers/topics/warehouse_prices/partitions
>>>>>> [2, 1, 0]
>>>>>> [zk: 127.0.0.1:2181(CONNECTED) 42]
>>>>>>
>>>>>> Zookeeper Storm
>>>>>>
>>>>>> [zk: 127.0.0.1:2181(CONNECTED) 42] ls
>>>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout
>>>>>> []
>>>>>> [zk: 127.0.0.1:2181(CONNECTED) 43]
>>>>>>
>>>>>>
>>>>>> Storm logs
>>>>>>
>>>>>> 44325 [Thread-15-rawWarehousePriceSpout] INFO
>>>>>> storm.kafka.ZkCoordinator - Task [1/1] New partition managers:
>>>>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>>>>> partition=0},
>>>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>>>>> partition=1},
>>>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>>>>> partition=2}]
>>>>>> 44491 [Thread-15-rawWarehousePriceSpout] INFO
>>>>>> storm.kafka.PartitionManager - Read partition information from:
>>>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_0  --> null
>>>>>> 44746 [Thread-15-rawWarehousePriceSpout] INFO
>>>>>> storm.kafka.PartitionManager - No partition information found, using
>>>>>> configuration to determine offset
>>>>>> 44746 [Thread-15-rawWarehousePriceSpout] INFO
>>>>>> storm.kafka.PartitionManager - Last commit offset from zookeeper: 0
>>>>>> 44747 [Thread-15-rawWarehousePriceSpout] INFO
>>>>>> storm.kafka.PartitionManager - Commit offset 0 is more than
>>>>>> 9223372036854775807 behind, resetting to startOffsetTime=-2
>>>>>> 44747 [Thread-15-rawWarehousePriceSpout] INFO
>>>>>> storm.kafka.PartitionManager - Starting Kafka
>>>>>> price-engine-demo-server.c.celertech-01.internal:0 from offset 0
>>>>>> 44749 [Thread-15-rawWarehousePriceSpout] INFO
>>>>>> storm.kafka.PartitionManager - Read partition information from:
>>>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_1  --> null
>>>>>> 44778 [Thread-15-rawWarehousePriceSpout] INFO
>>>>>> storm.kafka.PartitionManager - No partition information found, using
>>>>>> configuration to determine offset
>>>>>> 44778 [Thread-15-rawWarehousePriceSpout] INFO
>>>>>> storm.kafka.PartitionManager - Last commit offset from zookeeper: 0
>>>>>> 44778 [Thread-15-rawWarehousePriceSpout] INFO
>>>>>> storm.kafka.PartitionManager - Commit offset 0 is more than
>>>>>> 9223372036854775807 behind, resetting to startOffsetTime=-2
>>>>>> 44779 [Thread-15-rawWarehousePriceSpout] INFO
>>>>>> storm.kafka.PartitionManager - Starting Kafka
>>>>>> price-engine-demo-server.c.celertech-01.internal:1 from offset 0
>>>>>> 44781 [Thread-15-rawWarehousePriceSpout] INFO
>>>>>> storm.kafka.PartitionManager - Read partition information from:
>>>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2  --> null
>>>>>> 44809 [Thread-15-rawWarehousePriceSpout] INFO
>>>>>> storm.kafka.PartitionManager - No partition information found, using
>>>>>> configuration to determine offset
>>>>>> 44809 [Thread-15-rawWarehousePriceSpout] INFO
>>>>>> storm.kafka.PartitionManager - Last commit offset from zookeeper: 0
>>>>>> 44810 [Thread-15-rawWarehousePriceSpout] INFO
>>>>>> storm.kafka.PartitionManager - Commit offset 0 is more than
>>>>>> 9223372036854775807 behind, resetting to startOffsetTime=-2
>>>>>> 44810 [Thread-15-rawWarehousePriceSpout] INFO
>>>>>> storm.kafka.PartitionManager - Starting Kafka
>>>>>> price-engine-demo-server.c.celertech-01.internal:2 from offset 0
>>>>>> 44810 [Thread-15-rawWarehousePriceSpout] INFO
>>>>>> storm.kafka.ZkCoordinator - Task [1/1] Finished refreshing
>>>>>> 47438 [ProcessThread(sid:0 cport:-1):] INFO
>>>>>> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level
>>>>>> KeeperException when processing sessionid:0x14d7658db3c000c
>>>>>> type:create cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error
>>>>>> Path:/kafkastorm/warehouse_prices/rawWarehousePriceSpout
>>>>>> Error:KeeperErrorCode = NoNode for
>>>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout
>>>>>> 104184 [Thread-15-rawWarehousePriceSpout] WARN
>>>>>> storm.kafka.KafkaUtils - No data found in Kafka Partition partition_0
>>>>>> 104828 [Thread-15-rawWarehousePriceSpout] INFO
>>>>>> storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager
>>>>>> connections
>>>>>> 105005 [Thread-15-rawWarehousePriceSpout] INFO
>>>>>> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper:
>>>>>> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092,
>>>>>> 1=price-engine-demo-server.c.celertech-01.internal:9092,
>>>>>> 2=price-engine-demo-server.c.celertech-01.internal:9092}}
>>>>>> 105005 [Thread-15-rawWarehousePriceSpout] INFO
>>>>>> storm.kafka.KafkaUtils - Task [1/1] assigned
>>>>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>>>>> partition=0},
>>>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>>>>> partition=1},
>>>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>>>>> partition=2}]
>>>>>> 105006 [Thread-15-rawWarehousePriceSpout] INFO
>>>>>> storm.kafka.ZkCoordinator - Task [1/1] Deleted partition managers: []
>>>>>> 105006 [Thread-15-rawWarehousePriceSpout] INFO
>>>>>> storm.kafka.ZkCoordinator - Task [1/1] New partition managers: []
>>>>>> 105006 [Thread-15-rawWarehousePriceSpout] INFO
>>>>>> storm.kafka.ZkCoordinator - Task [1/1] Finished refreshing
>>>>>> 164204 [Thread-15-rawWarehousePriceSpout] WARN
>>>>>> storm.kafka.KafkaUtils - No data found in Kafka Partition partition_0
>>>>>> 165063 [Thread-15-rawWarehousePriceSpout] INFO
>>>>>> storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager
>>>>>> connections
>>>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO
>>>>>> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper:
>>>>>> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092,
>>>>>> 1=price-engine-demo-server.c.celertech-01.internal:9092,
>>>>>> 2=price-engine-demo-server.c.celertech-01.internal:9092}}
>>>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO
>>>>>> storm.kafka.KafkaUtils - Task [1/1] assigned
>>>>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>>>>> partition=0},
>>>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>>>>> partition=1},
>>>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>>>>> partition=2}]
>>>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO
>>>>>> storm.kafka.ZkCoordinator - Task [1/1] Deleted partition managers: []
>>>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO
>>>>>> storm.kafka.ZkCoordinator - Task [1/1] New partition managers: []
>>>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO
>>>>>> storm.kafka.ZkCoordinator - Task [1/1] Finished refreshing
>>>>>> 224233 [Thread-15-rawWarehousePriceSpout] WARN
>>>>>> storm.kafka.KafkaUtils - No data found in Kafka Partition partition_0
>>>>>> 225308 [Thread-15-rawWarehousePriceSpout] INFO
>>>>>> storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager
>>>>>> connections
>>>>>>
>>>>>>
>>>>>> On 21 May 2015, at 13:28, Cristian Makoto Sandiga <
>>>>>> cmsand...@gmail.com> wrote:
>>>>>>
>>>>>> Zookeeper create nothing when startup, you have to create your
>>>>>> partitions in kafka broker.
>>>>>>
>>>>>> bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic
>>>>>> click_history --replication-factor 1 --partitions 10
>>>>>>
>>>>>>
>>>>>>
>>>>>> 2015-05-21 8:58 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com>:
>>>>>>
>>>>>>> All,
>>>>>>>
>>>>>>> We changed or paths in zookeeper and we are now seeing
>>>>>>>
>>>>>>> java.lang.RuntimeException: java.lang.RuntimeException:
>>>>>>> org.apache.zookeeper.KeeperException$NoNodeException:
>>>>>>> KeeperErrorCode = NoNode for /brokers/topics/warehouse_prices/partitions
>>>>>>> at storm.kafka.DynamicBrokersReader.getBrokerInfo(
>>>>>>> DynamicBrokersReader.java:81) ~[storm-kafka-0.9.4.jar:0.9.4]
>>>>>>> at storm.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:42)
>>>>>>> ~[storm-kafka-0.9.4.jar:0.9.4]
>>>>>>> at storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:57)
>>>>>>> ~[storm-kafka-0.9.4.jar:0.9.4]
>>>>>>> at storm.kafka.KafkaSpout.open(KafkaSpout.java:87)
>>>>>>> ~[storm-kafka-0.9.4.jar:0.9.4]
>>>>>>> at
>>>>>>> backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:522)
>>>>>>> ~[storm-core-0.9.4.jar:0.9.4]
>>>>>>> at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461)
>>>>>>> ~[storm-core-0.9.4.jar:0.9.4]
>>>>>>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>>>>>> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
>>>>>>>
>>>>>>> Should this not be discovered by the Spout on startup?
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>

Reply via email to