Jun,

Thanks for confirming this. We will use the earliest offset.


-Xiaoyu

On Thu, Aug 23, 2012 at 10:37 AM, Jun Rao <jun...@gmail.com> wrote:

> Xiaoyu,
>
> Yes, this is because we rely on the last modified time as a rough estimate
> of the offset time, which is not very accurate. In this case, can you fall
> back to the earliest offset?
>
> Thanks,
>
> Jun
>
> On Thu, Aug 23, 2012 at 9:49 AM, xiaoyu wang <xiaoyu.w...@gmail.com>
> wrote:
>
> > Hello,
> >
> > We recently run into problem getting offsets by time for a very low
> traffic
> > topic. Basically, the traffic is so low that kafka always work on one
> > segment file. I checked the source code and found the following. The last
> > modified time is always current and therefore isFound is false when it
> exit
> > the loop and it returns no offsets.
> >
> >
> >
> >
> =============================================================================
> > package kafka.log
> >
> > import java.util.concurrent.CopyOnWriteArrayList
> > import java.util.concurrent.atomic._
> > import java.text.NumberFormat
> > import java.io._
> > import java.nio.channels.FileChannel
> > import org.apache.log4j._
> > import kafka.message._
> > import kafka.utils._
> > import kafka.common._
> > import kafka.api.OffsetRequest
> > import java.util._
> > ....
> >
> >  def getOffsetsBefore(request: OffsetRequest): Array[Long] = {
> >     val segsArray = segments.view
> >     var offsetTimeArray: Array[Tuple2[Long, Long]] = null
> >     if (segsArray.last.size > 0)
> >       offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length +
> 1)
> >     else
> >       offsetTimeArray = new Array[Tuple2[Long, Long]](segsArray.length)
> >
> >     for (i <- 0 until segsArray.length)
> >  *     offsetTimeArray(i) = (segsArray(i).start,
> > segsArray(i).file.lastModified)*
> >     if (segsArray.last.size > 0)
> >       offsetTimeArray(segsArray.length) = (segsArray.last.start +
> > segsArray.last.messageSet.highWaterMark, SystemTime.milliseconds)
> >
> >     var startIndex = -1
> >     request.time match {
> >       case OffsetRequest.LatestTime =>
> >         startIndex = offsetTimeArray.length - 1
> >       case OffsetRequest.EarliestTime =>
> >         startIndex = 0
> >       case _ =>
> >           var isFound = false
> >           if(logger.isDebugEnabled) {
> >             logger.debug("Offset time array = " +
> offsetTimeArray.foreach(o
> > => "%d, %d".format(o._1, o._2)))
> >           }
> >           startIndex = offsetTimeArray.length - 1
> >           while (startIndex >= 0 && !isFound) {
> > *            if (offsetTimeArray(startIndex)._2 <= request.time)*
> >               isFound = true
> >             else
> >               startIndex -=1
> >           }
> >     }
> >
> >     val retSize = request.maxNumOffsets.min(startIndex + 1)
> >     val ret = new Array[Long](retSize)
> >     for (j <- 0 until retSize) {
> >       ret(j) = offsetTimeArray(startIndex)._1
> >       startIndex -= 1
> >     }
> >     ret
> >   }
> >
>

Reply via email to