[jira] [Commented] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled

2016-03-08 Thread Michael Schiff (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15185889#comment-15185889
 ] 

Michael Schiff commented on KAFKA-3323:
---

[~gwenshap] [~ijuma] https://github.com/tubemogul/kafka-logsplitter
Thank you guys!

> Negative offsets in Log Segment Index files due to Integer overflow when 
> compaction is enabled 
> ---
>
> Key: KAFKA-3323
> URL: https://issues.apache.org/jira/browse/KAFKA-3323
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.1.1, 0.8.2.1
>Reporter: Michael Schiff
>Assignee: Jay Kreps
> Attachments: index_dump.txt, log_dump.txt
>
>
> Once the Offset Index has negative offset values, the binary search for 
> position lookup is broken. This causes consumers of compact topics to skip 
> large offset intervals when bootstrapping.  This has serious implications for 
> consumers of compact topics.
> {code}
>  /**
>* Append an entry for the given offset/location pair to the index. This 
> entry must have a larger offset than all subsequent entries.
>*/
>   def append(offset: Long, position: Int) {
> inLock(lock) {
>   require(!isFull, "Attempt to append to a full index (size = " + size + 
> ").")
>   if (size.get == 0 || offset > lastOffset) {
> debug("Adding index entry %d => %d to %s.".format(offset, position, 
> file.getName))
> this.mmap.putInt((offset - baseOffset).toInt)
> this.mmap.putInt(position)
> this.size.incrementAndGet()
> this.lastOffset = offset
> require(entries * 8 == mmap.position, entries + " entries but file 
> position in index is " + mmap.position + ".")
>   } else {
> throw new InvalidOffsetException("Attempt to append an offset (%d) to 
> position %d no larger than the last offset appended (%d) to %s."
>   .format(offset, entries, lastOffset, file.getAbsolutePath))
>   }
> }
>   }
> {code}
> OffsetIndex.append assumes that (offset - baseOffset) can be represented as 
> an integer without overflow. If the LogSegment is from a compacted topic, 
> this assumption may not be valid. The result is a quiet integer overflow, 
> which stores a negative value into the index.
> I believe that the issue is caused by the LogCleaner. Specifically, by the 
> groupings produced by 
> {code}
> /**
>* Group the segments in a log into groups totaling less than a given size. 
> the size is enforced separately for the log data and the index data.
>* We collect a group of such segments together into a single
>* destination segment. This prevents segment sizes from shrinking too much.
>*
>* @param segments The log segments to group
>* @param maxSize the maximum size in bytes for the total of all log data 
> in a group
>* @param maxIndexSize the maximum size in bytes for the total of all index 
> data in a group
>*
>* @return A list of grouped segments
>*/
>   private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], 
> maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]]
> {code}
> Since this method is only concerned with grouping by size, without taking 
> baseOffset and groupMaxOffset into account, it will produce groups that when 
> cleaned into a single segment, have offsets that overflow. This is more 
> likely for topics with low key cardinality, but high update volume, as you 
> could wind up with very few cleaned records, but with very high offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled

2016-03-08 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15185767#comment-15185767
 ] 

Ismael Juma commented on KAFKA-3323:


Yes, thank you Michael. :)

