[ 
https://issues.apache.org/jira/browse/KAFKA-5297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ray Chiang updated KAFKA-5297:
------------------------------
    Component/s: log

> Broker can take a long time to shut down if there are many active log segments
> ------------------------------------------------------------------------------
>
>                 Key: KAFKA-5297
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5297
>             Project: Kafka
>          Issue Type: Bug
>          Components: log
>            Reporter: Kyle Ambroff-Kao
>            Priority: Minor
>              Labels: windows
>         Attachments: LogSegmentBenchmark.java, flame graph of broker during 
> shut down.png, shutdown-flame-graph.png
>
>
> After the changes for KIP-33 were merged, we started noticing that our 
> cluster restart times were quite a bit longer. In some cases it was taking 
> four times as long as expected to do a rolling restart of every broker in the 
> cluster. This meant that doing a deploy to one of our Kafka clusters went 
> from taking about 3 hours to more than 12 hours!
> We looked into this and we have some data from a couple of runs with a 
> sampling profiler. It turns out that it isn't unusual for us to have a broker 
> sit in kafka.log.Log#close for up to 30 minutes if it has been running for 
> several weeks. There are just so many active log segments that it just takes 
> a long time to truncate all of the indexes.
> I've attached a flame graph that was generated from 10 minutes of stack 
> samples collected during shutdown of a broker that took about 30 minutes 
> total to shut down cleanly.
> * About 60% of the time was spent in kafka.log.AbstractIndex#resize, where 
> every index and timeindex file is truncated to the size of the number of 
> entries in that index.
> * Another big chunk of time is spent reading the last entry from the index, 
> which is used to make any final updates to the timeindex file. This is 
> something that can be cached. For a broker that's been running for a long 
> time the bulk of these indexes are not likely to be in the page cache 
> anymore. We cache the largestTimestamp and offsetOfLargestTimestamp in 
> LogSegment, so we could add a cache for this as well.
> Looking at these changes and considering KIP-33, it isn't surprising that the 
> broker shutdown time has increased so dramatically. The extra index plus the 
> extra reads have increased the amount of work performed by 
> kafka.log.Log#close by about 4x (in terms of system calls and potential page 
> faults). Breaking down what this function does:
> # Read the max timestamp from the timeindex. Could lead to a disk seek.
> # Read the max offset from the index. Could lead to a disk seek.
> # Append the timestamp and offset of the most recently written message to the 
> timeindex if it hasn't been written there for some reason.
> # Truncate the index file
> ## Get the position in the index of the last entry written
> ## If on Windows then unmap and close the index
> ## reopen
> ## truncate to the number of entries * entry size. (ftruncate() system call)
> ## mmap()
> ## Set the position back to where it was before the original. Leads to 
> lseek() system call.
> ## Close the newly reopenned and mapped index
> # Same thing as #4 but for the timeindex.
> ## Get the position in the timeindex of the last entry written
> ## If on Windows then unmap and close the timeindex
> ## reopen
> ## truncate to the number of entries * entry size. (ftruncate() system call)
> ## mmap()
> ## Set the position back to where it was before the original. Leads to 
> lseek() system call.
> ## Close the newly reopenned and mapped timeindex
> # Finalize the log segment
> ## Invoke java.nio.channels.FileChannel#force, which leads to a fsync() for 
> that log segment.
> ## Truncate the log segment if it doesn't have enough messages written to 
> fill up the whole thing. Potentially leads to a ftruncate() system call.
> ## Set the position to the end of the segment after truncation. Leads to a 
> lseek() system call.
> ## Close and unmap the channel.
> Looking in to the current implementation of kafka.log.AbstractIndex#resize, 
> it appears to do quite a bit of extra work to avoid keeping an instance of 
> RandomAccessFile around. It has to reopen the file, truncate, mmap(), 
> potentially perform an additional disk seek, all before imediately closing 
> the file.
> You wouldn't think this would amount to much, but I put together a benchmark 
> using jmh to measure the difference between the current code and a new 
> implementation that didn't have to recreate the page mapping during resize(), 
> and the difference is pretty dramatic.
> {noformat}
> Result "currentImplementation":
>   2063.386 ±(99.9%) 81.758 ops/s [Average]
>   (min, avg, max) = (1685.574, 2063.386, 2338.945), stdev = 182.863
>   CI (99.9%): [1981.628, 2145.144] (assumes normal distribution)
> Result "optimizedImplementation":
>   3497.354 ±(99.9%) 31.575 ops/s [Average]
>   (min, avg, max) = (3261.232, 3497.354, 3605.527), stdev = 70.623
>   CI (99.9%): [3465.778, 3528.929] (assumes normal distribution)
> # Run complete. Total time: 00:03:37
> Benchmark                                     Mode  Cnt     Score    Error  
> Units
> LogSegmentBenchmark.currentImplementation    thrpt   60  2063.386 ± 81.758  
> ops/s
> LogSegmentBenchmark.optimizedImplementation  thrpt   60  3497.354 ± 31.575  
> ops/s
> {noformat}
> I ran this benchmark on a Linux workstation. It just measures the throughput 
> of Log#close after 20 segments have been created. Not having to reopen the 
> file amounts to a 70% increase in throughput.
> I think there are two totally valid approaches to making this better:
> * Premptively truncate index files when log rotation happens. Once a log is 
> rotated, jobs could be added to an ExecutorService which truncates indexes so 
> that they don't all have to be truncated on shutdown. The new shutdown code 
> would enqueue all remaining active indexes and then drain the queue.
> * Alternatively we could just add a RandomAccessFile instance variable to 
> AbstractIndex so that it doesn't have to recreate the page mapping on 
> resize(). This means an extra file handle for each segment but that doesn't 
> seem like a big deal to me.
> No matter what we should add a cache for kafka.log.OffsetIndex#lastEntry.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to