Hi,
I've got an exception when trying to use a topology with a single Kafka spout
and a single Kafka bolt.
Code:
public class SmokeTestTopology {
public static final String INPUT_TOPIC = "inputTopic";
public static final String ZK_ROOT_PATH = "/smokeTest";
public static final String ZK_INPUT_ID = "inputId";
public static final String OUTPUT_TOPIC = "outputTopic";
public static StormTopology buildTopology() {
KafkaSpout kafkaSpout = prepareKafkaSpout();
KafkaBolt kafkaBolt = prepareKafkaBolt();
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("spout", kafkaSpout);
topologyBuilder.setBolt("bolt", kafkaBolt).shuffleGrouping("spout");
return topologyBuilder.createTopology();
}
private static KafkaBolt prepareKafkaBolt() {
return new KafkaBolt();
}
private static KafkaSpout prepareKafkaSpout() {
SpoutConfig spoutConfig = new SpoutConfig(
new ZkHosts("192.168.59.103:2181"),
INPUT_TOPIC,
ZK_ROOT_PATH,
ZK_INPUT_ID);
spoutConfig.forceFromStart = true;
return new KafkaSpout(spoutConfig);
}
}
Exception:
2014-09-02 10:29:59 b.s.d.executor [ERROR]
java.lang.RuntimeException: java.lang.IllegalArgumentException: message does
not exist
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.daemon.executor$fn__5641$fn__5653$fn__5700.invoke(executor.clj:746)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_20]
Caused by: java.lang.IllegalArgumentException: message does not exist
at backtype.storm.tuple.Fields.fieldIndex(Fields.java:78)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.tuple.TupleImpl.fieldIndex(TupleImpl.java:100)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at backtype.storm.tuple.TupleImpl.getValueByField(TupleImpl.java:149)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at storm.kafka.bolt.KafkaBolt.execute(KafkaBolt.java:75) ~[stormjar.jar:na]
at
backtype.storm.daemon.executor$fn__5641$tuple_action_fn__5643.invoke(executor.clj:631)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.daemon.executor$mk_task_receiver$fn__5564.invoke(executor.clj:399)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
… 6 common frames omitted
Anyone with similar issue?
Thanks,
Andras