> Negative offsets in Log Segment Index files due to Integer overflow when 
> compaction is enabled 
> ---
>
> Key: KAFKA-3323
> URL: https://issues.apache.org/jira/browse/KAFKA-3323
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.1.1, 0.8.2.1
>Reporter: Michael Schiff
>Assignee: Jay Kreps
> Attachments: index_dump.txt, log_dump.txt
>
>
> Once the Offset Index has negative offset values, the binary search for 
> position lookup is broken. This causes consumers of compact topics to skip 
> large offset intervals when bootstrapping.  This has serious implications for 
> consumers of compact topics.
> {code}
>  /**
>* Append an entry for the given offset/location pair to the index. This 
> entry must have a larger offset than all subsequent entries.
>*/
>   def append(offset: Long, position: Int) {
> inLock(lock) {
>   require(!isFull, "Attempt to append to a full index (size = " + size + 
> ").")
>   if (size.get == 0 || offset > lastOffset) {
> debug("Adding index entry %d => %d to %s.".format(offset, position, 
> file.getName))
> this.mmap.putInt((offset - baseOffset).toInt)
> this.mmap.putInt(position)
> this.size.incrementAndGet()
> this.lastOffset = offset
> require(entries * 8 == mmap.position, entries + " entries but file 
> position in index is " + mmap.position + ".")
>   } else {
> throw new InvalidOffsetException("Attempt to append an offset (%d) to 
> position %d no larger than the last offset appended (%d) to %s."
>   .format(offset, entries, lastOffset, file.getAbsolutePath))
>   }
> }
>   }
> {code}
> OffsetIndex.append assumes that (offset - baseOffset) can be represented as 
> an integer without overflow. If the LogSegment is from a compacted topic, 
> this assumption may not be valid. The result is a quiet integer overflow, 
> which stores a negative value into the index.
> I believe that the issue is caused by the LogCleaner. Specifically, by the 
> groupings produced by 
> {code}
> /**
>* Group the segments in a log into groups totaling less than a given size. 
> the size is enforced separately for the log data and the index data.
>* We collect a group of such segments together into a single
>* destination segment. This prevents segment sizes from shrinking too much.
>*
>* @param segments The log segments to group
>* @param maxSize the maximum size in bytes for the total of all log data 
> in a group
>* @param maxIndexSize the maximum size in bytes for the total of all index 
> data in a group
>*
>* @return A list of grouped segments
>*/
>   private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], 
> maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]]
> {code}
> Since this method is only concerned with grouping by size, without taking 
> baseOffset and groupMaxOffset into account, it will produce groups that when 
> cleaned into a single segment, have offsets that overflow. This is more 
> likely for topics with low key cardinality, but high update volume, as you 
> could wind up with very few cleaned records, but with very high offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled

2016-03-08 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15185766#comment-15185766
 ] 

Gwen Shapira commented on KAFKA-3323:
-

You rock! Thank you.

> Negative offsets in Log Segment Index files due to Integer overflow when 
> compaction is enabled 
> ---
>
> Key: KAFKA-3323
> URL: https://issues.apache.org/jira/browse/KAFKA-3323
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.1.1, 0.8.2.1
>Reporter: Michael Schiff
>Assignee: Jay Kreps
> Attachments: index_dump.txt, log_dump.txt
>
>
> Once the Offset Index has negative offset values, the binary search for 
> position lookup is broken. This causes consumers of compact topics to skip 
> large offset intervals when bootstrapping.  This has serious implications for 
> consumers of compact topics.
> {code}
>  /**
>* Append an entry for the given offset/location pair to the index. This 
> entry must have a larger offset than all subsequent entries.
>*/
>   def append(offset: Long, position: Int) {
> inLock(lock) {
>   require(!isFull, "Attempt to append to a full index (size = " + size + 
> ").")
>   if (size.get == 0 || offset > lastOffset) {
> debug("Adding index entry %d => %d to %s.".format(offset, position, 
> file.getName))
> this.mmap.putInt((offset - baseOffset).toInt)
> this.mmap.putInt(position)
> this.size.incrementAndGet()
> this.lastOffset = offset
> require(entries * 8 == mmap.position, entries + " entries but file 
> position in index is " + mmap.position + ".")
>   } else {
> throw new InvalidOffsetException("Attempt to append an offset (%d) to 
> position %d no larger than the last offset appended (%d) to %s."
>   .format(offset, entries, lastOffset, file.getAbsolutePath))
>   }
> }
>   }
> {code}
> OffsetIndex.append assumes that (offset - baseOffset) can be represented as 
> an integer without overflow. If the LogSegment is from a compacted topic, 
> this assumption may not be valid. The result is a quiet integer overflow, 
> which stores a negative value into the index.
> I believe that the issue is caused by the LogCleaner. Specifically, by the 
> groupings produced by 
> {code}
> /**
>* Group the segments in a log into groups totaling less than a given size. 
> the size is enforced separately for the log data and the index data.
>* We collect a group of such segments together into a single
>* destination segment. This prevents segment sizes from shrinking too much.
>*
>* @param segments The log segments to group
>* @param maxSize the maximum size in bytes for the total of all log data 
> in a group
>* @param maxIndexSize the maximum size in bytes for the total of all index 
> data in a group
>*
>* @return A list of grouped segments
>*/
>   private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], 
> maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]]
> {code}
> Since this method is only concerned with grouping by size, without taking 
> baseOffset and groupMaxOffset into account, it will produce groups that when 
> cleaned into a single segment, have offsets that overflow. This is more 
> likely for topics with low key cardinality, but high update volume, as you 
> could wind up with very few cleaned records, but with very high offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled

