[ 
https://issues.apache.org/jira/browse/KAFKA-7504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16649254#comment-16649254
 ] 

Yuto Kawamura commented on KAFKA-7504:
--------------------------------------

At first I added POC patch for this 
[https://github.com/kawamuray/kafka/commit/8fc8f2ab047bc6992b50428317ca0762c67c68e9]
 

Please check it and would like to hear what do you guys think about it.

> Broker performance degradation caused by call of sendfile reading disk in 
> network thread
> ----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-7504
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7504
>             Project: Kafka
>          Issue Type: Improvement
>          Components: core
>    Affects Versions: 0.10.2.1
>            Reporter: Yuto Kawamura
>            Assignee: Yuto Kawamura
>            Priority: Major
>
> h2. Environment
> OS: CentOS6
> Kernel version: 2.6.32-XX
> Kafka version: 0.10.2.1, 0.11.1.2 (but reproduces with latest build from 
> trunk (2.2.0-SNAPSHOT)
> h2. Phenomenon
> Response time of Produce request (99th ~ 99.9th %ile) degrading to 50x ~ 100x 
> more than usual.
> Normally 99th %ile is lower than 20ms, but when this issue occurs it marks 
> 50ms to 200ms.
> At the same time we could see two more things in metrics:
> 1. Disk read coincidence from the volume assigned to log.dirs.
> 2. Raise in network threads utilization (by 
> `kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent`)
> As we didn't see increase of requests in metrics, we suspected blocking in 
> event loop ran by network thread as the cause of raising network thread 
> utilization.
> Reading through Kafka broker source code, we understand that the only disk IO 
> performed in network thread is reading log data through calling sendfile(2) 
> (via FileChannel#transferTo).
> To probe that the calls of sendfile(2) are blocking network thread for some 
> moments, I ran following SystemTap script to inspect duration of sendfile 
> syscalls.
> {code:java}
> # Systemtap script to measure syscall duration
> global s
> global records
> probe syscall.$1 {
>     s[tid()] = gettimeofday_us()
> }
> probe syscall.$1.return {
>     elapsed = gettimeofday_us() - s[tid()]
>     delete s[tid()]
>     records <<< elapsed
> }
> probe end {
>     print(@hist_log(records))
> }{code}
> {code:java}
> $ stap -v syscall-duration.stp sendfile
> # value (us)
> value |---------------------------------------- count
>     0 |                                             0
>     1 |                                            71
>     2 |@@@                                       6171
>    16 |@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@  29472
>    32 |@@@                                       3418
>  2048 |                                             0
> ...
>  8192 |                                             3{code}
> As you can see there were some cases taking more than few milliseconds, 
> implies that it blocks network thread for that long and applying the same 
> latency for all other request/response processing.
> h2. Hypothesis
> Gathering the above observations, I made the following hypothesis.
> Let's say network-thread-1 multiplexing 3 connections.
> - producer-A
> - follower-B (broker replica fetch)
> - consumer-C
> Broker receives requests from each of those clients, [Produce, FetchFollower, 
> FetchConsumer].
> They are processed well by request handler threads, and now the response 
> queue of the network-thread contains 3 responses in following order: 
> [FetchConsumer, Produce, FetchFollower].
> network-thread-1 takes 3 responses and processes them sequentially 
> ([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/SocketServer.scala#L632]).
> Ideally processing of these 3 responses completes in microseconds as in it 
> just copies ready responses into client socket's buffer with non-blocking 
> manner.
> However, Kafka uses sendfile(2) for transferring log data to client sockets. 
> The target data might be in page cache, but old data which has written a bit 
> far ago and never read since then, are likely not.
> If the target data isn't in page cache, kernel first needs to load the target 
> page into cache. This takes more than few milliseconds to tens of 
> milliseconds depending on disk hardware and current load being applied to it.
> Linux kernel doesn't considers the moment loading data from disk into page 
> cache as "blocked", hence it awaits completion of target data loading rather 
> than returning EAGAIN.
> Thus, if this occurs when processing the first FetchConsumer request, the 
> latency from reading disk applies to all following responses as well. Not 
> just those which has been received by network thread but also those in 
> response queue of that network thread.
> h2. Experiment
> For reproducing this issue, I did experiment on our test cluster.
> Kafka cluster consist of 3 broker machines with following spec:
> CPU: Intel(R) Xeon(R) 2.20GHz x 20 cores (HT) * 2
> Memory: 256GiB
> Network: 10Gbps
> Disk: HDD x 12 RAID 1+0
> Essential broker configs are below:
> {code:java}
> num.network.threads=10
> num.io.threads=30
> num.replica.fetchers=3{code}
> And created two topics both with replicas=3, 
> [retention.ms|http://retention.ms/]=2days and min.insync.replicas=2:
> - {{large-topic}}: partitions=60
> - {{regular-topic}}: partitions=30
>  
> Ran producer which produces 1kb data into {{large-topic}} and accumulated to 
> have around 3GiB data for each partition, 60GiB data in total for each broker 
> and stop the producer.
> To simulate the situation that old data dropped from page cache, run {{echo 1 
> > /proc/sys/vm/drop_caches}} in each broker machine.
> Start another producer which produces 512 bytes data into topic 
> {{regular-topic}}.
> Throughput is 56k messages/sec. 99.9th %ile response time for Produce is very 
> good:
> !https://wiki.linecorp.com/download/attachments/1273840268/image2018-10-14_2-3-1.png?version=1&modificationDate=1539450181748&api=v2|height=208!
> !https://wiki.linecorp.com/download/attachments/1273840268/image2018-10-14_2-3-24.png?version=1&modificationDate=1539450205125&api=v2|height=209!
>  
> Then start consumer which reads from head of {{large-topic}} which is already 
> not in page cache.
> Response time of Produce degrade and throughput decreases:
> !https://wiki.linecorp.com/download/attachments/1273840268/image2018-10-14_2-6-13.png?version=1&modificationDate=1539450373678&api=v2|height=250!
>    
> !https://wiki.linecorp.com/download/attachments/1273840268/image2018-10-14_2-8-33.png?version=1&modificationDate=1539450513480&api=v2|height=216!
> Disk read is occurring and network threads got busier:
> !https://wiki.linecorp.com/download/attachments/1273840268/image2018-10-14_2-9-47.png?version=1&modificationDate=1539450588087&api=v2|height=206!
>    
> !https://wiki.linecorp.com/download/attachments/1273840268/image2018-10-14_2-9-58.png?version=1&modificationDate=1539450598971&api=v2|height=250!
> h2. Solution
> For fixing this issue, we've considered several ideas, but finally decided to 
> take following approach.
> The essential problem is blocking in the middle of event loop. If it does 
> something which is not purely computation (like blocking IO), the latency 
> from it directly applies for other irrelevant requests, leaving CPU cores 
> stale which can do work for other requests while awaiting completion of disk 
> IO.
> To fix this, there should be two possible ways:
> A. Make call of sendfile(2) not to wait even on waiting pages loaded from 
> disks to memory.
> B. Make sure that data is always available in memory when the response object 
> passed to network-thread.
> For A. Make call of sendfile(2) not to wait even on waiting pages loaded from 
> disks to memory, we must have to have support from kernel.
> Actually FreeBSD recently added such, nonblocking style sendfile system call 
> cooperating with community: 
> [https://www.nginx.com/blog/nginx-and-netflix-contribute-new-sendfile2-to-freebsd/]
>  
> It might be useful for case like this, but this time it wasn't an option 
> because:
> 1. Linux doesn't have such support in syscall
> 2. Even with kernel support, we can't use it until JDK supports it. Otherwise 
> we have to go towards introducing JNI code.
> So we sought approach B and tried some ways.
> The basic idea is to read the target data once in request handler thread so 
> the page cache of target data gets populated and stays for a while until it 
> gets transferred to client socket through sendfile(2) called from network 
> thread.
> The easiest way is to do read(2) on target data. However this is risky from 
> the overhead point of view as in it would leads overhead of copying data from 
> kernel to userland (which is what Kafka trying to avoid leveraging 
> sendfile(2)). Also allocated buffer for read()ing data would cause heap 
> pressure unless we implement it in optimal way (perhaps using off-heap).
> While we tried several other ways (like calling readahead(2) syscall with 
> checking completion of load calling mincore(2) in loop through JNI), we 
> finally turned out that calling sendfile for target data with setting 
> destination to /dev/null does exactly what we want.
> When calling it setting the destination fd which is pointing /dev/null, linux 
> kernel loads data from disk into page cache, but skips (not exactly the whole 
> part) copying loaded data to the destination device.
> To implement this, we just need Java code as in we can expect 
> FileChannel#transferTo calls sendfile internally.
> We patched our kafka broker to do this warmup, and it perfectly fixed this 
> problem. The patched version is based on 0.10.2.1 but I'm now testing the 
> same patch on latest build from 
> trunk(905f8135078127e08633400277c5829b10971d42) and seems that it works 
> effectively for now.
> To avoid additional overhead calling one more syscall (sendfile for 
> /dev/null) when processing all Fetch requests, I added bit optimization that 
> is skipping the last log segment of topic partitions assuming it contains 
> only newly written data which are still hot and has its data in page cache.
> By this optimization, since most of Fetch requests are requesting data of the 
> tail of topic-partition in our case, just few of Fetch requests less than 1% 
> are the subject of this warmup operation. Hence we haven't seen any overhead 
> in CPU utilization since we deployed this patch to our production system.
>  
> To test its effect in the last experiment scenario, I deployed our patch to 
> experiment cluster and tested again.
> When starting catch up read consumer, network thread got busier again, but 
> utilization was bit lower. And per-client request-time metric showed that 
> this time producer (normal-client) didn't affected:
> !https://wiki.linecorp.com/download/attachments/1273840268/image2018-10-14_2-24-48.png?version=1&modificationDate=1539451488884&api=v2|height=250!
>    
> !https://wiki.linecorp.com/download/attachments/1273840268/image2018-10-14_2-26-29.png?version=1&modificationDate=1539451590155&api=v2|height=202!
> The same amount of disk read was occurring. 
> !https://wiki.linecorp.com/download/attachments/1273840268/image2018-10-14_2-28-30.png?version=1&modificationDate=1539451711083&api=v2|height=210!
> However there was no Produce time degradation and throughput was stable: 
> !https://wiki.linecorp.com/download/attachments/1273840268/image2018-10-14_2-30-2.png?version=1&modificationDate=1539451802301&api=v2|height=204!
>    
> !https://wiki.linecorp.com/download/attachments/1273840268/image2018-10-14_2-30-25.png?version=1&modificationDate=1539451825198&api=v2|height=234!
> h2. Suggestion
> The performance degradation explained in this issue can happen very easy in 
> any Kafka deployments. Either one of following clients can cause Fetch for 
> old data, and almost all clients connecting to the same network thread which 
> processes that Fetch can be affected. 
> - Consumers delaying and processing offset far behind the latest offset.
> - Consumers which starts periodically and consumes accumulated topic data at 
> once.
> - Follower broker which is attempting to restore its replica from current 
> leaders.
> This patch can fix performance degradation in this scenario. However we still 
> have one concern. While the whole patch completes in pure Java code, its 
> semantics heavily assumes underlying implementation in kernel. We haven't 
> tested this in any OSes others than linux, so not sure if this effects well 
> in other platforms as well.
> Hence I think its better to add configuration which controls behavior of 
> kafka broker wether to allow to do this or not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to