Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-06-08 Thread Jay Kreps
Honghai,

You are going to do a vote on this, right? I think the patch is ready to go
so we are just waiting on the KIP adoption I think.

-Jay

On Thu, May 21, 2015 at 8:50 AM, Jun Rao j...@confluent.io wrote:

 Honghai,

 Could you update the wiki on the preallocated size? Instead of
 config.segmentSize
 - 2 * config.maxMessageSize, we just want to use config.segmentSize.

 Thanks,

 Jun

 On Mon, May 11, 2015 at 8:16 PM, Honghai Chen honghai.c...@microsoft.com
 wrote:

  All issues fixed, test cases added, performance result on windows
  attached.  The patch can help improve the consume performance around
  25%~50%.
 
  Thanks, Honghai Chen
 
  -Original Message-
  From: Jun Rao [mailto:j...@confluent.io]
  Sent: Wednesday, May 6, 2015 5:39 AM
  To: dev@kafka.apache.org
  Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume
  performance under windows and some old Linux file system
 
  Thanks. Could you updated the wiki? Also, commented on the jira.
 
  Jun
 
  On Tue, May 5, 2015 at 12:48 AM, Honghai Chen 
 honghai.c...@microsoft.com
  wrote:
 
   Use config.segmentSize should be ok.   Previously add that one for make
   sure the file not exceed config.segmentSize, actually the function
   maybeRoll already make sure that.
   When try add test case for recover, blocked by the rename related
   issue, just open one jira at
   https://issues.apache.org/jira/browse/KAFKA-2170 , any recommendation
  for fix that issue?
  
   Thanks, Honghai Chen
  
   -Original Message-
   From: Jun Rao [mailto:j...@confluent.io]
   Sent: Tuesday, May 5, 2015 12:51 PM
   To: dev@kafka.apache.org
   Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve
   consume performance under windows and some old Linux file system
  
   This seems similar to what's in
   https://issues.apache.org/jira/browse/KAFKA-1065.
  
   Also, could you explain why the preallocated size is set to
   config.segmentSize
   - 2 * config.maxMessageSize, instead of just config.segmentSize?
  
   Thanks,
  
   Jun
  
   On Mon, May 4, 2015 at 8:12 PM, Honghai Chen
   honghai.c...@microsoft.com
   wrote:
  
  Hi guys,
I'm trying add test cases, but below case crashed at line 
segReopen.recover(64*1024)-- index.trimToValidSize()  , any idea
for
   it?
Appreciate your help.
The case assume kafka suddenly crash, and need recover the
last segment.
   
kafka.log.LogSegmentTest  testCreateWithInitFileSizeCrash FAILED
java.io.IOException: The requested operation cannot be performed
on a file w ith a user-mapped section open
at java.io.RandomAccessFile.setLength(Native Method)
at
kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292)
at
kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288)
at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283)
at
kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(Offset
I
ndex.scala:272)
at
kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s
c
ala:272)
at
kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s
c
ala:272)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288)
at
 kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271)
at kafka.log.LogSegment.recover(LogSegment.scala:199)
at
kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentT
e
st.scala:306)
   
  def recover(maxMessageSize: Int): Int = {
index.truncate()
index.resize(index.maxIndexSize)
var validBytes = 0
var lastIndexEntry = 0
val iter = log.iterator(maxMessageSize)
try {
  while(iter.hasNext) {
val entry = iter.next
entry.message.ensureValid()
if(validBytes - lastIndexEntry  indexIntervalBytes) {
  // we need to decompress the message, if required, to get
the offset of the first uncompressed message
  val startOffset =
entry.message.compressionCodec match {
  case NoCompressionCodec =
entry.offset
  case _ =
   
ByteBufferMessageSet.deepIterator(entry.message).next().offset
  }
  index.append(startOffset, validBytes)
  lastIndexEntry = validBytes
}
validBytes += MessageSet.entrySize(entry.message)
  }
} catch {
  case e: InvalidMessageException =
logger.warn(Found invalid messages in log segment %s at
byte offset %d: %s..format(log.file.getAbsolutePath, validBytes,
   e.getMessage))
}
val truncated = log.sizeInBytes - validBytes
log.truncateTo(validBytes)
index.trimToValidSize()
truncated

Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-05-21 Thread Jun Rao
Honghai,

Could you update the wiki on the preallocated size? Instead of
config.segmentSize
- 2 * config.maxMessageSize, we just want to use config.segmentSize.

Thanks,

Jun

On Mon, May 11, 2015 at 8:16 PM, Honghai Chen honghai.c...@microsoft.com
wrote:

 All issues fixed, test cases added, performance result on windows
 attached.  The patch can help improve the consume performance around
 25%~50%.

 Thanks, Honghai Chen

 -Original Message-
 From: Jun Rao [mailto:j...@confluent.io]
 Sent: Wednesday, May 6, 2015 5:39 AM
 To: dev@kafka.apache.org
 Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume
 performance under windows and some old Linux file system

 Thanks. Could you updated the wiki? Also, commented on the jira.

 Jun

 On Tue, May 5, 2015 at 12:48 AM, Honghai Chen honghai.c...@microsoft.com
 wrote:

  Use config.segmentSize should be ok.   Previously add that one for make
  sure the file not exceed config.segmentSize, actually the function
  maybeRoll already make sure that.
  When try add test case for recover, blocked by the rename related
  issue, just open one jira at
  https://issues.apache.org/jira/browse/KAFKA-2170 , any recommendation
 for fix that issue?
 
  Thanks, Honghai Chen
 
  -Original Message-
  From: Jun Rao [mailto:j...@confluent.io]
  Sent: Tuesday, May 5, 2015 12:51 PM
  To: dev@kafka.apache.org
  Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve
  consume performance under windows and some old Linux file system
 
  This seems similar to what's in
  https://issues.apache.org/jira/browse/KAFKA-1065.
 
  Also, could you explain why the preallocated size is set to
  config.segmentSize
  - 2 * config.maxMessageSize, instead of just config.segmentSize?
 
  Thanks,
 
  Jun
 
  On Mon, May 4, 2015 at 8:12 PM, Honghai Chen
  honghai.c...@microsoft.com
  wrote:
 
 Hi guys,
   I'm trying add test cases, but below case crashed at line 
   segReopen.recover(64*1024)-- index.trimToValidSize()  , any idea
   for
  it?
   Appreciate your help.
   The case assume kafka suddenly crash, and need recover the
   last segment.
  
   kafka.log.LogSegmentTest  testCreateWithInitFileSizeCrash FAILED
   java.io.IOException: The requested operation cannot be performed
   on a file w ith a user-mapped section open
   at java.io.RandomAccessFile.setLength(Native Method)
   at
   kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292)
   at
   kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283)
   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288)
   at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283)
   at
   kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(Offset
   I
   ndex.scala:272)
   at
   kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s
   c
   ala:272)
   at
   kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s
   c
   ala:272)
   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288)
   at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271)
   at kafka.log.LogSegment.recover(LogSegment.scala:199)
   at
   kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentT
   e
   st.scala:306)
  
 def recover(maxMessageSize: Int): Int = {
   index.truncate()
   index.resize(index.maxIndexSize)
   var validBytes = 0
   var lastIndexEntry = 0
   val iter = log.iterator(maxMessageSize)
   try {
 while(iter.hasNext) {
   val entry = iter.next
   entry.message.ensureValid()
   if(validBytes - lastIndexEntry  indexIntervalBytes) {
 // we need to decompress the message, if required, to get
   the offset of the first uncompressed message
 val startOffset =
   entry.message.compressionCodec match {
 case NoCompressionCodec =
   entry.offset
 case _ =
  
   ByteBufferMessageSet.deepIterator(entry.message).next().offset
 }
 index.append(startOffset, validBytes)
 lastIndexEntry = validBytes
   }
   validBytes += MessageSet.entrySize(entry.message)
 }
   } catch {
 case e: InvalidMessageException =
   logger.warn(Found invalid messages in log segment %s at
   byte offset %d: %s..format(log.file.getAbsolutePath, validBytes,
  e.getMessage))
   }
   val truncated = log.sizeInBytes - validBytes
   log.truncateTo(validBytes)
   index.trimToValidSize()
   truncated
 }
  
   /* create a segment with   pre allocate and Crash*/
 @Test
 def testCreateWithInitFileSizeCrash() {
   val tempDir = TestUtils.tempDir()
   val seg = new LogSegment(tempDir, 40, 1, 1000, 0, SystemTime,
   false, 512*1024*1024, true)
  
   val ms = messages(50, hello, there)
   seg.append(50, ms)
   val ms2

Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-05-20 Thread Jiangjie Qin
I actually feel many [VOTE] threads eventually become [DISCUSS] as people
just put tons of comments there :)


On 5/20/15, 11:52 AM, Jay Kreps jay.kr...@gmail.com wrote:

Makes sense. Honghai, want to do a [VOTE] thread just so everything is
official?

-Jay

On Wed, May 20, 2015 at 11:22 AM, Gwen Shapira gshap...@cloudera.com
wrote:

 For simple discussions, I completely agree.

 For those threads where there are few votes, and then more discussion,
and
 then KIP changes few times... separate thread will help keep things
clear
 for both voters and anyone who will try to figure out what happened in
the
 future.

 On Wed, May 20, 2015 at 9:02 PM, Jay Kreps jay.kr...@gmail.com wrote:

  Hey all,
 
  How do people feel about these [DISCUSS] threads that basically
  accidentally turn into votes. Like basically everyone was +1 on this
KIP
  already should we just skip the second vote? I find it kind of
annoying
 to
  do both when this happens.
 
  -Jay
 
  On Mon, May 11, 2015 at 8:16 PM, Honghai Chen 
 honghai.c...@microsoft.com
  wrote:
 
   All issues fixed, test cases added, performance result on windows
   attached.  The patch can help improve the consume performance around
   25%~50%.
  
   Thanks, Honghai Chen
  
   -Original Message-
   From: Jun Rao [mailto:j...@confluent.io]
   Sent: Wednesday, May 6, 2015 5:39 AM
   To: dev@kafka.apache.org
   Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve
consume
   performance under windows and some old Linux file system
  
   Thanks. Could you updated the wiki? Also, commented on the jira.
  
   Jun
  
   On Tue, May 5, 2015 at 12:48 AM, Honghai Chen 
  honghai.c...@microsoft.com
   wrote:
  
Use config.segmentSize should be ok.   Previously add that one for
 make