2016-03-08 Thread Gwen Shapira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15185624#comment-15185624
 ] 

Gwen Shapira commented on KAFKA-3323:
-

[~michael.schiff] Yeah, i was thinking that you can host this as a stand alone 
tool in Github, and link to it here (and in KAFKA-2024).
This way people with older versions will be able to find and use your solution, 
but we won't package something that is no longer useful with new releases.

Makes sense?

> Negative offsets in Log Segment Index files due to Integer overflow when 
> compaction is enabled 
> ---
>
> Key: KAFKA-3323
> URL: https://issues.apache.org/jira/browse/KAFKA-3323
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.1.1, 0.8.2.1
>Reporter: Michael Schiff
>Assignee: Jay Kreps
> Attachments: index_dump.txt, log_dump.txt
>
>
> Once the Offset Index has negative offset values, the binary search for 
> position lookup is broken. This causes consumers of compact topics to skip 
> large offset intervals when bootstrapping.  This has serious implications for 
> consumers of compact topics.
> {code}
>  /**
>* Append an entry for the given offset/location pair to the index. This 
> entry must have a larger offset than all subsequent entries.
>*/
>   def append(offset: Long, position: Int) {
> inLock(lock) {
>   require(!isFull, "Attempt to append to a full index (size = " + size + 
> ").")
>   if (size.get == 0 || offset > lastOffset) {
> debug("Adding index entry %d => %d to %s.".format(offset, position, 
> file.getName))
> this.mmap.putInt((offset - baseOffset).toInt)
> this.mmap.putInt(position)
> this.size.incrementAndGet()
> this.lastOffset = offset
> require(entries * 8 == mmap.position, entries + " entries but file 
> position in index is " + mmap.position + ".")
>   } else {
> throw new InvalidOffsetException("Attempt to append an offset (%d) to 
> position %d no larger than the last offset appended (%d) to %s."
>   .format(offset, entries, lastOffset, file.getAbsolutePath))
>   }
> }
>   }
> {code}
> OffsetIndex.append assumes that (offset - baseOffset) can be represented as 
> an integer without overflow. If the LogSegment is from a compacted topic, 
> this assumption may not be valid. The result is a quiet integer overflow, 
> which stores a negative value into the index.
> I believe that the issue is caused by the LogCleaner. Specifically, by the 
> groupings produced by 
> {code}
> /**
>* Group the segments in a log into groups totaling less than a given size. 
> the size is enforced separately for the log data and the index data.
>* We collect a group of such segments together into a single
>* destination segment. This prevents segment sizes from shrinking too much.
>*
>* @param segments The log segments to group
>* @param maxSize the maximum size in bytes for the total of all log data 
> in a group
>* @param maxIndexSize the maximum size in bytes for the total of all index 
> data in a group
>*
>* @return A list of grouped segments
>*/
>   private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], 
> maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]]
> {code}
> Since this method is only concerned with grouping by size, without taking 
> baseOffset and groupMaxOffset into account, it will produce groups that when 
> cleaned into a single segment, have offsets that overflow. This is more 
> likely for topics with low key cardinality, but high update volume, as you 
> could wind up with very few cleaned records, but with very high offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled

2016-03-08 Thread Michael Schiff (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15185540#comment-15185540
 ] 

