Hi Jamie,
Thanks for the hint. I played with these parameters, and found only linger.ms 
is playing a significant role for my test case.
It is very sensitive and highly non linear.
I get these results:
Linger.ms       message per second
80              100
84              205
85              215 -> top
86              213
90              205
95              195
100             187
200             100

So as you can see, this is very sensitive and one can miss the peek easily.
However, 200 messages per second for 2 powerful nodes and relatively small 
message (12016bytes) is still at least 10X bellow what I would have hoped. When 
I see system resources still being barely moving, with cpu at 3%, I am sure 
something is not right.
Regards,
Eric

-----Original Message-----
From: Jamie <jamied...@aol.co.uk.INVALID> 
Sent: Wednesday, October 2, 2019 4:27 PM
To: users@kafka.apache.org
Subject: Re: poor producing performance with very low CPU utilization?

External

Hi Eric,
I found increasing the linger.ms to between 50-100 ms significantly increases 
performance (fewer larger requests instead of many small ones), I'd also 
increase the batch size and the buffer.memory.
Thanks,
Jamie


-----Original Message-----
From: Eric Owhadi <eric.owh...@esgyn.com>
To: users@kafka.apache.org <users@kafka.apache.org>
Sent: Wed, 2 Oct 2019 16:42
Subject: poor producing performance with very low CPU utilization?

Hi Kafka users,
I am new to Kafka and am struggling with getting acceptable producing rate.
I am using a cluster of 2 nodes, 40 CPU cores/ 80 if counting hyperthreading. 
256GB memory on a 10Gbit network Kafka is installed as part of cloudera parcel, 
with 5GB java heap.
Producer version: Kafka client 2.2.1

Wed Oct  2 07:56:59 PDT 2019
JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.131-0.b11.el6_9.x86_64
Using -XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp/kafka_kafka-KAFKA_BROKER-c1871edf37153578a6fc7f41462d01d2_pid6908.hprof
 -XX:OnOutOfMemoryError=/usr/lib64/cmf/service/common/killparent.sh as 
CSD_JAVA_OPTS Using 
/var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER as conf dir Using 
scripts/control.sh as process script 
CONF_DIR=/var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
CMF_CONF_DIR=/etc/cloudera-scm-agent

Date: Wed Oct  2 07:56:59 PDT 2019
Host: xxxxx.esgyn.local
Pwd: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
CONF_DIR: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
KAFKA_HOME: /opt/cloudera/parcels/KAFKA-4.1.0-1.4.1.0.p0.4/lib/kafka
Zookeeper Quorum: xxx.esgyn.local:2181,xxx.esgyn.local:2181,xxx.esgyn.local:2181
Zookeeper Chroot:
PORT: 9092
JMX_PORT: 9393
SSL_PORT: 9093
ENABLE_MONITORING: true
METRIC_REPORTERS: nl.techop.kafka.KafkaHttpMetricsReporter
BROKER_HEAP_SIZE: 5120
BROKER_JAVA_OPTS: -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 
-XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC 
-Djava.awt.headless=true
BROKER_SSL_ENABLED: false
KERBEROS_AUTH_ENABLED: false
KAFKA_PRINCIPAL:
SECURITY_INTER_BROKER_PROTOCOL: INFERRED
AUTHENTICATE_ZOOKEEPER_CONNECTION: true
SUPER_USERS: kafka
Kafka version found: 2.2.1-kafka4.1.0
Sentry version found: 1.5.1-cdh5.15.0
ZK_PRINCIPAL_NAME: zookeeper
Final Zookeeper Quorum is 
xxx.esgyn.local:2181,xx.esgyn.local:2181,x.esgyn.local:2181
security.inter.broker.protocol inferred as PLAINTEXT 
LISTENERS=listeners=PLAINTEXT://xxxxx.esgyn.local:9092,

I am producing messages of 12016 bytes uncompressed, then snappy compressed by 
kafka.
I am using a topic with 200 partitions, and a custom partitioner that I 
verified is doing good job at spreading the load on the 2 brokers.

My producer config look like:
                    
kafkaProps.put("bootstrap.servers","nap052.esgyn.local:9092,localhost:9092");
                    kafkaProps.put("key.serializer", 
"org.apache.kafka.common.serialization.LongSerializer");
              
kafkaProps.put("value.serializer","org.trafodion.sql.kafka.SmartpumpCollectorVectorSerializer");
                    
kafkaProps.put("partitioner.class","org.trafodion.sql.kafka.TimeSeriesPartitioner");
                    kafkaProps.put("compression.type","snappy");
                    kafkaProps.put("batch.size","65536");
                    kafkaProps.put("acks", "all");
                    kafkaProps.put("linger.ms","1");

I tried first doing fire and forget send, thinking I would get best performance.
Then I tried synchronous send, and amazingly found that I would get better 
performance with sync send.

However, after 1 or 2 minute of load test, I start getting error on the 
synchronous send like this:
ava.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000 ms has passed since 
batch creation
        at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)
        at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)
        at 
org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
        at 
org.trafodion.sql.kafka.TimeseriesEndPoint$customHandler.handle(TimeseriesEndPoint.java:315)
        at 
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
        at org.eclipse.jetty.server.Server.handle(Server.java:505)
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:370)
        at 
org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:267)
        at 
org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
        at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)
        at 
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
        at 
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
        at 
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
        at 
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)
        at 
org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)
        at 
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:781)
        at 
org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:917)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 
record(s) for DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000 ms has 
passed since batch creation

So I am suspecting that I am producing too fast, and brokers cannot catch up.
I tried bumping up the io thread from default 8 to 40, that did not help.

I am getting a producing rate of only about 100 message per seconds, and about 
1 Megabyte per seconds according to kafka metrics.
The CPU utilization is barely noticeable (3%), network is ridiculously 
unaffected, and having googled around, this is not the kind of perf I should 
expect out of my config. I was hoping for at least 10X more if not 100X better. 
Was my expectations too high, or am I missing something in config that is 
causing this performance numbers?

Some details: I produce using a jetty custom handler that I verified to be 
super-fast when I am not producing (commenting out the send()), and I am using 
a single (I also tried with 2) producer reused on all jetty threads.

Any help/clue would be much appreciated, Thanks in advance, Eric Owhadi Esgyn 
Corporation.


Reply via email to