sure the file not exceed config.segmentSize, actually the function
maybeRoll already make sure that.
When try add test case for recover, blocked by the rename related
issue, just open one jira at
https://issues.apache.org/jira/browse/KAFKA-2170 , any
 recommendation
   for fix that issue?
   
Thanks, Honghai Chen
   
-Original Message-
From: Jun Rao [mailto:j...@confluent.io]
Sent: Tuesday, May 5, 2015 12:51 PM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve
consume performance under windows and some old Linux file system
   
This seems similar to what's in
https://issues.apache.org/jira/browse/KAFKA-1065.
   
Also, could you explain why the preallocated size is set to
config.segmentSize
- 2 * config.maxMessageSize, instead of just config.segmentSize?
   
Thanks,
   
Jun
   
On Mon, May 4, 2015 at 8:12 PM, Honghai Chen
honghai.c...@microsoft.com
wrote:
   
   Hi guys,
 I'm trying add test cases, but below case crashed at
line 
 segReopen.recover(64*1024)-- index.trimToValidSize()  , any
idea
 for
it?
 Appreciate your help.
 The case assume kafka suddenly crash, and need recover
the
 last segment.

 kafka.log.LogSegmentTest  testCreateWithInitFileSizeCrash
FAILED
 java.io.IOException: The requested operation cannot be
 performed
 on a file w ith a user-mapped section open
 at java.io.RandomAccessFile.setLength(Native Method)
 at

 kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292)
 at

 kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288)
 at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283)
 at

 kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(Offset
 I
 ndex.scala:272)
 at

 kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s
 c
 ala:272)
 at

 kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s
 c
 ala:272)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288)
 at
  kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271)
 at kafka.log.LogSegment.recover(LogSegment.scala:199)
 at

 kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentT
 e
 st.scala:306)

   def recover(maxMessageSize: Int): Int = {
 index.truncate()
 index.resize(index.maxIndexSize)
 var validBytes = 0
 var lastIndexEntry = 0
 val iter = log.iterator(maxMessageSize)
 try {
   while(iter.hasNext) {
 val entry = iter.next
 entry.message.ensureValid()
 if(validBytes - lastIndexEntry  indexIntervalBytes) {
   // we need to decompress the message, if required, to
get
 the offset of the first uncompressed message
   val startOffset =
 entry.message.compressionCodec match {
   case NoCompressionCodec

Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-05-20 Thread Gwen Shapira
For simple discussions, I completely agree.

For those threads where there are few votes, and then more discussion, and
then KIP changes few times... separate thread will help keep things clear
for both voters and anyone who will try to figure out what happened in the
future.

On Wed, May 20, 2015 at 9:02 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Hey all,

 How do people feel about these [DISCUSS] threads that basically
 accidentally turn into votes. Like basically everyone was +1 on this KIP
 already should we just skip the second vote? I find it kind of annoying to
 do both when this happens.

 -Jay

 On Mon, May 11, 2015 at 8:16 PM, Honghai Chen honghai.c...@microsoft.com
 wrote:

  All issues fixed, test cases added, performance result on windows
  attached.  The patch can help improve the consume performance around
  25%~50%.
 
  Thanks, Honghai Chen
 
  -Original Message-
  From: Jun Rao [mailto:j...@confluent.io]
  Sent: Wednesday, May 6, 2015 5:39 AM
  To: dev@kafka.apache.org
  Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume
  performance under windows and some old Linux file system
 
  Thanks. Could you updated the wiki? Also, commented on the jira.
 
  Jun
 
  On Tue, May 5, 2015 at 12:48 AM, Honghai Chen 
 honghai.c...@microsoft.com
  wrote:
 
   Use config.segmentSize should be ok.   Previously add that one for make
   sure the file not exceed config.segmentSize, actually the function
   maybeRoll already make sure that.
   When try add test case for recover, blocked by the rename related
   issue, just open one jira at
   https://issues.apache.org/jira/browse/KAFKA-2170 , any recommendation
  for fix that issue?
  
   Thanks, Honghai Chen
  
   -Original Message-
   From: Jun Rao [mailto:j...@confluent.io]
   Sent: Tuesday, May 5, 2015 12:51 PM
   To: dev@kafka.apache.org
   Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve
   consume performance under windows and some old Linux file system
  
   This seems similar to what's in
   https://issues.apache.org/jira/browse/KAFKA-1065.
  
   Also, could you explain why the preallocated size is set to
   config.segmentSize
   - 2 * config.maxMessageSize, instead of just config.segmentSize?
  
   Thanks,
  
   Jun
  
   On Mon, May 4, 2015 at 8:12 PM, Honghai Chen
   honghai.c...@microsoft.com
   wrote:
  
  Hi guys,
I'm trying add test cases, but below case crashed at line 
segReopen.recover(64*1024)-- index.trimToValidSize()  , any idea
for
   it?
Appreciate your help.
The case assume kafka suddenly crash, and need recover the
last segment.
   
kafka.log.LogSegmentTest  testCreateWithInitFileSizeCrash FAILED
java.io.IOException: The requested operation cannot be performed
on a file w ith a user-mapped section open
at java.io.RandomAccessFile.setLength(Native Method)
at
kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292)
at
kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288)
at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283)
at
kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(Offset
I
ndex.scala:272)
at
kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s
c
ala:272)
at
kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s
c
ala:272)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288)
at
 kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271)
at kafka.log.LogSegment.recover(LogSegment.scala:199)
at
kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentT
e
st.scala:306)
   
  def recover(maxMessageSize: Int): Int = {
index.truncate()
index.resize(index.maxIndexSize)
var validBytes = 0
var lastIndexEntry = 0
val iter = log.iterator(maxMessageSize)
try {
  while(iter.hasNext) {
val entry = iter.next
entry.message.ensureValid()
if(validBytes - lastIndexEntry  indexIntervalBytes) {
  // we need to decompress the message, if required, to get
the offset of the first uncompressed message
  val startOffset =
entry.message.compressionCodec match {
  case NoCompressionCodec =
entry.offset
  case _ =
   
ByteBufferMessageSet.deepIterator(entry.message).next().offset
  }
  index.append(startOffset, validBytes)
  lastIndexEntry = validBytes
}
validBytes += MessageSet.entrySize(entry.message)
  }
} catch {
  case e: InvalidMessageException =
logger.warn(Found invalid messages in log segment %s at
byte offset %d

Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-05-20 Thread Joel Koshy
I think in general it is fine (even good) if a VOTE thread has a lot
of discussion.

The only issue I can think of is the one that Gwen made reference to:

early votes - update KIP/whatever is being voted on due to more discussion - 
later votes

as it then becomes unclear on what exactly each vote corresponds to.

So basically if there is a non-trivial change to the material under
vote due to discussion the vote should be canceled and a fresh vote
thread should be started.

Thanks,

Joel

On Wed, May 20, 2015 at 07:14:02PM +, Jiangjie Qin wrote:
 I actually feel many [VOTE] threads eventually become [DISCUSS] as people
 just put tons of comments there :)
 
 
 On 5/20/15, 11:52 AM, Jay Kreps jay.kr...@gmail.com wrote:
 
 Makes sense. Honghai, want to do a [VOTE] thread just so everything is
 official?
 
 -Jay
 
 On Wed, May 20, 2015 at 11:22 AM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
  For simple discussions, I completely agree.
 
  For those threads where there are few votes, and then more discussion,
 and
  then KIP changes few times... separate thread will help keep things
 clear
  for both voters and anyone who will try to figure out what happened in
 the
  future.
 
  On Wed, May 20, 2015 at 9:02 PM, Jay Kreps jay.kr...@gmail.com wrote:
 
   Hey all,
  
   How do people feel about these [DISCUSS] threads that basically
   accidentally turn into votes. Like basically everyone was +1 on this
 KIP
   already should we just skip the second vote? I find it kind of
 annoying
  to
   do both when this happens.
  
   -Jay
  
   On Mon, May 11, 2015 at 8:16 PM, Honghai Chen 
  honghai.c...@microsoft.com
   wrote:
  
All issues fixed, test cases added, performance result on windows
attached.  The patch can help improve the consume performance around
25%~50%.
   
Thanks, Honghai Chen
   
-Original Message-
From: Jun Rao [mailto:j...@confluent.io]
Sent: Wednesday, May 6, 2015 5:39 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve
 consume
performance under windows and some old Linux file system
   
Thanks. Could you updated the wiki? Also, commented on the jira.
   
Jun
   
On Tue, May 5, 2015 at 12:48 AM, Honghai Chen 
   honghai.c...@microsoft.com
wrote:
   
 Use config.segmentSize should be ok.   Previously add that one for
  make
 sure the file not exceed config.segmentSize, actually the function
 maybeRoll already make sure that.
 When try add test case for recover, blocked by the rename related
 issue, just open one jira at
 https://issues.apache.org/jira/browse/KAFKA-2170 , any
  recommendation
for fix that issue?

 Thanks, Honghai Chen

 -Original Message-
 From: Jun Rao [mailto:j...@confluent.io]
 Sent: Tuesday, May 5, 2015 12:51 PM
 To: dev@kafka.apache.org
 Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve
 consume performance under windows and some old Linux file system

 This seems similar to what's in
 https://issues.apache.org/jira/browse/KAFKA-1065.

 Also, could you explain why the preallocated size is set to
 config.segmentSize
 - 2 * config.maxMessageSize, instead of just config.segmentSize?

 Thanks,

 Jun

 On Mon, May 4, 2015 at 8:12 PM, Honghai Chen
 honghai.c...@microsoft.com
 wrote:

Hi guys,
  I'm trying add test cases, but below case crashed at
 line 
  segReopen.recover(64*1024)-- index.trimToValidSize()  , any
 idea
  for
 it?
  Appreciate your help.
  The case assume kafka suddenly crash, and need recover
 the
  last segment.
 
  kafka.log.LogSegmentTest  testCreateWithInitFileSizeCrash
 FAILED
  java.io.IOException: The requested operation cannot be
  performed
  on a file w ith a user-mapped section open
  at java.io.RandomAccessFile.setLength(Native Method)
  at
 
  kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292)
  at
 
  kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283)
  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288)
  at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283)
  at
 
  kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(Offset
  I
  ndex.scala:272)
  at
 
  kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s
  c
  ala:272)
  at
 
  kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s
  c
  ala:272)
  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288)
  at
   kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271)
  at kafka.log.LogSegment.recover(LogSegment.scala:199)
  at
 
  kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentT

Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-05-20 Thread Jay Kreps
Makes sense. Honghai, want to do a [VOTE] thread just so everything is
official?