Michael Schiff commented on KAFKA-3323:
---

[~ijuma] I went to submit the patch, but this didn't seem like the correct 
action, as this utility is currently just a standalone program, not a part of 
the Kafka distribution. Should I convert this utility into something that can 
be run like the other utilities (e.g. dump log segments)?  Once everyone has 
upgraded, this tool will not be useful anymore, so I don't know that it belongs 
in the distribution.

> Negative offsets in Log Segment Index files due to Integer overflow when 
> compaction is enabled 
> ---
>
> Key: KAFKA-3323
> URL: https://issues.apache.org/jira/browse/KAFKA-3323
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.1.1, 0.8.2.1
>Reporter: Michael Schiff
>Assignee: Jay Kreps
> Attachments: index_dump.txt, log_dump.txt
>
>
> Once the Offset Index has negative offset values, the binary search for 
> position lookup is broken. This causes consumers of compact topics to skip 
> large offset intervals when bootstrapping.  This has serious implications for 
> consumers of compact topics.
> {code}
>  /**
>* Append an entry for the given offset/location pair to the index. This 
> entry must have a larger offset than all subsequent entries.
>*/
>   def append(offset: Long, position: Int) {
> inLock(lock) {
>   require(!isFull, "Attempt to append to a full index (size = " + size + 
> ").")
>   if (size.get == 0 || offset > lastOffset) {
> debug("Adding index entry %d => %d to %s.".format(offset, position, 
> file.getName))
> this.mmap.putInt((offset - baseOffset).toInt)
> this.mmap.putInt(position)
> this.size.incrementAndGet()
> this.lastOffset = offset
> require(entries * 8 == mmap.position, entries + " entries but file 
> position in index is " + mmap.position + ".")
>   } else {
> throw new InvalidOffsetException("Attempt to append an offset (%d) to 
> position %d no larger than the last offset appended (%d) to %s."
>   .format(offset, entries, lastOffset, file.getAbsolutePath))
>   }
> }
>   }
> {code}
> OffsetIndex.append assumes that (offset - baseOffset) can be represented as 
> an integer without overflow. If the LogSegment is from a compacted topic, 
> this assumption may not be valid. The result is a quiet integer overflow, 
> which stores a negative value into the index.
> I believe that the issue is caused by the LogCleaner. Specifically, by the 
> groupings produced by 
> {code}
> /**
>* Group the segments in a log into groups totaling less than a given size. 
> the size is enforced separately for the log data and the index data.
>* We collect a group of such segments together into a single
>* destination segment. This prevents segment sizes from shrinking too much.
>*
>* @param segments The log segments to group
>* @param maxSize the maximum size in bytes for the total of all log data 
> in a group
>* @param maxIndexSize the maximum size in bytes for the total of all index 
> data in a group
>*
>* @return A list of grouped segments
>*/
>   private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], 
> maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]]
> {code}
> Since this method is only concerned with grouping by size, without taking 
> baseOffset and groupMaxOffset into account, it will produce groups that when 
> cleaned into a single segment, have offsets that overflow. This is more 
> likely for topics with low key cardinality, but high update volume, as you 
> could wind up with very few cleaned records, but with very high offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled

2016-03-08 Thread Michael Schiff (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15185488#comment-15185488
 ] 

Michael Schiff commented on KAFKA-3323:
---

[~ijuma] Yes, it is a duplicate.  The accepted solution was the first one I 
mentioned in my comment: group segments to clean, by segment size in bytes and 
by limiting the difference between the segment base offset and the max offset 
that goes into it.

This however does not address the already broken log segments that I (and 
others) may have in production.  For this I have written a utility to split log 
segments into legal chunks.  If this seems useful I can submit it as a patch.

