Yeah I think I jumped to conclusions. The issue I was referring to was just assuming the detault encoding, which would not cause the issue you described.
-Jay 2012/4/3 Patricio Echagüe <patric...@gmail.com>: > Interesting. What I can't explain though is why it works just fine when > printing the string this way: > > for(Message message: stream) { > > ByteBuffer bb = message.payload().duplicate(); > > ByteBuffer bb2 = message.payload().duplicate(); > > byte[] bytes = new byte[bb2.remaining()]; > > bb2.get(bytes); > > System.out.println("Message received string: " + new String(bytes)); > > consumerConnector.commitOffsets(); > > } > do you have a link to your patch Jay ? > On Tue, Apr 3, 2012 at 4:32 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > >> This is our bug, we were taking the system default encoding (d'oh). I >> have a patch for it I was adding to 0.8, we can probably backport it >> for older releases too pretty easily. >> >> -Jay >> >> 2012/4/3 Patricio Echagüe <patric...@gmail.com>: >> > Hi, I noticed that String Serializer somehow doesn't do well encoding >> > special characters such as "ü". >> > >> > I tried to create a ByteBufferEncoder this way: >> > >> > import java.nio.ByteBuffer; >> > >> > import kafka.message.Message; >> > >> > import kafka.serializer.Encoder; >> > >> > >> > public class ByteBufferEncoder implements Encoder<ByteBuffer> { >> > >> > public Message toMessage(ByteBuffer buffer) { >> > >> > return new Message(buffer); >> > >> > } >> > >> > } >> > >> > >> > but I get this exception [1] >> > >> > >> > Could you guys please advice on how to fix my encoding issue? >> > >> > Thanks >> > >> > >> > [1] >> > >> > Exception in thread "main" java.lang.RuntimeException: Invalid magic >> byte 34 >> > >> > at kafka.message.Message.compressionCodec(Message.scala:144) >> > >> > at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter( >> > ByteBufferMessageSet.scala:112) >> > >> > at kafka.message.ByteBufferMessageSet$$anon$1.makeNext( >> > ByteBufferMessageSet.scala:138) >> > >> > at kafka.message.ByteBufferMessageSet$$anon$1.makeNext( >> > ByteBufferMessageSet.scala:82) >> > >> > at >> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) >> > >> > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) >> > >> > at scala.collection.Iterator$class.foreach(Iterator.scala:631) >> > >> > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) >> > >> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) >> > >> > at kafka.message.MessageSet.foreach(MessageSet.scala:87) >> > >> > at >> > >> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$verifyMessageSize( >> > SyncProducer.scala:139) >> > >> > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) >> > >> > at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp( >> > ProducerPool.scala:116) >> > >> > at >> kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102) >> > >> > at >> kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102) >> > >> > at scala.collection.mutable.ResizableArray$class.foreach( >> > ResizableArray.scala:57) >> > >> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) >> > >> > at kafka.producer.ProducerPool.send(ProducerPool.scala:102) >> > >> > at kafka.producer.Producer.zkSend(Producer.scala:143) >> > >> > at kafka.producer.Producer.send(Producer.scala:105) >> > >> > at kafka.javaapi.producer.Producer.send(Producer.scala:104) >> > >> > at >> com.lucid.dao.queue.impl.kafka.KafkaProducer.send(KafkaProducer.java:63) >>