-Jay

On Wed, May 20, 2015 at 11:22 AM, Gwen Shapira gshap...@cloudera.com
wrote:

 For simple discussions, I completely agree.

 For those threads where there are few votes, and then more discussion, and
 then KIP changes few times... separate thread will help keep things clear
 for both voters and anyone who will try to figure out what happened in the
 future.

 On Wed, May 20, 2015 at 9:02 PM, Jay Kreps jay.kr...@gmail.com wrote:

  Hey all,
 
  How do people feel about these [DISCUSS] threads that basically
  accidentally turn into votes. Like basically everyone was +1 on this KIP
  already should we just skip the second vote? I find it kind of annoying
 to
  do both when this happens.
 
  -Jay
 
  On Mon, May 11, 2015 at 8:16 PM, Honghai Chen 
 honghai.c...@microsoft.com
  wrote:
 
   All issues fixed, test cases added, performance result on windows
   attached.  The patch can help improve the consume performance around
   25%~50%.
  
   Thanks, Honghai Chen
  
   -Original Message-
   From: Jun Rao [mailto:j...@confluent.io]
   Sent: Wednesday, May 6, 2015 5:39 AM
   To: dev@kafka.apache.org
   Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume
   performance under windows and some old Linux file system
  
   Thanks. Could you updated the wiki? Also, commented on the jira.
  
   Jun
  
   On Tue, May 5, 2015 at 12:48 AM, Honghai Chen 
  honghai.c...@microsoft.com
   wrote:
  
Use config.segmentSize should be ok.   Previously add that one for
 make
sure the file not exceed config.segmentSize, actually the function
maybeRoll already make sure that.
When try add test case for recover, blocked by the rename related
issue, just open one jira at
https://issues.apache.org/jira/browse/KAFKA-2170 , any
 recommendation
   for fix that issue?
   
Thanks, Honghai Chen
   
-Original Message-
From: Jun Rao [mailto:j...@confluent.io]
Sent: Tuesday, May 5, 2015 12:51 PM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve
consume performance under windows and some old Linux file system
   
This seems similar to what's in
https://issues.apache.org/jira/browse/KAFKA-1065.
   
Also, could you explain why the preallocated size is set to
config.segmentSize
- 2 * config.maxMessageSize, instead of just config.segmentSize?
   
Thanks,
   
Jun
   
On Mon, May 4, 2015 at 8:12 PM, Honghai Chen
honghai.c...@microsoft.com
wrote:
   
   Hi guys,
 I'm trying add test cases, but below case crashed at line 
 segReopen.recover(64*1024)-- index.trimToValidSize()  , any idea
 for
it?
 Appreciate your help.
 The case assume kafka suddenly crash, and need recover the
 last segment.

 kafka.log.LogSegmentTest  testCreateWithInitFileSizeCrash FAILED
 java.io.IOException: The requested operation cannot be
 performed
 on a file w ith a user-mapped section open
 at java.io.RandomAccessFile.setLength(Native Method)
 at

 kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292)
 at

 kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288)
 at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283)
 at

 kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(Offset
 I
 ndex.scala:272)
 at

 kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s
 c
 ala:272)
 at

 kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s
 c
 ala:272)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288)
 at
  kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271)
 at kafka.log.LogSegment.recover(LogSegment.scala:199)
 at

 kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentT
 e
 st.scala:306)

   def recover(maxMessageSize: Int): Int = {
 index.truncate()
 index.resize(index.maxIndexSize)
 var validBytes = 0
 var lastIndexEntry = 0
 val iter = log.iterator(maxMessageSize)
 try {
   while(iter.hasNext) {
 val entry = iter.next
 entry.message.ensureValid()
 if(validBytes - lastIndexEntry  indexIntervalBytes) {
   // we need to decompress the message, if required, to get
 the offset of the first uncompressed message
   val startOffset =
 entry.message.compressionCodec match {
   case NoCompressionCodec =
 entry.offset
   case _ =

 ByteBufferMessageSet.deepIterator(entry.message).next().offset

Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-05-20 Thread Jay Kreps
Hey all,

How do people feel about these [DISCUSS] threads that basically
accidentally turn into votes. Like basically everyone was +1 on this KIP
already should we just skip the second vote? I find it kind of annoying to
do both when this happens.

-Jay

On Mon, May 11, 2015 at 8:16 PM, Honghai Chen honghai.c...@microsoft.com
wrote:

 All issues fixed, test cases added, performance result on windows
 attached.  The patch can help improve the consume performance around
 25%~50%.

 Thanks, Honghai Chen

 -Original Message-
 From: Jun Rao [mailto:j...@confluent.io]
 Sent: Wednesday, May 6, 2015 5:39 AM
 To: dev@kafka.apache.org
 Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume
 performance under windows and some old Linux file system

 Thanks. Could you updated the wiki? Also, commented on the jira.

 Jun

 On Tue, May 5, 2015 at 12:48 AM, Honghai Chen honghai.c...@microsoft.com
 wrote:

  Use config.segmentSize should be ok.   Previously add that one for make
  sure the file not exceed config.segmentSize, actually the function
  maybeRoll already make sure that.
  When try add test case for recover, blocked by the rename related
  issue, just open one jira at
  https://issues.apache.org/jira/browse/KAFKA-2170 , any recommendation
 for fix that issue?
 
  Thanks, Honghai Chen
 
  -Original Message-
  From: Jun Rao [mailto:j...@confluent.io]
  Sent: Tuesday, May 5, 2015 12:51 PM
  To: dev@kafka.apache.org
  Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve
  consume performance under windows and some old Linux file system
 
  This seems similar to what's in
  https://issues.apache.org/jira/browse/KAFKA-1065.
 
  Also, could you explain why the preallocated size is set to
  config.segmentSize
  - 2 * config.maxMessageSize, instead of just config.segmentSize?
 
  Thanks,
 
  Jun
 
  On Mon, May 4, 2015 at 8:12 PM, Honghai Chen
  honghai.c...@microsoft.com
  wrote:
 
 Hi guys,
   I'm trying add test cases, but below case crashed at line 
   segReopen.recover(64*1024)-- index.trimToValidSize()  , any idea
   for
  it?
   Appreciate your help.
   The case assume kafka suddenly crash, and need recover the
   last segment.
  
   kafka.log.LogSegmentTest  testCreateWithInitFileSizeCrash FAILED
   java.io.IOException: The requested operation cannot be performed
   on a file w ith a user-mapped section open
   at java.io.RandomAccessFile.setLength(Native Method)
   at
   kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292)
   at
   kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283)
   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288)
   at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283)
   at
   kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(Offset
   I
   ndex.scala:272)
   at
   kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s
   c
   ala:272)
   at
   kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s
   c
   ala:272)
   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288)
   at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271)
   at kafka.log.LogSegment.recover(LogSegment.scala:199)
   at
   kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentT
   e
   st.scala:306)
  
 def recover(maxMessageSize: Int): Int = {
   index.truncate()
   index.resize(index.maxIndexSize)
   var validBytes = 0
   var lastIndexEntry = 0
   val iter = log.iterator(maxMessageSize)
   try {
 while(iter.hasNext) {
   val entry = iter.next
   entry.message.ensureValid()
   if(validBytes - lastIndexEntry  indexIntervalBytes) {
 // we need to decompress the message, if required, to get
   the offset of the first uncompressed message
 val startOffset =
   entry.message.compressionCodec match {
 case NoCompressionCodec =
   entry.offset
 case _ =
  
   ByteBufferMessageSet.deepIterator(entry.message).next().offset
 }
 index.append(startOffset, validBytes)
 lastIndexEntry = validBytes
   }
   validBytes += MessageSet.entrySize(entry.message)
 }
   } catch {
 case e: InvalidMessageException =
   logger.warn(Found invalid messages in log segment %s at
   byte offset %d: %s..format(log.file.getAbsolutePath, validBytes,
  e.getMessage))
   }
   val truncated = log.sizeInBytes - validBytes
   log.truncateTo(validBytes)
   index.trimToValidSize()
   truncated
 }
  
   /* create a segment with   pre allocate and Crash*/
 @Test
 def testCreateWithInitFileSizeCrash() {
   val tempDir = TestUtils.tempDir()
   val seg = new LogSegment(tempDir, 40, 1, 1000, 0, SystemTime,
   false, 512*1024*1024, true

RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-05-11 Thread Honghai Chen
All issues fixed, test cases added, performance result on windows attached.  
The patch can help improve the consume performance around 25%~50%.

Thanks, Honghai Chen 

-Original Message-
From: Jun Rao [mailto:j...@confluent.io] 
Sent: Wednesday, May 6, 2015 5:39 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume 
performance under windows and some old Linux file system

Thanks. Could you updated the wiki? Also, commented on the jira.

Jun

On Tue, May 5, 2015 at 12:48 AM, Honghai Chen honghai.c...@microsoft.com
wrote:

 Use config.segmentSize should be ok.   Previously add that one for make
 sure the file not exceed config.segmentSize, actually the function 
 maybeRoll already make sure that.
 When try add test case for recover, blocked by the rename related 
 issue, just open one jira at 
 https://issues.apache.org/jira/browse/KAFKA-2170 , any recommendation for fix 
 that issue?

 Thanks, Honghai Chen

 -Original Message-
 From: Jun Rao [mailto:j...@confluent.io]
 Sent: Tuesday, May 5, 2015 12:51 PM
 To: dev@kafka.apache.org
 Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve 
 consume performance under windows and some old Linux file system

 This seems similar to what's in
 https://issues.apache.org/jira/browse/KAFKA-1065.

 Also, could you explain why the preallocated size is set to 
 config.segmentSize
 - 2 * config.maxMessageSize, instead of just config.segmentSize?

 Thanks,

 Jun

 On Mon, May 4, 2015 at 8:12 PM, Honghai Chen 
 honghai.c...@microsoft.com
 wrote:

Hi guys,
  I'm trying add test cases, but below case crashed at line 
  segReopen.recover(64*1024)-- index.trimToValidSize()  , any idea 
  for
 it?
  Appreciate your help.
  The case assume kafka suddenly crash, and need recover the 
  last segment.
 
  kafka.log.LogSegmentTest  testCreateWithInitFileSizeCrash FAILED
  java.io.IOException: The requested operation cannot be performed 
  on a file w ith a user-mapped section open
  at java.io.RandomAccessFile.setLength(Native Method)
  at
  kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292)
  at
  kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283)
  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288)
  at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283)
  at
  kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(Offset
  I
  ndex.scala:272)
  at
  kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s
  c
  ala:272)
  at
  kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s
  c
  ala:272)
  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288)
  at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271)
  at kafka.log.LogSegment.recover(LogSegment.scala:199)
  at
  kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentT
  e
  st.scala:306)
 
