After some internet and forum searching, it looks like my Local mode isn't connecting to an external zookeeper. Does that sound familiar ? If so, how come I can still see some spouts ?
If it's true, any idea how I can connect to an external zookeeper in Local mode ? Martin On Fri, Oct 3, 2014 at 3:01 PM, Martin Arrowsmith < [email protected]> wrote: > Sorry again, just wanted to provide more info. > > The other threads seem fine - it's always Thread 11 that errors. Here is > Thread 13 for instance: > > 7040 [Thread-13-kafka] INFO backtype.storm.daemon.executor - Opened spout > kafka:(3) > 7044 [Thread-13-kafka] INFO backtype.storm.daemon.executor - Activating > spout kafka:(3) > 7045 [Thread-13-kafka] INFO storm.kafka.ZkCoordinator - Task [1/1] > Refreshing partition manager connections > 7047 [Thread-13-kafka-EventThread] INFO > org.apache.curator.framework.state.ConnectionStateManager - State change: > CONNECTED > 7047 [ConnectionStateManager-0] WARN > org.apache.curator.framework.state.ConnectionStateManager - There are no > ConnectionStateListeners registered. > 7052 [Thread-13-kafka] INFO storm.kafka.DynamicBrokersReader - Read > partition info from zookeeper: > GlobalPartitionInformation{partitionMap={0=localhost:9092, > 1=localhost:9092}} > 7054 [Thread-13-kafka] INFO storm.kafka.KafkaUtils - Task [1/1] assigned > [Partition{host=localhost:9092, partition=0}, > Partition{host=localhost:9092, partition=1}] > 7054 [Thread-13-kafka] INFO storm.kafka.ZkCoordinator - Task [1/1] > Deleted partition managers: [] > 7054 [Thread-13-kafka] INFO storm.kafka.ZkCoordinator - Task [1/1] New > partition managers: [Partition{host=localhost:9092, partition=1}, > Partition{host=localhost:9092, partition=0}] > > On Fri, Oct 3, 2014 at 2:57 PM, Martin Arrowsmith < > [email protected]> wrote: > >> Hi again, >> >> Does my zkRoot need to equal the value that is actually in zookeeper ? >> (I'm also new to zookeeper0 >> >> Here is Zookeeper: >> >> [zk: 127.0.0.1:2181(CONNECTED) 8] ls /brokers/topics/mytopic/partitions >> [1, 0] >> >> And here is what I pass into spoutConfig: >> >> SpoutConfig spoutConfig = new SpoutConfig(hosts, "mytopic", >> "/brokers/mytopic", "1"); >> >> Right before the crash, debugging statements say: >> >> 7188 [Thread-13-kafka] INFO storm.kafka.PartitionManager - Read >> partition information from: /brokers/mytopic/1/partition_1 --> null >> 7332 [Thread-13-kafka] INFO storm.kafka.PartitionManager - No partition >> information found, using configuration to determine offset >> 7332 [Thread-13-kafka] INFO storm.kafka.PartitionManager - Starting >> Kafka localhost:1 from offset 60 >> 7334 [Thread-13-kafka] INFO storm.kafka.PartitionManager - Read >> partition information from: /brokers/mytopic/1/partition_0 --> null >> 7335 [Thread-13-kafka] INFO storm.kafka.PartitionManager - No partition >> information found, using configuration to determine offset >> 7335 [Thread-13-kafka] INFO storm.kafka.PartitionManager - Starting >> Kafka localhost:0 from offset 60 >> 7335 [Thread-13-kafka] INFO storm.kafka.ZkCoordinator - Task [1/1] >> Finished refreshing >> 17001 [Thread-11-exclaim1] INFO backtype.storm.daemon.executor - >> Processing received message source: __system:-1, stream: __tick, id: {}, >> [10] >> 17007 [Thread-11-exclaim1] ERROR backtype.storm.util - Async loop died! >> java.lang.RuntimeException: java.lang.ClassCastException: >> java.lang.Integer cannot be cast to java.lang.String >> .... >> >> >> >> On Fri, Oct 3, 2014 at 11:20 AM, Martin Arrowsmith < >> [email protected]> wrote: >> >>> Hi Storm Experts! >>> >>> I'm running Storm 0.9.2-incubating and kafka_2.10-0.8.1.1 >>> >>> I'm trying to get the ExclamationTopolgy to run with a Kafka Spout >>> (using the storm-kafka project included with the Github - not >>> storm-kafka-starter) >>> >>> Here is the relevant portion of my ExcalamationTopology: >>> >>> BrokerHosts hosts = new ZkHosts("localhost:2181"); >>> SpoutConfig spoutConfig = new SpoutConfig(hosts, "test", "/test", >>> "discovery"); >>> spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); >>> KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); >>> >>> builder.setSpout("kafka", kafkaSpout); >>> >>> builder.setBolt("exclaim1", new >>> ExclamationBolt()).shuffleGrouping("kafka"); >>> >>> When I run the topology on Local mode for a 1 minute, it works for the >>> first 5 seconds, and then it breaks. >>> >>> spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); >>> >>> So not sure why it says I'm using String on an Int, and why it works >>> before but now now. >>> >>> In my kafka server.properties: >>> >>> num.network.threads=2 >>> num.io.threads=8 >>> num.partitions=2 >>> >>> Should I be doing something like this? >>> >>> builder.setSpout("kafka", kafkaSpout, 2); >>> >>> Here is the output for when it's working, and then when it starts to >>> break. Any help would be much appreciated!! >>> >>> As you can see - it starts to die on Thread 11 >>> >>> Best, >>> >>> Martin >>> >>> ================= >>> ... >>> >>> 13684 [Thread-13-kafka] INFO backtype.storm.daemon.task - Emitting: >>> kafka default [why will you] >>> 13684 [Thread-13-kafka] INFO backtype.storm.daemon.task - Emitting: >>> kafka __ack_init [-2810730987442558521 -8063320413415537181 3] >>> 13684 [Thread-11-exclaim1] INFO backtype.storm.daemon.executor - >>> Processing received message source: kafka:3, stream: default, id: >>> {-2810730987442558521=-8063320413415537181}, [why will you] >>> 13684 [Thread-11-exclaim1] INFO backtype.storm.daemon.task - Emitting: >>> exclaim1 default [why will you!!!] >>> 13685 [Thread-11-exclaim1] INFO backtype.storm.daemon.task - Emitting: >>> exclaim1 __ack_ack [-2810730987442558521 -8063320413415537181] >>> 13685 [Thread-17-__acker] INFO backtype.storm.daemon.executor - >>> Processing received message source: kafka:3, stream: __ack_init, id: {}, >>> [-2810730987442558521 -8063320413415537181 3] >>> 13685 [Thread-17-__acker] INFO backtype.storm.daemon.executor - >>> Processing received message source: exclaim1:2, stream: __ack_ack, id: {}, >>> [-2810730987442558521 -8063320413415537181] >>> 13686 [Thread-17-__acker] INFO backtype.storm.daemon.task - Emitting >>> direct: 3; __acker __ack_ack [-2810730987442558521] >>> 13687 [Thread-13-kafka] INFO backtype.storm.daemon.executor - >>> Processing received message source: __acker:1, stream: __ack_ack, id: {}, >>> [-2810730987442558521] >>> 13687 [Thread-13-kafka] INFO backtype.storm.daemon.executor - Acking >>> message storm.kafka.PartitionManager$KafkaMessageId@6fec0cbf >>> 15686 [Thread-13-kafka] INFO backtype.storm.daemon.task - Emitting: >>> kafka default [hit soon] >>> 15687 [Thread-11-exclaim1] INFO backtype.storm.daemon.executor - >>> Processing received message source: kafka:3, stream: default, id: >>> {2311641896827057642=-6472974563298617532}, [hit soon] >>> 15687 [Thread-13-kafka] INFO backtype.storm.daemon.task - Emitting: >>> kafka __ack_init [2311641896827057642 -6472974563298617532 3] >>> 15687 [Thread-11-exclaim1] INFO backtype.storm.daemon.task - Emitting: >>> exclaim1 default [hit soon!!!] >>> 15687 [Thread-11-exclaim1] INFO backtype.storm.daemon.task - Emitting: >>> exclaim1 __ack_ack [2311641896827057642 -6472974563298617532] >>> 15688 [Thread-17-__acker] INFO backtype.storm.daemon.executor - >>> Processing received message source: kafka:3, stream: __ack_init, id: {}, >>> [2311641896827057642 -6472974563298617532 3] >>> 15689 [Thread-17-__acker] INFO backtype.storm.daemon.executor - >>> Processing received message source: exclaim1:2, stream: __ack_ack, id: {}, >>> [2311641896827057642 -6472974563298617532] >>> 15689 [Thread-17-__acker] INFO backtype.storm.daemon.task - Emitting >>> direct: 3; __acker __ack_ack [2311641896827057642] >>> 15691 [Thread-13-kafka] INFO backtype.storm.daemon.executor - >>> Processing received message source: __acker:1, stream: __ack_ack, id: {}, >>> [2311641896827057642] >>> 15692 [Thread-13-kafka] INFO backtype.storm.daemon.executor - Acking >>> message storm.kafka.PartitionManager$KafkaMessageId@5638c9df >>> 17490 [Thread-11-exclaim1] INFO backtype.storm.daemon.executor - >>> Processing received message source: __system:-1, stream: __tick, id: {}, >>> [10] >>> 17496 [Thread-11-exclaim1] ERROR backtype.storm.util - Async loop died! >>> java.lang.RuntimeException: java.lang.ClassCastException: >>> java.lang.Integer cannot be cast to java.lang.String >>> at >>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) >>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] >>> at >>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) >>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] >>> at >>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) >>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] >>> at >>> backtype.storm.daemon.executor$fn__3353$fn__3365$fn__3412.invoke(executor.clj:746) >>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] >>> at backtype.storm.util$async_loop$fn__450.invoke(util.clj:431) >>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] >>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] >>> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45] >>> ... >>> >> >> >
