Are you acking in your bolt? collector.ack(input);

2015-05-21 11:19 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com>:

> Okay so interesting:
>
> I have your configuration in my local eclipse and a remote zookeeper that
> kafka is connected to:
>
> ZkHosts zkhost = new ZkHosts(“1.1.1.1:2181","/brokers");  // /brokers ->
> kafka broker
> SpoutConfig spoutconfig = new SpoutConfig(“1.1.1.1:2181", “/prices",
> "/brokers", “rawWarehousePriceSpout");
>
>
> Zookeeper:
>
>
> [zk: 127.0.0.1:2181(CONNECTED) 80] ls /brokers
> [rawWarehousePriceSpout, topics, ids]
> [zk: 127.0.0.1:2181(CONNECTED) 81] ls /brokers/rawWarehousePriceSpout
> []
> [zk: 127.0.0.1:2181(CONNECTED) 82]
>
>
>
> Logs i see:
>
> 41994 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.KafkaUtils -
> Task [1/1] assigned
> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
> partition=0},
> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
> partition=1},
> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
> partition=2}]
> 41994 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator -
> Task [1/1] Deleted partition managers: []
> 41994 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator -
> Task [1/1] New partition managers:
> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
> partition=0},
> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
> partition=1},
> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
> partition=2}]
> 42094 [Thread-15-rawWarehousePriceSpout] INFO
> storm.kafka.PartitionManager - Read partition information from:
> /brokers/rawWarehousePriceSpout/partition_0  --> null
> 42363 [Thread-15-rawWarehousePriceSpout] INFO
> storm.kafka.PartitionManager - No partition information found, using
> configuration to determine offset
> 42363 [Thread-15-rawWarehousePriceSpout] INFO
> storm.kafka.PartitionManager - Starting Kafka
> price-engine-demo-server.c.celertech-01.internal:0 from offset 425530
> 42366 [Thread-15-rawWarehousePriceSpout] INFO
> storm.kafka.PartitionManager - Read partition information from:
> /brokers/rawWarehousePriceSpout/partition_1  --> null
> 42400 [Thread-15-rawWarehousePriceSpout] INFO
> storm.kafka.PartitionManager - No partition information found, using
> configuration to determine offset
> 42400 [Thread-15-rawWarehousePriceSpout] INFO
> storm.kafka.PartitionManager - Starting Kafka
> price-engine-demo-server.c.celertech-01.internal:1 from offset 550715
> 42401 [Thread-15-rawWarehousePriceSpout] INFO
> storm.kafka.PartitionManager - Read partition information from:
> /brokers/rawWarehousePriceSpout/partition_2  --> null
> 42432 [Thread-15-rawWarehousePriceSpout] INFO
> storm.kafka.PartitionManager - No partition information found, using
> configuration to determine offset
> 42432 [Thread-15-rawWarehousePriceSpout] INFO
> storm.kafka.PartitionManager - Starting Kafka
> price-engine-demo-server.c.celertech-01.internal:2 from offset 439547
> 42432 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.ZkCoordinator -
> Task [1/1] Finished refreshing
> 44652 [ProcessThread(sid:0 cport:-1):] INFO
> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level
> KeeperException when processing sessionid:0x14d76d4418d000c type:create
> cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error
> Path:/brokers/rawWarehousePriceSpout Error:KeeperErrorCode = NoNode for
> /brokers/rawWarehousePriceSpout
>
>
>
> On 21 May 2015, at 15:02, Cristian Makoto Sandiga <cmsand...@gmail.com>
> wrote:
>
> Im sorry, i made a mistake
>
> ZkHosts zkhost = new ZkHosts("localhost:2181","/brokers");  // /brokers ->
> kafka broker
> SpoutConfig spoutconfig = new SpoutConfig(zkhost, "bid_history",
> "/brokers", "kafka-spout"); --> storm --> zookeeper
>
> If you see not is a ERROR, is a INFO usually when directory structure not
> exist.
>
> Take a look  Zookeeper Kaka-broker (port 2181)
> ----------------------------------
>
> -- bin/zookeeper-shell.sh localhost:2181
>
> ls /brokers/topics/bid_history/partitions/0/state
> []
>
> get /brokers/topics/bid_history/partitions/0/state
> {"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0]}
> cZxid = 0x113
> ctime = Tue May 19 18:19:24 BRT 2015
> mZxid = 0x113
> mtime = Tue May 19 18:19:24 BRT 2015
> pZxid = 0x113
> cversion = 0
> dataVersion = 0
> aclVersion = 0
> ephemeralOwner = 0x0
> dataLength = 72
> numChildren = 0
>
>
> Take a look  Zookeeper Storm-broker (port 2000)
> ----------------------------------
> Start topology, send a message and then look:
>
> bin/zookeeper-shell.sh localhost:2000
> Connecting to localhost:2000
> Welcome to ZooKeeper!
> JLine support is disabled
>
> ls /
> [storm, brokers, zookeeper]
> ls /brokers
> [kafka-spout]
> ls /brokers/kafka-spout
> [partition_0]
> ls /brokers/kafka-spout/partition_0
> []
> get /brokers/kafka-spout/partition_0
>
> {"topology":{"id":"a9be1962-6b4e-4ed4-ae68-155a1948a1f6","name":"consolidate_reports"},"offset":4426029,"partition":0,"broker":{"host":"localhost","port":9092},"topic":"bid_history"}
> cZxid = 0x50
> ctime = Thu May 21 11:00:48 BRT 2015
> mZxid = 0x50
> mtime = Thu May 21 11:00:48 BRT 2015
> pZxid = 0x50
> cversion = 0
> dataVersion = 0
> aclVersion = 0
> ephemeralOwner = 0x0
> dataLength = 182
> numChildren = 0
>
>
> 2015-05-21 10:43 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com>:
>
>> Bec I do see this error in the storm logs
>>
>> 5304 [ProcessThread(sid:0 cport:-1):] INFO
>> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level
>> KeeperException when processing sessionid:0x14d76b1ea54000c type:create
>> cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error
>> Path:/brokers/rawWarehousePriceSpout Error:KeeperErrorCode = NoNode for
>> /brokers/rawWarehousePriceSpout
>>
>>
>>
>> On 21 May 2015, at 14:04, Cristian Makoto Sandiga <cmsand...@gmail.com>
>> wrote:
>>
>> Are you wrong
>>
>> zookeeperPath , // the root path in Zookeeper for the spout to store the
>> consumer offsets
>>
>> Is for you KafkaSpout find information about broker meta information (
>> topics, partitions)
>>
>> You have to override
>>
>> SpoutConfig spoutconfig = new SpoutConfig(zkhost, "bid_history",
>> "/brokers", "kafka-spout");
>> spoutconfig.zkServers
>> spoutconfig.zkPort
>>
>> Take a look a KafkaSpout.class and do a debug mode in this part.
>>
>> 73        Map stateConf = new HashMap(conf);
>> 74       List<String> zkServers = _spoutConfig.zkServers;
>>         if (zkServers == null) {
>>             zkServers = (List<String>)
>> conf.get(Config.STORM_ZOOKEEPER_SERVERS);
>>         }
>>         Integer zkPort = _spoutConfig.zkPort;
>>         if (zkPort == null) {
>>             zkPort = ((Number)
>> conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
>>         }
>>
>>
>>
>>
>> 2015-05-21 9:53 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com>:
>>
>>> So when I run it out of eclipse in local mode I am pointing to the same
>>> zookeeper as I am when it is deployed. Whatever way it still does not write
>>> the offset.
>>>
>>> Only different I have is my spout is configured like so:
>>>
>>> BrokerHosts hosts = new ZkHosts(zookeeperQuorum);
>>>
>>> String zookeeperPath = KAFKA_STORM_DIR + "/" + topic;
>>> SpoutConfig spoutConfig = new SpoutConfig(
>>> hosts,
>>> topic, // topic to read from
>>> zookeeperPath , // the root path in Zookeeper for the spout to store
>>> the consumer offsets
>>> spoutId); // an id for this consumer for storing the consumer offsets in
>>> Zookeeper
>>>
>>> //Check if we should be consuming messages from the beginning
>>> spoutConfig.forceFromStart = consumeFromBeginning;
>>> spoutConfig.maxOffsetBehind = Long.MAX_VALUE;
>>> spoutConfig.useStartOffsetTimeIfOffsetOutOfRange = true;
>>> spoutConfig.scheme = new SchemeAsMultiScheme(new
>>> KafkaAnalyticsMessageDecoder());
>>>
>>>
>>> On 21 May 2015, at 13:50, Cristian Makoto Sandiga <cmsand...@gmail.com>
>>> wrote:
>>>
>>> I have the same problem when i have Storm in Local cluster mode. but is
>>> because, Local mode has a zookeeper embedded and every time that restart
>>>  the offset is null  and is the reason that you have
>>>  /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2  -->
>>> null.
>>>
>>> Try to run Local Mode, send some messages to spout, and then see Offset
>>> in Zookeeper (port 2000).
>>>
>>> Or use Zookeeper of kafka, override.
>>>
>>> spoutconfig.zkServers
>>> spoutconfig.zkPort
>>>
>>>
>>> 2015-05-21 9:39 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com>:
>>>
>>>> Thanks,
>>>>
>>>> Yup kafka is creating them on startup when the topics gets its first
>>>> msg. And from below the storm logs are never really committing the offset
>>>> to zookeeper.
>>>>
>>>> Zookeeper kafka topics details
>>>>
>>>> [zk: 127.0.0.1:2181(CONNECTED) 41] ls
>>>> /brokers/topics/warehouse_prices/partitions
>>>> [2, 1, 0]
>>>> [zk: 127.0.0.1:2181(CONNECTED) 42]
>>>>
>>>> Zookeeper Storm
>>>>
>>>> [zk: 127.0.0.1:2181(CONNECTED) 42] ls
>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout
>>>> []
>>>> [zk: 127.0.0.1:2181(CONNECTED) 43]
>>>>
>>>>
>>>> Storm logs
>>>>
>>>> 44325 [Thread-15-rawWarehousePriceSpout] INFO
>>>> storm.kafka.ZkCoordinator - Task [1/1] New partition managers:
>>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>>> partition=0},
>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>>> partition=1},
>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>>> partition=2}]
>>>> 44491 [Thread-15-rawWarehousePriceSpout] INFO
>>>> storm.kafka.PartitionManager - Read partition information from:
>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_0  --> null
>>>> 44746 [Thread-15-rawWarehousePriceSpout] INFO
>>>> storm.kafka.PartitionManager - No partition information found, using
>>>> configuration to determine offset
>>>> 44746 [Thread-15-rawWarehousePriceSpout] INFO
>>>> storm.kafka.PartitionManager - Last commit offset from zookeeper: 0
>>>> 44747 [Thread-15-rawWarehousePriceSpout] INFO
>>>> storm.kafka.PartitionManager - Commit offset 0 is more than
>>>> 9223372036854775807 behind, resetting to startOffsetTime=-2
>>>> 44747 [Thread-15-rawWarehousePriceSpout] INFO
>>>> storm.kafka.PartitionManager - Starting Kafka
>>>> price-engine-demo-server.c.celertech-01.internal:0 from offset 0
>>>> 44749 [Thread-15-rawWarehousePriceSpout] INFO
>>>> storm.kafka.PartitionManager - Read partition information from:
>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_1  --> null
>>>> 44778 [Thread-15-rawWarehousePriceSpout] INFO
>>>> storm.kafka.PartitionManager - No partition information found, using
>>>> configuration to determine offset
>>>> 44778 [Thread-15-rawWarehousePriceSpout] INFO
>>>> storm.kafka.PartitionManager - Last commit offset from zookeeper: 0
>>>> 44778 [Thread-15-rawWarehousePriceSpout] INFO
>>>> storm.kafka.PartitionManager - Commit offset 0 is more than
>>>> 9223372036854775807 behind, resetting to startOffsetTime=-2
>>>> 44779 [Thread-15-rawWarehousePriceSpout] INFO
>>>> storm.kafka.PartitionManager - Starting Kafka
>>>> price-engine-demo-server.c.celertech-01.internal:1 from offset 0
>>>> 44781 [Thread-15-rawWarehousePriceSpout] INFO
>>>> storm.kafka.PartitionManager - Read partition information from:
>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout/partition_2  --> null
>>>> 44809 [Thread-15-rawWarehousePriceSpout] INFO
>>>> storm.kafka.PartitionManager - No partition information found, using
>>>> configuration to determine offset
>>>> 44809 [Thread-15-rawWarehousePriceSpout] INFO
>>>> storm.kafka.PartitionManager - Last commit offset from zookeeper: 0
>>>> 44810 [Thread-15-rawWarehousePriceSpout] INFO
>>>> storm.kafka.PartitionManager - Commit offset 0 is more than
>>>> 9223372036854775807 behind, resetting to startOffsetTime=-2
>>>> 44810 [Thread-15-rawWarehousePriceSpout] INFO
>>>> storm.kafka.PartitionManager - Starting Kafka
>>>> price-engine-demo-server.c.celertech-01.internal:2 from offset 0
>>>> 44810 [Thread-15-rawWarehousePriceSpout] INFO
>>>> storm.kafka.ZkCoordinator - Task [1/1] Finished refreshing
>>>> 47438 [ProcessThread(sid:0 cport:-1):] INFO
>>>> org.apache.storm.zookeeper.server.PrepRequestProcessor - Got user-level
>>>> KeeperException when processing sessionid:0x14d7658db3c000c
>>>> type:create cxid:0x5 zxid:0x2c txntype:-1 reqpath:n/a Error
>>>> Path:/kafkastorm/warehouse_prices/rawWarehousePriceSpout
>>>> Error:KeeperErrorCode = NoNode for
>>>> /kafkastorm/warehouse_prices/rawWarehousePriceSpout
>>>> 104184 [Thread-15-rawWarehousePriceSpout] WARN  storm.kafka.KafkaUtils
>>>> - No data found in Kafka Partition partition_0
>>>> 104828 [Thread-15-rawWarehousePriceSpout] INFO
>>>> storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager
>>>> connections
>>>> 105005 [Thread-15-rawWarehousePriceSpout] INFO
>>>> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper:
>>>> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092,
>>>> 1=price-engine-demo-server.c.celertech-01.internal:9092,
>>>> 2=price-engine-demo-server.c.celertech-01.internal:9092}}
>>>> 105005 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.KafkaUtils
>>>> - Task [1/1] assigned
>>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>>> partition=0},
>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>>> partition=1},
>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>>> partition=2}]
>>>> 105006 [Thread-15-rawWarehousePriceSpout] INFO
>>>> storm.kafka.ZkCoordinator - Task [1/1] Deleted partition managers: []
>>>> 105006 [Thread-15-rawWarehousePriceSpout] INFO
>>>> storm.kafka.ZkCoordinator - Task [1/1] New partition managers: []
>>>> 105006 [Thread-15-rawWarehousePriceSpout] INFO
>>>> storm.kafka.ZkCoordinator - Task [1/1] Finished refreshing
>>>> 164204 [Thread-15-rawWarehousePriceSpout] WARN  storm.kafka.KafkaUtils
>>>> - No data found in Kafka Partition partition_0
>>>> 165063 [Thread-15-rawWarehousePriceSpout] INFO
>>>> storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager
>>>> connections
>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO
>>>> storm.kafka.DynamicBrokersReader - Read partition info from zookeeper:
>>>> GlobalPartitionInformation{partitionMap={0=price-engine-demo-server.c.celertech-01.internal:9092,
>>>> 1=price-engine-demo-server.c.celertech-01.internal:9092,
>>>> 2=price-engine-demo-server.c.celertech-01.internal:9092}}
>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO  storm.kafka.KafkaUtils
>>>> - Task [1/1] assigned
>>>> [Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>>> partition=0},
>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>>> partition=1},
>>>> Partition{host=price-engine-demo-server.c.celertech-01.internal:9092,
>>>> partition=2}]
>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO
>>>> storm.kafka.ZkCoordinator - Task [1/1] Deleted partition managers: []
>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO
>>>> storm.kafka.ZkCoordinator - Task [1/1] New partition managers: []
>>>> 165240 [Thread-15-rawWarehousePriceSpout] INFO
>>>> storm.kafka.ZkCoordinator - Task [1/1] Finished refreshing
>>>> 224233 [Thread-15-rawWarehousePriceSpout] WARN  storm.kafka.KafkaUtils
>>>> - No data found in Kafka Partition partition_0
>>>> 225308 [Thread-15-rawWarehousePriceSpout] INFO
>>>> storm.kafka.ZkCoordinator - Task [1/1] Refreshing partition manager
>>>> connections
>>>>
>>>>
>>>> On 21 May 2015, at 13:28, Cristian Makoto Sandiga <cmsand...@gmail.com>
>>>> wrote:
>>>>
>>>> Zookeeper create nothing when startup, you have to create your
>>>> partitions in kafka broker.
>>>>
>>>> bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic
>>>> click_history --replication-factor 1 --partitions 10
>>>>
>>>>
>>>>
>>>> 2015-05-21 8:58 GMT-03:00 Benjamin Cuthbert <cuthbert....@gmail.com>:
>>>>
>>>>> All,
>>>>>
>>>>> We changed or paths in zookeeper and we are now seeing
>>>>>
>>>>> java.lang.RuntimeException: java.lang.RuntimeException:
>>>>> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode
>>>>> = NoNode for /brokers/topics/warehouse_prices/partitions
>>>>> at storm.kafka.DynamicBrokersReader.getBrokerInfo(
>>>>> DynamicBrokersReader.java:81) ~[storm-kafka-0.9.4.jar:0.9.4]
>>>>> at storm.kafka.trident.ZkBrokerReader.<init>(ZkBrokerReader.java:42)
>>>>> ~[storm-kafka-0.9.4.jar:0.9.4]
>>>>> at storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:57)
>>>>> ~[storm-kafka-0.9.4.jar:0.9.4]
>>>>> at storm.kafka.KafkaSpout.open(KafkaSpout.java:87)
>>>>> ~[storm-kafka-0.9.4.jar:0.9.4]
>>>>> at
>>>>> backtype.storm.daemon.executor$fn__3371$fn__3386.invoke(executor.clj:522)
>>>>> ~[storm-core-0.9.4.jar:0.9.4]
>>>>> at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461)
>>>>> ~[storm-core-0.9.4.jar:0.9.4]
>>>>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
>>>>> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_51]
>>>>>
>>>>> Should this not be discovered by the Spout on startup?
>>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>
>

Reply via email to