def recover(maxMessageSize: Int): Int = {
  index.truncate()
  index.resize(index.maxIndexSize)
  var validBytes = 0
  var lastIndexEntry = 0
  val iter = log.iterator(maxMessageSize)
  try {
while(iter.hasNext) {
  val entry = iter.next
  entry.message.ensureValid()
  if(validBytes - lastIndexEntry  indexIntervalBytes) {
// we need to decompress the message, if required, to get 
  the offset of the first uncompressed message
val startOffset =
  entry.message.compressionCodec match {
case NoCompressionCodec =
  entry.offset
case _ =
 
  ByteBufferMessageSet.deepIterator(entry.message).next().offset
}
index.append(startOffset, validBytes)
lastIndexEntry = validBytes
  }
  validBytes += MessageSet.entrySize(entry.message)
}
  } catch {
case e: InvalidMessageException =
  logger.warn(Found invalid messages in log segment %s at 
  byte offset %d: %s..format(log.file.getAbsolutePath, validBytes,
 e.getMessage))
  }
  val truncated = log.sizeInBytes - validBytes
  log.truncateTo(validBytes)
  index.trimToValidSize()
  truncated
}
 
  /* create a segment with   pre allocate and Crash*/
@Test
def testCreateWithInitFileSizeCrash() {
  val tempDir = TestUtils.tempDir()
  val seg = new LogSegment(tempDir, 40, 1, 1000, 0, SystemTime, 
  false, 512*1024*1024, true)
 
  val ms = messages(50, hello, there)
  seg.append(50, ms)
  val ms2 = messages(60, alpha, beta)
  seg.append(60, ms2)
  val read = seg.read(startOffset = 55, maxSize = 200, maxOffset =
 None)
  assertEquals(ms2.toList, read.messageSet.toList)
  val oldSize = seg.log.sizeInBytes()
  val oldPosition = seg.log.channel.position
  val oldFileSize = seg.log.file.length
  assertEquals(512*1024*1024, oldFileSize

RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-05-05 Thread Honghai Chen
Use config.segmentSize should be ok.   Previously add that one for make sure 
the file not exceed config.segmentSize, actually the function maybeRoll already 
make sure that.
When try add test case for recover, blocked by the rename related issue, just 
open one jira at https://issues.apache.org/jira/browse/KAFKA-2170 , any 
recommendation for fix that issue? 

Thanks, Honghai Chen

-Original Message-
From: Jun Rao [mailto:j...@confluent.io] 
Sent: Tuesday, May 5, 2015 12:51 PM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume 
performance under windows and some old Linux file system

This seems similar to what's in
https://issues.apache.org/jira/browse/KAFKA-1065.

Also, could you explain why the preallocated size is set to config.segmentSize
- 2 * config.maxMessageSize, instead of just config.segmentSize?

Thanks,

Jun

On Mon, May 4, 2015 at 8:12 PM, Honghai Chen honghai.c...@microsoft.com
wrote:

   Hi guys,
 I'm trying add test cases, but below case crashed at line 
 segReopen.recover(64*1024)-- index.trimToValidSize()  , any idea for it?
 Appreciate your help.
 The case assume kafka suddenly crash, and need recover the 
 last segment.

 kafka.log.LogSegmentTest  testCreateWithInitFileSizeCrash FAILED
 java.io.IOException: The requested operation cannot be performed 
 on a file w ith a user-mapped section open
 at java.io.RandomAccessFile.setLength(Native Method)
 at
 kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292)
 at
 kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288)
 at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283)
 at
 kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetI
 ndex.scala:272)
 at
 kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
 ala:272)
 at
 kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
 ala:272)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288)
 at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271)
 at kafka.log.LogSegment.recover(LogSegment.scala:199)
 at
 kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentTe
 st.scala:306)

   def recover(maxMessageSize: Int): Int = {
 index.truncate()
 index.resize(index.maxIndexSize)
 var validBytes = 0
 var lastIndexEntry = 0
 val iter = log.iterator(maxMessageSize)
 try {
   while(iter.hasNext) {
 val entry = iter.next
 entry.message.ensureValid()
 if(validBytes - lastIndexEntry  indexIntervalBytes) {
   // we need to decompress the message, if required, to get 
 the offset of the first uncompressed message
   val startOffset =
 entry.message.compressionCodec match {
   case NoCompressionCodec =
 entry.offset
   case _ =

 ByteBufferMessageSet.deepIterator(entry.message).next().offset
   }
   index.append(startOffset, validBytes)
   lastIndexEntry = validBytes
 }
 validBytes += MessageSet.entrySize(entry.message)
   }
 } catch {
   case e: InvalidMessageException =
 logger.warn(Found invalid messages in log segment %s at byte 
 offset %d: %s..format(log.file.getAbsolutePath, validBytes, e.getMessage))
 }
 val truncated = log.sizeInBytes - validBytes
 log.truncateTo(validBytes)
 index.trimToValidSize()
 truncated
   }

 /* create a segment with   pre allocate and Crash*/
   @Test
   def testCreateWithInitFileSizeCrash() {
 val tempDir = TestUtils.tempDir()
 val seg = new LogSegment(tempDir, 40, 1, 1000, 0, SystemTime, 
 false, 512*1024*1024, true)

 val ms = messages(50, hello, there)
 seg.append(50, ms)
 val ms2 = messages(60, alpha, beta)
 seg.append(60, ms2)
 val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
 assertEquals(ms2.toList, read.messageSet.toList)
 val oldSize = seg.log.sizeInBytes()
 val oldPosition = seg.log.channel.position
 val oldFileSize = seg.log.file.length
 assertEquals(512*1024*1024, oldFileSize)
 seg.flush()
 seg.log.channel.close()
 seg.index.close()

 val segReopen = new LogSegment(tempDir, 40, 1, 1000, 0, 
 SystemTime,
 true)
 segReopen.recover(64*1024)
 val size = segReopen.log.sizeInBytes()
 val position = segReopen.log.channel.position
 val fileSize = segReopen.log.file.length
 assertEquals(oldPosition, position)
 assertEquals(oldSize, size)
 assertEquals(size, fileSize)
   }



 Thanks, Honghai Chen 

 -Original Message-
 From: Sriram Subramanian [mailto:srsubraman...@linkedin.com.INVALID]
 Sent: Friday, April 24, 2015 12:57 AM
 To: dev@kafka.apache.org
 Cc: Roshan Naik
 Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve

Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-05-05 Thread Jun Rao
Thanks. Could you updated the wiki? Also, commented on the jira.

Jun

On Tue, May 5, 2015 at 12:48 AM, Honghai Chen honghai.c...@microsoft.com
wrote:

 Use config.segmentSize should be ok.   Previously add that one for make
 sure the file not exceed config.segmentSize, actually the function
 maybeRoll already make sure that.
 When try add test case for recover, blocked by the rename related issue,
 just open one jira at https://issues.apache.org/jira/browse/KAFKA-2170 ,
 any recommendation for fix that issue?

 Thanks, Honghai Chen

 -Original Message-
 From: Jun Rao [mailto:j...@confluent.io]
 Sent: Tuesday, May 5, 2015 12:51 PM
 To: dev@kafka.apache.org
 Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume
 performance under windows and some old Linux file system

 This seems similar to what's in
 https://issues.apache.org/jira/browse/KAFKA-1065.

 Also, could you explain why the preallocated size is set to
 config.segmentSize
 - 2 * config.maxMessageSize, instead of just config.segmentSize?

 Thanks,

 Jun

 On Mon, May 4, 2015 at 8:12 PM, Honghai Chen honghai.c...@microsoft.com
 wrote:

Hi guys,
  I'm trying add test cases, but below case crashed at line 
  segReopen.recover(64*1024)-- index.trimToValidSize()  , any idea for
 it?
  Appreciate your help.
  The case assume kafka suddenly crash, and need recover the
  last segment.
 
  kafka.log.LogSegmentTest  testCreateWithInitFileSizeCrash FAILED
  java.io.IOException: The requested operation cannot be performed
  on a file w ith a user-mapped section open
  at java.io.RandomAccessFile.setLength(Native Method)
  at
  kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292)
  at
  kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283)
  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288)
  at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283)
  at
  kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetI
  ndex.scala:272)
  at
  kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
  ala:272)
  at
  kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
  ala:272)
  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288)
  at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271)
  at kafka.log.LogSegment.recover(LogSegment.scala:199)
  at
  kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentTe
  st.scala:306)
 
def recover(maxMessageSize: Int): Int = {
  index.truncate()
  index.resize(index.maxIndexSize)
  var validBytes = 0
  var lastIndexEntry = 0
  val iter = log.iterator(maxMessageSize)
  try {
while(iter.hasNext) {
  val entry = iter.next
  entry.message.ensureValid()
  if(validBytes - lastIndexEntry  indexIntervalBytes) {
// we need to decompress the message, if required, to get
  the offset of the first uncompressed message
val startOffset =
  entry.message.compressionCodec match {
case NoCompressionCodec =
  entry.offset
case _ =
 
  ByteBufferMessageSet.deepIterator(entry.message).next().offset
}
index.append(startOffset, validBytes)
lastIndexEntry = validBytes
  }
  validBytes += MessageSet.entrySize(entry.message)
}
  } catch {
case e: InvalidMessageException =
  logger.warn(Found invalid messages in log segment %s at byte
  offset %d: %s..format(log.file.getAbsolutePath, validBytes,
 e.getMessage))
  }
  val truncated = log.sizeInBytes - validBytes
  log.truncateTo(validBytes)
  index.trimToValidSize()
  truncated
}
 
  /* create a segment with   pre allocate and Crash*/
