Hi all I'm trying to use the KufkaSpout example from https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka but got the following error when trying to consume the messages
2012-05-21 12:52:27 worker [INFO] Worker 8ecb4136-8d07-4b08-ba99-faccbab6a28e for storm rolling-ippairs-kafka-17-1337599913 on 1e5d2733-ee7f-4519-91b5-e97f6105df31:6702 has finished loading 2012-05-21 12:52:28 log [INFO] ######################################################## 2012-05-21 12:52:28 KafkaSpout [INFO] Fetched 20971522 bytes of messages from Kafka: stormworker03:0 2012-05-21 12:52:28 log [INFO] ######################################################## 2012-05-21 12:52:28 util [ERROR] Async loop died! java.lang.RuntimeException: Invalid magic byte 6 at kafka.message.Message.compressionCodec(Message.scala:144) at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:112) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:138) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:82) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) at kafka.javaapi.message.ByteBufferMessageSet$$anon$1.hasNext(ByteBufferMessageSet.scala:55) at com.detica.treidan.storm.spouts.kafka.KafkaSpout$PartitionManager.fill(KafkaSpout.java:118) at com.detica.treidan.storm.spouts.kafka.KafkaSpout$PartitionManager.next(KafkaSpout.java:92) at com.detica.treidan.storm.spouts.kafka.KafkaSpout.nextTuple(KafkaSpout.java:215) at backtype.storm.daemon.task$fn__3465$fn__3514.invoke(task.clj:372) at backtype.storm.daemon.task$mk_task$iter__3386__3390$fn__3391$fn__3392$fn__3393.invoke(task.clj:247) at clojure.lang.AFn.applyToHelper(AFn.java:159) at clojure.lang.AFn.applyTo(AFn.java:151) at clojure.core$apply.invoke(core.clj:540) at backtype.storm.util$async_loop$fn__487.invoke(util.clj:271) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:636) 2012-05-21 12:52:28 task [ERROR] I have changed the KafkaConfig.fetchSizeBytes to 1024*1024*20 due to the message size from my test data, and the necessary local environemental config, and I've left most of the KafkaSpout code unchanged apart from putting more log messages in to try to debug the issues the following snippet is extracted from KafkaSpout, and the error is against the line for(MessageAndOffset msg: msgs) in the fill() method //returns false if it's reached the end of current batch public EmitState next() { if(_waitingToEmit.isEmpty()) fill(); MessageAndOffset toEmit = _waitingToEmit.pollFirst(); if(toEmit==null) return EmitState.NO_EMITTED; List<Object> tup = _spoutConfig.scheme.deserialize(Utils.toByteArray(toEmit.message().payload())); _collector.emit(tup, new KafkaMessageId(_partition, actualOffset(toEmit))); LOG.debug("Emitting the following message : "+toEmit.message().toString()); if(_waitingToEmit.size()>0) { return EmitState.EMITTED_MORE_LEFT; } else { return EmitState.EMITTED_END; } } private void fill() { SimpleConsumer consumer = _partitions.getConsumer(_partition); int hostPartition = _partitions.getHostPartition(_partition); LOG.info("Fetching from Kafka: " + consumer.host() + ":" + hostPartition); ByteBufferMessageSet msgs = consumer.fetch( new FetchRequest( _spoutConfig.topic, hostPartition, _emittedToOffset, _spoutConfig.fetchSizeBytes)); Log.info("########################################################"); LOG.info("Fetched " + msgs.sizeInBytes() + " bytes of messages from Kafka: " + consumer.host() + ":" + hostPartition); Log.info("########################################################"); for(MessageAndOffset msg: msgs) { _pending.add(actualOffset(msg)); _waitingToEmit.add(msg); _emittedToOffset = msg.offset(); } } could you provide some assistance please -- Rgds Ray