Filed a JIRA for this. https://issues.apache.org/jira/browse/KAFKA-5010
(also forwarded to the dev list.) On Tue, Apr 4, 2017 at 7:16 PM, Shuai Lin <[email protected]> wrote: > After digging a bit into the source code of LogCleaner class, i realized > maybe i should not try to fix the crash by increasing the > log.cleaner.io.buffer.size: the cleaner would automatically grow the buffer > so that it can hold at least one record. > > Also for the last error i pasted above, i.e. "largest offset in message > set can not be safely converted to relative offset", it should be due to > the fact that i set a too large io buffer size (256MB) which can hold too > many records, among which the value of (largest offset - base offset of the > new cleaned segment) is larger than Integer.MAX_VALUE, so it failed the > assertion > <https://github.com/apache/kafka/blob/0.10.2.0/core/src/main/scala/kafka/log/LogSegment.scala#L86> > in LogSegment.append. > > From the source code, I see the workflow of cleaning a segment is like > this: > > - read as much records as possible into the readBuffer > - construct a MemoryRecords object and filter the records using the > OffsetMap > - write the filtered records into the writeBuffer > - repeat, until the whole segment is processed > > So the question is clear: Given that the size of readBuffer and > writeBuffer are exactly the same > <https://github.com/apache/kafka/blob/0.10.2.0/core/src/main/scala/kafka/log/LogCleaner.scala#L320-L324> > (half of log.cleaner.io.buffer.size), why would the cleaner throw a > BufferOverflowException when writing the filtered records into the > writeBuffer? That should never happen because the size of the filtered > records should be no greater that the size of the readBuffer, thus no > greater than the size of the writeBuffer. > > [2017-03-24 10:41:07,372] ERROR [kafka-log-cleaner-thread-0], Error due to >> (kafka.log.LogCleaner) >> java.nio.BufferOverflowException >> at java.nio.HeapByteBuffer.put(HeapByteBuffer.java:206) >> at org.apache.kafka.common.record.LogEntry.writeTo(LogEntry.jav >> a:98) >> at org.apache.kafka.common.record.MemoryRecords.filterTo(Memory >> Records.java:158) >> at org.apache.kafka.common.record.MemoryRecords.filterTo(Memory >> Records.java:111) >> at kafka.log.Cleaner.cleanInto(LogCleaner.scala:468) > > > Also worth mentioning that we are still using log.message.format.version = > 0.9.0.0 because there are still some old consumers. Could this be related > to the problem? > > > On Sun, Apr 2, 2017 at 11:46 PM, Shuai Lin <[email protected]> wrote: > >> Hi, >> >> Recently we updated from kafka 0.10.0.1 to 0.10.2.1, and found the log >> cleaner thread crashed with this error: >> >> [2017-03-24 10:41:03,926] INFO [kafka-log-cleaner-thread-0], Starting >>> (kafka.log.LogCleaner) >>> [2017-03-24 10:41:04,177] INFO Cleaner 0: Beginning cleaning of log >>> app-topic-20170317-20. (kafka.log.LogCleaner) >>> [2017-03-24 10:41:04,177] INFO Cleaner 0: Building offset map for >>> app-topic-20170317-20... (kafka.log.LogCleaner) >>> [2017-03-24 10:41:04,387] INFO Cleaner 0: Building offset map for log >>> app-topic-20170317-20 for 1 segments in offset range [9737795, 9887707). >>> (kafka.log.LogCleaner) >>> [2017-03-24 10:41:07,101] INFO Cleaner 0: Offset map for log >>> app-topic-20170317-20 complete. (kafka.log.LogCleaner) >>> [2017-03-24 10:41:07,106] INFO Cleaner 0: Cleaning log >>> app-topic-20170317-20 (cleaning prior to Fri Mar 24 10:36:06 GMT 2017, >>> discarding tombstones prior to Thu Mar 23 10:18:02 GMT 2017)... >>> (kafka.log.LogCleaner) >>> [2017-03-24 10:41:07,110] INFO Cleaner 0: Cleaning segment 0 in log >>> app-topic-20170317-20 (largest timestamp Fri Mar 24 09:58:25 GMT 2017) into >>> 0, retaining deletes. (kafka.log.LogCleaner) >>> [2017-03-24 10:41:07,372] ERROR [kafka-log-cleaner-thread-0], Error due >>> to (kafka.log.LogCleaner) >>> java.nio.BufferOverflowException >>> at java.nio.HeapByteBuffer.put(HeapByteBuffer.java:206) >>> at org.apache.kafka.common.record.LogEntry.writeTo(LogEntry.jav >>> a:98) >>> at org.apache.kafka.common.record.MemoryRecords.filterTo(Memory >>> Records.java:158) >>> at org.apache.kafka.common.record.MemoryRecords.filterTo(Memory >>> Records.java:111) >>> at kafka.log.Cleaner.cleanInto(LogCleaner.scala:468) >>> at kafka.log.Cleaner.$anonfun$cleanSegments$1(LogCleaner.scala: >>> 405) >>> at kafka.log.Cleaner.$anonfun$cleanSegments$1$adapted(LogCleane >>> r.scala:401) >>> at scala.collection.immutable.List.foreach(List.scala:378) >>> at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:401) >>> at kafka.log.Cleaner.$anonfun$clean$6(LogCleaner.scala:363) >>> at kafka.log.Cleaner.$anonfun$clean$6$adapted(LogCleaner.scala: >>> 362) >>> at scala.collection.immutable.List.foreach(List.scala:378) >>> at kafka.log.Cleaner.clean(LogCleaner.scala:362) >>> at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.s >>> cala:241) >>> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:2 >>> 20) >>> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala: >>> 63) >>> [2017-03-24 10:41:07,375] INFO [kafka-log-cleaner-thread-0], Stopped >>> (kafka.log.LogCleaner) >> >> >> >> For completeness, here are some other related settings (they are the same >> before & after the upgrade to 0.10.2): >> >> - log.cleaner.enable = 'true' >> - log.cleaner.min.cleanable.ratio = '0.1' >> - log.cleaner.threads = '1' >> - log.cleaner.io.buffer.load.factor = '0.98' >> - log.roll.hours = '24' >> - log.cleaner.dedupe.buffer.size = 2GB >> - log.segment.bytes = 512MB >> - message.max.bytes = 10MB >> >> >> Before the upgrade, we used the default value of >> *log.cleaner.io.buffer.size*, i.e. 512K. Since the above error said >> "buffer overflow", I updated it to 2M, but the log cleaner thread crashed >> immediately (with the same error) when the broker restarted. >> >> Then I tried 10M, then 128M, but all with no luck. Finally when I used >> 256MB, the above error didn't happen anymore. >> >> However, after a few days, the log cleaner thread crashed again, with >> another error: >> >> [2017-03-27 12:33:51,323] INFO Cleaner 0: Cleaning segment 8590943933 >>> <0859%20094%203933> in log __consumer_offsets-49 (largest timestamp Mon >>> Mar 27 12:27:12 GMT 2017) into 6443803053, retaining deletes. >>> (kafka.log.LogCleaner) >>> [2017-03-27 12:33:51,377] ERROR [kafka-log-cleaner-thread-0], Error due >>> to (kafka.log.LogCleaner) >>> java.lang.IllegalArgumentException: requirement failed: largest offset >>> in message set can not be safely converted to relative offset. >>> at scala.Predef$.require(Predef.scala:277) >>> at kafka.log.LogSegment.append(LogSegment.scala:109) >>> at kafka.log.Cleaner.cleanInto(LogCleaner.scala:482) >>> at kafka.log.Cleaner.$anonfun$cleanSegments$1(LogCleaner.scala: >>> 405) >>> at kafka.log.Cleaner.$anonfun$cleanSegments$1$adapted(LogCleane >>> r.scala:401) >>> at scala.collection.immutable.List.foreach(List.scala:378) >>> at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:401) >>> at kafka.log.Cleaner.$anonfun$clean$6(LogCleaner.scala:363) >>> at kafka.log.Cleaner.$anonfun$clean$6$adapted(LogCleaner.scala: >>> 362) >>> at scala.collection.immutable.List.foreach(List.scala:378) >>> at kafka.log.Cleaner.clean(LogCleaner.scala:362) >>> at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.s >>> cala:241) >>> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:2 >>> 20) >>> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala: >>> 63) >>> [2017-03-27 12:33:51,377] INFO [kafka-log-cleaner-thread-0], Stopped >>> (kafka.log.LogCleaner) >> >> >> Can someone explain what might have caused the above errors, or suggest a >> possible fix? Thanks! >> >> Regards, >> Shuai >> > >