@Test
def testCreateWithInitFileSizeCrash() {
  val tempDir = TestUtils.tempDir()
  val seg = new LogSegment(tempDir, 40, 1, 1000, 0, SystemTime,
  false, 512*1024*1024, true)
 
  val ms = messages(50, hello, there)
  seg.append(50, ms)
  val ms2 = messages(60, alpha, beta)
  seg.append(60, ms2)
  val read = seg.read(startOffset = 55, maxSize = 200, maxOffset =
 None)
  assertEquals(ms2.toList, read.messageSet.toList)
  val oldSize = seg.log.sizeInBytes()
  val oldPosition = seg.log.channel.position
  val oldFileSize = seg.log.file.length
  assertEquals(512*1024*1024, oldFileSize)
  seg.flush()
  seg.log.channel.close()
  seg.index.close()
 
  val segReopen = new LogSegment(tempDir, 40, 1, 1000, 0,
  SystemTime,
  true)
  segReopen.recover(64*1024)
  val size = segReopen.log.sizeInBytes()
  val position = segReopen.log.channel.position
  val fileSize = segReopen.log.file.length
  assertEquals(oldPosition, position)
  assertEquals(oldSize, size)
  assertEquals(size, fileSize

RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-05-04 Thread Honghai Chen
  Hi guys,
I'm trying add test cases, but below case crashed at line  
segReopen.recover(64*1024)-- index.trimToValidSize()  , any idea for it? 
Appreciate your help.
The case assume kafka suddenly crash, and need recover the last segment.

kafka.log.LogSegmentTest  testCreateWithInitFileSizeCrash FAILED
java.io.IOException: The requested operation cannot be performed on a file w
ith a user-mapped section open
at java.io.RandomAccessFile.setLength(Native Method)
at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292)
at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288)
at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283)
at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetI
ndex.scala:272)
at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
ala:272)
at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
ala:272)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288)
at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271)
at kafka.log.LogSegment.recover(LogSegment.scala:199)
at kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentTe
st.scala:306)

  def recover(maxMessageSize: Int): Int = {
index.truncate()
index.resize(index.maxIndexSize)
var validBytes = 0
var lastIndexEntry = 0
val iter = log.iterator(maxMessageSize)
try {
  while(iter.hasNext) {
val entry = iter.next
entry.message.ensureValid()
if(validBytes - lastIndexEntry  indexIntervalBytes) {
  // we need to decompress the message, if required, to get the offset 
of the first uncompressed message
  val startOffset =
entry.message.compressionCodec match {
  case NoCompressionCodec =
entry.offset
  case _ =
ByteBufferMessageSet.deepIterator(entry.message).next().offset
  }
  index.append(startOffset, validBytes)
  lastIndexEntry = validBytes
}
validBytes += MessageSet.entrySize(entry.message)
  }
} catch {
  case e: InvalidMessageException = 
logger.warn(Found invalid messages in log segment %s at byte offset 
%d: %s..format(log.file.getAbsolutePath, validBytes, e.getMessage))
}
val truncated = log.sizeInBytes - validBytes
log.truncateTo(validBytes)
index.trimToValidSize()
truncated
  }

/* create a segment with   pre allocate and Crash*/
  @Test
  def testCreateWithInitFileSizeCrash() {
val tempDir = TestUtils.tempDir()
val seg = new LogSegment(tempDir, 40, 1, 1000, 0, SystemTime, false, 
512*1024*1024, true)

val ms = messages(50, hello, there)
seg.append(50, ms)
val ms2 = messages(60, alpha, beta)
seg.append(60, ms2)
val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
assertEquals(ms2.toList, read.messageSet.toList)
val oldSize = seg.log.sizeInBytes()
val oldPosition = seg.log.channel.position
val oldFileSize = seg.log.file.length
assertEquals(512*1024*1024, oldFileSize)
seg.flush()
seg.log.channel.close()
seg.index.close()

val segReopen = new LogSegment(tempDir, 40, 1, 1000, 0, SystemTime, true)
segReopen.recover(64*1024)
val size = segReopen.log.sizeInBytes()
val position = segReopen.log.channel.position
val fileSize = segReopen.log.file.length
assertEquals(oldPosition, position)
assertEquals(oldSize, size)
assertEquals(size, fileSize)
  }



Thanks, Honghai Chen
http://aka.ms/kafka 
http://aka.ms/manifold 

-Original Message-
From: Sriram Subramanian [mailto:srsubraman...@linkedin.com.INVALID] 
Sent: Friday, April 24, 2015 12:57 AM
To: dev@kafka.apache.org
Cc: Roshan Naik
Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume 
performance under windows and some old Linux file system

+1

Some information on how this will be tested would be useful.

On 4/23/15 9:33 AM, Jay Kreps jay.kr...@gmail.com wrote:

Yeah if we understand the optimal policy for a config we always want to 
set it automatically. In this case I don't think we do yet, but down 
the road that could be nice. I think for now we should consider this 
option experimental to give people a chance to try it out.

-Jay

