We use following method to deserialize the message consumed using Simple Consumer -
DatumReader<T> datumReader = new SpecificDatumReader<>(className); ByteArrayInputStream inputStream = new ByteArrayInputStream(byteArray); Decoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null); T object = datumReader.read(null, decoder); IOUtils.closeQuietly(inputStream); It does not seem to handle header bytes. When I remove those 26 bytes, deserialization work fine. Please note, we are using Simple consumer API, not high level consumer. On Mon, Jan 30, 2017 at 10:57 PM, Ewen Cheslack-Postava <e...@confluent.io> wrote: > What are the 26 additional bytes? That sounds like a header that a > decoder/deserializer is handling with the high level consumer. What class > are you using to deserialize the messages with the high level consumer? > > -Ewen > > On Fri, Jan 27, 2017 at 10:19 AM, Anjani Gupta < > anjani.gu...@salesforce.com> > wrote: > > > I am using kafka_2.10-0.8.1 and trying to fetch messages using Simple > > Consumer API. I notice that byte array for message retrieved has 26 junk > > bytes appended at the beginning of original message sent by producer. > Any > > idea what's going on here? This works fine with High level consumer. > > > > This is how my code looks like - > > > > TopicAndPartition topicAndPartition = new TopicAndPartition(topic, > > partition); > > OffsetFetchResponse offsetFetchResponse = > consumer.fetchOffsets(new > > OffsetFetchRequest(GROUP_ID, > > Collections.singletonList(topicAndPartition), (short) 0, > > 0, > > CLIENT_ID)); > > > > //Fetch messages from Kafka. > > FetchRequest req = new FetchRequestBuilder() > > .clientId(CLIENT_ID) > > .addFetch(topic, partition, readOffset, 1000) > > .build(); > > FetchResponse fetchResponse = consumer.fetch(req); > > for (MessageAndOffset messageAndOffset : > > fetchResponse.messageSet(topicName, partition)) { > > byte[] message = messageAndOffset.message(). > payload().array(); > > > > } > > > > Here message has additional 26 bytes appended to beginning of array. > > > > > > Thanks, > > Anjani > > >