Interesting. What I can't explain though is why it works just fine when printing the string this way:
for(Message message: stream) { ByteBuffer bb = message.payload().duplicate(); ByteBuffer bb2 = message.payload().duplicate(); byte[] bytes = new byte[bb2.remaining()]; bb2.get(bytes); System.out.println("Message received string: " + new String(bytes)); consumerConnector.commitOffsets(); } do you have a link to your patch Jay ? On Tue, Apr 3, 2012 at 4:32 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > This is our bug, we were taking the system default encoding (d'oh). I > have a patch for it I was adding to 0.8, we can probably backport it > for older releases too pretty easily. > > -Jay > > 2012/4/3 Patricio Echagüe <patric...@gmail.com>: > > Hi, I noticed that String Serializer somehow doesn't do well encoding > > special characters such as "ü". > > > > I tried to create a ByteBufferEncoder this way: > > > > import java.nio.ByteBuffer; > > > > import kafka.message.Message; > > > > import kafka.serializer.Encoder; > > > > > > public class ByteBufferEncoder implements Encoder<ByteBuffer> { > > > > public Message toMessage(ByteBuffer buffer) { > > > > return new Message(buffer); > > > > } > > > > } > > > > > > but I get this exception [1] > > > > > > Could you guys please advice on how to fix my encoding issue? > > > > Thanks > > > > > > [1] > > > > Exception in thread "main" java.lang.RuntimeException: Invalid magic > byte 34 > > > > at kafka.message.Message.compressionCodec(Message.scala:144) > > > > at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter( > > ByteBufferMessageSet.scala:112) > > > > at kafka.message.ByteBufferMessageSet$$anon$1.makeNext( > > ByteBufferMessageSet.scala:138) > > > > at kafka.message.ByteBufferMessageSet$$anon$1.makeNext( > > ByteBufferMessageSet.scala:82) > > > > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > > > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > > > > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > > > > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) > > > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > > > > at kafka.message.MessageSet.foreach(MessageSet.scala:87) > > > > at > > > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$verifyMessageSize( > > SyncProducer.scala:139) > > > > at kafka.producer.SyncProducer.send(SyncProducer.scala:113) > > > > at kafka.producer.ProducerPool$$anonfun$send$1.apply$mcVI$sp( > > ProducerPool.scala:116) > > > > at > kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102) > > > > at > kafka.producer.ProducerPool$$anonfun$send$1.apply(ProducerPool.scala:102) > > > > at scala.collection.mutable.ResizableArray$class.foreach( > > ResizableArray.scala:57) > > > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > > > > at kafka.producer.ProducerPool.send(ProducerPool.scala:102) > > > > at kafka.producer.Producer.zkSend(Producer.scala:143) > > > > at kafka.producer.Producer.send(Producer.scala:105) > > > > at kafka.javaapi.producer.Producer.send(Producer.scala:104) > > > > at > com.lucid.dao.queue.impl.kafka.KafkaProducer.send(KafkaProducer.java:63) >