Seems like a wrong compression codec is being used. Currently, only codec 0 (gzip) and 1 (snappy) are supported.
Thanks, Jun On Mon, May 21, 2012 at 5:35 AM, Raymond Ng <raymond...@gmail.com> wrote: > Hi all > > I'm trying to use the KufkaSpout example from > https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka but > got > the following error when trying to consume the messages > > 2012-05-21 12:52:27 worker [INFO] Worker > 8ecb4136-8d07-4b08-ba99-faccbab6a28e for storm > rolling-ippairs-kafka-17-1337599913 on > 1e5d2733-ee7f-4519-91b5-e97f6105df31:6702 has finished loading > 2012-05-21 12:52:28 log [INFO] > ######################################################## > 2012-05-21 12:52:28 KafkaSpout [INFO] Fetched 20971522 bytes of messages > from Kafka: stormworker03:0 > 2012-05-21 12:52:28 log [INFO] > ######################################################## > 2012-05-21 12:52:28 util [ERROR] Async loop died! > java.lang.RuntimeException: Invalid magic byte 6 > at kafka.message.Message.compressionCodec(Message.scala:144) > at > > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:112) > at > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:138) > at > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:82) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > at > > kafka.javaapi.message.ByteBufferMessageSet$$anon$1.hasNext(ByteBufferMessageSet.scala:55) > at > > com.detica.treidan.storm.spouts.kafka.KafkaSpout$PartitionManager.fill(KafkaSpout.java:118) > at > > com.detica.treidan.storm.spouts.kafka.KafkaSpout$PartitionManager.next(KafkaSpout.java:92) > at > > com.detica.treidan.storm.spouts.kafka.KafkaSpout.nextTuple(KafkaSpout.java:215) > at backtype.storm.daemon.task$fn__3465$fn__3514.invoke(task.clj:372) > at > > backtype.storm.daemon.task$mk_task$iter__3386__3390$fn__3391$fn__3392$fn__3393.invoke(task.clj:247) > at clojure.lang.AFn.applyToHelper(AFn.java:159) > at clojure.lang.AFn.applyTo(AFn.java:151) > at clojure.core$apply.invoke(core.clj:540) > at backtype.storm.util$async_loop$fn__487.invoke(util.clj:271) > at clojure.lang.AFn.run(AFn.java:24) > at java.lang.Thread.run(Thread.java:636) > 2012-05-21 12:52:28 task [ERROR] > > I have changed the KafkaConfig.fetchSizeBytes to 1024*1024*20 due to the > message size from my test data, and the necessary local environemental > config, > > and I've left most of the KafkaSpout code unchanged apart from putting more > log messages in to try to debug the issues > > the following snippet is extracted from KafkaSpout, and the error is > against the line for(MessageAndOffset msg: msgs) in the fill() method > > > //returns false if it's reached the end of current batch > public EmitState next() { > if(_waitingToEmit.isEmpty()) fill(); > MessageAndOffset toEmit = _waitingToEmit.pollFirst(); > if(toEmit==null) return EmitState.NO_EMITTED; > List<Object> tup = > > _spoutConfig.scheme.deserialize(Utils.toByteArray(toEmit.message().payload())); > _collector.emit(tup, new KafkaMessageId(_partition, > actualOffset(toEmit))); > LOG.debug("Emitting the following message : > "+toEmit.message().toString()); > if(_waitingToEmit.size()>0) { > return EmitState.EMITTED_MORE_LEFT; > } else { > return EmitState.EMITTED_END; > } > } > > private void fill() { > SimpleConsumer consumer = _partitions.getConsumer(_partition); > int hostPartition = _partitions.getHostPartition(_partition); > LOG.info("Fetching from Kafka: " + consumer.host() + ":" + > hostPartition); > ByteBufferMessageSet msgs = consumer.fetch( > new FetchRequest( > _spoutConfig.topic, > hostPartition, > _emittedToOffset, > _spoutConfig.fetchSizeBytes)); > Log.info("########################################################"); > LOG.info("Fetched " + msgs.sizeInBytes() + " bytes of messages from > Kafka: " + consumer.host() + ":" + hostPartition); > Log.info("########################################################"); > for(MessageAndOffset msg: msgs) { > _pending.add(actualOffset(msg)); > _waitingToEmit.add(msg); > _emittedToOffset = msg.offset(); > } > } > > could you provide some assistance please > > -- > Rgds > Ray >