Hi again,
Does my zkRoot need to equal the value that is actually in zookeeper ? (I'm
also new to zookeeper0
Here is Zookeeper:
[zk: 127.0.0.1:2181(CONNECTED) 8] ls /brokers/topics/mytopic/partitions
[1, 0]
And here is what I pass into spoutConfig:
SpoutConfig spoutConfig = new SpoutConfig(hosts, "mytopic",
"/brokers/mytopic", "1");
Right before the crash, debugging statements say:
7188 [Thread-13-kafka] INFO storm.kafka.PartitionManager - Read partition
information from: /brokers/mytopic/1/partition_1 --> null
7332 [Thread-13-kafka] INFO storm.kafka.PartitionManager - No partition
information found, using configuration to determine offset
7332 [Thread-13-kafka] INFO storm.kafka.PartitionManager - Starting Kafka
localhost:1 from offset 60
7334 [Thread-13-kafka] INFO storm.kafka.PartitionManager - Read partition
information from: /brokers/mytopic/1/partition_0 --> null
7335 [Thread-13-kafka] INFO storm.kafka.PartitionManager - No partition
information found, using configuration to determine offset
7335 [Thread-13-kafka] INFO storm.kafka.PartitionManager - Starting Kafka
localhost:0 from offset 60
7335 [Thread-13-kafka] INFO storm.kafka.ZkCoordinator - Task [1/1]
Finished refreshing
17001 [Thread-11-exclaim1] INFO backtype.storm.daemon.executor -
Processing received message source: __system:-1, stream: __tick, id: {},
[10]
17007 [Thread-11-exclaim1] ERROR backtype.storm.util - Async loop died!
java.lang.RuntimeException: java.lang.ClassCastException: java.lang.Integer
cannot be cast to java.lang.String
....
On Fri, Oct 3, 2014 at 11:20 AM, Martin Arrowsmith <
[email protected]> wrote:
> Hi Storm Experts!
>
> I'm running Storm 0.9.2-incubating and kafka_2.10-0.8.1.1
>
> I'm trying to get the ExclamationTopolgy to run with a Kafka Spout (using
> the storm-kafka project included with the Github - not storm-kafka-starter)
>
> Here is the relevant portion of my ExcalamationTopology:
>
> BrokerHosts hosts = new ZkHosts("localhost:2181");
> SpoutConfig spoutConfig = new SpoutConfig(hosts, "test", "/test",
> "discovery");
> spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
> KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
>
> builder.setSpout("kafka", kafkaSpout);
>
> builder.setBolt("exclaim1", new
> ExclamationBolt()).shuffleGrouping("kafka");
>
> When I run the topology on Local mode for a 1 minute, it works for the
> first 5 seconds, and then it breaks.
>
> spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
>
> So not sure why it says I'm using String on an Int, and why it works
> before but now now.
>
> In my kafka server.properties:
>
> num.network.threads=2
> num.io.threads=8
> num.partitions=2
>
> Should I be doing something like this?
>
> builder.setSpout("kafka", kafkaSpout, 2);
>
> Here is the output for when it's working, and then when it starts to
> break. Any help would be much appreciated!!
>
> As you can see - it starts to die on Thread 11
>
> Best,
>
> Martin
>
> =================
> ...
>
> 13684 [Thread-13-kafka] INFO backtype.storm.daemon.task - Emitting: kafka
> default [why will you]
> 13684 [Thread-13-kafka] INFO backtype.storm.daemon.task - Emitting: kafka
> __ack_init [-2810730987442558521 -8063320413415537181 3]
> 13684 [Thread-11-exclaim1] INFO backtype.storm.daemon.executor -
> Processing received message source: kafka:3, stream: default, id:
> {-2810730987442558521=-8063320413415537181}, [why will you]
> 13684 [Thread-11-exclaim1] INFO backtype.storm.daemon.task - Emitting:
> exclaim1 default [why will you!!!]
> 13685 [Thread-11-exclaim1] INFO backtype.storm.daemon.task - Emitting:
> exclaim1 __ack_ack [-2810730987442558521 -8063320413415537181]
> 13685 [Thread-17-__acker] INFO backtype.storm.daemon.executor -
> Processing received message source: kafka:3, stream: __ack_init, id: {},
> [-2810730987442558521 -8063320413415537181 3]
> 13685 [Thread-17-__acker] INFO backtype.storm.daemon.executor -
> Processing received message source: exclaim1:2, stream: __ack_ack, id: {},
> [-2810730987442558521 -8063320413415537181]
> 13686 [Thread-17-__acker] INFO backtype.storm.daemon.task - Emitting
> direct: 3; __acker __ack_ack [-2810730987442558521]
> 13687 [Thread-13-kafka] INFO backtype.storm.daemon.executor - Processing
> received message source: __acker:1, stream: __ack_ack, id: {},
> [-2810730987442558521]
> 13687 [Thread-13-kafka] INFO backtype.storm.daemon.executor - Acking
> message storm.kafka.PartitionManager$KafkaMessageId@6fec0cbf
> 15686 [Thread-13-kafka] INFO backtype.storm.daemon.task - Emitting: kafka
> default [hit soon]
> 15687 [Thread-11-exclaim1] INFO backtype.storm.daemon.executor -
> Processing received message source: kafka:3, stream: default, id:
> {2311641896827057642=-6472974563298617532}, [hit soon]
> 15687 [Thread-13-kafka] INFO backtype.storm.daemon.task - Emitting: kafka
> __ack_init [2311641896827057642 -6472974563298617532 3]
> 15687 [Thread-11-exclaim1] INFO backtype.storm.daemon.task - Emitting:
> exclaim1 default [hit soon!!!]
> 15687 [Thread-11-exclaim1] INFO backtype.storm.daemon.task - Emitting:
> exclaim1 __ack_ack [2311641896827057642 -6472974563298617532]
> 15688 [Thread-17-__acker] INFO backtype.storm.daemon.executor -
> Processing received message source: kafka:3, stream: __ack_init, id: {},
> [2311641896827057642 -6472974563298617532 3]
> 15689 [Thread-17-__acker] INFO backtype.storm.daemon.executor -
> Processing received message source: exclaim1:2, stream: __ack_ack, id: {},
> [2311641896827057642 -6472974563298617532]
> 15689 [Thread-17-__acker] INFO backtype.storm.daemon.task - Emitting
> direct: 3; __acker __ack_ack [2311641896827057642]
> 15691 [Thread-13-kafka] INFO backtype.storm.daemon.executor - Processing
> received message source: __acker:1, stream: __ack_ack, id: {},
> [2311641896827057642]
> 15692 [Thread-13-kafka] INFO backtype.storm.daemon.executor - Acking
> message storm.kafka.PartitionManager$KafkaMessageId@5638c9df
> 17490 [Thread-11-exclaim1] INFO backtype.storm.daemon.executor -
> Processing received message source: __system:-1, stream: __tick, id: {},
> [10]
> 17496 [Thread-11-exclaim1] ERROR backtype.storm.util - Async loop died!
> java.lang.RuntimeException: java.lang.ClassCastException:
> java.lang.Integer cannot be cast to java.lang.String
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> at
> backtype.storm.daemon.executor$fn__3353$fn__3365$fn__3412.invoke(executor.clj:746)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> at backtype.storm.util$async_loop$fn__450.invoke(util.clj:431)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
> at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]
> ...
>