Hi,

I've got an exception when trying to use a topology with a single Kafka spout 
and a single Kafka bolt.
Code:
public class SmokeTestTopology {

    public static final String INPUT_TOPIC = "inputTopic";
    public static final String ZK_ROOT_PATH = "/smokeTest";
    public static final String ZK_INPUT_ID = "inputId";
    public static final String OUTPUT_TOPIC = "outputTopic";

    public static StormTopology buildTopology() {
        KafkaSpout kafkaSpout = prepareKafkaSpout();
        KafkaBolt kafkaBolt = prepareKafkaBolt();

        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("spout", kafkaSpout);
        topologyBuilder.setBolt("bolt", kafkaBolt).shuffleGrouping("spout");
        return topologyBuilder.createTopology();
    }

    private static KafkaBolt prepareKafkaBolt() {
        return new KafkaBolt();
    }

    private static KafkaSpout prepareKafkaSpout() {
        SpoutConfig spoutConfig = new SpoutConfig(
                new ZkHosts("192.168.59.103:2181"),
                INPUT_TOPIC,
                ZK_ROOT_PATH,
                ZK_INPUT_ID);
        spoutConfig.forceFromStart = true;
        return new KafkaSpout(spoutConfig);
    }
}

Exception:

2014-09-02 10:29:59 b.s.d.executor [ERROR] 
java.lang.RuntimeException: java.lang.IllegalArgumentException: message does 
not exist 
at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
 ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] 
at 
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
 ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] 
at 
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) 
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] 
at 
backtype.storm.daemon.executor$fn__5641$fn__5653$fn__5700.invoke(executor.clj:746)
 ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] 
at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431) 
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] 
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] 
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_20] 
Caused by: java.lang.IllegalArgumentException: message does not exist 
at backtype.storm.tuple.Fields.fieldIndex(Fields.java:78) 
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] 
at backtype.storm.tuple.TupleImpl.fieldIndex(TupleImpl.java:100) 
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] 
at backtype.storm.tuple.TupleImpl.getValueByField(TupleImpl.java:149) 
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] 
at storm.kafka.bolt.KafkaBolt.execute(KafkaBolt.java:75) ~[stormjar.jar:na] 
at 
backtype.storm.daemon.executor$fn__5641$tuple_action_fn__5643.invoke(executor.clj:631)
 ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] 
at 
backtype.storm.daemon.executor$mk_task_receiver$fn__5564.invoke(executor.clj:399)
 ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] 
at 
backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58) 
~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] 
at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
 ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] 
… 6 common frames omitted 

Anyone with similar issue?

Thanks,
Andras

Reply via email to