The solution:
The keys and values will be serialized before passing to another component. In
this case the Kafka bolt received both the key (="message") and the value
(=<the message itself) as byte arrays, therefore the key "message" couldn't be
found. Only after deserializing with the help of a scheme works the bolt as
expected.
// …further config of spoutConfig
spoutConfig.scheme = new KeyValueSchemeAsMultiScheme(new
KafkaBoltKeyValueScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
public class KafkaBoltKeyValueScheme extends StringKeyValueScheme {
@Override
public Fields getOutputFields() {
return new Fields("message");
}
}
Cheers,
Andras
> On 02 Sep 2014, at 12:30, Andras Hatvani <[email protected]>
> wrote:
>
> Hi,
>
> I've got an exception when trying to use a topology with a single Kafka spout
> and a single Kafka bolt.
> Code:
> public class SmokeTestTopology {
>
> public static final String INPUT_TOPIC = "inputTopic";
> public static final String ZK_ROOT_PATH = "/smokeTest";
> public static final String ZK_INPUT_ID = "inputId";
> public static final String OUTPUT_TOPIC = "outputTopic";
>
> public static StormTopology buildTopology() {
> KafkaSpout kafkaSpout = prepareKafkaSpout();
> KafkaBolt kafkaBolt = prepareKafkaBolt();
>
> TopologyBuilder topologyBuilder = new TopologyBuilder();
> topologyBuilder.setSpout("spout", kafkaSpout);
> topologyBuilder.setBolt("bolt", kafkaBolt).shuffleGrouping("spout");
> return topologyBuilder.createTopology();
> }
>
> private static KafkaBolt prepareKafkaBolt() {
> return new KafkaBolt();
> }
>
> private static KafkaSpout prepareKafkaSpout() {
> SpoutConfig spoutConfig = new SpoutConfig(
> new ZkHosts("192.168.59.103:2181"),
> INPUT_TOPIC,
> ZK_ROOT_PATH,
> ZK_INPUT_ID);
> spoutConfig.forceFromStart = true;
> return new KafkaSpout(spoutConfig);
> }
> }
>
> Exception:
>
> 2014-09-02 10:29:59 b.s.d.executor [ERROR]
> java.lang.RuntimeException: java.lang.IllegalArgumentException: message does
> not exist
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> at
> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> at
> backtype.storm.daemon.executor$fn__5641$fn__5653$fn__5700.invoke(executor.clj:746)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> at backtype.storm.util$async_loop$fn__457.invoke(util.clj:431)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_20]
> Caused by: java.lang.IllegalArgumentException: message does not exist
> at backtype.storm.tuple.Fields.fieldIndex(Fields.java:78)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> at backtype.storm.tuple.TupleImpl.fieldIndex(TupleImpl.java:100)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> at backtype.storm.tuple.TupleImpl.getValueByField(TupleImpl.java:149)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> at storm.kafka.bolt.KafkaBolt.execute(KafkaBolt.java:75) ~[stormjar.jar:na]
> at
> backtype.storm.daemon.executor$fn__5641$tuple_action_fn__5643.invoke(executor.clj:631)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> at
> backtype.storm.daemon.executor$mk_task_receiver$fn__5564.invoke(executor.clj:399)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> at
> backtype.storm.disruptor$clojure_handler$reify__745.onEvent(disruptor.clj:58)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> at
> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
> ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
> … 6 common frames omitted
>
> Anyone with similar issue?
>
> Thanks,
> Andras