Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
Honghai, You are going to do a vote on this, right? I think the patch is ready to go so we are just waiting on the KIP adoption I think. -Jay On Thu, May 21, 2015 at 8:50 AM, Jun Rao j...@confluent.io wrote: Honghai, Could you update the wiki on the preallocated size? Instead of config.segmentSize - 2 * config.maxMessageSize, we just want to use config.segmentSize. Thanks, Jun On Mon, May 11, 2015 at 8:16 PM, Honghai Chen honghai.c...@microsoft.com wrote: All issues fixed, test cases added, performance result on windows attached. The patch can help improve the consume performance around 25%~50%. Thanks, Honghai Chen -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Wednesday, May 6, 2015 5:39 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system Thanks. Could you updated the wiki? Also, commented on the jira. Jun On Tue, May 5, 2015 at 12:48 AM, Honghai Chen honghai.c...@microsoft.com wrote: Use config.segmentSize should be ok. Previously add that one for make sure the file not exceed config.segmentSize, actually the function maybeRoll already make sure that. When try add test case for recover, blocked by the rename related issue, just open one jira at https://issues.apache.org/jira/browse/KAFKA-2170 , any recommendation for fix that issue? Thanks, Honghai Chen -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Tuesday, May 5, 2015 12:51 PM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system This seems similar to what's in https://issues.apache.org/jira/browse/KAFKA-1065. Also, could you explain why the preallocated size is set to config.segmentSize - 2 * config.maxMessageSize, instead of just config.segmentSize? Thanks, Jun On Mon, May 4, 2015 at 8:12 PM, Honghai Chen honghai.c...@microsoft.com wrote: Hi guys, I'm trying add test cases, but below case crashed at line segReopen.recover(64*1024)-- index.trimToValidSize() , any idea for it? Appreciate your help. The case assume kafka suddenly crash, and need recover the last segment. kafka.log.LogSegmentTest testCreateWithInitFileSizeCrash FAILED java.io.IOException: The requested operation cannot be performed on a file w ith a user-mapped section open at java.io.RandomAccessFile.setLength(Native Method) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(Offset I ndex.scala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s c ala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s c ala:272) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271) at kafka.log.LogSegment.recover(LogSegment.scala:199) at kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentT e st.scala:306) def recover(maxMessageSize: Int): Int = { index.truncate() index.resize(index.maxIndexSize) var validBytes = 0 var lastIndexEntry = 0 val iter = log.iterator(maxMessageSize) try { while(iter.hasNext) { val entry = iter.next entry.message.ensureValid() if(validBytes - lastIndexEntry indexIntervalBytes) { // we need to decompress the message, if required, to get the offset of the first uncompressed message val startOffset = entry.message.compressionCodec match { case NoCompressionCodec = entry.offset case _ = ByteBufferMessageSet.deepIterator(entry.message).next().offset } index.append(startOffset, validBytes) lastIndexEntry = validBytes } validBytes += MessageSet.entrySize(entry.message) } } catch { case e: InvalidMessageException = logger.warn(Found invalid messages in log segment %s at byte offset %d: %s..format(log.file.getAbsolutePath, validBytes, e.getMessage)) } val truncated = log.sizeInBytes - validBytes log.truncateTo(validBytes) index.trimToValidSize() truncated
Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
Honghai, Could you update the wiki on the preallocated size? Instead of config.segmentSize - 2 * config.maxMessageSize, we just want to use config.segmentSize. Thanks, Jun On Mon, May 11, 2015 at 8:16 PM, Honghai Chen honghai.c...@microsoft.com wrote: All issues fixed, test cases added, performance result on windows attached. The patch can help improve the consume performance around 25%~50%. Thanks, Honghai Chen -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Wednesday, May 6, 2015 5:39 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system Thanks. Could you updated the wiki? Also, commented on the jira. Jun On Tue, May 5, 2015 at 12:48 AM, Honghai Chen honghai.c...@microsoft.com wrote: Use config.segmentSize should be ok. Previously add that one for make sure the file not exceed config.segmentSize, actually the function maybeRoll already make sure that. When try add test case for recover, blocked by the rename related issue, just open one jira at https://issues.apache.org/jira/browse/KAFKA-2170 , any recommendation for fix that issue? Thanks, Honghai Chen -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Tuesday, May 5, 2015 12:51 PM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system This seems similar to what's in https://issues.apache.org/jira/browse/KAFKA-1065. Also, could you explain why the preallocated size is set to config.segmentSize - 2 * config.maxMessageSize, instead of just config.segmentSize? Thanks, Jun On Mon, May 4, 2015 at 8:12 PM, Honghai Chen honghai.c...@microsoft.com wrote: Hi guys, I'm trying add test cases, but below case crashed at line segReopen.recover(64*1024)-- index.trimToValidSize() , any idea for it? Appreciate your help. The case assume kafka suddenly crash, and need recover the last segment. kafka.log.LogSegmentTest testCreateWithInitFileSizeCrash FAILED java.io.IOException: The requested operation cannot be performed on a file w ith a user-mapped section open at java.io.RandomAccessFile.setLength(Native Method) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(Offset I ndex.scala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s c ala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s c ala:272) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271) at kafka.log.LogSegment.recover(LogSegment.scala:199) at kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentT e st.scala:306) def recover(maxMessageSize: Int): Int = { index.truncate() index.resize(index.maxIndexSize) var validBytes = 0 var lastIndexEntry = 0 val iter = log.iterator(maxMessageSize) try { while(iter.hasNext) { val entry = iter.next entry.message.ensureValid() if(validBytes - lastIndexEntry indexIntervalBytes) { // we need to decompress the message, if required, to get the offset of the first uncompressed message val startOffset = entry.message.compressionCodec match { case NoCompressionCodec = entry.offset case _ = ByteBufferMessageSet.deepIterator(entry.message).next().offset } index.append(startOffset, validBytes) lastIndexEntry = validBytes } validBytes += MessageSet.entrySize(entry.message) } } catch { case e: InvalidMessageException = logger.warn(Found invalid messages in log segment %s at byte offset %d: %s..format(log.file.getAbsolutePath, validBytes, e.getMessage)) } val truncated = log.sizeInBytes - validBytes log.truncateTo(validBytes) index.trimToValidSize() truncated } /* create a segment with pre allocate and Crash*/ @Test def testCreateWithInitFileSizeCrash() { val tempDir = TestUtils.tempDir() val seg = new LogSegment(tempDir, 40, 1, 1000, 0, SystemTime, false, 512*1024*1024, true) val ms = messages(50, hello, there) seg.append(50, ms) val ms2
Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
I actually feel many [VOTE] threads eventually become [DISCUSS] as people just put tons of comments there :) On 5/20/15, 11:52 AM, Jay Kreps jay.kr...@gmail.com wrote: Makes sense. Honghai, want to do a [VOTE] thread just so everything is official? -Jay On Wed, May 20, 2015 at 11:22 AM, Gwen Shapira gshap...@cloudera.com wrote: For simple discussions, I completely agree. For those threads where there are few votes, and then more discussion, and then KIP changes few times... separate thread will help keep things clear for both voters and anyone who will try to figure out what happened in the future. On Wed, May 20, 2015 at 9:02 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey all, How do people feel about these [DISCUSS] threads that basically accidentally turn into votes. Like basically everyone was +1 on this KIP already should we just skip the second vote? I find it kind of annoying to do both when this happens. -Jay On Mon, May 11, 2015 at 8:16 PM, Honghai Chen honghai.c...@microsoft.com wrote: All issues fixed, test cases added, performance result on windows attached. The patch can help improve the consume performance around 25%~50%. Thanks, Honghai Chen -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Wednesday, May 6, 2015 5:39 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system Thanks. Could you updated the wiki? Also, commented on the jira. Jun On Tue, May 5, 2015 at 12:48 AM, Honghai Chen honghai.c...@microsoft.com wrote: Use config.segmentSize should be ok. Previously add that one for make sure the file not exceed config.segmentSize, actually the function maybeRoll already make sure that. When try add test case for recover, blocked by the rename related issue, just open one jira at https://issues.apache.org/jira/browse/KAFKA-2170 , any recommendation for fix that issue? Thanks, Honghai Chen -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Tuesday, May 5, 2015 12:51 PM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system This seems similar to what's in https://issues.apache.org/jira/browse/KAFKA-1065. Also, could you explain why the preallocated size is set to config.segmentSize - 2 * config.maxMessageSize, instead of just config.segmentSize? Thanks, Jun On Mon, May 4, 2015 at 8:12 PM, Honghai Chen honghai.c...@microsoft.com wrote: Hi guys, I'm trying add test cases, but below case crashed at line segReopen.recover(64*1024)-- index.trimToValidSize() , any idea for it? Appreciate your help. The case assume kafka suddenly crash, and need recover the last segment. kafka.log.LogSegmentTest testCreateWithInitFileSizeCrash FAILED java.io.IOException: The requested operation cannot be performed on a file w ith a user-mapped section open at java.io.RandomAccessFile.setLength(Native Method) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(Offset I ndex.scala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s c ala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s c ala:272) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271) at kafka.log.LogSegment.recover(LogSegment.scala:199) at kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentT e st.scala:306) def recover(maxMessageSize: Int): Int = { index.truncate() index.resize(index.maxIndexSize) var validBytes = 0 var lastIndexEntry = 0 val iter = log.iterator(maxMessageSize) try { while(iter.hasNext) { val entry = iter.next entry.message.ensureValid() if(validBytes - lastIndexEntry indexIntervalBytes) { // we need to decompress the message, if required, to get the offset of the first uncompressed message val startOffset = entry.message.compressionCodec match { case NoCompressionCodec
Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
For simple discussions, I completely agree. For those threads where there are few votes, and then more discussion, and then KIP changes few times... separate thread will help keep things clear for both voters and anyone who will try to figure out what happened in the future. On Wed, May 20, 2015 at 9:02 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey all, How do people feel about these [DISCUSS] threads that basically accidentally turn into votes. Like basically everyone was +1 on this KIP already should we just skip the second vote? I find it kind of annoying to do both when this happens. -Jay On Mon, May 11, 2015 at 8:16 PM, Honghai Chen honghai.c...@microsoft.com wrote: All issues fixed, test cases added, performance result on windows attached. The patch can help improve the consume performance around 25%~50%. Thanks, Honghai Chen -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Wednesday, May 6, 2015 5:39 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system Thanks. Could you updated the wiki? Also, commented on the jira. Jun On Tue, May 5, 2015 at 12:48 AM, Honghai Chen honghai.c...@microsoft.com wrote: Use config.segmentSize should be ok. Previously add that one for make sure the file not exceed config.segmentSize, actually the function maybeRoll already make sure that. When try add test case for recover, blocked by the rename related issue, just open one jira at https://issues.apache.org/jira/browse/KAFKA-2170 , any recommendation for fix that issue? Thanks, Honghai Chen -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Tuesday, May 5, 2015 12:51 PM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system This seems similar to what's in https://issues.apache.org/jira/browse/KAFKA-1065. Also, could you explain why the preallocated size is set to config.segmentSize - 2 * config.maxMessageSize, instead of just config.segmentSize? Thanks, Jun On Mon, May 4, 2015 at 8:12 PM, Honghai Chen honghai.c...@microsoft.com wrote: Hi guys, I'm trying add test cases, but below case crashed at line segReopen.recover(64*1024)-- index.trimToValidSize() , any idea for it? Appreciate your help. The case assume kafka suddenly crash, and need recover the last segment. kafka.log.LogSegmentTest testCreateWithInitFileSizeCrash FAILED java.io.IOException: The requested operation cannot be performed on a file w ith a user-mapped section open at java.io.RandomAccessFile.setLength(Native Method) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(Offset I ndex.scala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s c ala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s c ala:272) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271) at kafka.log.LogSegment.recover(LogSegment.scala:199) at kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentT e st.scala:306) def recover(maxMessageSize: Int): Int = { index.truncate() index.resize(index.maxIndexSize) var validBytes = 0 var lastIndexEntry = 0 val iter = log.iterator(maxMessageSize) try { while(iter.hasNext) { val entry = iter.next entry.message.ensureValid() if(validBytes - lastIndexEntry indexIntervalBytes) { // we need to decompress the message, if required, to get the offset of the first uncompressed message val startOffset = entry.message.compressionCodec match { case NoCompressionCodec = entry.offset case _ = ByteBufferMessageSet.deepIterator(entry.message).next().offset } index.append(startOffset, validBytes) lastIndexEntry = validBytes } validBytes += MessageSet.entrySize(entry.message) } } catch { case e: InvalidMessageException = logger.warn(Found invalid messages in log segment %s at byte offset %d
Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
I think in general it is fine (even good) if a VOTE thread has a lot of discussion. The only issue I can think of is the one that Gwen made reference to: early votes - update KIP/whatever is being voted on due to more discussion - later votes as it then becomes unclear on what exactly each vote corresponds to. So basically if there is a non-trivial change to the material under vote due to discussion the vote should be canceled and a fresh vote thread should be started. Thanks, Joel On Wed, May 20, 2015 at 07:14:02PM +, Jiangjie Qin wrote: I actually feel many [VOTE] threads eventually become [DISCUSS] as people just put tons of comments there :) On 5/20/15, 11:52 AM, Jay Kreps jay.kr...@gmail.com wrote: Makes sense. Honghai, want to do a [VOTE] thread just so everything is official? -Jay On Wed, May 20, 2015 at 11:22 AM, Gwen Shapira gshap...@cloudera.com wrote: For simple discussions, I completely agree. For those threads where there are few votes, and then more discussion, and then KIP changes few times... separate thread will help keep things clear for both voters and anyone who will try to figure out what happened in the future. On Wed, May 20, 2015 at 9:02 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey all, How do people feel about these [DISCUSS] threads that basically accidentally turn into votes. Like basically everyone was +1 on this KIP already should we just skip the second vote? I find it kind of annoying to do both when this happens. -Jay On Mon, May 11, 2015 at 8:16 PM, Honghai Chen honghai.c...@microsoft.com wrote: All issues fixed, test cases added, performance result on windows attached. The patch can help improve the consume performance around 25%~50%. Thanks, Honghai Chen -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Wednesday, May 6, 2015 5:39 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system Thanks. Could you updated the wiki? Also, commented on the jira. Jun On Tue, May 5, 2015 at 12:48 AM, Honghai Chen honghai.c...@microsoft.com wrote: Use config.segmentSize should be ok. Previously add that one for make sure the file not exceed config.segmentSize, actually the function maybeRoll already make sure that. When try add test case for recover, blocked by the rename related issue, just open one jira at https://issues.apache.org/jira/browse/KAFKA-2170 , any recommendation for fix that issue? Thanks, Honghai Chen -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Tuesday, May 5, 2015 12:51 PM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system This seems similar to what's in https://issues.apache.org/jira/browse/KAFKA-1065. Also, could you explain why the preallocated size is set to config.segmentSize - 2 * config.maxMessageSize, instead of just config.segmentSize? Thanks, Jun On Mon, May 4, 2015 at 8:12 PM, Honghai Chen honghai.c...@microsoft.com wrote: Hi guys, I'm trying add test cases, but below case crashed at line segReopen.recover(64*1024)-- index.trimToValidSize() , any idea for it? Appreciate your help. The case assume kafka suddenly crash, and need recover the last segment. kafka.log.LogSegmentTest testCreateWithInitFileSizeCrash FAILED java.io.IOException: The requested operation cannot be performed on a file w ith a user-mapped section open at java.io.RandomAccessFile.setLength(Native Method) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(Offset I ndex.scala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s c ala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s c ala:272) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271) at kafka.log.LogSegment.recover(LogSegment.scala:199) at kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentT
Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
Makes sense. Honghai, want to do a [VOTE] thread just so everything is official? -Jay On Wed, May 20, 2015 at 11:22 AM, Gwen Shapira gshap...@cloudera.com wrote: For simple discussions, I completely agree. For those threads where there are few votes, and then more discussion, and then KIP changes few times... separate thread will help keep things clear for both voters and anyone who will try to figure out what happened in the future. On Wed, May 20, 2015 at 9:02 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey all, How do people feel about these [DISCUSS] threads that basically accidentally turn into votes. Like basically everyone was +1 on this KIP already should we just skip the second vote? I find it kind of annoying to do both when this happens. -Jay On Mon, May 11, 2015 at 8:16 PM, Honghai Chen honghai.c...@microsoft.com wrote: All issues fixed, test cases added, performance result on windows attached. The patch can help improve the consume performance around 25%~50%. Thanks, Honghai Chen -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Wednesday, May 6, 2015 5:39 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system Thanks. Could you updated the wiki? Also, commented on the jira. Jun On Tue, May 5, 2015 at 12:48 AM, Honghai Chen honghai.c...@microsoft.com wrote: Use config.segmentSize should be ok. Previously add that one for make sure the file not exceed config.segmentSize, actually the function maybeRoll already make sure that. When try add test case for recover, blocked by the rename related issue, just open one jira at https://issues.apache.org/jira/browse/KAFKA-2170 , any recommendation for fix that issue? Thanks, Honghai Chen -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Tuesday, May 5, 2015 12:51 PM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system This seems similar to what's in https://issues.apache.org/jira/browse/KAFKA-1065. Also, could you explain why the preallocated size is set to config.segmentSize - 2 * config.maxMessageSize, instead of just config.segmentSize? Thanks, Jun On Mon, May 4, 2015 at 8:12 PM, Honghai Chen honghai.c...@microsoft.com wrote: Hi guys, I'm trying add test cases, but below case crashed at line segReopen.recover(64*1024)-- index.trimToValidSize() , any idea for it? Appreciate your help. The case assume kafka suddenly crash, and need recover the last segment. kafka.log.LogSegmentTest testCreateWithInitFileSizeCrash FAILED java.io.IOException: The requested operation cannot be performed on a file w ith a user-mapped section open at java.io.RandomAccessFile.setLength(Native Method) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(Offset I ndex.scala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s c ala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s c ala:272) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271) at kafka.log.LogSegment.recover(LogSegment.scala:199) at kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentT e st.scala:306) def recover(maxMessageSize: Int): Int = { index.truncate() index.resize(index.maxIndexSize) var validBytes = 0 var lastIndexEntry = 0 val iter = log.iterator(maxMessageSize) try { while(iter.hasNext) { val entry = iter.next entry.message.ensureValid() if(validBytes - lastIndexEntry indexIntervalBytes) { // we need to decompress the message, if required, to get the offset of the first uncompressed message val startOffset = entry.message.compressionCodec match { case NoCompressionCodec = entry.offset case _ = ByteBufferMessageSet.deepIterator(entry.message).next().offset
Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
Hey all, How do people feel about these [DISCUSS] threads that basically accidentally turn into votes. Like basically everyone was +1 on this KIP already should we just skip the second vote? I find it kind of annoying to do both when this happens. -Jay On Mon, May 11, 2015 at 8:16 PM, Honghai Chen honghai.c...@microsoft.com wrote: All issues fixed, test cases added, performance result on windows attached. The patch can help improve the consume performance around 25%~50%. Thanks, Honghai Chen -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Wednesday, May 6, 2015 5:39 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system Thanks. Could you updated the wiki? Also, commented on the jira. Jun On Tue, May 5, 2015 at 12:48 AM, Honghai Chen honghai.c...@microsoft.com wrote: Use config.segmentSize should be ok. Previously add that one for make sure the file not exceed config.segmentSize, actually the function maybeRoll already make sure that. When try add test case for recover, blocked by the rename related issue, just open one jira at https://issues.apache.org/jira/browse/KAFKA-2170 , any recommendation for fix that issue? Thanks, Honghai Chen -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Tuesday, May 5, 2015 12:51 PM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system This seems similar to what's in https://issues.apache.org/jira/browse/KAFKA-1065. Also, could you explain why the preallocated size is set to config.segmentSize - 2 * config.maxMessageSize, instead of just config.segmentSize? Thanks, Jun On Mon, May 4, 2015 at 8:12 PM, Honghai Chen honghai.c...@microsoft.com wrote: Hi guys, I'm trying add test cases, but below case crashed at line segReopen.recover(64*1024)-- index.trimToValidSize() , any idea for it? Appreciate your help. The case assume kafka suddenly crash, and need recover the last segment. kafka.log.LogSegmentTest testCreateWithInitFileSizeCrash FAILED java.io.IOException: The requested operation cannot be performed on a file w ith a user-mapped section open at java.io.RandomAccessFile.setLength(Native Method) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(Offset I ndex.scala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s c ala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s c ala:272) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271) at kafka.log.LogSegment.recover(LogSegment.scala:199) at kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentT e st.scala:306) def recover(maxMessageSize: Int): Int = { index.truncate() index.resize(index.maxIndexSize) var validBytes = 0 var lastIndexEntry = 0 val iter = log.iterator(maxMessageSize) try { while(iter.hasNext) { val entry = iter.next entry.message.ensureValid() if(validBytes - lastIndexEntry indexIntervalBytes) { // we need to decompress the message, if required, to get the offset of the first uncompressed message val startOffset = entry.message.compressionCodec match { case NoCompressionCodec = entry.offset case _ = ByteBufferMessageSet.deepIterator(entry.message).next().offset } index.append(startOffset, validBytes) lastIndexEntry = validBytes } validBytes += MessageSet.entrySize(entry.message) } } catch { case e: InvalidMessageException = logger.warn(Found invalid messages in log segment %s at byte offset %d: %s..format(log.file.getAbsolutePath, validBytes, e.getMessage)) } val truncated = log.sizeInBytes - validBytes log.truncateTo(validBytes) index.trimToValidSize() truncated } /* create a segment with pre allocate and Crash*/ @Test def testCreateWithInitFileSizeCrash() { val tempDir = TestUtils.tempDir() val seg = new LogSegment(tempDir, 40, 1, 1000, 0, SystemTime, false, 512*1024*1024, true
RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
All issues fixed, test cases added, performance result on windows attached. The patch can help improve the consume performance around 25%~50%. Thanks, Honghai Chen -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Wednesday, May 6, 2015 5:39 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system Thanks. Could you updated the wiki? Also, commented on the jira. Jun On Tue, May 5, 2015 at 12:48 AM, Honghai Chen honghai.c...@microsoft.com wrote: Use config.segmentSize should be ok. Previously add that one for make sure the file not exceed config.segmentSize, actually the function maybeRoll already make sure that. When try add test case for recover, blocked by the rename related issue, just open one jira at https://issues.apache.org/jira/browse/KAFKA-2170 , any recommendation for fix that issue? Thanks, Honghai Chen -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Tuesday, May 5, 2015 12:51 PM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system This seems similar to what's in https://issues.apache.org/jira/browse/KAFKA-1065. Also, could you explain why the preallocated size is set to config.segmentSize - 2 * config.maxMessageSize, instead of just config.segmentSize? Thanks, Jun On Mon, May 4, 2015 at 8:12 PM, Honghai Chen honghai.c...@microsoft.com wrote: Hi guys, I'm trying add test cases, but below case crashed at line segReopen.recover(64*1024)-- index.trimToValidSize() , any idea for it? Appreciate your help. The case assume kafka suddenly crash, and need recover the last segment. kafka.log.LogSegmentTest testCreateWithInitFileSizeCrash FAILED java.io.IOException: The requested operation cannot be performed on a file w ith a user-mapped section open at java.io.RandomAccessFile.setLength(Native Method) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(Offset I ndex.scala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s c ala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s c ala:272) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271) at kafka.log.LogSegment.recover(LogSegment.scala:199) at kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentT e st.scala:306) def recover(maxMessageSize: Int): Int = { index.truncate() index.resize(index.maxIndexSize) var validBytes = 0 var lastIndexEntry = 0 val iter = log.iterator(maxMessageSize) try { while(iter.hasNext) { val entry = iter.next entry.message.ensureValid() if(validBytes - lastIndexEntry indexIntervalBytes) { // we need to decompress the message, if required, to get the offset of the first uncompressed message val startOffset = entry.message.compressionCodec match { case NoCompressionCodec = entry.offset case _ = ByteBufferMessageSet.deepIterator(entry.message).next().offset } index.append(startOffset, validBytes) lastIndexEntry = validBytes } validBytes += MessageSet.entrySize(entry.message) } } catch { case e: InvalidMessageException = logger.warn(Found invalid messages in log segment %s at byte offset %d: %s..format(log.file.getAbsolutePath, validBytes, e.getMessage)) } val truncated = log.sizeInBytes - validBytes log.truncateTo(validBytes) index.trimToValidSize() truncated } /* create a segment with pre allocate and Crash*/ @Test def testCreateWithInitFileSizeCrash() { val tempDir = TestUtils.tempDir() val seg = new LogSegment(tempDir, 40, 1, 1000, 0, SystemTime, false, 512*1024*1024, true) val ms = messages(50, hello, there) seg.append(50, ms) val ms2 = messages(60, alpha, beta) seg.append(60, ms2) val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None) assertEquals(ms2.toList, read.messageSet.toList) val oldSize = seg.log.sizeInBytes() val oldPosition = seg.log.channel.position val oldFileSize = seg.log.file.length assertEquals(512*1024*1024, oldFileSize
RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
Use config.segmentSize should be ok. Previously add that one for make sure the file not exceed config.segmentSize, actually the function maybeRoll already make sure that. When try add test case for recover, blocked by the rename related issue, just open one jira at https://issues.apache.org/jira/browse/KAFKA-2170 , any recommendation for fix that issue? Thanks, Honghai Chen -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Tuesday, May 5, 2015 12:51 PM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system This seems similar to what's in https://issues.apache.org/jira/browse/KAFKA-1065. Also, could you explain why the preallocated size is set to config.segmentSize - 2 * config.maxMessageSize, instead of just config.segmentSize? Thanks, Jun On Mon, May 4, 2015 at 8:12 PM, Honghai Chen honghai.c...@microsoft.com wrote: Hi guys, I'm trying add test cases, but below case crashed at line segReopen.recover(64*1024)-- index.trimToValidSize() , any idea for it? Appreciate your help. The case assume kafka suddenly crash, and need recover the last segment. kafka.log.LogSegmentTest testCreateWithInitFileSizeCrash FAILED java.io.IOException: The requested operation cannot be performed on a file w ith a user-mapped section open at java.io.RandomAccessFile.setLength(Native Method) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetI ndex.scala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc ala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc ala:272) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271) at kafka.log.LogSegment.recover(LogSegment.scala:199) at kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentTe st.scala:306) def recover(maxMessageSize: Int): Int = { index.truncate() index.resize(index.maxIndexSize) var validBytes = 0 var lastIndexEntry = 0 val iter = log.iterator(maxMessageSize) try { while(iter.hasNext) { val entry = iter.next entry.message.ensureValid() if(validBytes - lastIndexEntry indexIntervalBytes) { // we need to decompress the message, if required, to get the offset of the first uncompressed message val startOffset = entry.message.compressionCodec match { case NoCompressionCodec = entry.offset case _ = ByteBufferMessageSet.deepIterator(entry.message).next().offset } index.append(startOffset, validBytes) lastIndexEntry = validBytes } validBytes += MessageSet.entrySize(entry.message) } } catch { case e: InvalidMessageException = logger.warn(Found invalid messages in log segment %s at byte offset %d: %s..format(log.file.getAbsolutePath, validBytes, e.getMessage)) } val truncated = log.sizeInBytes - validBytes log.truncateTo(validBytes) index.trimToValidSize() truncated } /* create a segment with pre allocate and Crash*/ @Test def testCreateWithInitFileSizeCrash() { val tempDir = TestUtils.tempDir() val seg = new LogSegment(tempDir, 40, 1, 1000, 0, SystemTime, false, 512*1024*1024, true) val ms = messages(50, hello, there) seg.append(50, ms) val ms2 = messages(60, alpha, beta) seg.append(60, ms2) val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None) assertEquals(ms2.toList, read.messageSet.toList) val oldSize = seg.log.sizeInBytes() val oldPosition = seg.log.channel.position val oldFileSize = seg.log.file.length assertEquals(512*1024*1024, oldFileSize) seg.flush() seg.log.channel.close() seg.index.close() val segReopen = new LogSegment(tempDir, 40, 1, 1000, 0, SystemTime, true) segReopen.recover(64*1024) val size = segReopen.log.sizeInBytes() val position = segReopen.log.channel.position val fileSize = segReopen.log.file.length assertEquals(oldPosition, position) assertEquals(oldSize, size) assertEquals(size, fileSize) } Thanks, Honghai Chen -Original Message- From: Sriram Subramanian [mailto:srsubraman...@linkedin.com.INVALID] Sent: Friday, April 24, 2015 12:57 AM To: dev@kafka.apache.org Cc: Roshan Naik Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve
Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
Thanks. Could you updated the wiki? Also, commented on the jira. Jun On Tue, May 5, 2015 at 12:48 AM, Honghai Chen honghai.c...@microsoft.com wrote: Use config.segmentSize should be ok. Previously add that one for make sure the file not exceed config.segmentSize, actually the function maybeRoll already make sure that. When try add test case for recover, blocked by the rename related issue, just open one jira at https://issues.apache.org/jira/browse/KAFKA-2170 , any recommendation for fix that issue? Thanks, Honghai Chen -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Tuesday, May 5, 2015 12:51 PM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system This seems similar to what's in https://issues.apache.org/jira/browse/KAFKA-1065. Also, could you explain why the preallocated size is set to config.segmentSize - 2 * config.maxMessageSize, instead of just config.segmentSize? Thanks, Jun On Mon, May 4, 2015 at 8:12 PM, Honghai Chen honghai.c...@microsoft.com wrote: Hi guys, I'm trying add test cases, but below case crashed at line segReopen.recover(64*1024)-- index.trimToValidSize() , any idea for it? Appreciate your help. The case assume kafka suddenly crash, and need recover the last segment. kafka.log.LogSegmentTest testCreateWithInitFileSizeCrash FAILED java.io.IOException: The requested operation cannot be performed on a file w ith a user-mapped section open at java.io.RandomAccessFile.setLength(Native Method) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetI ndex.scala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc ala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc ala:272) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271) at kafka.log.LogSegment.recover(LogSegment.scala:199) at kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentTe st.scala:306) def recover(maxMessageSize: Int): Int = { index.truncate() index.resize(index.maxIndexSize) var validBytes = 0 var lastIndexEntry = 0 val iter = log.iterator(maxMessageSize) try { while(iter.hasNext) { val entry = iter.next entry.message.ensureValid() if(validBytes - lastIndexEntry indexIntervalBytes) { // we need to decompress the message, if required, to get the offset of the first uncompressed message val startOffset = entry.message.compressionCodec match { case NoCompressionCodec = entry.offset case _ = ByteBufferMessageSet.deepIterator(entry.message).next().offset } index.append(startOffset, validBytes) lastIndexEntry = validBytes } validBytes += MessageSet.entrySize(entry.message) } } catch { case e: InvalidMessageException = logger.warn(Found invalid messages in log segment %s at byte offset %d: %s..format(log.file.getAbsolutePath, validBytes, e.getMessage)) } val truncated = log.sizeInBytes - validBytes log.truncateTo(validBytes) index.trimToValidSize() truncated } /* create a segment with pre allocate and Crash*/ @Test def testCreateWithInitFileSizeCrash() { val tempDir = TestUtils.tempDir() val seg = new LogSegment(tempDir, 40, 1, 1000, 0, SystemTime, false, 512*1024*1024, true) val ms = messages(50, hello, there) seg.append(50, ms) val ms2 = messages(60, alpha, beta) seg.append(60, ms2) val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None) assertEquals(ms2.toList, read.messageSet.toList) val oldSize = seg.log.sizeInBytes() val oldPosition = seg.log.channel.position val oldFileSize = seg.log.file.length assertEquals(512*1024*1024, oldFileSize) seg.flush() seg.log.channel.close() seg.index.close() val segReopen = new LogSegment(tempDir, 40, 1, 1000, 0, SystemTime, true) segReopen.recover(64*1024) val size = segReopen.log.sizeInBytes() val position = segReopen.log.channel.position val fileSize = segReopen.log.file.length assertEquals(oldPosition, position) assertEquals(oldSize, size) assertEquals(size, fileSize
RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
Hi guys, I'm trying add test cases, but below case crashed at line segReopen.recover(64*1024)-- index.trimToValidSize() , any idea for it? Appreciate your help. The case assume kafka suddenly crash, and need recover the last segment. kafka.log.LogSegmentTest testCreateWithInitFileSizeCrash FAILED java.io.IOException: The requested operation cannot be performed on a file w ith a user-mapped section open at java.io.RandomAccessFile.setLength(Native Method) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetI ndex.scala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc ala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc ala:272) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271) at kafka.log.LogSegment.recover(LogSegment.scala:199) at kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentTe st.scala:306) def recover(maxMessageSize: Int): Int = { index.truncate() index.resize(index.maxIndexSize) var validBytes = 0 var lastIndexEntry = 0 val iter = log.iterator(maxMessageSize) try { while(iter.hasNext) { val entry = iter.next entry.message.ensureValid() if(validBytes - lastIndexEntry indexIntervalBytes) { // we need to decompress the message, if required, to get the offset of the first uncompressed message val startOffset = entry.message.compressionCodec match { case NoCompressionCodec = entry.offset case _ = ByteBufferMessageSet.deepIterator(entry.message).next().offset } index.append(startOffset, validBytes) lastIndexEntry = validBytes } validBytes += MessageSet.entrySize(entry.message) } } catch { case e: InvalidMessageException = logger.warn(Found invalid messages in log segment %s at byte offset %d: %s..format(log.file.getAbsolutePath, validBytes, e.getMessage)) } val truncated = log.sizeInBytes - validBytes log.truncateTo(validBytes) index.trimToValidSize() truncated } /* create a segment with pre allocate and Crash*/ @Test def testCreateWithInitFileSizeCrash() { val tempDir = TestUtils.tempDir() val seg = new LogSegment(tempDir, 40, 1, 1000, 0, SystemTime, false, 512*1024*1024, true) val ms = messages(50, hello, there) seg.append(50, ms) val ms2 = messages(60, alpha, beta) seg.append(60, ms2) val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None) assertEquals(ms2.toList, read.messageSet.toList) val oldSize = seg.log.sizeInBytes() val oldPosition = seg.log.channel.position val oldFileSize = seg.log.file.length assertEquals(512*1024*1024, oldFileSize) seg.flush() seg.log.channel.close() seg.index.close() val segReopen = new LogSegment(tempDir, 40, 1, 1000, 0, SystemTime, true) segReopen.recover(64*1024) val size = segReopen.log.sizeInBytes() val position = segReopen.log.channel.position val fileSize = segReopen.log.file.length assertEquals(oldPosition, position) assertEquals(oldSize, size) assertEquals(size, fileSize) } Thanks, Honghai Chen http://aka.ms/kafka http://aka.ms/manifold -Original Message- From: Sriram Subramanian [mailto:srsubraman...@linkedin.com.INVALID] Sent: Friday, April 24, 2015 12:57 AM To: dev@kafka.apache.org Cc: Roshan Naik Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system +1 Some information on how this will be tested would be useful. On 4/23/15 9:33 AM, Jay Kreps jay.kr...@gmail.com wrote: Yeah if we understand the optimal policy for a config we always want to set it automatically. In this case I don't think we do yet, but down the road that could be nice. I think for now we should consider this option experimental to give people a chance to try it out. -Jay On Wed, Apr 22, 2015 at 7:32 PM, Honghai Chen honghai.c...@microsoft.com wrote: Hi Roshan, Use the 'auto' value maybe will break the rule and mess up the configuration. @Jay, any thoughts? Thanks, Honghai Chen -Original Message- From: Sriharsha Chintalapani [mailto:harsh...@fastmail.fm] Sent: Thursday, April 23, 2015 6:27 AM To: dev@kafka.apache.org; Roshan Naik Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system +1 (non-binding
Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
Yeah if we understand the optimal policy for a config we always want to set it automatically. In this case I don't think we do yet, but down the road that could be nice. I think for now we should consider this option experimental to give people a chance to try it out. -Jay On Wed, Apr 22, 2015 at 7:32 PM, Honghai Chen honghai.c...@microsoft.com wrote: Hi Roshan, Use the 'auto' value maybe will break the rule and mess up the configuration. @Jay, any thoughts? Thanks, Honghai Chen -Original Message- From: Sriharsha Chintalapani [mailto:harsh...@fastmail.fm] Sent: Thursday, April 23, 2015 6:27 AM To: dev@kafka.apache.org; Roshan Naik Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system +1 (non-binding). -- Harsha On April 22, 2015 at 2:52:12 PM, Roshan Naik (ros...@hortonworks.com) wrote: I see that it is safe to keep it this off by default due to some concerns. Eventually, for settings such as this whose 'preferred' value is platform specific (or based on other criteria), it might be worth considering having a default value that is not a constant but an 'auto' value .. When kafka boots up it can automatically use the preferred value. Ofcourse it would have to documented as to what auto means for a given platform. -roshan On 4/22/15 1:21 PM, Jakob Homan jgho...@gmail.com wrote: +1. This is an important performance fix for Windows-based clusters. -Jakob On 22 April 2015 at 03:25, Honghai Chen honghai.c...@microsoft.com wrote: Fix the issue Sriram mentioned. Code review and jira/KIP updated. Below are detail description for the scenarios: 1.If do clear shutdown, the last log file will be truncated to its real size since the close() function of FileMessageSet will call trim(), 2.If crash, then when restart, will go through the process of recover() and the last log file will be truncate to its real size, (and the position will be moved to end of the file) 3.When service start and open existing file a.Will run the LogSegment constructor which has NO parameter preallocate, b.Then in FileMessageSet, the end in FileMessageSet will be Int.MaxValue, and then channel.position(math.min(channel.size().toInt, end)) will make the position be end of the file, c.If recover needed, the recover function will truncate file to end of valid data, and also move the position to it, 4.When service running and need create new log segment and new FileMessageSet a.If preallocate = truei.the end in FileMessageSet will be 0, the file size will be initFileSize, and then channel.position(math.min(channel.size().toInt, end)) will make the position be 0, b.Else if preallocate = falsei.backward compatible, the end in FileMessageSet will be Int.MaxValue, the file size will be 0, and then channel.position(math.min(channel.size().toInt, end)) will make the position be 0, https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+pre allocate+to+improve+consume+performance+under+windows+and+some+old+Linux+ file+system https://issues.apache.org/jira/browse/KAFKA-1646 https://reviews.apache.org/r/33204/diff/2/ Thanks, Honghai Chen http://aka.ms/kafka http://aka.ms/manifold -Original Message- From: Honghai Chen Sent: Wednesday, April 22, 2015 11:12 AM To: dev@kafka.apache.org Subject: RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system Hi Sriram, One sentence of code missed, will update code review board and KIP soon. For LogSegment and FileMessageSet, must use different constructor function for existing file and new file, then the code channel.position(math.min(channel.size().toInt, end)) will make sure the position at end of existing file. Thanks, Honghai Chen -Original Message- From: Jay Kreps [mailto:jay.kr...@gmail.com] Sent: Wednesday, April 22, 2015 5:22 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system My understanding of the patch is that clean shutdown truncates the file back to it's true size (and reallocates it on startup). Hard crash is handled by the normal recovery which should truncate off the empty portion of the file. On Tue, Apr 21, 2015 at 10:52 AM, Sriram Subramanian srsubraman...@linkedin.com.invalid wrote: Could you describe how recovery works in this mode? Say, we had a 250 MB preallocated segment and we wrote till 50MB and crashed. Till what point do we recover? Also, on startup, how is the append end pointer set even on a clean shutdown? How does the FileChannel end position get set to 50 MB instead of 250 MB? The existing code might just work for it but explaining that would be useful. On 4/21/15 9:40 AM, Neha Narkhede n...@confluent.io wrote: +1
Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
+1 Some information on how this will be tested would be useful. On 4/23/15 9:33 AM, Jay Kreps jay.kr...@gmail.com wrote: Yeah if we understand the optimal policy for a config we always want to set it automatically. In this case I don't think we do yet, but down the road that could be nice. I think for now we should consider this option experimental to give people a chance to try it out. -Jay On Wed, Apr 22, 2015 at 7:32 PM, Honghai Chen honghai.c...@microsoft.com wrote: Hi Roshan, Use the 'auto' value maybe will break the rule and mess up the configuration. @Jay, any thoughts? Thanks, Honghai Chen -Original Message- From: Sriharsha Chintalapani [mailto:harsh...@fastmail.fm] Sent: Thursday, April 23, 2015 6:27 AM To: dev@kafka.apache.org; Roshan Naik Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system +1 (non-binding). -- Harsha On April 22, 2015 at 2:52:12 PM, Roshan Naik (ros...@hortonworks.com) wrote: I see that it is safe to keep it this off by default due to some concerns. Eventually, for settings such as this whose 'preferred' value is platform specific (or based on other criteria), it might be worth considering having a default value that is not a constant but an 'auto' value .. When kafka boots up it can automatically use the preferred value. Ofcourse it would have to documented as to what auto means for a given platform. -roshan On 4/22/15 1:21 PM, Jakob Homan jgho...@gmail.com wrote: +1. This is an important performance fix for Windows-based clusters. -Jakob On 22 April 2015 at 03:25, Honghai Chen honghai.c...@microsoft.com wrote: Fix the issue Sriram mentioned. Code review and jira/KIP updated. Below are detail description for the scenarios: 1.If do clear shutdown, the last log file will be truncated to its real size since the close() function of FileMessageSet will call trim(), 2.If crash, then when restart, will go through the process of recover() and the last log file will be truncate to its real size, (and the position will be moved to end of the file) 3.When service start and open existing file a.Will run the LogSegment constructor which has NO parameter preallocate, b.Then in FileMessageSet, the end in FileMessageSet will be Int.MaxValue, and then channel.position(math.min(channel.size().toInt, end)) will make the position be end of the file, c.If recover needed, the recover function will truncate file to end of valid data, and also move the position to it, 4.When service running and need create new log segment and new FileMessageSet a.If preallocate = truei.the end in FileMessageSet will be 0, the file size will be initFileSize, and then channel.position(math.min(channel.size().toInt, end)) will make the position be 0, b.Else if preallocate = falsei.backward compatible, the end in FileMessageSet will be Int.MaxValue, the file size will be 0, and then channel.position(math.min(channel.size().toInt, end)) will make the position be 0, https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+pre allocate+to+improve+consume+performance+under+windows+and+some+old+Linu x+ file+system https://issues.apache.org/jira/browse/KAFKA-1646 https://reviews.apache.org/r/33204/diff/2/ Thanks, Honghai Chen http://aka.ms/kafka http://aka.ms/manifold -Original Message- From: Honghai Chen Sent: Wednesday, April 22, 2015 11:12 AM To: dev@kafka.apache.org Subject: RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system Hi Sriram, One sentence of code missed, will update code review board and KIP soon. For LogSegment and FileMessageSet, must use different constructor function for existing file and new file, then the code channel.position(math.min(channel.size().toInt, end)) will make sure the position at end of existing file. Thanks, Honghai Chen -Original Message- From: Jay Kreps [mailto:jay.kr...@gmail.com] Sent: Wednesday, April 22, 2015 5:22 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system My understanding of the patch is that clean shutdown truncates the file back to it's true size (and reallocates it on startup). Hard crash is handled by the normal recovery which should truncate off the empty portion of the file. On Tue, Apr 21, 2015 at 10:52 AM, Sriram Subramanian srsubraman...@linkedin.com.invalid wrote: Could you describe how recovery works in this mode? Say, we had a 250 MB preallocated segment and we wrote till 50MB and crashed. Till what point do we recover? Also, on startup, how is the append end pointer set even on a clean shutdown? How does the FileChannel end position get set to 50 MB instead of 250 MB? The existing code might
RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
Hi Roshan, Use the 'auto' value maybe will break the rule and mess up the configuration. @Jay, any thoughts? Thanks, Honghai Chen -Original Message- From: Sriharsha Chintalapani [mailto:harsh...@fastmail.fm] Sent: Thursday, April 23, 2015 6:27 AM To: dev@kafka.apache.org; Roshan Naik Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system +1 (non-binding). -- Harsha On April 22, 2015 at 2:52:12 PM, Roshan Naik (ros...@hortonworks.com) wrote: I see that it is safe to keep it this off by default due to some concerns. Eventually, for settings such as this whose 'preferred' value is platform specific (or based on other criteria), it might be worth considering having a default value that is not a constant but an 'auto' value .. When kafka boots up it can automatically use the preferred value. Ofcourse it would have to documented as to what auto means for a given platform. -roshan On 4/22/15 1:21 PM, Jakob Homan jgho...@gmail.com wrote: +1. This is an important performance fix for Windows-based clusters. -Jakob On 22 April 2015 at 03:25, Honghai Chen honghai.c...@microsoft.com wrote: Fix the issue Sriram mentioned. Code review and jira/KIP updated. Below are detail description for the scenarios: 1.If do clear shutdown, the last log file will be truncated to its real size since the close() function of FileMessageSet will call trim(), 2.If crash, then when restart, will go through the process of recover() and the last log file will be truncate to its real size, (and the position will be moved to end of the file) 3.When service start and open existing file a.Will run the LogSegment constructor which has NO parameter preallocate, b.Then in FileMessageSet, the end in FileMessageSet will be Int.MaxValue, and then channel.position(math.min(channel.size().toInt, end)) will make the position be end of the file, c.If recover needed, the recover function will truncate file to end of valid data, and also move the position to it, 4.When service running and need create new log segment and new FileMessageSet a.If preallocate = truei.the end in FileMessageSet will be 0, the file size will be initFileSize, and then channel.position(math.min(channel.size().toInt, end)) will make the position be 0, b.Else if preallocate = falsei.backward compatible, the end in FileMessageSet will be Int.MaxValue, the file size will be 0, and then channel.position(math.min(channel.size().toInt, end)) will make the position be 0, https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+pre allocate+to+improve+consume+performance+under+windows+and+some+old+Linux+ file+system https://issues.apache.org/jira/browse/KAFKA-1646 https://reviews.apache.org/r/33204/diff/2/ Thanks, Honghai Chen http://aka.ms/kafka http://aka.ms/manifold -Original Message- From: Honghai Chen Sent: Wednesday, April 22, 2015 11:12 AM To: dev@kafka.apache.org Subject: RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system Hi Sriram, One sentence of code missed, will update code review board and KIP soon. For LogSegment and FileMessageSet, must use different constructor function for existing file and new file, then the code channel.position(math.min(channel.size().toInt, end)) will make sure the position at end of existing file. Thanks, Honghai Chen -Original Message- From: Jay Kreps [mailto:jay.kr...@gmail.com] Sent: Wednesday, April 22, 2015 5:22 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system My understanding of the patch is that clean shutdown truncates the file back to it's true size (and reallocates it on startup). Hard crash is handled by the normal recovery which should truncate off the empty portion of the file. On Tue, Apr 21, 2015 at 10:52 AM, Sriram Subramanian srsubraman...@linkedin.com.invalid wrote: Could you describe how recovery works in this mode? Say, we had a 250 MB preallocated segment and we wrote till 50MB and crashed. Till what point do we recover? Also, on startup, how is the append end pointer set even on a clean shutdown? How does the FileChannel end position get set to 50 MB instead of 250 MB? The existing code might just work for it but explaining that would be useful. On 4/21/15 9:40 AM, Neha Narkhede n...@confluent.io wrote: +1. I've tried this on Linux and it helps reduce the spikes in append +(and hence producer) latency for high throughput writes. I am not entirely sure why but my suspicion is that in the absence of preallocation, you see spikes writes need to happen faster
Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
+1 (non-binding). -- Harsha On April 22, 2015 at 2:52:12 PM, Roshan Naik (ros...@hortonworks.com) wrote: I see that it is safe to keep it this off by default due to some concerns. Eventually, for settings such as this whose 'preferred' value is platform specific (or based on other criteria), it might be worth considering having a default value that is not a constant but an 'auto' value .. When kafka boots up it can automatically use the preferred value. Ofcourse it would have to documented as to what auto means for a given platform. -roshan On 4/22/15 1:21 PM, Jakob Homan jgho...@gmail.com wrote: +1. This is an important performance fix for Windows-based clusters. -Jakob On 22 April 2015 at 03:25, Honghai Chen honghai.c...@microsoft.com wrote: Fix the issue Sriram mentioned. Code review and jira/KIP updated. Below are detail description for the scenarios: 1.If do clear shutdown, the last log file will be truncated to its real size since the close() function of FileMessageSet will call trim(), 2.If crash, then when restart, will go through the process of recover() and the last log file will be truncate to its real size, (and the position will be moved to end of the file) 3.When service start and open existing file a.Will run the LogSegment constructor which has NO parameter preallocate, b.Then in FileMessageSet, the end in FileMessageSet will be Int.MaxValue, and then channel.position(math.min(channel.size().toInt, end)) will make the position be end of the file, c.If recover needed, the recover function will truncate file to end of valid data, and also move the position to it, 4.When service running and need create new log segment and new FileMessageSet a.If preallocate = truei.the end in FileMessageSet will be 0, the file size will be initFileSize, and then channel.position(math.min(channel.size().toInt, end)) will make the position be 0, b.Else if preallocate = falsei.backward compatible, the end in FileMessageSet will be Int.MaxValue, the file size will be 0, and then channel.position(math.min(channel.size().toInt, end)) will make the position be 0, https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+pre allocate+to+improve+consume+performance+under+windows+and+some+old+Linux+ file+system https://issues.apache.org/jira/browse/KAFKA-1646 https://reviews.apache.org/r/33204/diff/2/ Thanks, Honghai Chen http://aka.ms/kafka http://aka.ms/manifold -Original Message- From: Honghai Chen Sent: Wednesday, April 22, 2015 11:12 AM To: dev@kafka.apache.org Subject: RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system Hi Sriram, One sentence of code missed, will update code review board and KIP soon. For LogSegment and FileMessageSet, must use different constructor function for existing file and new file, then the code channel.position(math.min(channel.size().toInt, end)) will make sure the position at end of existing file. Thanks, Honghai Chen -Original Message- From: Jay Kreps [mailto:jay.kr...@gmail.com] Sent: Wednesday, April 22, 2015 5:22 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system My understanding of the patch is that clean shutdown truncates the file back to it's true size (and reallocates it on startup). Hard crash is handled by the normal recovery which should truncate off the empty portion of the file. On Tue, Apr 21, 2015 at 10:52 AM, Sriram Subramanian srsubraman...@linkedin.com.invalid wrote: Could you describe how recovery works in this mode? Say, we had a 250 MB preallocated segment and we wrote till 50MB and crashed. Till what point do we recover? Also, on startup, how is the append end pointer set even on a clean shutdown? How does the FileChannel end position get set to 50 MB instead of 250 MB? The existing code might just work for it but explaining that would be useful. On 4/21/15 9:40 AM, Neha Narkhede n...@confluent.io wrote: +1. I've tried this on Linux and it helps reduce the spikes in append +(and hence producer) latency for high throughput writes. I am not entirely sure why but my suspicion is that in the absence of preallocation, you see spikes writes need to happen faster than the time it takes Linux to allocate the next block to the file. It will be great to see some performance test results too. On Tue, Apr 21, 2015 at 9:23 AM, Jay Kreps jay.kr...@gmail.com wrote: I'm also +1 on this. The change is quite small and may actually help perf on Linux as well (we've never tried this). I have a lot of concerns on testing the various failure conditions but I think since
Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
+1. This is an important performance fix for Windows-based clusters. -Jakob On 22 April 2015 at 03:25, Honghai Chen honghai.c...@microsoft.com wrote: Fix the issue Sriram mentioned. Code review and jira/KIP updated. Below are detail description for the scenarios: 1.If do clear shutdown, the last log file will be truncated to its real size since the close() function of FileMessageSet will call trim(), 2.If crash, then when restart, will go through the process of recover() and the last log file will be truncate to its real size, (and the position will be moved to end of the file) 3.When service start and open existing file a.Will run the LogSegment constructor which has NO parameter preallocate, b.Then in FileMessageSet, the end in FileMessageSet will be Int.MaxValue, and then channel.position(math.min(channel.size().toInt, end)) will make the position be end of the file, c.If recover needed, the recover function will truncate file to end of valid data, and also move the position to it, 4.When service running and need create new log segment and new FileMessageSet a.If preallocate = truei.the end in FileMessageSet will be 0, the file size will be initFileSize, and then channel.position(math.min(channel.size().toInt, end)) will make the position be 0, b.Else if preallocate = falsei.backward compatible, the end in FileMessageSet will be Int.MaxValue, the file size will be 0, and then channel.position(math.min(channel.size().toInt, end)) will make the position be 0, https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+preallocate+to+improve+consume+performance+under+windows+and+some+old+Linux+file+system https://issues.apache.org/jira/browse/KAFKA-1646 https://reviews.apache.org/r/33204/diff/2/ Thanks, Honghai Chen http://aka.ms/kafka http://aka.ms/manifold -Original Message- From: Honghai Chen Sent: Wednesday, April 22, 2015 11:12 AM To: dev@kafka.apache.org Subject: RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system Hi Sriram, One sentence of code missed, will update code review board and KIP soon. For LogSegment and FileMessageSet, must use different constructor function for existing file and new file, then the code channel.position(math.min(channel.size().toInt, end)) will make sure the position at end of existing file. Thanks, Honghai Chen -Original Message- From: Jay Kreps [mailto:jay.kr...@gmail.com] Sent: Wednesday, April 22, 2015 5:22 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system My understanding of the patch is that clean shutdown truncates the file back to it's true size (and reallocates it on startup). Hard crash is handled by the normal recovery which should truncate off the empty portion of the file. On Tue, Apr 21, 2015 at 10:52 AM, Sriram Subramanian srsubraman...@linkedin.com.invalid wrote: Could you describe how recovery works in this mode? Say, we had a 250 MB preallocated segment and we wrote till 50MB and crashed. Till what point do we recover? Also, on startup, how is the append end pointer set even on a clean shutdown? How does the FileChannel end position get set to 50 MB instead of 250 MB? The existing code might just work for it but explaining that would be useful. On 4/21/15 9:40 AM, Neha Narkhede n...@confluent.io wrote: +1. I've tried this on Linux and it helps reduce the spikes in append +(and hence producer) latency for high throughput writes. I am not entirely sure why but my suspicion is that in the absence of preallocation, you see spikes writes need to happen faster than the time it takes Linux to allocate the next block to the file. It will be great to see some performance test results too. On Tue, Apr 21, 2015 at 9:23 AM, Jay Kreps jay.kr...@gmail.com wrote: I'm also +1 on this. The change is quite small and may actually help perf on Linux as well (we've never tried this). I have a lot of concerns on testing the various failure conditions but I think since it will be off by default the risk is not too high. -Jay On Mon, Apr 20, 2015 at 6:58 PM, Honghai Chen honghai.c...@microsoft.com wrote: I wrote a KIP for this after some discussion on KAFKA-1646. https://issues.apache.org/jira/browse/KAFKA-1646 https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+ pre allocate+to+improve+consume+performance+under+windows+and+some+old+Linux+ file+system The RB is here: https://reviews.apache.org/r/33204/diff/ Thanks, Honghai -- Thanks, Neha
RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
Fix the issue Sriram mentioned. Code review and jira/KIP updated. Below are detail description for the scenarios: 1.If do clear shutdown, the last log file will be truncated to its real size since the close() function of FileMessageSet will call trim(), 2.If crash, then when restart, will go through the process of recover() and the last log file will be truncate to its real size, (and the position will be moved to end of the file) 3.When service start and open existing file a.Will run the LogSegment constructor which has NO parameter preallocate, b.Then in FileMessageSet, the end in FileMessageSet will be Int.MaxValue, and then channel.position(math.min(channel.size().toInt, end)) will make the position be end of the file, c.If recover needed, the recover function will truncate file to end of valid data, and also move the position to it, 4.When service running and need create new log segment and new FileMessageSet a.If preallocate = truei.the end in FileMessageSet will be 0, the file size will be initFileSize, and then channel.position(math.min(channel.size().toInt, end)) will make the position be 0, b.Else if preallocate = falsei.backward compatible, the end in FileMessageSet will be Int.MaxValue, the file size will be 0, and then channel.position(math.min(channel.size().toInt, end)) will make the position be 0, https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+preallocate+to+improve+consume+performance+under+windows+and+some+old+Linux+file+system https://issues.apache.org/jira/browse/KAFKA-1646 https://reviews.apache.org/r/33204/diff/2/ Thanks, Honghai Chen http://aka.ms/kafka http://aka.ms/manifold -Original Message- From: Honghai Chen Sent: Wednesday, April 22, 2015 11:12 AM To: dev@kafka.apache.org Subject: RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system Hi Sriram, One sentence of code missed, will update code review board and KIP soon. For LogSegment and FileMessageSet, must use different constructor function for existing file and new file, then the code channel.position(math.min(channel.size().toInt, end)) will make sure the position at end of existing file. Thanks, Honghai Chen -Original Message- From: Jay Kreps [mailto:jay.kr...@gmail.com] Sent: Wednesday, April 22, 2015 5:22 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system My understanding of the patch is that clean shutdown truncates the file back to it's true size (and reallocates it on startup). Hard crash is handled by the normal recovery which should truncate off the empty portion of the file. On Tue, Apr 21, 2015 at 10:52 AM, Sriram Subramanian srsubraman...@linkedin.com.invalid wrote: Could you describe how recovery works in this mode? Say, we had a 250 MB preallocated segment and we wrote till 50MB and crashed. Till what point do we recover? Also, on startup, how is the append end pointer set even on a clean shutdown? How does the FileChannel end position get set to 50 MB instead of 250 MB? The existing code might just work for it but explaining that would be useful. On 4/21/15 9:40 AM, Neha Narkhede n...@confluent.io wrote: +1. I've tried this on Linux and it helps reduce the spikes in append +(and hence producer) latency for high throughput writes. I am not entirely sure why but my suspicion is that in the absence of preallocation, you see spikes writes need to happen faster than the time it takes Linux to allocate the next block to the file. It will be great to see some performance test results too. On Tue, Apr 21, 2015 at 9:23 AM, Jay Kreps jay.kr...@gmail.com wrote: I'm also +1 on this. The change is quite small and may actually help perf on Linux as well (we've never tried this). I have a lot of concerns on testing the various failure conditions but I think since it will be off by default the risk is not too high. -Jay On Mon, Apr 20, 2015 at 6:58 PM, Honghai Chen honghai.c...@microsoft.com wrote: I wrote a KIP for this after some discussion on KAFKA-1646. https://issues.apache.org/jira/browse/KAFKA-1646 https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+ pre allocate+to+improve+consume+performance+under+windows+and+some+old+Linux+ file+system The RB is here: https://reviews.apache.org/r/33204/diff/ Thanks, Honghai -- Thanks, Neha
Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
Could you describe how recovery works in this mode? Say, we had a 250 MB preallocated segment and we wrote till 50MB and crashed. Till what point do we recover? Also, on startup, how is the append end pointer set even on a clean shutdown? How does the FileChannel end position get set to 50 MB instead of 250 MB? The existing code might just work for it but explaining that would be useful. On 4/21/15 9:40 AM, Neha Narkhede n...@confluent.io wrote: +1. I've tried this on Linux and it helps reduce the spikes in append (and hence producer) latency for high throughput writes. I am not entirely sure why but my suspicion is that in the absence of preallocation, you see spikes writes need to happen faster than the time it takes Linux to allocate the next block to the file. It will be great to see some performance test results too. On Tue, Apr 21, 2015 at 9:23 AM, Jay Kreps jay.kr...@gmail.com wrote: I'm also +1 on this. The change is quite small and may actually help perf on Linux as well (we've never tried this). I have a lot of concerns on testing the various failure conditions but I think since it will be off by default the risk is not too high. -Jay On Mon, Apr 20, 2015 at 6:58 PM, Honghai Chen honghai.c...@microsoft.com wrote: I wrote a KIP for this after some discussion on KAFKA-1646. https://issues.apache.org/jira/browse/KAFKA-1646 https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+pre allocate+to+improve+consume+performance+under+windows+and+some+old+Linux+ file+system The RB is here: https://reviews.apache.org/r/33204/diff/ Thanks, Honghai -- Thanks, Neha
Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
My understanding of the patch is that clean shutdown truncates the file back to it's true size (and reallocates it on startup). Hard crash is handled by the normal recovery which should truncate off the empty portion of the file. On Tue, Apr 21, 2015 at 10:52 AM, Sriram Subramanian srsubraman...@linkedin.com.invalid wrote: Could you describe how recovery works in this mode? Say, we had a 250 MB preallocated segment and we wrote till 50MB and crashed. Till what point do we recover? Also, on startup, how is the append end pointer set even on a clean shutdown? How does the FileChannel end position get set to 50 MB instead of 250 MB? The existing code might just work for it but explaining that would be useful. On 4/21/15 9:40 AM, Neha Narkhede n...@confluent.io wrote: +1. I've tried this on Linux and it helps reduce the spikes in append (and hence producer) latency for high throughput writes. I am not entirely sure why but my suspicion is that in the absence of preallocation, you see spikes writes need to happen faster than the time it takes Linux to allocate the next block to the file. It will be great to see some performance test results too. On Tue, Apr 21, 2015 at 9:23 AM, Jay Kreps jay.kr...@gmail.com wrote: I'm also +1 on this. The change is quite small and may actually help perf on Linux as well (we've never tried this). I have a lot of concerns on testing the various failure conditions but I think since it will be off by default the risk is not too high. -Jay On Mon, Apr 20, 2015 at 6:58 PM, Honghai Chen honghai.c...@microsoft.com wrote: I wrote a KIP for this after some discussion on KAFKA-1646. https://issues.apache.org/jira/browse/KAFKA-1646 https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+pre allocate+to+improve+consume+performance+under+windows+and+some+old+Linux+ file+system The RB is here: https://reviews.apache.org/r/33204/diff/ Thanks, Honghai -- Thanks, Neha
RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
Hi Sriram, One sentence of code missed, will update code review board and KIP soon. For LogSegment and FileMessageSet, must use different constructor function for existing file and new file, then the code channel.position(math.min(channel.size().toInt, end)) will make sure the position at end of existing file. Thanks, Honghai Chen -Original Message- From: Jay Kreps [mailto:jay.kr...@gmail.com] Sent: Wednesday, April 22, 2015 5:22 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system My understanding of the patch is that clean shutdown truncates the file back to it's true size (and reallocates it on startup). Hard crash is handled by the normal recovery which should truncate off the empty portion of the file. On Tue, Apr 21, 2015 at 10:52 AM, Sriram Subramanian srsubraman...@linkedin.com.invalid wrote: Could you describe how recovery works in this mode? Say, we had a 250 MB preallocated segment and we wrote till 50MB and crashed. Till what point do we recover? Also, on startup, how is the append end pointer set even on a clean shutdown? How does the FileChannel end position get set to 50 MB instead of 250 MB? The existing code might just work for it but explaining that would be useful. On 4/21/15 9:40 AM, Neha Narkhede n...@confluent.io wrote: +1. I've tried this on Linux and it helps reduce the spikes in append +(and hence producer) latency for high throughput writes. I am not entirely sure why but my suspicion is that in the absence of preallocation, you see spikes writes need to happen faster than the time it takes Linux to allocate the next block to the file. It will be great to see some performance test results too. On Tue, Apr 21, 2015 at 9:23 AM, Jay Kreps jay.kr...@gmail.com wrote: I'm also +1 on this. The change is quite small and may actually help perf on Linux as well (we've never tried this). I have a lot of concerns on testing the various failure conditions but I think since it will be off by default the risk is not too high. -Jay On Mon, Apr 20, 2015 at 6:58 PM, Honghai Chen honghai.c...@microsoft.com wrote: I wrote a KIP for this after some discussion on KAFKA-1646. https://issues.apache.org/jira/browse/KAFKA-1646 https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+ pre allocate+to+improve+consume+performance+under+windows+and+some+old+Linux+ file+system The RB is here: https://reviews.apache.org/r/33204/diff/ Thanks, Honghai -- Thanks, Neha
Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
I'm also +1 on this. The change is quite small and may actually help perf on Linux as well (we've never tried this). I have a lot of concerns on testing the various failure conditions but I think since it will be off by default the risk is not too high. -Jay On Mon, Apr 20, 2015 at 6:58 PM, Honghai Chen honghai.c...@microsoft.com wrote: I wrote a KIP for this after some discussion on KAFKA-1646. https://issues.apache.org/jira/browse/KAFKA-1646 https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+preallocate+to+improve+consume+performance+under+windows+and+some+old+Linux+file+system The RB is here: https://reviews.apache.org/r/33204/diff/ Thanks, Honghai
Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
+1. I've tried this on Linux and it helps reduce the spikes in append (and hence producer) latency for high throughput writes. I am not entirely sure why but my suspicion is that in the absence of preallocation, you see spikes writes need to happen faster than the time it takes Linux to allocate the next block to the file. It will be great to see some performance test results too. On Tue, Apr 21, 2015 at 9:23 AM, Jay Kreps jay.kr...@gmail.com wrote: I'm also +1 on this. The change is quite small and may actually help perf on Linux as well (we've never tried this). I have a lot of concerns on testing the various failure conditions but I think since it will be off by default the risk is not too high. -Jay On Mon, Apr 20, 2015 at 6:58 PM, Honghai Chen honghai.c...@microsoft.com wrote: I wrote a KIP for this after some discussion on KAFKA-1646. https://issues.apache.org/jira/browse/KAFKA-1646 https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+preallocate+to+improve+consume+performance+under+windows+and+some+old+Linux+file+system The RB is here: https://reviews.apache.org/r/33204/diff/ Thanks, Honghai -- Thanks, Neha
[DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
I wrote a KIP for this after some discussion on KAFKA-1646. https://issues.apache.org/jira/browse/KAFKA-1646 https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+preallocate+to+improve+consume+performance+under+windows+and+some+old+Linux+file+system The RB is here: https://reviews.apache.org/r/33204/diff/ Thanks, Honghai
Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
+1 (non-binding) Sure, makes sense :) Just make sure the doc for the config includes something like If you are using Kafka on Windows, you probably want to set it to true, so people will know how to use it without looking for the JIRA. On Mon, Apr 20, 2015 at 6:58 PM, Honghai Chen honghai.c...@microsoft.com wrote: I wrote a KIP for this after some discussion on KAFKA-1646. https://issues.apache.org/jira/browse/KAFKA-1646 https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+preallocate+to+improve+consume+performance+under+windows+and+some+old+Linux+file+system The RB is here: https://reviews.apache.org/r/33204/diff/ Thanks, Honghai