> Negative offsets in Log Segment Index files due to Integer overflow when 
> compaction is enabled 
> ---
>
> Key: KAFKA-3323
> URL: https://issues.apache.org/jira/browse/KAFKA-3323
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.1.1
>Reporter: Michael Schiff
>Assignee: Jay Kreps
> Attachments: index_dump.txt, log_dump.txt
>
>
> Once the Offset Index has negative offset values, the binary search for 
> position lookup is broken. This causes consumers of compact topics to skip 
> large offset intervals when bootstrapping.  This has serious implications for 
> consumers of compact topics.
> {code}
>  /**
>* Append an entry for the given offset/location pair to the index. This 
> entry must have a larger offset than all subsequent entries.
>*/
>   def append(offset: Long, position: Int) {
> inLock(lock) {
>   require(!isFull, "Attempt to append to a full index (size = " + size + 
> ").")
>   if (size.get == 0 || offset > lastOffset) {
> debug("Adding index entry %d => %d to %s.".format(offset, position, 
> file.getName))
> this.mmap.putInt((offset - baseOffset).toInt)
> this.mmap.putInt(position)
> this.size.incrementAndGet()
> this.lastOffset = offset
> require(entries * 8 == mmap.position, entries + " entries but file 
> position in index is " + mmap.position + ".")
>   } else {
> throw new InvalidOffsetException("Attempt to append an offset (%d) to 
> position %d no larger than the last offset appended (%d) to %s."
>   .format(offset, entries, lastOffset, file.getAbsolutePath))
>   }
> }
>   }
> {code}
> OffsetIndex.append assumes that (offset - baseOffset) can be represented as 
> an integer without overflow. If the LogSegment is from a compacted topic, 
> this assumption may not be valid. The result is a quiet integer overflow, 
> which stores a negative value into the index.
> I believe that the issue is caused by the LogCleaner. Specifically, by the 
> groupings produced by 
> {code}
> /**
>* Group the segments in a log into groups totaling less than a given size. 
> the size is enforced separately for the log data and the index data.
>* We collect a group of such segments together into a single
>* destination segment. This prevents segment sizes from shrinking too much.
>*
>* @param segments The log segments to group
>* @param maxSize the maximum size in bytes for the total of all log data 
> in a group
>* @param maxIndexSize the maximum size in bytes for the total of all index 
> data in a group
>*
>* @return A list of grouped segments
>*/
>   private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], 
> maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]]
> {code}
> Since this method is only concerned with grouping by size, without taking 
> baseOffset and groupMaxOffset into account, it will produce groups that when 
> cleaned into a single segment, have offsets that overflow. This is more 
> likely for topics with low key cardinality, but high update volume, as you 
> could wind up with very few cleaned records, but with very high offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled

2016-03-08 Thread Ismael Juma (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15184801#comment-15184801
 ] 

Ismael Juma commented on KAFKA-3323:


[~michael.schiff], do you agree that this is a duplicate of KAFKA-2024.

> Negative offsets in Log Segment Index files due to Integer overflow when 
> compaction is enabled 
> ---
>
> Key: KAFKA-3323
> URL: https://issues.apache.org/jira/browse/KAFKA-3323
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.1.1
>Reporter: Michael Schiff
>Assignee: Jay Kreps
> Attachments: index_dump.txt, log_dump.txt
>
>
> Once the Offset Index has negative offset values, the binary search for 
> position lookup is broken. This causes consumers of compact topics to skip 
> large offset intervals when bootstrapping.  This has serious implications for 
> consumers of compact topics.
> {code}
>  /**
>* Append an entry for the given offset/location pair to the index. This 
> entry must have a larger offset than all subsequent entries.
>*/
>   def append(offset: Long, position: Int) {
> inLock(lock) {
>   require(!isFull, "Attempt to append to a full index (size = " + size + 
> ").")
>   if (size.get == 0 || offset > lastOffset) {
> debug("Adding index entry %d => %d to %s.".format(offset, position, 
> file.getName))
> this.mmap.putInt((offset - baseOffset).toInt)
> this.mmap.putInt(position)
> this.size.incrementAndGet()
> this.lastOffset = offset
> require(entries * 8 == mmap.position, entries + " entries but file 
> position in index is " + mmap.position + ".")
>   } else {
> throw new InvalidOffsetException("Attempt to append an offset (%d) to 
> position %d no larger than the last offset appended (%d) to %s."
>   .format(offset, entries, lastOffset, file.getAbsolutePath))
>   }
> }
>   }
> {code}
> OffsetIndex.append assumes that (offset - baseOffset) can be represented as 
> an integer without overflow. If the LogSegment is from a compacted topic, 
> this assumption may not be valid. The result is a quiet integer overflow, 
> which stores a negative value into the index.
> I believe that the issue is caused by the LogCleaner. Specifically, by the 
> groupings produced by 
> {code}
> /**
>* Group the segments in a log into groups totaling less than a given size. 
> the size is enforced separately for the log data and the index data.
>* We collect a group of such segments together into a single
>* destination segment. This prevents segment sizes from shrinking too much.
>*
>* @param segments The log segments to group
>* @param maxSize the maximum size in bytes for the total of all log data 
> in a group
>* @param maxIndexSize the maximum size in bytes for the total of all index 
> data in a group
>*
>* @return A list of grouped segments
>*/
>   private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], 
> maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]]
> {code}
> Since this method is only concerned with grouping by size, without taking 
> baseOffset and groupMaxOffset into account, it will produce groups that when 
> cleaned into a single segment, have offsets that overflow. This is more 
> likely for topics with low key cardinality, but high update volume, as you 
> could wind up with very few cleaned records, but with very high offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled

2016-03-03 Thread Konstantin Zadorozhny (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15178479#comment-15178479
 ] 

Konstantin Zadorozhny commented on KAFKA-3323:
--

Is this issue is a duplicate of KAFKA-2024?

> Negative offsets in Log Segment Index files due to Integer overflow when 
> compaction is enabled 
> ---
>
> Key: KAFKA-3323
> URL: https://issues.apache.org/jira/browse/KAFKA-3323
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.1.1
>Reporter: Michael Schiff
>Assignee: Jay Kreps
> Attachments: index_dump.txt, log_dump.txt
>
>
> Once the Offset Index has negative offset values, the binary search for 
> position lookup is broken. This causes consumers of compact topics to skip 
> large offset intervals when bootstrapping.  This has serious implications for 
> consumers of compact topics.
> {code}
>  /**
>* Append an entry for the given offset/location pair to the index. This 
> entry must have a larger offset than all subsequent entries.
>*/
>   def append(offset: Long, position: Int) {
> inLock(lock) {
>   require(!isFull, "Attempt to append to a full index (size = " + size + 
> ").")
>   if (size.get == 0 || offset > lastOffset) {
> debug("Adding index entry %d => %d to %s.".format(offset, position, 
> file.getName))
> this.mmap.putInt((offset - baseOffset).toInt)
> this.mmap.putInt(position)
> this.size.incrementAndGet()
> this.lastOffset = offset
> require(entries * 8 == mmap.position, entries + " entries but file 
> position in index is " + mmap.position + ".")
>   } else {
> throw new InvalidOffsetException("Attempt to append an offset (%d) to 
> position %d no larger than the last offset appended (%d) to %s."
>   .format(offset, entries, lastOffset, file.getAbsolutePath))
>   }
> }
>   }
> {code}
> OffsetIndex.append assumes that (offset - baseOffset) can be represented as 
> an integer without overflow. If the LogSegment is from a compacted topic, 
> this assumption may not be valid. The result is a quiet integer overflow, 
> which stores a negative value into the index.
> I believe that the issue is caused by the LogCleaner. Specifically, by the 
> groupings produced by 
> {code}
> /**
>* Group the segments in a log into groups totaling less than a given size. 
> the size is enforced separately for the log data and the index data.
>* We collect a group of such segments together into a single
>* destination segment. This prevents segment sizes from shrinking too much.
>*
>* @param segments The log segments to group
>* @param maxSize the maximum size in bytes for the total of all log data 
> in a group
>* @param maxIndexSize the maximum size in bytes for the total of all index 
> data in a group
>*
>* @return A list of grouped segments
>*/
>   private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], 
> maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]]
> {code}
> Since this method is only concerned with grouping by size, without taking 
> baseOffset and groupMaxOffset into account, it will produce groups that when 
> cleaned into a single segment, have offsets that overflow. This is more 
> likely for topics with low key cardinality, but high update volume, as you 
> could wind up with very few cleaned records, but with very high offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3323) Negative offsets in Log Segment Index files due to Integer overflow when compaction is enabled