On Wed, Apr 22, 2015 at 7:32 PM, Honghai Chen 
honghai.c...@microsoft.com
wrote:

 Hi Roshan,
 Use the 'auto' value maybe will break the rule and mess up 
 the configuration. @Jay, any thoughts?

 Thanks, Honghai Chen

 -Original Message-
 From: Sriharsha Chintalapani [mailto:harsh...@fastmail.fm]
 Sent: Thursday, April 23, 2015 6:27 AM
 To: dev@kafka.apache.org; Roshan Naik
 Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve 
 consume performance under windows and some old Linux file system

 +1 (non-binding

Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-04-23 Thread Jay Kreps
Yeah if we understand the optimal policy for a config we always want to set
it automatically. In this case I don't think we do yet, but down the road
that could be nice. I think for now we should consider this option
experimental to give people a chance to try it out.

-Jay

On Wed, Apr 22, 2015 at 7:32 PM, Honghai Chen honghai.c...@microsoft.com
wrote:

 Hi Roshan,
 Use the 'auto' value maybe will break the rule and mess up the
 configuration. @Jay, any thoughts?

 Thanks, Honghai Chen

 -Original Message-
 From: Sriharsha Chintalapani [mailto:harsh...@fastmail.fm]
 Sent: Thursday, April 23, 2015 6:27 AM
 To: dev@kafka.apache.org; Roshan Naik
 Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume
 performance under windows and some old Linux file system

 +1 (non-binding).

 --
 Harsha


 On April 22, 2015 at 2:52:12 PM, Roshan Naik (ros...@hortonworks.com)
 wrote:

 I see that it is safe to keep it this off by default due to some concerns.
 Eventually, for settings such as this whose 'preferred' value is platform
 specific (or based on other criteria), it might be worth considering
 having a default value that is not a constant but an 'auto' value .. When
 kafka boots up it can automatically use the preferred value. Ofcourse it
 would have to documented as to what auto means for a given platform.

 -roshan


 On 4/22/15 1:21 PM, Jakob Homan jgho...@gmail.com wrote:

 +1. This is an important performance fix for Windows-based clusters.
 
 -Jakob
 
 On 22 April 2015 at 03:25, Honghai Chen honghai.c...@microsoft.com
 wrote:
  Fix the issue Sriram mentioned. Code review and jira/KIP updated.
 
  Below are detail description for the scenarios:
  1.If do clear shutdown, the last log file will be truncated to its
 real size since the close() function of FileMessageSet will call trim(),
  2.If crash, then when restart, will go through the process of
 recover() and the last log file will be truncate to its real size, (and
 the position will be moved to end of the file)
  3.When service start and open existing file
  a.Will run the LogSegment constructor which has NO parameter
 preallocate,
  b.Then in FileMessageSet, the end in FileMessageSet will be
 Int.MaxValue, and then
 channel.position(math.min(channel.size().toInt, end)) will make the
 position be end of the file,
  c.If recover needed, the recover function will truncate file to end of
 valid data, and also move the position to it,
 
  4.When service running and need create new log segment and new
 FileMessageSet
 
  a.If preallocate = truei.the end in FileMessageSet will be 0, the
 file size will be initFileSize, and then
 channel.position(math.min(channel.size().toInt, end)) will make the
 position be 0,
 
  b.Else if preallocate = falsei.backward compatible, the end in
 FileMessageSet will be Int.MaxValue, the file size will be 0, and
 then channel.position(math.min(channel.size().toInt, end)) will make
 the position be 0,
 
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+pre
 allocate+to+improve+consume+performance+under+windows+and+some+old+Linux+
 file+system
  https://issues.apache.org/jira/browse/KAFKA-1646
  https://reviews.apache.org/r/33204/diff/2/
 
  Thanks, Honghai Chen
  http://aka.ms/kafka
  http://aka.ms/manifold
 
  -Original Message-
  From: Honghai Chen
  Sent: Wednesday, April 22, 2015 11:12 AM
  To: dev@kafka.apache.org
  Subject: RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume
 performance under windows and some old Linux file system
 
  Hi Sriram,
  One sentence of code missed, will update code review board and
 KIP soon.
  For LogSegment and FileMessageSet, must use different
 constructor function for existing file and new file, then the code 
 channel.position(math.min(channel.size().toInt, end))  will make sure
 the position at end of existing file.
 
  Thanks, Honghai Chen
 
  -Original Message-
  From: Jay Kreps [mailto:jay.kr...@gmail.com]
  Sent: Wednesday, April 22, 2015 5:22 AM
  To: dev@kafka.apache.org
  Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume
 performance under windows and some old Linux file system
 
  My understanding of the patch is that clean shutdown truncates the file
 back to it's true size (and reallocates it on startup). Hard crash is
 handled by the normal recovery which should truncate off the empty
 portion of the file.
 
  On Tue, Apr 21, 2015 at 10:52 AM, Sriram Subramanian 
 srsubraman...@linkedin.com.invalid wrote:
 
  Could you describe how recovery works in this mode? Say, we had a 250
  MB preallocated segment and we wrote till 50MB and crashed. Till what
  point do we recover? Also, on startup, how is the append end pointer
  set even on a clean shutdown? How does the FileChannel end position
  get set to 50 MB instead of 250 MB? The existing code might just work
  for it but explaining that would be useful.
 
  On 4/21/15 9:40 AM, Neha Narkhede n...@confluent.io wrote:
 
  +1

Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-04-23 Thread Sriram Subramanian
+1 

Some information on how this will be tested would be useful.

On 4/23/15 9:33 AM, Jay Kreps jay.kr...@gmail.com wrote:

Yeah if we understand the optimal policy for a config we always want to
set
it automatically. In this case I don't think we do yet, but down the road
that could be nice. I think for now we should consider this option
experimental to give people a chance to try it out.

-Jay

On Wed, Apr 22, 2015 at 7:32 PM, Honghai Chen honghai.c...@microsoft.com
wrote:

 Hi Roshan,
 Use the 'auto' value maybe will break the rule and mess up the
 configuration. @Jay, any thoughts?

 Thanks, Honghai Chen

 -Original Message-
 From: Sriharsha Chintalapani [mailto:harsh...@fastmail.fm]
 Sent: Thursday, April 23, 2015 6:27 AM
 To: dev@kafka.apache.org; Roshan Naik
 Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume
 performance under windows and some old Linux file system

 +1 (non-binding).

 --
 Harsha


 On April 22, 2015 at 2:52:12 PM, Roshan Naik (ros...@hortonworks.com)
 wrote:

 I see that it is safe to keep it this off by default due to some
concerns.
 Eventually, for settings such as this whose 'preferred' value is
platform
 specific (or based on other criteria), it might be worth considering
 having a default value that is not a constant but an 'auto' value ..
When
 kafka boots up it can automatically use the preferred value. Ofcourse it
 would have to documented as to what auto means for a given platform.

 -roshan


 On 4/22/15 1:21 PM, Jakob Homan jgho...@gmail.com wrote:

 +1. This is an important performance fix for Windows-based clusters.
 
 -Jakob
 
 On 22 April 2015 at 03:25, Honghai Chen honghai.c...@microsoft.com
 wrote:
  Fix the issue Sriram mentioned. Code review and jira/KIP updated.
 
  Below are detail description for the scenarios:
  1.If do clear shutdown, the last log file will be truncated to its
 real size since the close() function of FileMessageSet will call
trim(),
  2.If crash, then when restart, will go through the process of
 recover() and the last log file will be truncate to its real size,
(and
 the position will be moved to end of the file)
  3.When service start and open existing file
  a.Will run the LogSegment constructor which has NO parameter
 preallocate,
  b.Then in FileMessageSet, the end in FileMessageSet will be
 Int.MaxValue, and then
 channel.position(math.min(channel.size().toInt, end)) will make the
 position be end of the file,
  c.If recover needed, the recover function will truncate file to end
of
 valid data, and also move the position to it,
 
  4.When service running and need create new log segment and new
 FileMessageSet
 
  a.If preallocate = truei.the end in FileMessageSet will be 0, the
 file size will be initFileSize, and then
 channel.position(math.min(channel.size().toInt, end)) will make the
 position be 0,
 
  b.Else if preallocate = falsei.backward compatible, the end in
 FileMessageSet will be Int.MaxValue, the file size will be 0, and
 then channel.position(math.min(channel.size().toInt, end)) will make
 the position be 0,
 
 
 
 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+pre
 
allocate+to+improve+consume+performance+under+windows+and+some+old+Linu
x+
 file+system
  https://issues.apache.org/jira/browse/KAFKA-1646
  https://reviews.apache.org/r/33204/diff/2/
 
  Thanks, Honghai Chen
  http://aka.ms/kafka
  http://aka.ms/manifold
 
  -Original Message-
  From: Honghai Chen
  Sent: Wednesday, April 22, 2015 11:12 AM
  To: dev@kafka.apache.org
  Subject: RE: [DISCUSS] KIP 20 Enable log preallocate to improve
consume
 performance under windows and some old Linux file system
 
  Hi Sriram,
  One sentence of code missed, will update code review board and
 KIP soon.
  For LogSegment and FileMessageSet, must use different
 constructor function for existing file and new file, then the code 
 channel.position(math.min(channel.size().toInt, end))  will make sure
 the position at end of existing file.
 
  Thanks, Honghai Chen
 
  -Original Message-
  From: Jay Kreps [mailto:jay.kr...@gmail.com]
  Sent: Wednesday, April 22, 2015 5:22 AM
  To: dev@kafka.apache.org
  Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve
consume
 performance under windows and some old Linux file system
 
  My understanding of the patch is that clean shutdown truncates the
file
 back to it's true size (and reallocates it on startup). Hard crash is
 handled by the normal recovery which should truncate off the empty
 portion of the file.
 
  On Tue, Apr 21, 2015 at 10:52 AM, Sriram Subramanian 
 srsubraman...@linkedin.com.invalid wrote:
 
  Could you describe how recovery works in this mode? Say, we had a
250
  MB preallocated segment and we wrote till 50MB and crashed. Till
what
  point do we recover? Also, on startup, how is the append end pointer
  set even on a clean shutdown? How does the FileChannel end position
  get set to 50 MB instead of 250 MB? The existing code might

RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-04-22 Thread Honghai Chen
Hi Roshan,
Use the 'auto' value maybe will break the rule and mess up the 
configuration. @Jay, any thoughts?

Thanks, Honghai Chen 

-Original Message-
From: Sriharsha Chintalapani [mailto:harsh...@fastmail.fm] 
Sent: Thursday, April 23, 2015 6:27 AM
To: dev@kafka.apache.org; Roshan Naik
Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume 
performance under windows and some old Linux file system

+1 (non-binding).

-- 
Harsha


On April 22, 2015 at 2:52:12 PM, Roshan Naik (ros...@hortonworks.com) wrote:

I see that it is safe to keep it this off by default due to some concerns.  
Eventually, for settings such as this whose 'preferred' value is platform  
specific (or based on other criteria), it might be worth considering  
having a default value that is not a constant but an 'auto' value .. When  
kafka boots up it can automatically use the preferred value. Ofcourse it  
would have to documented as to what auto means for a given platform.  

-roshan  


On 4/22/15 1:21 PM, Jakob Homan jgho...@gmail.com wrote:  

+1. This is an important performance fix for Windows-based clusters.  
  
-Jakob  
  
On 22 April 2015 at 03:25, Honghai Chen honghai.c...@microsoft.com  
wrote:  
 Fix the issue Sriram mentioned. Code review and jira/KIP updated.  
  
 Below are detail description for the scenarios:  
 1.If do clear shutdown, the last log file will be truncated to its  
real size since the close() function of FileMessageSet will call trim(),  
 2.If crash, then when restart, will go through the process of  
recover() and the last log file will be truncate to its real size, (and  
the position will be moved to end of the file)  
 3.When service start and open existing file  
 a.Will run the LogSegment constructor which has NO parameter  
preallocate,  
 b.Then in FileMessageSet, the end in FileMessageSet will be  
Int.MaxValue, and then  
channel.position(math.min(channel.size().toInt, end)) will make the  
position be end of the file,  
 c.If recover needed, the recover function will truncate file to end of  
valid data, and also move the position to it,  
  
 4.When service running and need create new log segment and new  
FileMessageSet  
  
 a.If preallocate = truei.the end in FileMessageSet will be 0, the  
file size will be initFileSize, and then  
channel.position(math.min(channel.size().toInt, end)) will make the  
position be 0,  
  
 b.Else if preallocate = falsei.backward compatible, the end in  
FileMessageSet will be Int.MaxValue, the file size will be 0, and  
then channel.position(math.min(channel.size().toInt, end)) will make  
the position be 0,  
  
  
https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+pre  
allocate+to+improve+consume+performance+under+windows+and+some+old+Linux+  
file+system  
 https://issues.apache.org/jira/browse/KAFKA-1646  
 https://reviews.apache.org/r/33204/diff/2/  
  
 Thanks, Honghai Chen  
 http://aka.ms/kafka  
 http://aka.ms/manifold  
  
 -Original Message-  
 From: Honghai Chen  
 Sent: Wednesday, April 22, 2015 11:12 AM  
 To: dev@kafka.apache.org  
 Subject: RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume  
performance under windows and some old Linux file system  
  
 Hi Sriram,  
 One sentence of code missed, will update code review board and  
KIP soon.  
 For LogSegment and FileMessageSet, must use different  
constructor function for existing file and new file, then the code   
channel.position(math.min(channel.size().toInt, end))  will make sure  
the position at end of existing file.  
  
 Thanks, Honghai Chen  
  
 -Original Message-  
 From: Jay Kreps [mailto:jay.kr...@gmail.com]  
 Sent: Wednesday, April 22, 2015 5:22 AM  
 To: dev@kafka.apache.org  
 Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume  
performance under windows and some old Linux file system  
  
 My understanding of the patch is that clean shutdown truncates the file  
back to it's true size (and reallocates it on startup). Hard crash is  
handled by the normal recovery which should truncate off the empty  
portion of the file.  
  
 On Tue, Apr 21, 2015 at 10:52 AM, Sriram Subramanian   
srsubraman...@linkedin.com.invalid wrote:  
  
 Could you describe how recovery works in this mode? Say, we had a 250  
 MB preallocated segment and we wrote till 50MB and crashed. Till what  
 point do we recover? Also, on startup, how is the append end pointer  
 set even on a clean shutdown? How does the FileChannel end position  
 get set to 50 MB instead of 250 MB? The existing code might just work  
 for it but explaining that would be useful.  
  
 On 4/21/15 9:40 AM, Neha Narkhede n...@confluent.io wrote:  
  
 +1. I've tried this on Linux and it helps reduce the spikes in append  
 +(and  
 hence producer) latency for high throughput writes. I am not entirely  
 sure why but my suspicion is that in the absence of preallocation,  
 you see spikes writes need to happen faster

Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-04-22 Thread Sriharsha Chintalapani
+1 (non-binding).

-- 
Harsha


On April 22, 2015 at 2:52:12 PM, Roshan Naik (ros...@hortonworks.com) wrote:

I see that it is safe to keep it this off by default due to some concerns.  
Eventually, for settings such as this whose 'preferred' value is platform  
specific (or based on other criteria), it might be worth considering  
having a default value that is not a constant but an 'auto' value .. When  
kafka boots up it can automatically use the preferred value. Ofcourse it  
would have to documented as to what auto means for a given platform.  

-roshan  


On 4/22/15 1:21 PM, Jakob Homan jgho...@gmail.com wrote:  

+1. This is an important performance fix for Windows-based clusters.  
  
-Jakob  
  
On 22 April 2015 at 03:25, Honghai Chen honghai.c...@microsoft.com  
wrote:  
 Fix the issue Sriram mentioned. Code review and jira/KIP updated.  
  
 Below are detail description for the scenarios:  
 1.If do clear shutdown, the last log file will be truncated to its  
real size since the close() function of FileMessageSet will call trim(),  
 2.If crash, then when restart, will go through the process of  
recover() and the last log file will be truncate to its real size, (and  
the position will be moved to end of the file)  
 3.When service start and open existing file  
 a.Will run the LogSegment constructor which has NO parameter  
preallocate,  
 b.Then in FileMessageSet, the end in FileMessageSet will be  
Int.MaxValue, and then  
channel.position(math.min(channel.size().toInt, end)) will make the  
position be end of the file,  
 c.If recover needed, the recover function will truncate file to end of  
valid data, and also move the position to it,  
  
 4.When service running and need create new log segment and new  
FileMessageSet  
  
 a.If preallocate = truei.the end in FileMessageSet will be 0, the  
file size will be initFileSize, and then  
channel.position(math.min(channel.size().toInt, end)) will make the  
position be 0,  
  
 b.Else if preallocate = falsei.backward compatible, the end in  
FileMessageSet will be Int.MaxValue, the file size will be 0, and  
then channel.position(math.min(channel.size().toInt, end)) will make  
the position be 0,  
  
  
https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+pre  
allocate+to+improve+consume+performance+under+windows+and+some+old+Linux+  
file+system  
 https://issues.apache.org/jira/browse/KAFKA-1646  
 https://reviews.apache.org/r/33204/diff/2/  
  
 Thanks, Honghai Chen  
 http://aka.ms/kafka  
 http://aka.ms/manifold  
  
 -Original Message-  
 From: Honghai Chen  
 Sent: Wednesday, April 22, 2015 11:12 AM  
 To: dev@kafka.apache.org  
 Subject: RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume  
performance under windows and some old Linux file system  
  
 Hi Sriram,  
 One sentence of code missed, will update code review board and  
KIP soon.  
 For LogSegment and FileMessageSet, must use different  
constructor function for existing file and new file, then the code   
channel.position(math.min(channel.size().toInt, end))  will make sure  
the position at end of existing file.  
  
 Thanks, Honghai Chen  
  
 -Original Message-  
 From: Jay Kreps [mailto:jay.kr...@gmail.com]  
 Sent: Wednesday, April 22, 2015 5:22 AM  
 To: dev@kafka.apache.org  
 Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume  
performance under windows and some old Linux file system  
  
 My understanding of the patch is that clean shutdown truncates the file  
back to it's true size (and reallocates it on startup). Hard crash is  
handled by the normal recovery which should truncate off the empty  
portion of the file.  
  
 On Tue, Apr 21, 2015 at 10:52 AM, Sriram Subramanian   
srsubraman...@linkedin.com.invalid wrote:  
  
 Could you describe how recovery works in this mode? Say, we had a 250  
 MB preallocated segment and we wrote till 50MB and crashed. Till what  
 point do we recover? Also, on startup, how is the append end pointer  
 set even on a clean shutdown? How does the FileChannel end position  
 get set to 50 MB instead of 250 MB? The existing code might just work  
 for it but explaining that would be useful.  
  
 On 4/21/15 9:40 AM, Neha Narkhede n...@confluent.io wrote:  
  
 +1. I've tried this on Linux and it helps reduce the spikes in append  
 +(and  
 hence producer) latency for high throughput writes. I am not entirely  
 sure why but my suspicion is that in the absence of preallocation,  
 you see spikes writes need to happen faster than the time it takes  
 Linux to allocate the next block to the file.  
   
 It will be great to see some performance test results too.  
   
 On Tue, Apr 21, 2015 at 9:23 AM, Jay Kreps jay.kr...@gmail.com  
wrote:  
   
  I'm also +1 on this. The change is quite small and may actually  
 help perf on Linux as well (we've never tried this).  
   
  I have a lot of concerns on testing the various failure conditions  
  but I think since

Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-04-22 Thread Jakob Homan
+1.  This is an important performance fix for Windows-based clusters.

-Jakob

On 22 April 2015 at 03:25, Honghai Chen honghai.c...@microsoft.com wrote:
 Fix the issue Sriram mentioned. Code review and jira/KIP updated.

 Below are detail description for the scenarios:
 1.If do clear shutdown,  the last log file will be truncated to its real size 
 since the close() function of FileMessageSet will call trim(),
 2.If crash, then when restart,  will go through the process of recover() and 
 the last log file will be truncate to its real size, (and the position will 
 be moved to end of the file)
 3.When service start and open existing file
 a.Will run the LogSegment constructor which has NO parameter preallocate,
 b.Then in FileMessageSet,  the end in FileMessageSet will be Int.MaxValue,  
  and then channel.position(math.min(channel.size().toInt, end))  will make 
 the position be end of the file,
 c.If recover needed, the recover function will truncate file to end of valid 
 data, and also move the position to it,

 4.When service running and need create new log segment and new FileMessageSet

 a.If preallocate = truei.the end in FileMessageSet will be 0,  the file 
 size will be initFileSize, and then 
 channel.position(math.min(channel.size().toInt, end))  will make the 
 position be 0,

 b.Else if preallocate = falsei.backward compatible, the end in 
 FileMessageSet will be Int.MaxValue, the file size will be 0,  and then 
 channel.position(math.min(channel.size().toInt, end))  will make the 
 position be 0,

 https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+preallocate+to+improve+consume+performance+under+windows+and+some+old+Linux+file+system
 https://issues.apache.org/jira/browse/KAFKA-1646
 https://reviews.apache.org/r/33204/diff/2/

 Thanks, Honghai Chen
 http://aka.ms/kafka
 http://aka.ms/manifold

 -Original Message-
 From: Honghai Chen
 Sent: Wednesday, April 22, 2015 11:12 AM
 To: dev@kafka.apache.org
 Subject: RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume 
 performance under windows and some old Linux file system

 Hi Sriram,
 One sentence of code missed, will update code review board and KIP 
 soon.
 For LogSegment and FileMessageSet, must use different constructor 
 function for existing file and new file, then the code  
 channel.position(math.min(channel.size().toInt, end))  will make sure the 
 position at end of existing file.

 Thanks, Honghai Chen

 -Original Message-
 From: Jay Kreps [mailto:jay.kr...@gmail.com]
 Sent: Wednesday, April 22, 2015 5:22 AM
 To: dev@kafka.apache.org
 Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume 
 performance under windows and some old Linux file system

 My understanding of the patch is that clean shutdown truncates the file back 
 to it's true size (and reallocates it on startup). Hard crash is handled by 
 the normal recovery which should truncate off the empty portion of the file.

 On Tue, Apr 21, 2015 at 10:52 AM, Sriram Subramanian  
 srsubraman...@linkedin.com.invalid wrote:

 Could you describe how recovery works in this mode? Say, we had a 250
 MB preallocated segment and we wrote till 50MB and crashed. Till what
 point do we recover? Also, on startup, how is the append end pointer
 set even on a clean shutdown? How does the FileChannel end position
 get set to 50 MB instead of 250 MB? The existing code might just work
 for it but explaining that would be useful.

 On 4/21/15 9:40 AM, Neha Narkhede n...@confluent.io wrote:

 +1. I've tried this on Linux and it helps reduce the spikes in append
 +(and
 hence producer) latency for high throughput writes. I am not entirely
 sure why but my suspicion is that in the absence of preallocation,
 you see spikes writes need to happen faster than the time it takes
 Linux to allocate the next block to the file.
 
 It will be great to see some performance test results too.
 
 On Tue, Apr 21, 2015 at 9:23 AM, Jay Kreps jay.kr...@gmail.com wrote:
 
  I'm also +1 on this. The change is quite small and may actually
 help perf  on Linux as well (we've never tried this).
 
  I have a lot of concerns on testing the various failure conditions
  but I think since it will be off by default the risk is not too high.
 
  -Jay
 
  On Mon, Apr 20, 2015 at 6:58 PM, Honghai Chen
 honghai.c...@microsoft.com
  wrote:
 
   I wrote a KIP for this after some discussion on KAFKA-1646.
   https://issues.apache.org/jira/browse/KAFKA-1646
  
  
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+
 pre
 allocate+to+improve+consume+performance+under+windows+and+some+old+Linux+
 file+system
   The RB is here: https://reviews.apache.org/r/33204/diff/
  
   Thanks, Honghai
  
  
 
 
 
 
 --
 Thanks,
 Neha




RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-04-22 Thread Honghai Chen
Fix the issue Sriram mentioned. Code review and jira/KIP updated.

Below are detail description for the scenarios:
1.If do clear shutdown,  the last log file will be truncated to its real size 
since the close() function of FileMessageSet will call trim(), 
2.If crash, then when restart,  will go through the process of recover() and 
the last log file will be truncate to its real size, (and the position will be 
moved to end of the file)
3.When service start and open existing file
a.Will run the LogSegment constructor which has NO parameter preallocate, 
b.Then in FileMessageSet,  the end in FileMessageSet will be Int.MaxValue,   
and then channel.position(math.min(channel.size().toInt, end))  will make the 
position be end of the file,
c.If recover needed, the recover function will truncate file to end of valid 
data, and also move the position to it,

4.When service running and need create new log segment and new FileMessageSet

a.If preallocate = truei.the end in FileMessageSet will be 0,  the file size 
will be initFileSize, and then 
channel.position(math.min(channel.size().toInt, end))  will make the position 
be 0,

b.Else if preallocate = falsei.backward compatible, the end in FileMessageSet 
will be Int.MaxValue, the file size will be 0,  and then 
channel.position(math.min(channel.size().toInt, end))  will make the position 
be 0,

https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+preallocate+to+improve+consume+performance+under+windows+and+some+old+Linux+file+system
https://issues.apache.org/jira/browse/KAFKA-1646 
https://reviews.apache.org/r/33204/diff/2/ 

Thanks, Honghai Chen
http://aka.ms/kafka 
http://aka.ms/manifold 

-Original Message-
From: Honghai Chen 
Sent: Wednesday, April 22, 2015 11:12 AM
To: dev@kafka.apache.org
Subject: RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume 
performance under windows and some old Linux file system

Hi Sriram,
One sentence of code missed, will update code review board and KIP soon.
For LogSegment and FileMessageSet, must use different constructor 
function for existing file and new file, then the code  
channel.position(math.min(channel.size().toInt, end))  will make sure the 
position at end of existing file.

Thanks, Honghai Chen 

-Original Message-
From: Jay Kreps [mailto:jay.kr...@gmail.com]
Sent: Wednesday, April 22, 2015 5:22 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume 
performance under windows and some old Linux file system

My understanding of the patch is that clean shutdown truncates the file back to 
it's true size (and reallocates it on startup). Hard crash is handled by the 
normal recovery which should truncate off the empty portion of the file.

On Tue, Apr 21, 2015 at 10:52 AM, Sriram Subramanian  
srsubraman...@linkedin.com.invalid wrote:

 Could you describe how recovery works in this mode? Say, we had a 250 
 MB preallocated segment and we wrote till 50MB and crashed. Till what 
 point do we recover? Also, on startup, how is the append end pointer 
 set even on a clean shutdown? How does the FileChannel end position 
 get set to 50 MB instead of 250 MB? The existing code might just work 
 for it but explaining that would be useful.

 On 4/21/15 9:40 AM, Neha Narkhede n...@confluent.io wrote:

 +1. I've tried this on Linux and it helps reduce the spikes in append 
 +(and
 hence producer) latency for high throughput writes. I am not entirely 
 sure why but my suspicion is that in the absence of preallocation, 
 you see spikes writes need to happen faster than the time it takes 
 Linux to allocate the next block to the file.
 
 It will be great to see some performance test results too.
 
 On Tue, Apr 21, 2015 at 9:23 AM, Jay Kreps jay.kr...@gmail.com wrote:
 
  I'm also +1 on this. The change is quite small and may actually 
 help perf  on Linux as well (we've never tried this).
 
  I have a lot of concerns on testing the various failure conditions 
  but I think since it will be off by default the risk is not too high.
 
  -Jay
 
  On Mon, Apr 20, 2015 at 6:58 PM, Honghai Chen 
 honghai.c...@microsoft.com
  wrote:
 
   I wrote a KIP for this after some discussion on KAFKA-1646.
   https://issues.apache.org/jira/browse/KAFKA-1646
  
  
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+
 pre
 allocate+to+improve+consume+performance+under+windows+and+some+old+Linux+
 file+system
   The RB is here: https://reviews.apache.org/r/33204/diff/
  
   Thanks, Honghai
  
  
 
 
 
 
 --
 Thanks,
 Neha




Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-04-21 Thread Sriram Subramanian
Could you describe how recovery works in this mode? Say, we had a 250 MB
preallocated segment and we wrote till 50MB and crashed. Till what point
do we recover? Also, on startup, how is the append end pointer set even on
a clean shutdown? How does the FileChannel end position get set to 50 MB
instead of 250 MB? The existing code might just work for it but explaining
that would be useful.

On 4/21/15 9:40 AM, Neha Narkhede n...@confluent.io wrote:

+1. I've tried this on Linux and it helps reduce the spikes in append (and
hence producer) latency for high throughput writes. I am not entirely sure
why but my suspicion is that in the absence of preallocation, you see
spikes writes need to happen faster than the time it takes Linux to
allocate the next block to the file.

It will be great to see some performance test results too.

On Tue, Apr 21, 2015 at 9:23 AM, Jay Kreps jay.kr...@gmail.com wrote:

 I'm also +1 on this. The change is quite small and may actually help
perf
 on Linux as well (we've never tried this).

 I have a lot of concerns on testing the various failure conditions but I
 think since it will be off by default the risk is not too high.

 -Jay

 On Mon, Apr 20, 2015 at 6:58 PM, Honghai Chen
honghai.c...@microsoft.com
 wrote:

  I wrote a KIP for this after some discussion on KAFKA-1646.
  https://issues.apache.org/jira/browse/KAFKA-1646
 
 
 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+pre
allocate+to+improve+consume+performance+under+windows+and+some+old+Linux+
file+system
  The RB is here: https://reviews.apache.org/r/33204/diff/
 
  Thanks, Honghai
 
 




-- 
Thanks,
Neha



Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-04-21 Thread Jay Kreps
My understanding of the patch is that clean shutdown truncates the file
back to it's true size (and reallocates it on startup). Hard crash is
handled by the normal recovery which should truncate off the empty portion
of the file.

On Tue, Apr 21, 2015 at 10:52 AM, Sriram Subramanian 
srsubraman...@linkedin.com.invalid wrote:

 Could you describe how recovery works in this mode? Say, we had a 250 MB
 preallocated segment and we wrote till 50MB and crashed. Till what point
 do we recover? Also, on startup, how is the append end pointer set even on
 a clean shutdown? How does the FileChannel end position get set to 50 MB
 instead of 250 MB? The existing code might just work for it but explaining
 that would be useful.

 On 4/21/15 9:40 AM, Neha Narkhede n...@confluent.io wrote:

 +1. I've tried this on Linux and it helps reduce the spikes in append (and
 hence producer) latency for high throughput writes. I am not entirely sure
 why but my suspicion is that in the absence of preallocation, you see
 spikes writes need to happen faster than the time it takes Linux to
 allocate the next block to the file.
 
 It will be great to see some performance test results too.
 
 On Tue, Apr 21, 2015 at 9:23 AM, Jay Kreps jay.kr...@gmail.com wrote:
 
  I'm also +1 on this. The change is quite small and may actually help
 perf
  on Linux as well (we've never tried this).
 
  I have a lot of concerns on testing the various failure conditions but I
  think since it will be off by default the risk is not too high.
 
  -Jay
 
  On Mon, Apr 20, 2015 at 6:58 PM, Honghai Chen
 honghai.c...@microsoft.com
  wrote:
 
   I wrote a KIP for this after some discussion on KAFKA-1646.
   https://issues.apache.org/jira/browse/KAFKA-1646
  
  
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+pre
 allocate+to+improve+consume+performance+under+windows+and+some+old+Linux+
 file+system
   The RB is here: https://reviews.apache.org/r/33204/diff/
  
   Thanks, Honghai
  
  
 
 
 
 
 --
 Thanks,
 Neha




RE: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-04-21 Thread Honghai Chen
Hi Sriram,
One sentence of code missed, will update code review board and KIP soon.
For LogSegment and FileMessageSet, must use different constructor 
function for existing file and new file, then the code  
channel.position(math.min(channel.size().toInt, end))  will make sure the 
position at end of existing file.

Thanks, Honghai Chen 

-Original Message-
From: Jay Kreps [mailto:jay.kr...@gmail.com] 
Sent: Wednesday, April 22, 2015 5:22 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume 
performance under windows and some old Linux file system

My understanding of the patch is that clean shutdown truncates the file back to 
it's true size (and reallocates it on startup). Hard crash is handled by the 
normal recovery which should truncate off the empty portion of the file.

On Tue, Apr 21, 2015 at 10:52 AM, Sriram Subramanian  
srsubraman...@linkedin.com.invalid wrote:

 Could you describe how recovery works in this mode? Say, we had a 250 
 MB preallocated segment and we wrote till 50MB and crashed. Till what 
 point do we recover? Also, on startup, how is the append end pointer 
 set even on a clean shutdown? How does the FileChannel end position 
 get set to 50 MB instead of 250 MB? The existing code might just work 
 for it but explaining that would be useful.

 On 4/21/15 9:40 AM, Neha Narkhede n...@confluent.io wrote:

 +1. I've tried this on Linux and it helps reduce the spikes in append 
 +(and
 hence producer) latency for high throughput writes. I am not entirely 
 sure why but my suspicion is that in the absence of preallocation, 
 you see spikes writes need to happen faster than the time it takes 
 Linux to allocate the next block to the file.
 
 It will be great to see some performance test results too.
 
 On Tue, Apr 21, 2015 at 9:23 AM, Jay Kreps jay.kr...@gmail.com wrote:
 
  I'm also +1 on this. The change is quite small and may actually 
 help perf  on Linux as well (we've never tried this).
 
  I have a lot of concerns on testing the various failure conditions 
  but I think since it will be off by default the risk is not too high.
 
  -Jay
 
  On Mon, Apr 20, 2015 at 6:58 PM, Honghai Chen 
 honghai.c...@microsoft.com
  wrote:
 
   I wrote a KIP for this after some discussion on KAFKA-1646.
   https://issues.apache.org/jira/browse/KAFKA-1646
  
  
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+
 pre
 allocate+to+improve+consume+performance+under+windows+and+some+old+Linux+
 file+system
   The RB is here: https://reviews.apache.org/r/33204/diff/
  
   Thanks, Honghai
  
  
 
 
 
 
 --
 Thanks,
 Neha




Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-04-21 Thread Jay Kreps
I'm also +1 on this. The change is quite small and may actually help perf
on Linux as well (we've never tried this).

I have a lot of concerns on testing the various failure conditions but I
think since it will be off by default the risk is not too high.

-Jay

On Mon, Apr 20, 2015 at 6:58 PM, Honghai Chen honghai.c...@microsoft.com
wrote:

 I wrote a KIP for this after some discussion on KAFKA-1646.
 https://issues.apache.org/jira/browse/KAFKA-1646

 https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+preallocate+to+improve+consume+performance+under+windows+and+some+old+Linux+file+system
 The RB is here: https://reviews.apache.org/r/33204/diff/

 Thanks, Honghai




Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-04-21 Thread Neha Narkhede
+1. I've tried this on Linux and it helps reduce the spikes in append (and
hence producer) latency for high throughput writes. I am not entirely sure
why but my suspicion is that in the absence of preallocation, you see
spikes writes need to happen faster than the time it takes Linux to
allocate the next block to the file.

It will be great to see some performance test results too.

On Tue, Apr 21, 2015 at 9:23 AM, Jay Kreps jay.kr...@gmail.com wrote:

 I'm also +1 on this. The change is quite small and may actually help perf
 on Linux as well (we've never tried this).

 I have a lot of concerns on testing the various failure conditions but I
 think since it will be off by default the risk is not too high.

 -Jay

 On Mon, Apr 20, 2015 at 6:58 PM, Honghai Chen honghai.c...@microsoft.com
 wrote:

  I wrote a KIP for this after some discussion on KAFKA-1646.
  https://issues.apache.org/jira/browse/KAFKA-1646
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+preallocate+to+improve+consume+performance+under+windows+and+some+old+Linux+file+system
  The RB is here: https://reviews.apache.org/r/33204/diff/
 
  Thanks, Honghai
 
 




-- 
Thanks,
Neha


[DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-04-20 Thread Honghai Chen
I wrote a KIP for this after some discussion on KAFKA-1646. 
https://issues.apache.org/jira/browse/KAFKA-1646
https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+preallocate+to+improve+consume+performance+under+windows+and+some+old+Linux+file+system
The RB is here: https://reviews.apache.org/r/33204/diff/

Thanks, Honghai



Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system

2015-04-20 Thread Gwen Shapira
+1 (non-binding)

Sure, makes sense :)
Just make sure the doc for the config includes something like If you
are using Kafka on Windows, you probably want to set it to true, so
people will know how to use it without looking for the JIRA.


On Mon, Apr 20, 2015 at 6:58 PM, Honghai Chen
honghai.c...@microsoft.com wrote:
 I wrote a KIP for this after some discussion on KAFKA-1646. 
 https://issues.apache.org/jira/browse/KAFKA-1646
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-20+-+Enable+log+preallocate+to+improve+consume+performance+under+windows+and+some+old+Linux+file+system
 The RB is here: https://reviews.apache.org/r/33204/diff/

 Thanks, Honghai