[ 
https://issues.apache.org/jira/browse/KAFKA-9906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xiang Zhang updated KAFKA-9906:
-------------------------------
    Description: 
I was reading code in LogSegment.scala and I found the code below:

 
{code:java}
def append(largestOffset: Long,
           largestTimestamp: Long,
           shallowOffsetOfMaxTimestamp: Long,
           records: MemoryRecords): Unit = {
  ...
  val appendedBytes = log.append(records)
  if (bytesSinceLastIndexEntry > indexIntervalBytes) {
    offsetIndex.append(largestOffset, physicalPosition)
    timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
    bytesSinceLastIndexEntry = 0
  }
  bytesSinceLastIndexEntry += records.sizeInBytes
}

{code}
when bytesSinceLastIndexEntry > indexIntervalBytes, we update the offsetIndex 
and maybe the timeIndex and set bytesSinceLastIndexEntry to zero, which makes 
sense to me because we just update the index. However, following that, 
bytesSinceLastIndexEntry is incremented by records.sizeInBytes, which I find 
confusing since the records are appended before the index are updated. Maybe it 
should work like this :
{code:java}
if (bytesSinceLastIndexEntry > indexIntervalBytes) {
  offsetIndex.append(largestOffset, physicalPosition)
  timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
  bytesSinceLastIndexEntry = 0
} else {
  bytesSinceLastIndexEntry += records.sizeInBytes
}{code}
 Sorry if I misunderstood this.

  was:I was reading code in LogSegment.scala and I found the code in 


> Is bytesSinceLastIndexEntry updated correctly when LogSegment.append() is 
> called ?
> ----------------------------------------------------------------------------------
>
>                 Key: KAFKA-9906
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9906
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Xiang Zhang
>            Priority: Major
>
> I was reading code in LogSegment.scala and I found the code below:
>  
> {code:java}
> def append(largestOffset: Long,
>            largestTimestamp: Long,
>            shallowOffsetOfMaxTimestamp: Long,
>            records: MemoryRecords): Unit = {
>   ...
>   val appendedBytes = log.append(records)
>   if (bytesSinceLastIndexEntry > indexIntervalBytes) {
>     offsetIndex.append(largestOffset, physicalPosition)
>     timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
>     bytesSinceLastIndexEntry = 0
>   }
>   bytesSinceLastIndexEntry += records.sizeInBytes
> }
> {code}
> when bytesSinceLastIndexEntry > indexIntervalBytes, we update the offsetIndex 
> and maybe the timeIndex and set bytesSinceLastIndexEntry to zero, which makes 
> sense to me because we just update the index. However, following that, 
> bytesSinceLastIndexEntry is incremented by records.sizeInBytes, which I find 
> confusing since the records are appended before the index are updated. Maybe 
> it should work like this :
> {code:java}
> if (bytesSinceLastIndexEntry > indexIntervalBytes) {
>   offsetIndex.append(largestOffset, physicalPosition)
>   timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
>   bytesSinceLastIndexEntry = 0
> } else {
>   bytesSinceLastIndexEntry += records.sizeInBytes
> }{code}
>  Sorry if I misunderstood this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to