[jira] [Commented] (KAFKA-7504) Broker performance degradation caused by call of sendfile reading disk in network thread

2019-07-02 Thread Allen Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16877175#comment-16877175
 ] 

Allen Wang commented on KAFKA-7504:
---

[~junrao] I tried the configuration you suggested but did not see much 
difference.

(1) I made sure that there are multiple segments for each partition and enough 
data (> 1TB) on each broker.

(2) All client requests went to the same listener and num.network.threads is 
changed to 1.

Once the lagging consumer started, 99% Produce response send time increased 
from 5ms to 35ms. 99% FetchConsumer response send time increased from 10ms to 
600ms.

 

> Broker performance degradation caused by call of sendfile reading disk in 
> network thread
> 
>
> Key: KAFKA-7504
> URL: https://issues.apache.org/jira/browse/KAFKA-7504
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.2.1
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
>Priority: Major
>  Labels: latency, performance
> Attachments: Network_Request_Idle_After_Patch.png, 
> Network_Request_Idle_Per_Before_Patch.png, Response_Times_After_Patch.png, 
> Response_Times_Before_Patch.png, image-2018-10-14-14-18-38-149.png, 
> image-2018-10-14-14-18-57-429.png, image-2018-10-14-14-19-17-395.png, 
> image-2018-10-14-14-19-27-059.png, image-2018-10-14-14-19-41-397.png, 
> image-2018-10-14-14-19-51-823.png, image-2018-10-14-14-20-09-822.png, 
> image-2018-10-14-14-20-19-217.png, image-2018-10-14-14-20-33-500.png, 
> image-2018-10-14-14-20-46-566.png, image-2018-10-14-14-20-57-233.png
>
>
> h2. Environment
> OS: CentOS6
> Kernel version: 2.6.32-XX
>  Kafka version: 0.10.2.1, 0.11.1.2 (but reproduces with latest build from 
> trunk (2.2.0-SNAPSHOT)
> h2. Phenomenon
> Response time of Produce request (99th ~ 99.9th %ile) degrading to 50x ~ 100x 
> more than usual.
>  Normally 99th %ile is lower than 20ms, but when this issue occurs it marks 
> 50ms to 200ms.
> At the same time we could see two more things in metrics:
> 1. Disk read coincidence from the volume assigned to log.dirs.
>  2. Raise in network threads utilization (by 
> `kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent`)
> As we didn't see increase of requests in metrics, we suspected blocking in 
> event loop ran by network thread as the cause of raising network thread 
> utilization.
>  Reading through Kafka broker source code, we understand that the only disk 
> IO performed in network thread is reading log data through calling 
> sendfile(2) (via FileChannel#transferTo).
>  To probe that the calls of sendfile(2) are blocking network thread for some 
> moments, I ran following SystemTap script to inspect duration of sendfile 
> syscalls.
> {code:java}
> # Systemtap script to measure syscall duration
> global s
> global records
> probe syscall.$1 {
> s[tid()] = gettimeofday_us()
> }
> probe syscall.$1.return {
> elapsed = gettimeofday_us() - s[tid()]
> delete s[tid()]
> records <<< elapsed
> }
> probe end {
> print(@hist_log(records))
> }{code}
> {code:java}
> $ stap -v syscall-duration.stp sendfile
> # value (us)
> value | count
> 0 | 0
> 1 |71
> 2 |@@@   6171
>16 |@@@  29472
>32 |@@@   3418
>  2048 | 0
> ...
>  8192 | 3{code}
> As you can see there were some cases taking more than few milliseconds, 
> implies that it blocks network thread for that long and applying the same 
> latency for all other request/response processing.
> h2. Hypothesis
> Gathering the above observations, I made the following hypothesis.
> Let's say network-thread-1 multiplexing 3 connections.
>  - producer-A
>  - follower-B (broker replica fetch)
>  - consumer-C
> Broker receives requests from each of those clients, [Produce, FetchFollower, 
> FetchConsumer].
> They are processed well by request handler threads, and now the response 
> queue of the network-thread contains 3 responses in following order: 
> [FetchConsumer, Produce, FetchFollower].
> network-thread-1 takes 3 responses and processes them sequentially 
> ([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/SocketServer.scala#L632]).
>  Ideally processing of these 3 responses completes in microseconds as in it 
> just copies ready responses into client socket's buffer with non-blocking 
> manner.
>  However, Kafka uses sendfile(2) for transferring log data to client sockets. 
> The target data 

[jira] [Comment Edited] (KAFKA-7504) Broker performance degradation caused by call of sendfile reading disk in network thread

2019-06-07 Thread Allen Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16858934#comment-16858934
 ] 

Allen Wang edited comment on KAFKA-7504 at 6/8/19 2:26 AM:
---

We tested this patch in the following setting:

 

Brokers: 6 brokers with Kafka version 2.2, each with AWS r5.2xl and 6TB EBS 
(st1). Same broker listener (SSL) for both producers and consumers. Replication 
on different listener.

Topic: one topic with 1200 partitions, replication factor 2 and 
min.insync.replicas=1

Producers: ack = 1, message size: 8000 bytes. Aggregated message volume is 
about 50MB/sec bytes-in per broker. Kafka version 2.1. 

Consumers: One consumer group has no lag. One consumer group replays the entire 
data on disk from beginning. Kafka version 2.1.

 

This patch does not seem to have any effect on produce latency during consumer 
replay. Once the consumer starts replay, 99 percentile produce response time 
increased from 3ms to over 5 seconds. This happened with or without the patch.

  


was (Author: allenxwang):
We tested this patch in the following setting:

 

Brokers: 6 brokers with Kafka version 2.2, each with AWS r5.2xl and 6TB EBS 
(st1). Same broker listener (SSL) for both producers and consumers. Replication 
on different listener.

Topic: one topic with 1200 partitions, replication factor 2 and 
min.insync.replicas=1

Producers: ack = 1, message size: 8000 bytes. Aggregated message volume is 
about 50MB/sec bytes-in per broker. Kafka version 2.1. 

Consumers: One consumer group has no lag. One consumer group replays the entire 
data on disk from beginning. Kafka version 2.1.

 

This patch does not seem to have any effect on produce latency during consumer 
replay. Once the consumer starts replay, 99 percentile produce response time 
increased from 3ms to over 5 seconds. This happened with or without the patch.

 

We found in another testing that having separate listeners for consumer and 
producer helped a lot in this case. Producer latency was unaffected during the 
consumer replay.

 

> Broker performance degradation caused by call of sendfile reading disk in 
> network thread
> 
>
> Key: KAFKA-7504
> URL: https://issues.apache.org/jira/browse/KAFKA-7504
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.2.1
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
>Priority: Major
>  Labels: latency, performance
> Attachments: Network_Request_Idle_After_Patch.png, 
> Network_Request_Idle_Per_Before_Patch.png, Response_Times_After_Patch.png, 
> Response_Times_Before_Patch.png, image-2018-10-14-14-18-38-149.png, 
> image-2018-10-14-14-18-57-429.png, image-2018-10-14-14-19-17-395.png, 
> image-2018-10-14-14-19-27-059.png, image-2018-10-14-14-19-41-397.png, 
> image-2018-10-14-14-19-51-823.png, image-2018-10-14-14-20-09-822.png, 
> image-2018-10-14-14-20-19-217.png, image-2018-10-14-14-20-33-500.png, 
> image-2018-10-14-14-20-46-566.png, image-2018-10-14-14-20-57-233.png
>
>
> h2. Environment
> OS: CentOS6
> Kernel version: 2.6.32-XX
>  Kafka version: 0.10.2.1, 0.11.1.2 (but reproduces with latest build from 
> trunk (2.2.0-SNAPSHOT)
> h2. Phenomenon
> Response time of Produce request (99th ~ 99.9th %ile) degrading to 50x ~ 100x 
> more than usual.
>  Normally 99th %ile is lower than 20ms, but when this issue occurs it marks 
> 50ms to 200ms.
> At the same time we could see two more things in metrics:
> 1. Disk read coincidence from the volume assigned to log.dirs.
>  2. Raise in network threads utilization (by 
> `kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent`)
> As we didn't see increase of requests in metrics, we suspected blocking in 
> event loop ran by network thread as the cause of raising network thread 
> utilization.
>  Reading through Kafka broker source code, we understand that the only disk 
> IO performed in network thread is reading log data through calling 
> sendfile(2) (via FileChannel#transferTo).
>  To probe that the calls of sendfile(2) are blocking network thread for some 
> moments, I ran following SystemTap script to inspect duration of sendfile 
> syscalls.
> {code:java}
> # Systemtap script to measure syscall duration
> global s
> global records
> probe syscall.$1 {
> s[tid()] = gettimeofday_us()
> }
> probe syscall.$1.return {
> elapsed = gettimeofday_us() - s[tid()]
> delete s[tid()]
> records <<< elapsed
> }
> probe end {
> print(@hist_log(records))
> }{code}
> {code:java}
> $ stap -v syscall-duration.stp sendfile
> # value (us)
> value | count
> 0 | 0
> 1 | 

[jira] [Commented] (KAFKA-7504) Broker performance degradation caused by call of sendfile reading disk in network thread

2019-06-07 Thread Allen Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16859064#comment-16859064
 ] 

Allen Wang commented on KAFKA-7504:
---

[~junrao] Yes I measured the total response time for produce requests. 

I don't see the improvement for the real time consumer. The 
records-consumed-rate metric showed that it was dramatically reduced when 
replay started.

I need to make a correction on the effect of different listeners for consumer 
and producer. It does not help much either. The real improvement of the produce 
response time is from the consumer quota when consumer replays.

 

> Broker performance degradation caused by call of sendfile reading disk in 
> network thread
> 
>
> Key: KAFKA-7504
> URL: https://issues.apache.org/jira/browse/KAFKA-7504
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.2.1
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
>Priority: Major
>  Labels: latency, performance
> Attachments: Network_Request_Idle_After_Patch.png, 
> Network_Request_Idle_Per_Before_Patch.png, Response_Times_After_Patch.png, 
> Response_Times_Before_Patch.png, image-2018-10-14-14-18-38-149.png, 
> image-2018-10-14-14-18-57-429.png, image-2018-10-14-14-19-17-395.png, 
> image-2018-10-14-14-19-27-059.png, image-2018-10-14-14-19-41-397.png, 
> image-2018-10-14-14-19-51-823.png, image-2018-10-14-14-20-09-822.png, 
> image-2018-10-14-14-20-19-217.png, image-2018-10-14-14-20-33-500.png, 
> image-2018-10-14-14-20-46-566.png, image-2018-10-14-14-20-57-233.png
>
>
> h2. Environment
> OS: CentOS6
> Kernel version: 2.6.32-XX
>  Kafka version: 0.10.2.1, 0.11.1.2 (but reproduces with latest build from 
> trunk (2.2.0-SNAPSHOT)
> h2. Phenomenon
> Response time of Produce request (99th ~ 99.9th %ile) degrading to 50x ~ 100x 
> more than usual.
>  Normally 99th %ile is lower than 20ms, but when this issue occurs it marks 
> 50ms to 200ms.
> At the same time we could see two more things in metrics:
> 1. Disk read coincidence from the volume assigned to log.dirs.
>  2. Raise in network threads utilization (by 
> `kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent`)
> As we didn't see increase of requests in metrics, we suspected blocking in 
> event loop ran by network thread as the cause of raising network thread 
> utilization.
>  Reading through Kafka broker source code, we understand that the only disk 
> IO performed in network thread is reading log data through calling 
> sendfile(2) (via FileChannel#transferTo).
>  To probe that the calls of sendfile(2) are blocking network thread for some 
> moments, I ran following SystemTap script to inspect duration of sendfile 
> syscalls.
> {code:java}
> # Systemtap script to measure syscall duration
> global s
> global records
> probe syscall.$1 {
> s[tid()] = gettimeofday_us()
> }
> probe syscall.$1.return {
> elapsed = gettimeofday_us() - s[tid()]
> delete s[tid()]
> records <<< elapsed
> }
> probe end {
> print(@hist_log(records))
> }{code}
> {code:java}
> $ stap -v syscall-duration.stp sendfile
> # value (us)
> value | count
> 0 | 0
> 1 |71
> 2 |@@@   6171
>16 |@@@  29472
>32 |@@@   3418
>  2048 | 0
> ...
>  8192 | 3{code}
> As you can see there were some cases taking more than few milliseconds, 
> implies that it blocks network thread for that long and applying the same 
> latency for all other request/response processing.
> h2. Hypothesis
> Gathering the above observations, I made the following hypothesis.
> Let's say network-thread-1 multiplexing 3 connections.
>  - producer-A
>  - follower-B (broker replica fetch)
>  - consumer-C
> Broker receives requests from each of those clients, [Produce, FetchFollower, 
> FetchConsumer].
> They are processed well by request handler threads, and now the response 
> queue of the network-thread contains 3 responses in following order: 
> [FetchConsumer, Produce, FetchFollower].
> network-thread-1 takes 3 responses and processes them sequentially 
> ([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/SocketServer.scala#L632]).
>  Ideally processing of these 3 responses completes in microseconds as in it 
> just copies ready responses into client socket's buffer with non-blocking 
> manner.
>  However, Kafka uses sendfile(2) for transferring log data to client sockets. 
> The target data 

[jira] [Commented] (KAFKA-7504) Broker performance degradation caused by call of sendfile reading disk in network thread

2019-06-07 Thread Allen Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16858934#comment-16858934
 ] 

Allen Wang commented on KAFKA-7504:
---

We tested this patch in the following setting:

 

Brokers: 6 brokers with Kafka version 2.2, each with AWS r5.2xl and 6TB EBS 
(st1). Same broker listener (SSL) for both producers and consumers. Replication 
on different listener.

Topic: one topic with 1200 partitions, replication factor 2 and 
min.insync.replicas=1

Producers: ack = 1, message size: 8000 bytes. Aggregated message volume is 
about 50MB/sec bytes-in per broker. Kafka version 2.1. 

Consumers: One consumer group has no lag. One consumer group replays the entire 
data on disk from beginning. Kafka version 2.1.

 

This patch does not seem to have any effect on produce latency during consumer 
replay. Once the consumer starts replay, 99 percentile produce response time 
increased from 3ms to over 5 seconds. This happened with or without the patch.

 

We found in another testing that having separate listeners for consumer and 
producer helped a lot in this case. Producer latency was unaffected during the 
consumer replay.

 

> Broker performance degradation caused by call of sendfile reading disk in 
> network thread
> 
>
> Key: KAFKA-7504
> URL: https://issues.apache.org/jira/browse/KAFKA-7504
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.2.1
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
>Priority: Major
>  Labels: latency, performance
> Attachments: Network_Request_Idle_After_Patch.png, 
> Network_Request_Idle_Per_Before_Patch.png, Response_Times_After_Patch.png, 
> Response_Times_Before_Patch.png, image-2018-10-14-14-18-38-149.png, 
> image-2018-10-14-14-18-57-429.png, image-2018-10-14-14-19-17-395.png, 
> image-2018-10-14-14-19-27-059.png, image-2018-10-14-14-19-41-397.png, 
> image-2018-10-14-14-19-51-823.png, image-2018-10-14-14-20-09-822.png, 
> image-2018-10-14-14-20-19-217.png, image-2018-10-14-14-20-33-500.png, 
> image-2018-10-14-14-20-46-566.png, image-2018-10-14-14-20-57-233.png
>
>
> h2. Environment
> OS: CentOS6
> Kernel version: 2.6.32-XX
>  Kafka version: 0.10.2.1, 0.11.1.2 (but reproduces with latest build from 
> trunk (2.2.0-SNAPSHOT)
> h2. Phenomenon
> Response time of Produce request (99th ~ 99.9th %ile) degrading to 50x ~ 100x 
> more than usual.
>  Normally 99th %ile is lower than 20ms, but when this issue occurs it marks 
> 50ms to 200ms.
> At the same time we could see two more things in metrics:
> 1. Disk read coincidence from the volume assigned to log.dirs.
>  2. Raise in network threads utilization (by 
> `kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent`)
> As we didn't see increase of requests in metrics, we suspected blocking in 
> event loop ran by network thread as the cause of raising network thread 
> utilization.
>  Reading through Kafka broker source code, we understand that the only disk 
> IO performed in network thread is reading log data through calling 
> sendfile(2) (via FileChannel#transferTo).
>  To probe that the calls of sendfile(2) are blocking network thread for some 
> moments, I ran following SystemTap script to inspect duration of sendfile 
> syscalls.
> {code:java}
> # Systemtap script to measure syscall duration
> global s
> global records
> probe syscall.$1 {
> s[tid()] = gettimeofday_us()
> }
> probe syscall.$1.return {
> elapsed = gettimeofday_us() - s[tid()]
> delete s[tid()]
> records <<< elapsed
> }
> probe end {
> print(@hist_log(records))
> }{code}
> {code:java}
> $ stap -v syscall-duration.stp sendfile
> # value (us)
> value | count
> 0 | 0
> 1 |71
> 2 |@@@   6171
>16 |@@@  29472
>32 |@@@   3418
>  2048 | 0
> ...
>  8192 | 3{code}
> As you can see there were some cases taking more than few milliseconds, 
> implies that it blocks network thread for that long and applying the same 
> latency for all other request/response processing.
> h2. Hypothesis
> Gathering the above observations, I made the following hypothesis.
> Let's say network-thread-1 multiplexing 3 connections.
>  - producer-A
>  - follower-B (broker replica fetch)
>  - consumer-C
> Broker receives requests from each of those clients, [Produce, FetchFollower, 
> FetchConsumer].
> They are processed well by request handler threads, and now the response 
> queue of 

[jira] [Commented] (KAFKA-7504) Broker performance degradation caused by call of sendfile reading disk in network thread

2019-05-15 Thread Allen Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16840732#comment-16840732
 ] 

Allen Wang commented on KAFKA-7504:
---

[~junrao] [~ijuma] Any plan to address this performance issue in the trunk? I 
am curious why there has been no PR for this given that there has been a lot of 
discussions and positive results in this Jira.

 

> Broker performance degradation caused by call of sendfile reading disk in 
> network thread
> 
>
> Key: KAFKA-7504
> URL: https://issues.apache.org/jira/browse/KAFKA-7504
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.2.1
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
>Priority: Major
>  Labels: latency, performance
> Attachments: Network_Request_Idle_After_Patch.png, 
> Network_Request_Idle_Per_Before_Patch.png, Response_Times_After_Patch.png, 
> Response_Times_Before_Patch.png, image-2018-10-14-14-18-38-149.png, 
> image-2018-10-14-14-18-57-429.png, image-2018-10-14-14-19-17-395.png, 
> image-2018-10-14-14-19-27-059.png, image-2018-10-14-14-19-41-397.png, 
> image-2018-10-14-14-19-51-823.png, image-2018-10-14-14-20-09-822.png, 
> image-2018-10-14-14-20-19-217.png, image-2018-10-14-14-20-33-500.png, 
> image-2018-10-14-14-20-46-566.png, image-2018-10-14-14-20-57-233.png
>
>
> h2. Environment
> OS: CentOS6
> Kernel version: 2.6.32-XX
>  Kafka version: 0.10.2.1, 0.11.1.2 (but reproduces with latest build from 
> trunk (2.2.0-SNAPSHOT)
> h2. Phenomenon
> Response time of Produce request (99th ~ 99.9th %ile) degrading to 50x ~ 100x 
> more than usual.
>  Normally 99th %ile is lower than 20ms, but when this issue occurs it marks 
> 50ms to 200ms.
> At the same time we could see two more things in metrics:
> 1. Disk read coincidence from the volume assigned to log.dirs.
>  2. Raise in network threads utilization (by 
> `kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent`)
> As we didn't see increase of requests in metrics, we suspected blocking in 
> event loop ran by network thread as the cause of raising network thread 
> utilization.
>  Reading through Kafka broker source code, we understand that the only disk 
> IO performed in network thread is reading log data through calling 
> sendfile(2) (via FileChannel#transferTo).
>  To probe that the calls of sendfile(2) are blocking network thread for some 
> moments, I ran following SystemTap script to inspect duration of sendfile 
> syscalls.
> {code:java}
> # Systemtap script to measure syscall duration
> global s
> global records
> probe syscall.$1 {
> s[tid()] = gettimeofday_us()
> }
> probe syscall.$1.return {
> elapsed = gettimeofday_us() - s[tid()]
> delete s[tid()]
> records <<< elapsed
> }
> probe end {
> print(@hist_log(records))
> }{code}
> {code:java}
> $ stap -v syscall-duration.stp sendfile
> # value (us)
> value | count
> 0 | 0
> 1 |71
> 2 |@@@   6171
>16 |@@@  29472
>32 |@@@   3418
>  2048 | 0
> ...
>  8192 | 3{code}
> As you can see there were some cases taking more than few milliseconds, 
> implies that it blocks network thread for that long and applying the same 
> latency for all other request/response processing.
> h2. Hypothesis
> Gathering the above observations, I made the following hypothesis.
> Let's say network-thread-1 multiplexing 3 connections.
>  - producer-A
>  - follower-B (broker replica fetch)
>  - consumer-C
> Broker receives requests from each of those clients, [Produce, FetchFollower, 
> FetchConsumer].
> They are processed well by request handler threads, and now the response 
> queue of the network-thread contains 3 responses in following order: 
> [FetchConsumer, Produce, FetchFollower].
> network-thread-1 takes 3 responses and processes them sequentially 
> ([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/SocketServer.scala#L632]).
>  Ideally processing of these 3 responses completes in microseconds as in it 
> just copies ready responses into client socket's buffer with non-blocking 
> manner.
>  However, Kafka uses sendfile(2) for transferring log data to client sockets. 
> The target data might be in page cache, but old data which has written a bit 
> far ago and never read since then, are likely not.
>  If the target data isn't in page cache, kernel first needs to load the 
> target page into cache. This takes more than few 

[jira] [Commented] (KAFKA-6774) Improve default groupId behavior in consumer

2019-01-08 Thread Allen Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16737714#comment-16737714
 ] 

Allen Wang commented on KAFKA-6774:
---

Question on this Jira - does it work with Kafka 1.1 broker? From the PR it 
looks like so because there is only client side changes.

 

 

> Improve default groupId behavior in consumer
> 
>
> Key: KAFKA-6774
> URL: https://issues.apache.org/jira/browse/KAFKA-6774
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Vahid Hashemian
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.2.0
>
>
> At the moment, the default groupId in the consumer is "". If you try to use 
> this to subscribe() to a topic, the broker will reject the group as invalid. 
> On the other hand, if you use it with assign(), then the user will be able to 
> fetch and commit offsets using the empty groupId. Probably 99% of the time, 
> this is not what the user expects. Instead you would probably expect that if 
> no groupId is provided, then no committed offsets will be fetched at all and 
> we'll just use the auto reset behavior if we don't have a current position.
> Here are two potential solutions (both requiring a KIP):
> 1. Change the default to null. We will preserve the current behavior for 
> subscribe(). When using assign(), we will not bother fetching committed 
> offsets for the null groupId, and any attempt to commit offsets will raise an 
> error. The user can still use the empty groupId, but they have to specify it 
> explicitly.
> 2. Keep the current default, but change the consumer to treat this value as 
> if it were null as described in option 1. The argument for this behavior is 
> that using the empty groupId to commit offsets is inherently a dangerous 
> practice and should not be permitted. We'd have to convince ourselves that 
> we're fine not needing to allow the empty groupId for backwards compatibility 
> though.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7504) Broker performance degradation caused by call of sendfile reading disk in network thread

2018-10-15 Thread Allen Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650874#comment-16650874
 ] 

Allen Wang commented on KAFKA-7504:
---

[~junrao] Does that mean the patch does not immediately address our issue and 
further work is needed? I was about to give it a try.

 

> Broker performance degradation caused by call of sendfile reading disk in 
> network thread
> 
>
> Key: KAFKA-7504
> URL: https://issues.apache.org/jira/browse/KAFKA-7504
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.2.1
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
>Priority: Major
>  Labels: latency, performance
> Attachments: image-2018-10-14-14-18-38-149.png, 
> image-2018-10-14-14-18-57-429.png, image-2018-10-14-14-19-17-395.png, 
> image-2018-10-14-14-19-27-059.png, image-2018-10-14-14-19-41-397.png, 
> image-2018-10-14-14-19-51-823.png, image-2018-10-14-14-20-09-822.png, 
> image-2018-10-14-14-20-19-217.png, image-2018-10-14-14-20-33-500.png, 
> image-2018-10-14-14-20-46-566.png, image-2018-10-14-14-20-57-233.png
>
>
> h2. Environment
> OS: CentOS6
> Kernel version: 2.6.32-XX
>  Kafka version: 0.10.2.1, 0.11.1.2 (but reproduces with latest build from 
> trunk (2.2.0-SNAPSHOT)
> h2. Phenomenon
> Response time of Produce request (99th ~ 99.9th %ile) degrading to 50x ~ 100x 
> more than usual.
>  Normally 99th %ile is lower than 20ms, but when this issue occurs it marks 
> 50ms to 200ms.
> At the same time we could see two more things in metrics:
> 1. Disk read coincidence from the volume assigned to log.dirs.
>  2. Raise in network threads utilization (by 
> `kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent`)
> As we didn't see increase of requests in metrics, we suspected blocking in 
> event loop ran by network thread as the cause of raising network thread 
> utilization.
>  Reading through Kafka broker source code, we understand that the only disk 
> IO performed in network thread is reading log data through calling 
> sendfile(2) (via FileChannel#transferTo).
>  To probe that the calls of sendfile(2) are blocking network thread for some 
> moments, I ran following SystemTap script to inspect duration of sendfile 
> syscalls.
> {code:java}
> # Systemtap script to measure syscall duration
> global s
> global records
> probe syscall.$1 {
> s[tid()] = gettimeofday_us()
> }
> probe syscall.$1.return {
> elapsed = gettimeofday_us() - s[tid()]
> delete s[tid()]
> records <<< elapsed
> }
> probe end {
> print(@hist_log(records))
> }{code}
> {code:java}
> $ stap -v syscall-duration.stp sendfile
> # value (us)
> value | count
> 0 | 0
> 1 |71
> 2 |@@@   6171
>16 |@@@  29472
>32 |@@@   3418
>  2048 | 0
> ...
>  8192 | 3{code}
> As you can see there were some cases taking more than few milliseconds, 
> implies that it blocks network thread for that long and applying the same 
> latency for all other request/response processing.
> h2. Hypothesis
> Gathering the above observations, I made the following hypothesis.
> Let's say network-thread-1 multiplexing 3 connections.
>  - producer-A
>  - follower-B (broker replica fetch)
>  - consumer-C
> Broker receives requests from each of those clients, [Produce, FetchFollower, 
> FetchConsumer].
> They are processed well by request handler threads, and now the response 
> queue of the network-thread contains 3 responses in following order: 
> [FetchConsumer, Produce, FetchFollower].
> network-thread-1 takes 3 responses and processes them sequentially 
> ([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/SocketServer.scala#L632]).
>  Ideally processing of these 3 responses completes in microseconds as in it 
> just copies ready responses into client socket's buffer with non-blocking 
> manner.
>  However, Kafka uses sendfile(2) for transferring log data to client sockets. 
> The target data might be in page cache, but old data which has written a bit 
> far ago and never read since then, are likely not.
>  If the target data isn't in page cache, kernel first needs to load the 
> target page into cache. This takes more than few milliseconds to tens of 
> milliseconds depending on disk hardware and current load being applied to it.
>  Linux kernel doesn't considers the moment loading data from disk into page 
> cache as "blocked", hence it awaits completion 

[jira] [Commented] (KAFKA-7504) Broker performance degradation caused by call of sendfile reading disk in network thread

2018-10-15 Thread Allen Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650486#comment-16650486
 ] 

Allen Wang commented on KAFKA-7504:
---

Great work. We have observed significant produce response time increase when a 
follower is catching up with the leader after the follower is down for a while. 
The leader has a lot of disk read at the time. It seems to be related to this 
issue. I am looking forward to the patch in 1.1 and 2.x branches.

> Broker performance degradation caused by call of sendfile reading disk in 
> network thread
> 
>
> Key: KAFKA-7504
> URL: https://issues.apache.org/jira/browse/KAFKA-7504
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.10.2.1
>Reporter: Yuto Kawamura
>Assignee: Yuto Kawamura
>Priority: Major
>  Labels: latency, performance
> Attachments: image-2018-10-14-14-18-38-149.png, 
> image-2018-10-14-14-18-57-429.png, image-2018-10-14-14-19-17-395.png, 
> image-2018-10-14-14-19-27-059.png, image-2018-10-14-14-19-41-397.png, 
> image-2018-10-14-14-19-51-823.png, image-2018-10-14-14-20-09-822.png, 
> image-2018-10-14-14-20-19-217.png, image-2018-10-14-14-20-33-500.png, 
> image-2018-10-14-14-20-46-566.png, image-2018-10-14-14-20-57-233.png
>
>
> h2. Environment
> OS: CentOS6
> Kernel version: 2.6.32-XX
>  Kafka version: 0.10.2.1, 0.11.1.2 (but reproduces with latest build from 
> trunk (2.2.0-SNAPSHOT)
> h2. Phenomenon
> Response time of Produce request (99th ~ 99.9th %ile) degrading to 50x ~ 100x 
> more than usual.
>  Normally 99th %ile is lower than 20ms, but when this issue occurs it marks 
> 50ms to 200ms.
> At the same time we could see two more things in metrics:
> 1. Disk read coincidence from the volume assigned to log.dirs.
>  2. Raise in network threads utilization (by 
> `kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent`)
> As we didn't see increase of requests in metrics, we suspected blocking in 
> event loop ran by network thread as the cause of raising network thread 
> utilization.
>  Reading through Kafka broker source code, we understand that the only disk 
> IO performed in network thread is reading log data through calling 
> sendfile(2) (via FileChannel#transferTo).
>  To probe that the calls of sendfile(2) are blocking network thread for some 
> moments, I ran following SystemTap script to inspect duration of sendfile 
> syscalls.
> {code:java}
> # Systemtap script to measure syscall duration
> global s
> global records
> probe syscall.$1 {
> s[tid()] = gettimeofday_us()
> }
> probe syscall.$1.return {
> elapsed = gettimeofday_us() - s[tid()]
> delete s[tid()]
> records <<< elapsed
> }
> probe end {
> print(@hist_log(records))
> }{code}
> {code:java}
> $ stap -v syscall-duration.stp sendfile
> # value (us)
> value | count
> 0 | 0
> 1 |71
> 2 |@@@   6171
>16 |@@@  29472
>32 |@@@   3418
>  2048 | 0
> ...
>  8192 | 3{code}
> As you can see there were some cases taking more than few milliseconds, 
> implies that it blocks network thread for that long and applying the same 
> latency for all other request/response processing.
> h2. Hypothesis
> Gathering the above observations, I made the following hypothesis.
> Let's say network-thread-1 multiplexing 3 connections.
>  - producer-A
>  - follower-B (broker replica fetch)
>  - consumer-C
> Broker receives requests from each of those clients, [Produce, FetchFollower, 
> FetchConsumer].
> They are processed well by request handler threads, and now the response 
> queue of the network-thread contains 3 responses in following order: 
> [FetchConsumer, Produce, FetchFollower].
> network-thread-1 takes 3 responses and processes them sequentially 
> ([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/SocketServer.scala#L632]).
>  Ideally processing of these 3 responses completes in microseconds as in it 
> just copies ready responses into client socket's buffer with non-blocking 
> manner.
>  However, Kafka uses sendfile(2) for transferring log data to client sockets. 
> The target data might be in page cache, but old data which has written a bit 
> far ago and never read since then, are likely not.
>  If the target data isn't in page cache, kernel first needs to load the 
> target page into cache. This takes more than few milliseconds to tens of 
> milliseconds depending on disk 

[jira] [Commented] (KAFKA-6514) Add API version as a tag for the RequestsPerSec metric

2018-02-06 Thread Allen Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354831#comment-16354831
 ] 

Allen Wang commented on KAFKA-6514:
---

[~ijuma] Since we don't change the name of the metric, but just adding a tag, 
it shouldn't break existing monitoring system. So I am wondering why a KIP is 
still required in this case. 

For example, I found that when upgrading to Kafka 0.10, "records-consumed-rate" 
metric has an added tag "topic", which makes sense to me. 

I would prefer keeping the same metric name since people are already familiar 
with it and the name makes sense. It would also have minimal impact on the 
storage required on a typical metric aggregation system. 

 

> Add API version as a tag for the RequestsPerSec metric
> --
>
> Key: KAFKA-6514
> URL: https://issues.apache.org/jira/browse/KAFKA-6514
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 1.0.0
>Reporter: Allen Wang
>Priority: Major
>
> After we upgrade broker to a new version, one important insight is to see how 
> many clients have been upgraded so that we can switch the message format when 
> most of the clients have also been updated to the new version to minimize the 
> performance penalty. 
> RequestsPerSec with the version tag will give us that insight.
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6514) Add API version as a tag for the RequestsPerSec metric

2018-01-31 Thread Allen Wang (JIRA)
Allen Wang created KAFKA-6514:
-

 Summary: Add API version as a tag for the RequestsPerSec metric
 Key: KAFKA-6514
 URL: https://issues.apache.org/jira/browse/KAFKA-6514
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 1.0.0
Reporter: Allen Wang


After we upgrade broker to a new version, one important insight is to see how 
many clients have been upgraded so that we can switch the message format when 
most of the clients have also been updated to the new version to minimize the 
performance penalty. 

RequestsPerSec with the version tag will give us that insight.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6329) Load trust store as a resource

2017-12-07 Thread Allen Wang (JIRA)
Allen Wang created KAFKA-6329:
-

 Summary: Load trust store as a resource
 Key: KAFKA-6329
 URL: https://issues.apache.org/jira/browse/KAFKA-6329
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 1.0.0, 0.11.0.0, 0.10.2.0
Reporter: Allen Wang


We would like to publish a Kafka client library with SSL enabled by default and 
distribute to internal applications so that they can communicate to our brokers 
securely. We also need to distribute a trust store with our internal CA cert. 
In our library/application ecosystem, this is the easiest way to enable 
security without adding burdens to each application to deploy a certain trust 
store.

However, that does not seem to be possible as Kafka client assumes that the 
trust store is in a local file system and uses FileInputStream which does not 
work with resources.

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java

Here is the actual line of code:

{code:java}
in = new FileInputStream(path);
{code}

Ideally we would also like to be able to do this as another way to load trust 
store:

{code:java}
in = this.getClass().getResourcesAsStream(resourcePath)
{code}






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5813) Unexpected unclean leader election due to leader/controller's unusual event handling order

2017-08-30 Thread Allen Wang (JIRA)
Allen Wang created KAFKA-5813:
-

 Summary: Unexpected unclean leader election due to 
leader/controller's unusual event handling order 
 Key: KAFKA-5813
 URL: https://issues.apache.org/jira/browse/KAFKA-5813
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.10.2.1
Reporter: Allen Wang
Priority: Minor


We experienced an unexpected unclean leader election after network glitch 
happened on the leader of partition. We have replication factor 2.

Here is the sequence of event gathered from various logs:

1. ZK session timeout happens for leader of partition 
2. New ZK session is established for leader 
3. Leader removes the follower from ISR (which might be caused by replication 
delay due to the network problem) and updates the ISR in ZK 
4. Controller processes the BrokerChangeListener event happened at step 1 where 
the leader seems to be offline 
5. Because the ISR in ZK is already updated by leader to remove the follower, 
controller makes an unclean leader election 
6. Controller processes the second BrokerChangeListener event happened at step 
2 to mark the broker online again

It seems to me that step 4 happens too late. If it happens right after step 1, 
it will be a clean leader election and hopefully the producer will immediately 
switch to the new leader, thus avoiding consumer offset reset. 






--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5724) AbstractPartitionAssignor does not take into consideration that partition number may start from non-zero

2017-08-11 Thread Allen Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16123631#comment-16123631
 ] 

Allen Wang commented on KAFKA-5724:
---

[~hachikuji] Please note that TopicCommand can take the replica assignment as 
an argument to create topic. So I believe it does not require much advanced 
tooling to get into this situation.

> AbstractPartitionAssignor does not take into consideration that partition 
> number may start from non-zero
> 
>
> Key: KAFKA-5724
> URL: https://issues.apache.org/jira/browse/KAFKA-5724
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 0.11.0.0
>Reporter: Allen Wang
>Assignee: huxihx
>
> In AbstractPartitionAssignor.assign(Cluster metadata, Map Subscription> subscriptions), it invokes assign(partitionsPerTopic, 
> subscriptions). It assumes that partition number starts from 0, and it 
> constructs TopicPartition in the range of [0, partitionsPerTopic). 
> This assumption is not correct. The correct way to handle it is to follow the 
> same approach in producer's DefaultPartitioner, where it uses [0, 
> numberOfPartition) as an index to the actual partition.
> There are use cases where partition number may not start from zero. It can 
> happen if users use advanced tooling to manually craft the partition number 
> when creating topics.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5724) AbstractPartitionAssignor does not take into consideration that partition number may start from non-zero

2017-08-10 Thread Allen Wang (JIRA)
Allen Wang created KAFKA-5724:
-

 Summary: AbstractPartitionAssignor does not take into 
consideration that partition number may start from non-zero
 Key: KAFKA-5724
 URL: https://issues.apache.org/jira/browse/KAFKA-5724
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer
Affects Versions: 0.11.0.0
Reporter: Allen Wang


In AbstractPartitionAssignor.assign(Cluster metadata, Map 
subscriptions), it invokes assign(partitionsPerTopic, subscriptions). It 
assumes that partition number starts from 0, and it constructs TopicPartition 
in the range of [0, partitionsPerTopic). 

This assumption is not correct. The correct way to handle it is to follow the 
same approach in producer's DefaultPartitioner, where it uses [0, 
numberOfPartition) as an index to the actual partition.

There are use cases where partition number may not start from zero. It can 
happen if users use advanced tooling to manually craft the partition number 
when creating topics.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)