Seems like a wrong compression codec is being used. Currently, only codec 0
(gzip) and 1 (snappy) are supported.

Thanks,

Jun

On Mon, May 21, 2012 at 5:35 AM, Raymond Ng <raymond...@gmail.com> wrote:

> Hi all
>
> I'm trying to use the KufkaSpout example from
> https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka but
> got
> the following error when trying to consume the messages
>
> 2012-05-21 12:52:27 worker [INFO] Worker
> 8ecb4136-8d07-4b08-ba99-faccbab6a28e for storm
> rolling-ippairs-kafka-17-1337599913 on
> 1e5d2733-ee7f-4519-91b5-e97f6105df31:6702 has finished loading
> 2012-05-21 12:52:28 log [INFO]
> ########################################################
> 2012-05-21 12:52:28 KafkaSpout [INFO] Fetched 20971522 bytes of messages
> from Kafka: stormworker03:0
> 2012-05-21 12:52:28 log [INFO]
> ########################################################
> 2012-05-21 12:52:28 util [ERROR] Async loop died!
> java.lang.RuntimeException: Invalid magic byte 6
>  at kafka.message.Message.compressionCodec(Message.scala:144)
>  at
>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:112)
>  at
>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:138)
>  at
>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:82)
>  at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>  at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>  at
>
> kafka.javaapi.message.ByteBufferMessageSet$$anon$1.hasNext(ByteBufferMessageSet.scala:55)
>  at
>
> com.detica.treidan.storm.spouts.kafka.KafkaSpout$PartitionManager.fill(KafkaSpout.java:118)
>  at
>
> com.detica.treidan.storm.spouts.kafka.KafkaSpout$PartitionManager.next(KafkaSpout.java:92)
>  at
>
> com.detica.treidan.storm.spouts.kafka.KafkaSpout.nextTuple(KafkaSpout.java:215)
>  at backtype.storm.daemon.task$fn__3465$fn__3514.invoke(task.clj:372)
>  at
>
> backtype.storm.daemon.task$mk_task$iter__3386__3390$fn__3391$fn__3392$fn__3393.invoke(task.clj:247)
>  at clojure.lang.AFn.applyToHelper(AFn.java:159)
>  at clojure.lang.AFn.applyTo(AFn.java:151)
>  at clojure.core$apply.invoke(core.clj:540)
>  at backtype.storm.util$async_loop$fn__487.invoke(util.clj:271)
>  at clojure.lang.AFn.run(AFn.java:24)
>  at java.lang.Thread.run(Thread.java:636)
> 2012-05-21 12:52:28 task [ERROR]
>
> I have changed the KafkaConfig.fetchSizeBytes to 1024*1024*20 due to the
> message size from my test data, and the necessary local environemental
> config,
>
> and I've left most of the KafkaSpout code unchanged apart from putting more
> log messages in to try to debug the issues
>
> the following snippet is extracted from KafkaSpout, and the error is
> against the line    for(MessageAndOffset msg: msgs)   in the fill() method
>
>
>  //returns false if it's reached the end of current batch
>  public EmitState next() {
>   if(_waitingToEmit.isEmpty()) fill();
>   MessageAndOffset toEmit = _waitingToEmit.pollFirst();
>   if(toEmit==null) return EmitState.NO_EMITTED;
>   List<Object> tup =
>
> _spoutConfig.scheme.deserialize(Utils.toByteArray(toEmit.message().payload()));
>   _collector.emit(tup, new KafkaMessageId(_partition,
> actualOffset(toEmit)));
>   LOG.debug("Emitting the following message :
> "+toEmit.message().toString());
>   if(_waitingToEmit.size()>0) {
>    return EmitState.EMITTED_MORE_LEFT;
>   } else {
>    return EmitState.EMITTED_END;
>   }
>  }
>
>  private void fill() {
>   SimpleConsumer consumer = _partitions.getConsumer(_partition);
>   int hostPartition = _partitions.getHostPartition(_partition);
>   LOG.info("Fetching from Kafka: " + consumer.host() + ":" +
> hostPartition);
>   ByteBufferMessageSet msgs = consumer.fetch(
>     new FetchRequest(
>       _spoutConfig.topic,
>       hostPartition,
>       _emittedToOffset,
>       _spoutConfig.fetchSizeBytes));
>   Log.info("########################################################");
>   LOG.info("Fetched " + msgs.sizeInBytes() + " bytes of messages from
> Kafka: " + consumer.host() + ":" + hostPartition);
>   Log.info("########################################################");
>   for(MessageAndOffset msg: msgs) {
>    _pending.add(actualOffset(msg));
>    _waitingToEmit.add(msg);
>    _emittedToOffset = msg.offset();
>   }
>  }
>
> could you provide some assistance please
>
> --
> Rgds
> Ray
>

Reply via email to