just to confirm - is this project an integration of kafka with storm? The page doesnt provide much details.
Thanks, Navneet Sharma On Mon, May 21, 2012 at 8:21 PM, Jun Rao <jun...@gmail.com> wrote: > Seems like a wrong compression codec is being used. Currently, only codec 0 > (gzip) and 1 (snappy) are supported. > > Thanks, > > Jun > > On Mon, May 21, 2012 at 5:35 AM, Raymond Ng <raymond...@gmail.com> wrote: > > > Hi all > > > > I'm trying to use the KufkaSpout example from > > https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka but > > got > > the following error when trying to consume the messages > > > > 2012-05-21 12:52:27 worker [INFO] Worker > > 8ecb4136-8d07-4b08-ba99-faccbab6a28e for storm > > rolling-ippairs-kafka-17-1337599913 on > > 1e5d2733-ee7f-4519-91b5-e97f6105df31:6702 has finished loading > > 2012-05-21 12:52:28 log [INFO] > > ######################################################## > > 2012-05-21 12:52:28 KafkaSpout [INFO] Fetched 20971522 bytes of messages > > from Kafka: stormworker03:0 > > 2012-05-21 12:52:28 log [INFO] > > ######################################################## > > 2012-05-21 12:52:28 util [ERROR] Async loop died! > > java.lang.RuntimeException: Invalid magic byte 6 > > at kafka.message.Message.compressionCodec(Message.scala:144) > > at > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:112) > > at > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:138) > > at > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:82) > > at > > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > > at > > > > > kafka.javaapi.message.ByteBufferMessageSet$$anon$1.hasNext(ByteBufferMessageSet.scala:55) > > at > > > > > com.detica.treidan.storm.spouts.kafka.KafkaSpout$PartitionManager.fill(KafkaSpout.java:118) > > at > > > > > com.detica.treidan.storm.spouts.kafka.KafkaSpout$PartitionManager.next(KafkaSpout.java:92) > > at > > > > > com.detica.treidan.storm.spouts.kafka.KafkaSpout.nextTuple(KafkaSpout.java:215) > > at backtype.storm.daemon.task$fn__3465$fn__3514.invoke(task.clj:372) > > at > > > > > backtype.storm.daemon.task$mk_task$iter__3386__3390$fn__3391$fn__3392$fn__3393.invoke(task.clj:247) > > at clojure.lang.AFn.applyToHelper(AFn.java:159) > > at clojure.lang.AFn.applyTo(AFn.java:151) > > at clojure.core$apply.invoke(core.clj:540) > > at backtype.storm.util$async_loop$fn__487.invoke(util.clj:271) > > at clojure.lang.AFn.run(AFn.java:24) > > at java.lang.Thread.run(Thread.java:636) > > 2012-05-21 12:52:28 task [ERROR] > > > > I have changed the KafkaConfig.fetchSizeBytes to 1024*1024*20 due to the > > message size from my test data, and the necessary local environemental > > config, > > > > and I've left most of the KafkaSpout code unchanged apart from putting > more > > log messages in to try to debug the issues > > > > the following snippet is extracted from KafkaSpout, and the error is > > against the line for(MessageAndOffset msg: msgs) in the fill() > method > > > > > > //returns false if it's reached the end of current batch > > public EmitState next() { > > if(_waitingToEmit.isEmpty()) fill(); > > MessageAndOffset toEmit = _waitingToEmit.pollFirst(); > > if(toEmit==null) return EmitState.NO_EMITTED; > > List<Object> tup = > > > > > _spoutConfig.scheme.deserialize(Utils.toByteArray(toEmit.message().payload())); > > _collector.emit(tup, new KafkaMessageId(_partition, > > actualOffset(toEmit))); > > LOG.debug("Emitting the following message : > > "+toEmit.message().toString()); > > if(_waitingToEmit.size()>0) { > > return EmitState.EMITTED_MORE_LEFT; > > } else { > > return EmitState.EMITTED_END; > > } > > } > > > > private void fill() { > > SimpleConsumer consumer = _partitions.getConsumer(_partition); > > int hostPartition = _partitions.getHostPartition(_partition); > > LOG.info("Fetching from Kafka: " + consumer.host() + ":" + > > hostPartition); > > ByteBufferMessageSet msgs = consumer.fetch( > > new FetchRequest( > > _spoutConfig.topic, > > hostPartition, > > _emittedToOffset, > > _spoutConfig.fetchSizeBytes)); > > Log.info("########################################################"); > > LOG.info("Fetched " + msgs.sizeInBytes() + " bytes of messages from > > Kafka: " + consumer.host() + ":" + hostPartition); > > Log.info("########################################################"); > > for(MessageAndOffset msg: msgs) { > > _pending.add(actualOffset(msg)); > > _waitingToEmit.add(msg); > > _emittedToOffset = msg.offset(); > > } > > } > > > > could you provide some assistance please > > > > -- > > Rgds > > Ray > > >