2016-03-03 Thread Michael Schiff (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15178187#comment-15178187
 ] 

Michael Schiff commented on KAFKA-3323:
---

Regarding solutions:

One alternative is to group segments to clean, not according to segment size in 
bytes, but by limiting the difference between the segment base offset and the 
max offset that goes into it.

I see two issues with this. The first is that we could wind up creating 
segments that are actually pretty small, size wise.  The second issue is that, 
for people like me, who already have production topics with log segments with 
(offsets - baseOffset) > Int.MAX_VALUE, changing the grouping does not fix our 
issue. If you already have such log segments, they cannot be properly indexed 
as is.

The second solution is to make indexes use long valued keys.  This would mean 
changing the index format, forcing you to rebuild all of your existing indices. 
However, you can then continue grouping segments to clean the same way, 
ensuring good sized segments.  It also means that people with segments that are 
already incorrect are not forced to drop their data.

> Negative offsets in Log Segment Index files due to Integer overflow when 
> compaction is enabled 
> ---
>
> Key: KAFKA-3323
> URL: https://issues.apache.org/jira/browse/KAFKA-3323
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.8.1.1
>Reporter: Michael Schiff
>Assignee: Jay Kreps
> Attachments: index_dump.txt, log_dump.txt
>
>
> {code}
>  /**
>* Append an entry for the given offset/location pair to the index. This 
> entry must have a larger offset than all subsequent entries.
>*/
>   def append(offset: Long, position: Int) {
> inLock(lock) {
>   require(!isFull, "Attempt to append to a full index (size = " + size + 
> ").")
>   if (size.get == 0 || offset > lastOffset) {
> debug("Adding index entry %d => %d to %s.".format(offset, position, 
> file.getName))
> this.mmap.putInt((offset - baseOffset).toInt)
> this.mmap.putInt(position)
> this.size.incrementAndGet()
> this.lastOffset = offset
> require(entries * 8 == mmap.position, entries + " entries but file 
> position in index is " + mmap.position + ".")
>   } else {
> throw new InvalidOffsetException("Attempt to append an offset (%d) to 
> position %d no larger than the last offset appended (%d) to %s."
>   .format(offset, entries, lastOffset, file.getAbsolutePath))
>   }
> }
>   }
> {code}
> OffsetIndex.append assumes that (offset - baseOffset) can be represented as 
> an integer without overflow. If the LogSegment is from a compacted topic, 
> this assumption may not be valid. The result is a quiet integer overflow, 
> which stores a negative value into the index. This breaks the binary search 
> used to lookup offset positions -> large intervals of offsets are skipped by 
> consumers who are bootstrapping themselves on the topic.
> I believe that the issue is caused by the LogCleaner. Specifically, by the 
> groupings produced by 
> {code}
> /**
>* Group the segments in a log into groups totaling less than a given size. 
> the size is enforced separately for the log data and the index data.
>* We collect a group of such segments together into a single
>* destination segment. This prevents segment sizes from shrinking too much.
>*
>* @param segments The log segments to group
>* @param maxSize the maximum size in bytes for the total of all log data 
> in a group
>* @param maxIndexSize the maximum size in bytes for the total of all index 
> data in a group
>*
>* @return A list of grouped segments
>*/
>   private[log] def groupSegmentsBySize(segments: Iterable[LogSegment], 
> maxSize: Int, maxIndexSize: Int): List[Seq[LogSegment]]
> {code}
> Since this method is only concerned with grouping by size, without taking 
> baseOffset and groupMaxOffset into account, it will produce groups that when 
> cleaned into a single segment, have offsets that overflow. This is more 
> likely for topics with low key cardinality, but high update volume, as you 
> could wind up with very few cleaned records, but with very high offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)