Hi all

I'm trying to use the KufkaSpout example from
https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka but got
the following error when trying to consume the messages

2012-05-21 12:52:27 worker [INFO] Worker
8ecb4136-8d07-4b08-ba99-faccbab6a28e for storm
rolling-ippairs-kafka-17-1337599913 on
1e5d2733-ee7f-4519-91b5-e97f6105df31:6702 has finished loading
2012-05-21 12:52:28 log [INFO]
########################################################
2012-05-21 12:52:28 KafkaSpout [INFO] Fetched 20971522 bytes of messages
from Kafka: stormworker03:0
2012-05-21 12:52:28 log [INFO]
########################################################
2012-05-21 12:52:28 util [ERROR] Async loop died!
java.lang.RuntimeException: Invalid magic byte 6
 at kafka.message.Message.compressionCodec(Message.scala:144)
 at
kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:112)
 at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:138)
 at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:82)
 at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
 at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
 at
kafka.javaapi.message.ByteBufferMessageSet$$anon$1.hasNext(ByteBufferMessageSet.scala:55)
 at
com.detica.treidan.storm.spouts.kafka.KafkaSpout$PartitionManager.fill(KafkaSpout.java:118)
 at
com.detica.treidan.storm.spouts.kafka.KafkaSpout$PartitionManager.next(KafkaSpout.java:92)
 at
com.detica.treidan.storm.spouts.kafka.KafkaSpout.nextTuple(KafkaSpout.java:215)
 at backtype.storm.daemon.task$fn__3465$fn__3514.invoke(task.clj:372)
 at
backtype.storm.daemon.task$mk_task$iter__3386__3390$fn__3391$fn__3392$fn__3393.invoke(task.clj:247)
 at clojure.lang.AFn.applyToHelper(AFn.java:159)
 at clojure.lang.AFn.applyTo(AFn.java:151)
 at clojure.core$apply.invoke(core.clj:540)
 at backtype.storm.util$async_loop$fn__487.invoke(util.clj:271)
 at clojure.lang.AFn.run(AFn.java:24)
 at java.lang.Thread.run(Thread.java:636)
2012-05-21 12:52:28 task [ERROR]

I have changed the KafkaConfig.fetchSizeBytes to 1024*1024*20 due to the
message size from my test data, and the necessary local environemental
config,

and I've left most of the KafkaSpout code unchanged apart from putting more
log messages in to try to debug the issues

the following snippet is extracted from KafkaSpout, and the error is
against the line    for(MessageAndOffset msg: msgs)   in the fill() method


  //returns false if it's reached the end of current batch
  public EmitState next() {
   if(_waitingToEmit.isEmpty()) fill();
   MessageAndOffset toEmit = _waitingToEmit.pollFirst();
   if(toEmit==null) return EmitState.NO_EMITTED;
   List<Object> tup =
_spoutConfig.scheme.deserialize(Utils.toByteArray(toEmit.message().payload()));
   _collector.emit(tup, new KafkaMessageId(_partition,
actualOffset(toEmit)));
   LOG.debug("Emitting the following message :
"+toEmit.message().toString());
   if(_waitingToEmit.size()>0) {
    return EmitState.EMITTED_MORE_LEFT;
   } else {
    return EmitState.EMITTED_END;
   }
  }

  private void fill() {
   SimpleConsumer consumer = _partitions.getConsumer(_partition);
   int hostPartition = _partitions.getHostPartition(_partition);
   LOG.info("Fetching from Kafka: " + consumer.host() + ":" +
hostPartition);
   ByteBufferMessageSet msgs = consumer.fetch(
     new FetchRequest(
       _spoutConfig.topic,
       hostPartition,
       _emittedToOffset,
       _spoutConfig.fetchSizeBytes));
   Log.info("########################################################");
   LOG.info("Fetched " + msgs.sizeInBytes() + " bytes of messages from
Kafka: " + consumer.host() + ":" + hostPartition);
   Log.info("########################################################");
   for(MessageAndOffset msg: msgs) {
    _pending.add(actualOffset(msg));
    _waitingToEmit.add(msg);
    _emittedToOffset = msg.offset();
   }
  }

could you provide some assistance please

-- 
Rgds
Ray

Reply via email to