Hi Eric, 
I found increasing the linger.ms to between 50-100 ms significantly increases 
performance (fewer larger requests instead of many small ones), I'd also 
increase the batch size and the buffer.memory. 
Thanks, 
Jamie


-----Original Message-----
From: Eric Owhadi <eric.owh...@esgyn.com>
To: users@kafka.apache.org <users@kafka.apache.org>
Sent: Wed, 2 Oct 2019 16:42
Subject: poor producing performance with very low CPU utilization?

Hi Kafka users,
I am new to Kafka and am struggling with getting acceptable producing rate.
I am using a cluster of 2 nodes, 40 CPU cores/ 80 if counting hyperthreading. 
256GB memory on a 10Gbit network
Kafka is installed as part of cloudera parcel, with 5GB java heap.
Producer version: Kafka client 2.2.1

Wed Oct  2 07:56:59 PDT 2019
JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.131-0.b11.el6_9.x86_64
Using -XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp/kafka_kafka-KAFKA_BROKER-c1871edf37153578a6fc7f41462d01d2_pid6908.hprof
 -XX:OnOutOfMemoryError=/usr/lib64/cmf/service/common/killparent.sh as 
CSD_JAVA_OPTS
Using /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER as conf dir
Using scripts/control.sh as process script
CONF_DIR=/var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
CMF_CONF_DIR=/etc/cloudera-scm-agent

Date: Wed Oct  2 07:56:59 PDT 2019
Host: xxxxx.esgyn.local
Pwd: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
CONF_DIR: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
KAFKA_HOME: /opt/cloudera/parcels/KAFKA-4.1.0-1.4.1.0.p0.4/lib/kafka
Zookeeper Quorum: xxx.esgyn.local:2181,xxx.esgyn.local:2181,xxx.esgyn.local:2181
Zookeeper Chroot:
PORT: 9092
JMX_PORT: 9393
SSL_PORT: 9093
ENABLE_MONITORING: true
METRIC_REPORTERS: nl.techop.kafka.KafkaHttpMetricsReporter
BROKER_HEAP_SIZE: 5120
BROKER_JAVA_OPTS: -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 
-XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC 
-Djava.awt.headless=true
BROKER_SSL_ENABLED: false
KERBEROS_AUTH_ENABLED: false
KAFKA_PRINCIPAL:
SECURITY_INTER_BROKER_PROTOCOL: INFERRED
AUTHENTICATE_ZOOKEEPER_CONNECTION: true
SUPER_USERS: kafka
Kafka version found: 2.2.1-kafka4.1.0
Sentry version found: 1.5.1-cdh5.15.0
ZK_PRINCIPAL_NAME: zookeeper
Final Zookeeper Quorum is 
xxx.esgyn.local:2181,xx.esgyn.local:2181,x.esgyn.local:2181
security.inter.broker.protocol inferred as PLAINTEXT
LISTENERS=listeners=PLAINTEXT://xxxxx.esgyn.local:9092,

I am producing messages of 12016 bytes uncompressed, then snappy compressed by 
kafka.
I am using a topic with 200 partitions, and a custom partitioner that I 
verified is doing good job at spreading the load on the 2 brokers.

My producer config look like:
                    
kafkaProps.put("bootstrap.servers","nap052.esgyn.local:9092,localhost:9092");
                    kafkaProps.put("key.serializer", 
"org.apache.kafka.common.serialization.LongSerializer");
              
kafkaProps.put("value.serializer","org.trafodion.sql.kafka.SmartpumpCollectorVectorSerializer");
                    
kafkaProps.put("partitioner.class","org.trafodion.sql.kafka.TimeSeriesPartitioner");
                    kafkaProps.put("compression.type","snappy");
                    kafkaProps.put("batch.size","65536");
                    kafkaProps.put("acks", "all");
                    kafkaProps.put("linger.ms","1");

I tried first doing fire and forget send, thinking I would get best performance.
Then I tried synchronous send, and amazingly found that I would get better 
performance with sync send.

However, after 1 or 2 minute of load test, I start getting error on the 
synchronous send like this:
ava.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000 ms has passed since 
batch creation
        at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)
        at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)
        at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
        at 
org.trafodion.sql.kafka.TimeseriesEndPoint$customHandler.handle(TimeseriesEndPoint.java:315)
        at 
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
        at org.eclipse.jetty.server.Server.handle(Server.java:505)
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:370)
        at 
org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:267)
        at 
org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
        at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)
        at 
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
        at 
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
        at 
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
        at 
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)
        at 
org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)
        at 
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:781)
        at 
org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:917)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 
record(s) for DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000 ms has 
passed since batch creation

So I am suspecting that I am producing too fast, and brokers cannot catch up.
I tried bumping up the io thread from default 8 to 40, that did not help.

I am getting a producing rate of only about 100 message per seconds, and about 
1 Megabyte per seconds according to kafka metrics.
The CPU utilization is barely noticeable (3%), network is ridiculously 
unaffected, and having googled around, this is not the kind of perf I should 
expect out of my config. I was hoping for at least 10X more if not 100X better. 
Was my expectations too high, or am I missing something in config that is 
causing this performance numbers?

Some details: I produce using a jetty custom handler that I verified to be 
super-fast when I am not producing (commenting out the send()), and I am using 
a single (I also tried with 2) producer reused on all jetty threads.

Any help/clue would be much appreciated,
Thanks in advance,
Eric Owhadi
Esgyn Corporation.


Reply via email to