The solution:

The keys and values will be serialized before passing to another component. In 
this case the Kafka bolt received both the key (="message") and the value 
(=<the message itself) as byte arrays, therefore the key "message" couldn't be 
found. Only after deserializing with the help of a scheme works the bolt as 
expected.

// …further config of spoutConfig
spoutConfig.scheme = new KeyValueSchemeAsMultiScheme(new 
KafkaBoltKeyValueScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

public class KafkaBoltKeyValueScheme extends StringKeyValueScheme {
    @Override
    public Fields getOutputFields() {
        return new Fields("message");
    }
}

Cheers,
Andras

> On 02 Sep 2014, at 12:30, Andras Hatvani <[email protected]> 
> wrote:
> 
> Hi,
> 
> I've got an exception when trying to use a topology with a single Kafka spout 
> and a single Kafka bolt.
> Code:
> public class SmokeTestTopology {
> 
>    public static final String INPUT_TOPIC = "inputTopic";
>    public static final String ZK_ROOT_PATH = "/smokeTest";
>    public static final String ZK_INPUT_ID = "inputId";
>    public static final String OUTPUT_TOPIC = "outputTopic";
> 
>    public static StormTopology buildTopology() {
>        KafkaSpout kafkaSpout = prepareKafkaSpout();
>        KafkaBolt kafkaBolt = prepareKafkaBolt();
> 
>        TopologyBuilder topologyBuilder = new TopologyBuilder();
>        topologyBuilder.setSpout("spout", kafkaSpout);
>        topologyBuilder.setBolt("bolt", kafkaBolt).shuffleGrouping("spout");
>        return topologyBuilder.createTopology();
>    }
> 
>    private static KafkaBolt prepareKafkaBolt() {
>        return new KafkaBolt();
>    }
> 
>    private static KafkaSpout prepareKafkaSpout() {
>        SpoutConfig spoutConfig = new SpoutConfig(
>                new ZkHosts("192.168.59.103:2181"),
>                INPUT_TOPIC,
>                ZK_ROOT_PATH,
>                ZK_INPUT_ID);
>        spoutConfig.forceFromStart = true;
>        return new KafkaSpout(spoutConfig);
>    }
> }
> 
> Exception:
> 
> 2014-09-02 10:29:59 b.s.d.executor [ERROR] 
> java.lang.RuntimeException: java.lang.IllegalArgumentException: message does 
> not exist 
> at 
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
>  ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] 
> at 
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
>  ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] 
> at 
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
>  ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] 
> at 
> backtype.storm.daemon.executor$fn__5641$fn__5653$fn__5700.invoke(executor.clj:746)
>  ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] 
> at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431) 
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] 
> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] 
> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_20] 
> Caused by: java.lang.IllegalArgumentException: message does not exist 
> at backtype.storm.tuple.Fields.fieldIndex(Fields.java:78) 
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] 
> at backtype.storm.tuple.TupleImpl.fieldIndex(TupleImpl.java:100) 
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] 
> at backtype.storm.tuple.TupleImpl.getValueByField(TupleImpl.java:149) 
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] 
> at storm.kafka.bolt.KafkaBolt.execute(KafkaBolt.java:75) ~[stormjar.jar:na] 
> at 
> backtype.storm.daemon.executor$fn__5641$tuple_action_fn__5643.invoke(executor.clj:631)
>  ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] 
> at 
> backtype.storm.daemon.executor$mk_task_receiver$fn__5564.invoke(executor.clj:399)
>  ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] 
> at 
> backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58) 
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] 
> at 
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
>  ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating] 
> … 6 common frames omitted 
> 
> Anyone with similar issue?
> 
> Thanks,
> Andras

Reply via email to