Hi Eric, I found increasing the linger.ms to between 50-100 ms significantly increases performance (fewer larger requests instead of many small ones), I'd also increase the batch size and the buffer.memory. Thanks, Jamie
-----Original Message----- From: Eric Owhadi <eric.owh...@esgyn.com> To: users@kafka.apache.org <users@kafka.apache.org> Sent: Wed, 2 Oct 2019 16:42 Subject: poor producing performance with very low CPU utilization? Hi Kafka users, I am new to Kafka and am struggling with getting acceptable producing rate. I am using a cluster of 2 nodes, 40 CPU cores/ 80 if counting hyperthreading. 256GB memory on a 10Gbit network Kafka is installed as part of cloudera parcel, with 5GB java heap. Producer version: Kafka client 2.2.1 Wed Oct 2 07:56:59 PDT 2019 JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.131-0.b11.el6_9.x86_64 Using -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/kafka_kafka-KAFKA_BROKER-c1871edf37153578a6fc7f41462d01d2_pid6908.hprof -XX:OnOutOfMemoryError=/usr/lib64/cmf/service/common/killparent.sh as CSD_JAVA_OPTS Using /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER as conf dir Using scripts/control.sh as process script CONF_DIR=/var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER CMF_CONF_DIR=/etc/cloudera-scm-agent Date: Wed Oct 2 07:56:59 PDT 2019 Host: xxxxx.esgyn.local Pwd: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER CONF_DIR: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER KAFKA_HOME: /opt/cloudera/parcels/KAFKA-4.1.0-1.4.1.0.p0.4/lib/kafka Zookeeper Quorum: xxx.esgyn.local:2181,xxx.esgyn.local:2181,xxx.esgyn.local:2181 Zookeeper Chroot: PORT: 9092 JMX_PORT: 9393 SSL_PORT: 9093 ENABLE_MONITORING: true METRIC_REPORTERS: nl.techop.kafka.KafkaHttpMetricsReporter BROKER_HEAP_SIZE: 5120 BROKER_JAVA_OPTS: -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true BROKER_SSL_ENABLED: false KERBEROS_AUTH_ENABLED: false KAFKA_PRINCIPAL: SECURITY_INTER_BROKER_PROTOCOL: INFERRED AUTHENTICATE_ZOOKEEPER_CONNECTION: true SUPER_USERS: kafka Kafka version found: 2.2.1-kafka4.1.0 Sentry version found: 1.5.1-cdh5.15.0 ZK_PRINCIPAL_NAME: zookeeper Final Zookeeper Quorum is xxx.esgyn.local:2181,xx.esgyn.local:2181,x.esgyn.local:2181 security.inter.broker.protocol inferred as PLAINTEXT LISTENERS=listeners=PLAINTEXT://xxxxx.esgyn.local:9092, I am producing messages of 12016 bytes uncompressed, then snappy compressed by kafka. I am using a topic with 200 partitions, and a custom partitioner that I verified is doing good job at spreading the load on the 2 brokers. My producer config look like: kafkaProps.put("bootstrap.servers","nap052.esgyn.local:9092,localhost:9092"); kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.LongSerializer"); kafkaProps.put("value.serializer","org.trafodion.sql.kafka.SmartpumpCollectorVectorSerializer"); kafkaProps.put("partitioner.class","org.trafodion.sql.kafka.TimeSeriesPartitioner"); kafkaProps.put("compression.type","snappy"); kafkaProps.put("batch.size","65536"); kafkaProps.put("acks", "all"); kafkaProps.put("linger.ms","1"); I tried first doing fire and forget send, thinking I would get best performance. Then I tried synchronous send, and amazingly found that I would get better performance with sync send. However, after 1 or 2 minute of load test, I start getting error on the synchronous send like this: ava.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000 ms has passed since batch creation at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30) at org.trafodion.sql.kafka.TimeseriesEndPoint$customHandler.handle(TimeseriesEndPoint.java:315) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) at org.eclipse.jetty.server.Server.handle(Server.java:505) at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:370) at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:267) at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305) at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103) at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117) at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333) at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310) at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168) at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126) at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:781) at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:917) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000 ms has passed since batch creation So I am suspecting that I am producing too fast, and brokers cannot catch up. I tried bumping up the io thread from default 8 to 40, that did not help. I am getting a producing rate of only about 100 message per seconds, and about 1 Megabyte per seconds according to kafka metrics. The CPU utilization is barely noticeable (3%), network is ridiculously unaffected, and having googled around, this is not the kind of perf I should expect out of my config. I was hoping for at least 10X more if not 100X better. Was my expectations too high, or am I missing something in config that is causing this performance numbers? Some details: I produce using a jetty custom handler that I verified to be super-fast when I am not producing (commenting out the send()), and I am using a single (I also tried with 2) producer reused on all jetty threads. Any help/clue would be much appreciated, Thanks in advance, Eric Owhadi Esgyn Corporation.