[ 
https://issues.apache.org/jira/browse/KAFKA-9156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16969820#comment-16969820
 ] 

shilin Lu commented on KAFKA-9156:
----------------------------------

[~guozhang] i think only use val AtomicReference can't perform as well.

 
{code:java}
// code placeholder
class LazyTimeIndex(@volatile private var _file: File, baseOffset: Long, 
maxIndexSize: Int = -1, writable: Boolean = true) {
  @volatile private var timeIndex: Option[TimeIndex] = None

  def file: File = {
    if (timeIndex.isDefined)
      timeIndex.get.file
    else
      _file
  }

  def file_=(f: File) {
    if (timeIndex.isDefined)
      timeIndex.get.file = f
    else
      _file = f
  }

  def get: TimeIndex = {
    if (timeIndex.isEmpty) {
      this synchronized {
        if (timeIndex.isEmpty) {
          timeIndex = Some(new TimeIndex(_file, baseOffset, maxIndexSize, 
writable))
        }
      }
    }
    timeIndex.get
  }
}{code}
the file will rename by def file_=(f: File) this function. we should keep it 
Multi-thread safe. i think this code can work.i use synchronized because i 
think this function is lightweight. if you think has some problem, i will 
modify it. thanks
{code:java}
// code placeholder
class LazyTimeIndex(@volatile private var _file: File, baseOffset: Long, 
maxIndexSize: Int = -1, writable: Boolean = true) {
  @volatile private var timeIndex: Option[TimeIndex] = None

  def file: File = {
    this synchronized {
      if (timeIndex.isDefined)
        timeIndex.get.file
      else
        _file
    }
  }

  def file_=(f: File) {
    this synchronized {
      if (timeIndex.isDefined)
        timeIndex.get.file = f
      else
        _file = f
    }
  }

  def get: TimeIndex = {
    if (timeIndex.isEmpty) {
      this synchronized {
        if (timeIndex.isEmpty) {
          timeIndex = Some(new TimeIndex(_file, baseOffset, maxIndexSize, 
writable))
        }
      }
    }
    timeIndex.get
  }
}{code}
 

> LazyTimeIndex & LazyOffsetIndex may cause niobufferoverflow in concurrent 
> state
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-9156
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9156
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 2.3.0
>            Reporter: shilin Lu
>            Priority: Critical
>         Attachments: image-2019-11-07-17-42-13-852.png, 
> image-2019-11-07-17-44-05-357.png, image-2019-11-07-17-46-53-650.png
>
>
> !image-2019-11-07-17-42-13-852.png!
> this timeindex get function is not thread safe ,may cause create some 
> timeindex.
> !image-2019-11-07-17-44-05-357.png!
> When create timeindex not exactly one ,may cause mappedbytebuffer position to 
> end. Then write index entry to this mmap file will cause 
> java.nio.BufferOverflowException.
>  
> !image-2019-11-07-17-46-53-650.png!
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to