Hi Storm Experts!
I'm running Storm 0.9.2-incubating and kafka_2.10-0.8.1.1
I'm trying to get the ExclamationTopolgy to run with a Kafka Spout (using
the storm-kafka project included with the Github - not storm-kafka-starter)
Here is the relevant portion of my ExcalamationTopology:
BrokerHosts hosts = new ZkHosts("localhost:2181");
SpoutConfig spoutConfig = new SpoutConfig(hosts, "test", "/test",
"discovery");
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
builder.setSpout("kafka", kafkaSpout);
builder.setBolt("exclaim1", new
ExclamationBolt()).shuffleGrouping("kafka");
When I run the topology on Local mode for a 1 minute, it works for the
first 5 seconds, and then it breaks.
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
So not sure why it says I'm using String on an Int, and why it works before
but now now.
In my kafka server.properties:
num.network.threads=2
num.io.threads=8
num.partitions=2
Should I be doing something like this?
builder.setSpout("kafka", kafkaSpout, 2);
Here is the output for when it's working, and then when it starts to break.
Any help would be much appreciated!!
As you can see - it starts to die on Thread 11
Best,
Martin
=================
...
13684 [Thread-13-kafka] INFO backtype.storm.daemon.task - Emitting: kafka
default [why will you]
13684 [Thread-13-kafka] INFO backtype.storm.daemon.task - Emitting: kafka
__ack_init [-2810730987442558521 -8063320413415537181 3]
13684 [Thread-11-exclaim1] INFO backtype.storm.daemon.executor -
Processing received message source: kafka:3, stream: default, id:
{-2810730987442558521=-8063320413415537181}, [why will you]
13684 [Thread-11-exclaim1] INFO backtype.storm.daemon.task - Emitting:
exclaim1 default [why will you!!!]
13685 [Thread-11-exclaim1] INFO backtype.storm.daemon.task - Emitting:
exclaim1 __ack_ack [-2810730987442558521 -8063320413415537181]
13685 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing
received message source: kafka:3, stream: __ack_init, id: {},
[-2810730987442558521 -8063320413415537181 3]
13685 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing
received message source: exclaim1:2, stream: __ack_ack, id: {},
[-2810730987442558521 -8063320413415537181]
13686 [Thread-17-__acker] INFO backtype.storm.daemon.task - Emitting
direct: 3; __acker __ack_ack [-2810730987442558521]
13687 [Thread-13-kafka] INFO backtype.storm.daemon.executor - Processing
received message source: __acker:1, stream: __ack_ack, id: {},
[-2810730987442558521]
13687 [Thread-13-kafka] INFO backtype.storm.daemon.executor - Acking
message storm.kafka.PartitionManager$KafkaMessageId@6fec0cbf
15686 [Thread-13-kafka] INFO backtype.storm.daemon.task - Emitting: kafka
default [hit soon]
15687 [Thread-11-exclaim1] INFO backtype.storm.daemon.executor -
Processing received message source: kafka:3, stream: default, id:
{2311641896827057642=-6472974563298617532}, [hit soon]
15687 [Thread-13-kafka] INFO backtype.storm.daemon.task - Emitting: kafka
__ack_init [2311641896827057642 -6472974563298617532 3]
15687 [Thread-11-exclaim1] INFO backtype.storm.daemon.task - Emitting:
exclaim1 default [hit soon!!!]
15687 [Thread-11-exclaim1] INFO backtype.storm.daemon.task - Emitting:
exclaim1 __ack_ack [2311641896827057642 -6472974563298617532]
15688 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing
received message source: kafka:3, stream: __ack_init, id: {},
[2311641896827057642 -6472974563298617532 3]
15689 [Thread-17-__acker] INFO backtype.storm.daemon.executor - Processing
received message source: exclaim1:2, stream: __ack_ack, id: {},
[2311641896827057642 -6472974563298617532]
15689 [Thread-17-__acker] INFO backtype.storm.daemon.task - Emitting
direct: 3; __acker __ack_ack [2311641896827057642]
15691 [Thread-13-kafka] INFO backtype.storm.daemon.executor - Processing
received message source: __acker:1, stream: __ack_ack, id: {},
[2311641896827057642]
15692 [Thread-13-kafka] INFO backtype.storm.daemon.executor - Acking
message storm.kafka.PartitionManager$KafkaMessageId@5638c9df
17490 [Thread-11-exclaim1] INFO backtype.storm.daemon.executor -
Processing received message source: __system:-1, stream: __tick, id: {},
[10]
17496 [Thread-11-exclaim1] ERROR backtype.storm.util - Async loop died!
java.lang.RuntimeException: java.lang.ClassCastException: java.lang.Integer
cannot be cast to java.lang.String
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.daemon.executor$fn__3353$fn__3365$fn__3412.invoke(executor.clj:746)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__450.invoke(util.clj:431)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]
...