Jun, Thanks for confirming this. We will use the earliest offset.
-Xiaoyu On Thu, Aug 23, 2012 at 10:37 AM, Jun Rao <jun...@gmail.com> wrote: > Xiaoyu, > > Yes, this is because we rely on the last modified time as a rough estimate > of the offset time, which is not very accurate. In this case, can you fall > back to the earliest offset? > > Thanks, > > Jun > > On Thu, Aug 23, 2012 at 9:49 AM, xiaoyu wang <xiaoyu.w...@gmail.com> > wrote: > > > Hello, > > > > We recently run into problem getting offsets by time for a very low > traffic > > topic. Basically, the traffic is so low that kafka always work on one > > segment file. I checked the source code and found the following. The last > > modified time is always current and therefore isFound is false when it > exit > > the loop and it returns no offsets. > > > > > > > > > ============================================================================= > > package kafka.log > > > > import java.util.concurrent.CopyOnWriteArrayList > > import java.util.concurrent.atomic._ > > import java.text.NumberFormat > > import java.io._ > > import java.nio.channels.FileChannel > > import org.apache.log4j._ > > import kafka.message._ > > import kafka.utils._ > > import kafka.common._ > > import kafka.api.OffsetRequest > > import java.util._ > > .... > > > > def getOffsetsBefore(request: OffsetRequest): Array[Long] = { > > val segsArray = segments.view > > var offsetTimeArray: Array[Tuple2[Long, Long]] = null > > if (segsArray.last.size > 0) > > offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length + > 1) > > else > > offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length) > > > > for (i <- 0 until segsArray.length) > > * offsetTimeArray(i) = (segsArray(i).start, > > segsArray(i).file.lastModified)* > > if (segsArray.last.size > 0) > > offsetTimeArray(segsArray.length) = (segsArray.last.start + > > segsArray.last.messageSet.highWaterMark, SystemTime.milliseconds) > > > > var startIndex = -1 > > request.time match { > > case OffsetRequest.LatestTime => > > startIndex = offsetTimeArray.length - 1 > > case OffsetRequest.EarliestTime => > > startIndex = 0 > > case _ => > > var isFound = false > > if(logger.isDebugEnabled) { > > logger.debug("Offset time array = " + > offsetTimeArray.foreach(o > > => "%d, %d".format(o._1, o._2))) > > } > > startIndex = offsetTimeArray.length - 1 > > while (startIndex >= 0 && !isFound) { > > * if (offsetTimeArray(startIndex)._2 <= request.time)* > > isFound = true > > else > > startIndex -=1 > > } > > } > > > > val retSize = request.maxNumOffsets.min(startIndex + 1) > > val ret = new Array[Long](retSize) > > for (j <- 0 until retSize) { > > ret(j) = offsetTimeArray(startIndex)._1 > > startIndex -= 1 > > } > > ret > > } > > >