[
https://issues.apache.org/jira/browse/KAFKA-12481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Andrei Iatsuk updated KAFKA-12481:
----------------------------------
Description:
*What to do?*
Add _socket.nagle.disable_ parameter to Apache Kafka config like in
[librdkafka|https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md].
*What reason of this improvement?*
A large number of topic-partitions on one broker causes burst of host's
packets/sec metric. The traffic shaper in the cloud ceases to cope with such a
load and causes service degradation.
*How to reproduce?*
# Create Kafka Cluster with 4 brokers. Amount of packet/sec is ~120.
# Add 100 topics with 100 partitions each and replication factor = 3. It is 30k
topic-partitions in total. Amount of packet/sec is ~15k.
{code:java}
import os
for i in range(100):
print(f"create topic 'flower{i}'... ", end="")
cmd = "kafka-topics.sh --create --bootstrap-server {} --topic {} --partitions
{} --replication-factor {}".format("databus.andrei-iatsuk.ec.odkl.ru:9092",
f"flower{i}", 100, 3)
code = os.system(cmd)
print("ok" if code == 0 else "error")
{code}
!Screenshot 2021-03-16 at 21.05.03.png!
# Generate server load by launching next script in 4 terminals. Amount of
packet/sec is ~130k.
{code:java}
import time
from pykafka import KafkaClient
client = KafkaClient(hosts="databus.andrei-iatsuk.ec.odkl.ru:9092")
while True:
for i in range(100):
print(f"sent message to 'flower{i}'")
with client.topics[f"flower{i}"].get_sync_producer() as producer:
for j in range(1000):
producer.produce(str.encode(f'test message {j} in topic flower{i}' *
10))
{code}
!Screenshot 2021-03-13 at 00.44.43.png!
!Screenshot 2021-03-13 at 00.29.10.png!
# Make dump of tcp connections via tcpdump due ~2 sec:
{code:java}
$ tcpdump -i eth1 -w dump.pcap
tcpdump: listening on eth1, link-type EN10MB (Ethernet), capture size 262144
bytes
^C8873886 packets captured
9139050 packets received by filter
265028 packets dropped by kernel
{code}
# Load dump to Wireshark and see statistics: ~99.999% of packets is inter
broker messages, size of packets 40-160 bytes. On screen hosts with IPs
10.16.23.[157-160] is brokers:
!Screenshot 2021-03-14 at 01.46.00.png!
!Screenshot 2021-03-14 at 01.52.01.png!
*How to fix?*
# Add boolean _socket.nagle.disable_ parameter to Apache Kafka config and
provide value to kafka.network.Acceptor.accept(key) method in :
[https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/network/SocketServer.scala#L646]
# For disabled TCP_NODELAY value:
## ~400 packets/s for idle broker (instead ~12k packets/s)
## ~3k packets/s for loaded broker (instead ~150k packets/s)
!Screenshot 2021-03-16 at 21.12.17.png!
was:
*What to do?*
Add _socket.nagle.disable_ parameter to Apache Kafka config like in
[librdkafka|https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md].
*What reason of this improvement?*
A large number of topic-partitions on one broker causes burst of host's
packets/sec metric. The traffic shaper in the cloud ceases to cope with such a
load and causes service degradation.
*How to reproduce?*
# Create Kafka Cluster with 4 brokers. Amount of packet/sec is ~120.
# Add 100 topics with 100 partitions each and replication factor = 3. It is
30k topic-partitions in total. Amount of packet/sec is ~15k.
{code:java}
import os
for i in range(100):
print(f"create topic 'flower{i}'... ", end="")
cmd = "kafka-topics.sh --create --bootstrap-server {} --topic {} --partitions
{} --replication-factor {}".format("databus.andrei-iatsuk.ec.odkl.ru:9092",
f"flower{i}", 100, 3)
code = os.system(cmd)
print("ok" if code == 0 else "error")
{code}
!Screenshot 2021-03-16 at 21.05.03.png!
# Generate server load by launching next script in 4 terminals. Amount of
packet/sec is ~130k.
{code:java}
import time
from pykafka import KafkaClient
client = KafkaClient(hosts="databus.andrei-iatsuk.ec.odkl.ru:9092")
while True:
for i in range(100):
print(f"sent message to 'flower{i}'")
with client.topics[f"flower{i}"].get_sync_producer() as producer:
for j in range(1000):
producer.produce(str.encode(f'test message {j} in topic flower{i}' *
10))
{code}
!Screenshot 2021-03-13 at 00.44.43.png!
!Screenshot 2021-03-13 at 00.29.10.png!
# Make dump of tcp connections via tcpdump due ~2 sec:
{code:java}
$ tcpdump -i eth1 -w dump.pcap
tcpdump: listening on eth1, link-type EN10MB (Ethernet), capture size 262144
bytes
^C8873886 packets captured
9139050 packets received by filter
265028 packets dropped by kernel
{code}
# Load dump to Wireshark and see statistics: ~99.999% of packets is inter
broker messages, size of packets 40-160 bytes. On screen hosts with IPs
10.16.23.[157-160] is brokers:
!Screenshot 2021-03-14 at 01.46.00.png!
!Screenshot 2021-03-14 at 01.52.01.png!
*How to fix?*
# Add boolean _socket.nagle.disable_ parameter to Apache Kafka config and
provide value to kafka.network.Acceptor.accept(key) method in :
[https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/network/SocketServer.scala#L646]
# For disabled TCP_NODELAY value:
## ~400 packets/s for idle broker (instead ~12k packets/s)
## ~3k packets/s for loaded broker (instead ~150k packets/s)
!Screenshot 2021-03-16 at 21.12.17.png!
> Add socket.nagle.disable config to reduce number of packets
> ------------------------------------------------------------
>
> Key: KAFKA-12481
> URL: https://issues.apache.org/jira/browse/KAFKA-12481
> Project: Kafka
> Issue Type: Improvement
> Components: core
> Affects Versions: 0.8.2.2, 0.10.2.2, 0.11.0.3, 1.0.2, 1.1.1, 2.0.1, 2.1.1,
> 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.7.0, 2.6.1
> Reporter: Andrei Iatsuk
> Priority: Major
> Attachments: Screenshot 2021-03-13 at 00.29.10.png, Screenshot
> 2021-03-13 at 00.44.43.png, Screenshot 2021-03-14 at 01.46.00.png, Screenshot
> 2021-03-14 at 01.52.01.png, Screenshot 2021-03-16 at 21.05.03.png, Screenshot
> 2021-03-16 at 21.12.17.png
>
>
> *What to do?*
> Add _socket.nagle.disable_ parameter to Apache Kafka config like in
> [librdkafka|https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md].
> *What reason of this improvement?*
> A large number of topic-partitions on one broker causes burst of host's
> packets/sec metric. The traffic shaper in the cloud ceases to cope with such
> a load and causes service degradation.
> *How to reproduce?*
> # Create Kafka Cluster with 4 brokers. Amount of packet/sec is ~120.
> # Add 100 topics with 100 partitions each and replication factor = 3. It is
> 30k topic-partitions in total. Amount of packet/sec is ~15k.
> {code:java}
> import os
> for i in range(100):
> print(f"create topic 'flower{i}'... ", end="")
> cmd = "kafka-topics.sh --create --bootstrap-server {} --topic {}
> --partitions {} --replication-factor
> {}".format("databus.andrei-iatsuk.ec.odkl.ru:9092", f"flower{i}", 100, 3)
> code = os.system(cmd)
> print("ok" if code == 0 else "error")
> {code}
> !Screenshot 2021-03-16 at 21.05.03.png!
> # Generate server load by launching next script in 4 terminals. Amount of
> packet/sec is ~130k.
> {code:java}
> import time
> from pykafka import KafkaClient
> client = KafkaClient(hosts="databus.andrei-iatsuk.ec.odkl.ru:9092")
> while True:
> for i in range(100):
> print(f"sent message to 'flower{i}'")
> with client.topics[f"flower{i}"].get_sync_producer() as producer:
> for j in range(1000):
> producer.produce(str.encode(f'test message {j} in topic flower{i}' *
> 10))
> {code}
> !Screenshot 2021-03-13 at 00.44.43.png!
> !Screenshot 2021-03-13 at 00.29.10.png!
> # Make dump of tcp connections via tcpdump due ~2 sec:
> {code:java}
> $ tcpdump -i eth1 -w dump.pcap
> tcpdump: listening on eth1, link-type EN10MB (Ethernet), capture size 262144
> bytes
> ^C8873886 packets captured
> 9139050 packets received by filter
> 265028 packets dropped by kernel
> {code}
> # Load dump to Wireshark and see statistics: ~99.999% of packets is inter
> broker messages, size of packets 40-160 bytes. On screen hosts with IPs
> 10.16.23.[157-160] is brokers:
> !Screenshot 2021-03-14 at 01.46.00.png!
> !Screenshot 2021-03-14 at 01.52.01.png!
> *How to fix?*
> # Add boolean _socket.nagle.disable_ parameter to Apache Kafka config and
> provide value to kafka.network.Acceptor.accept(key) method in :
> [https://github.com/apache/kafka/blob/2.4/core/src/main/scala/kafka/network/SocketServer.scala#L646]
> # For disabled TCP_NODELAY value:
> ## ~400 packets/s for idle broker (instead ~12k packets/s)
> ## ~3k packets/s for loaded broker (instead ~150k packets/s)
> !Screenshot 2021-03-16 at 21.12.17.png!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)