Hi, We've recently started encountering the following exceptions, which appears to happen a lot on the Consumer side - we're using the old consumer (ZK based) and not the new (Camel based unfortunately).
*The exception* kafka.common.KafkaException: Error processing data for partition acmetopic-7 offset 2204558563 at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:205) at scala.Option.foreach(Option.scala:257) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:169) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:166) at scala.collection.Iterator.foreach(Iterator.scala:929) at scala.collection.Iterator.foreach$(Iterator.scala:929) at scala.collection.AbstractIterator.foreach(Iterator.scala:1417) at scala.collection.IterableLike.foreach(IterableLike.scala:71) at scala.collection.IterableLike.foreach$(IterableLike.scala:70) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$5(AbstractFetcherThread.scala:166) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:166) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) Caused by: java.lang.IllegalArgumentException: Illegal batch type class org.apache.kafka.common.record.DefaultRecordBatch. The older message format classes only support conversion from class org.apache.kafka.common.record.AbstractLegacyRecordBatch, which is used for magic v0 and v1 at kafka.message.MessageAndOffset$.fromRecordBatch(MessageAndOffset.scala:30) at kafka.message.ByteBufferMessageSet.$anonfun$internalIterator$1(ByteBufferMessageSet.scala:169) at scala.collection.Iterator$$anon$10.next(Iterator.scala:448) at scala.collection.Iterator.toStream(Iterator.scala:1403) at scala.collection.Iterator.toStream$(Iterator.scala:1402) at scala.collection.AbstractIterator.toStream(Iterator.scala:1417) at scala.collection.TraversableOnce.toSeq(TraversableOnce.scala:298) at scala.collection.TraversableOnce.toSeq$(TraversableOnce.scala:298) at scala.collection.AbstractIterator.toSeq(Iterator.scala:1417) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:59) at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:87) at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:37) at kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:183) ... 15 common frames omitted We're using Kafka v1.1.0 both on server and client. Unfortunately I'm not up to speed with the exact protocol details between client and server, but I presume the client tells the server that he's an old client, and the server "remembers" that for the session created, and returns Record Batches using magic number v0 or v1. The exception stack trace shows something odd. It seems that the magic number sent was v2, thus MemoryRecords class creates an Iterator of DefaultRecordBatch, but a tidy bit later, it reaches a point where it tries to convert it to MessageAndOffset, and fails since from some odd reason it only able to do so for AbstractLegacyRecordBatch. This is the parts I saw: *PartitionTopicInfo.scala* /** * Enqueue a message set for processing. */ def enqueue(messages: ByteBufferMessageSet) { val size = messages.validBytes if(size > 0) { val next = messages.shallowIterator.toSeq.last.nextOffset *ByteBufferMessageSet* /** iterator over compressed messages without decompressing */ def shallowIterator: Iterator[MessageAndOffset] = internalIterator(isShallow = true) /** When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages. **/ private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = { if (isShallow) asRecords.batches.asScala.iterator.map(MessageAndOffset.fromRecordBatch) override def asRecords: MemoryRecords = MemoryRecords.readableRecords(buffer.duplicate()) *MemoryRecords* public static MemoryRecords readableRecords(ByteBuffer buffer) { return new MemoryRecords(buffer); } private final Iterable<MutableRecordBatch> batches = new Iterable<MutableRecordBatch>() { @Override public Iterator<MutableRecordBatch> iterator() { return new RecordBatchIterator<>(new ByteBufferLogInputStream(buffer.duplicate(), Integer.MAX_VALUE)); } }; *ByteBufferLogInputStream* public MutableRecordBatch nextBatch() throws IOException { int remaining = buffer.remaining(); if (remaining < LOG_OVERHEAD) return null; int recordSize = buffer.getInt(buffer.position() + SIZE_OFFSET); // V0 has the smallest overhead, stricter checking is done later if (recordSize < LegacyRecord.RECORD_OVERHEAD_V0) throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", LegacyRecord.RECORD_OVERHEAD_V0)); if (recordSize > maxMessageSize) throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxMessageSize)); int batchSize = recordSize + LOG_OVERHEAD; if (remaining < batchSize) return null; byte magic = buffer.get(buffer.position() + MAGIC_OFFSET); ByteBuffer batchSlice = buffer.slice(); batchSlice.limit(batchSize); buffer.position(buffer.position() + batchSize); \ if (magic < 0 || magic > RecordBatch.CURRENT_MAGIC_VALUE) throw new CorruptRecordException("Invalid magic found in record: " + magic); if (magic > RecordBatch.MAGIC_VALUE_V1) return new DefaultRecordBatch(batchSlice); else return new AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchSlice); } So the stream constructs DefaultRecordBatch which later fail since they try to map it o MessageAndOffset but can't do it for DefaultRecordBatch - can't figure out why.. To me it seems like a bug. I've posted a JIRA ticket <https://issues.apache.org/jira/browse/KAFKA-7769>, but not comments since Dec 26th, so I though I can ping here as well and get some pointers from the community. Our current work-around is to restart either the server or client, and it solves it. Thanks! Asaf Mesika