I've now switched to 0.9.3 for Storm - can someone please tell me how to use the external Zookeeper in Local mode ? Would be very much appreciated and grateful!
Thanks, Martin On Fri, Oct 3, 2014 at 3:55 PM, Martin Arrowsmith < [email protected]> wrote: > After some internet and forum searching, it looks like my Local mode isn't > connecting to an external zookeeper. Does that sound familiar ? If so, how > come I can still see some spouts ? > > If it's true, any idea how I can connect to an external zookeeper in Local > mode ? > > Martin > > On Fri, Oct 3, 2014 at 3:01 PM, Martin Arrowsmith < > [email protected]> wrote: > >> Sorry again, just wanted to provide more info. >> >> The other threads seem fine - it's always Thread 11 that errors. Here is >> Thread 13 for instance: >> >> 7040 [Thread-13-kafka] INFO backtype.storm.daemon.executor - Opened >> spout kafka:(3) >> 7044 [Thread-13-kafka] INFO backtype.storm.daemon.executor - Activating >> spout kafka:(3) >> 7045 [Thread-13-kafka] INFO storm.kafka.ZkCoordinator - Task [1/1] >> Refreshing partition manager connections >> 7047 [Thread-13-kafka-EventThread] INFO >> org.apache.curator.framework.state.ConnectionStateManager - State change: >> CONNECTED >> 7047 [ConnectionStateManager-0] WARN >> org.apache.curator.framework.state.ConnectionStateManager - There are no >> ConnectionStateListeners registered. >> 7052 [Thread-13-kafka] INFO storm.kafka.DynamicBrokersReader - Read >> partition info from zookeeper: >> GlobalPartitionInformation{partitionMap={0=localhost:9092, >> 1=localhost:9092}} >> 7054 [Thread-13-kafka] INFO storm.kafka.KafkaUtils - Task [1/1] assigned >> [Partition{host=localhost:9092, partition=0}, >> Partition{host=localhost:9092, partition=1}] >> 7054 [Thread-13-kafka] INFO storm.kafka.ZkCoordinator - Task [1/1] >> Deleted partition managers: [] >> 7054 [Thread-13-kafka] INFO storm.kafka.ZkCoordinator - Task [1/1] New >> partition managers: [Partition{host=localhost:9092, partition=1}, >> Partition{host=localhost:9092, partition=0}] >> >> On Fri, Oct 3, 2014 at 2:57 PM, Martin Arrowsmith < >> [email protected]> wrote: >> >>> Hi again, >>> >>> Does my zkRoot need to equal the value that is actually in zookeeper ? >>> (I'm also new to zookeeper0 >>> >>> Here is Zookeeper: >>> >>> [zk: 127.0.0.1:2181(CONNECTED) 8] ls /brokers/topics/mytopic/partitions >>> [1, 0] >>> >>> And here is what I pass into spoutConfig: >>> >>> SpoutConfig spoutConfig = new SpoutConfig(hosts, "mytopic", >>> "/brokers/mytopic", "1"); >>> >>> Right before the crash, debugging statements say: >>> >>> 7188 [Thread-13-kafka] INFO storm.kafka.PartitionManager - Read >>> partition information from: /brokers/mytopic/1/partition_1 --> null >>> 7332 [Thread-13-kafka] INFO storm.kafka.PartitionManager - No partition >>> information found, using configuration to determine offset >>> 7332 [Thread-13-kafka] INFO storm.kafka.PartitionManager - Starting >>> Kafka localhost:1 from offset 60 >>> 7334 [Thread-13-kafka] INFO storm.kafka.PartitionManager - Read >>> partition information from: /brokers/mytopic/1/partition_0 --> null >>> 7335 [Thread-13-kafka] INFO storm.kafka.PartitionManager - No partition >>> information found, using configuration to determine offset >>> 7335 [Thread-13-kafka] INFO storm.kafka.PartitionManager - Starting >>> Kafka localhost:0 from offset 60 >>> 7335 [Thread-13-kafka] INFO storm.kafka.ZkCoordinator - Task [1/1] >>> Finished refreshing >>> 17001 [Thread-11-exclaim1] INFO backtype.storm.daemon.executor - >>> Processing received message source: __system:-1, stream: __tick, id: {}, >>> [10] >>> 17007 [Thread-11-exclaim1] ERROR backtype.storm.util - Async loop died! >>> java.lang.RuntimeException: java.lang.ClassCastException: >>> java.lang.Integer cannot be cast to java.lang.String >>> .... >>> >>> >>> >>> On Fri, Oct 3, 2014 at 11:20 AM, Martin Arrowsmith < >>> [email protected]> wrote: >>> >>>> Hi Storm Experts! >>>> >>>> I'm running Storm 0.9.2-incubating and kafka_2.10-0.8.1.1 >>>> >>>> I'm trying to get the ExclamationTopolgy to run with a Kafka Spout >>>> (using the storm-kafka project included with the Github - not >>>> storm-kafka-starter) >>>> >>>> Here is the relevant portion of my ExcalamationTopology: >>>> >>>> BrokerHosts hosts = new ZkHosts("localhost:2181"); >>>> SpoutConfig spoutConfig = new SpoutConfig(hosts, "test", "/test", >>>> "discovery"); >>>> spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); >>>> KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); >>>> >>>> builder.setSpout("kafka", kafkaSpout); >>>> >>>> builder.setBolt("exclaim1", new >>>> ExclamationBolt()).shuffleGrouping("kafka"); >>>> >>>> When I run the topology on Local mode for a 1 minute, it works for the >>>> first 5 seconds, and then it breaks. >>>> >>>> spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); >>>> >>>> So not sure why it says I'm using String on an Int, and why it works >>>> before but now now. >>>> >>>> In my kafka server.properties: >>>> >>>> num.network.threads=2 >>>> num.io.threads=8 >>>> num.partitions=2 >>>> >>>> Should I be doing something like this? >>>> >>>> builder.setSpout("kafka", kafkaSpout, 2); >>>> >>>> Here is the output for when it's working, and then when it starts to >>>> break. Any help would be much appreciated!! >>>> >>>> As you can see - it starts to die on Thread 11 >>>> >>>> Best, >>>> >>>> Martin >>>> >>>> ================= >>>> ... >>>> >>>> 13684 [Thread-13-kafka] INFO backtype.storm.daemon.task - Emitting: >>>> kafka default [why will you] >>>> 13684 [Thread-13-kafka] INFO backtype.storm.daemon.task - Emitting: >>>> kafka __ack_init [-2810730987442558521 -8063320413415537181 3] >>>> 13684 [Thread-11-exclaim1] INFO backtype.storm.daemon.executor - >>>> Processing received message source: kafka:3, stream: default, id: >>>> {-2810730987442558521=-8063320413415537181}, [why will you] >>>> 13684 [Thread-11-exclaim1] INFO backtype.storm.daemon.task - Emitting: >>>> exclaim1 default [why will you!!!] >>>> 13685 [Thread-11-exclaim1] INFO backtype.storm.daemon.task - Emitting: >>>> exclaim1 __ack_ack [-2810730987442558521 -8063320413415537181] >>>> 13685 [Thread-17-__acker] INFO backtype.storm.daemon.executor - >>>> Processing received message source: kafka:3, stream: __ack_init, id: {}, >>>> [-2810730987442558521 -8063320413415537181 3] >>>> 13685 [Thread-17-__acker] INFO backtype.storm.daemon.executor - >>>> Processing received message source: exclaim1:2, stream: __ack_ack, id: {}, >>>> [-2810730987442558521 -8063320413415537181] >>>> 13686 [Thread-17-__acker] INFO backtype.storm.daemon.task - Emitting >>>> direct: 3; __acker __ack_ack [-2810730987442558521] >>>> 13687 [Thread-13-kafka] INFO backtype.storm.daemon.executor - >>>> Processing received message source: __acker:1, stream: __ack_ack, id: {}, >>>> [-2810730987442558521] >>>> 13687 [Thread-13-kafka] INFO backtype.storm.daemon.executor - Acking >>>> message storm.kafka.PartitionManager$KafkaMessageId@6fec0cbf >>>> 15686 [Thread-13-kafka] INFO backtype.storm.daemon.task - Emitting: >>>> kafka default [hit soon] >>>> 15687 [Thread-11-exclaim1] INFO backtype.storm.daemon.executor - >>>> Processing received message source: kafka:3, stream: default, id: >>>> {2311641896827057642=-6472974563298617532}, [hit soon] >>>> 15687 [Thread-13-kafka] INFO backtype.storm.daemon.task - Emitting: >>>> kafka __ack_init [2311641896827057642 -6472974563298617532 3] >>>> 15687 [Thread-11-exclaim1] INFO backtype.storm.daemon.task - Emitting: >>>> exclaim1 default [hit soon!!!] >>>> 15687 [Thread-11-exclaim1] INFO backtype.storm.daemon.task - Emitting: >>>> exclaim1 __ack_ack [2311641896827057642 -6472974563298617532] >>>> 15688 [Thread-17-__acker] INFO backtype.storm.daemon.executor - >>>> Processing received message source: kafka:3, stream: __ack_init, id: {}, >>>> [2311641896827057642 -6472974563298617532 3] >>>> 15689 [Thread-17-__acker] INFO backtype.storm.daemon.executor - >>>> Processing received message source: exclaim1:2, stream: __ack_ack, id: {}, >>>> [2311641896827057642 -6472974563298617532] >>>> 15689 [Thread-17-__acker] INFO backtype.storm.daemon.task - Emitting >>>> direct: 3; __acker __ack_ack [2311641896827057642] >>>> 15691 [Thread-13-kafka] INFO backtype.storm.daemon.executor - >>>> Processing received message source: __acker:1, stream: __ack_ack, id: {}, >>>> [2311641896827057642] >>>> 15692 [Thread-13-kafka] INFO backtype.storm.daemon.executor - Acking >>>> message storm.kafka.PartitionManager$KafkaMessageId@5638c9df >>>> 17490 [Thread-11-exclaim1] INFO backtype.storm.daemon.executor - >>>> Processing received message source: __system:-1, stream: __tick, id: {}, >>>> [10] >>>> 17496 [Thread-11-exclaim1] ERROR backtype.storm.util - Async loop died! >>>> java.lang.RuntimeException: java.lang.ClassCastException: >>>> java.lang.Integer cannot be cast to java.lang.String >>>> at >>>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) >>>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] >>>> at >>>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) >>>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] >>>> at >>>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) >>>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] >>>> at >>>> backtype.storm.daemon.executor$fn__3353$fn__3365$fn__3412.invoke(executor.clj:746) >>>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] >>>> at backtype.storm.util$async_loop$fn__450.invoke(util.clj:431) >>>> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] >>>> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] >>>> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45] >>>> ... >>>> >>> >>> >> >
