Hi Eric, I had the similar issue on my application. One thing I noticed is, metric `kafka_consumer_io_wait_time_avg_seconds` is switching from 2 mins to 6 hours! Average is about 1-2 hours. This metric is a build-in micrometer metric. I'm still googling the source code of this metric to understand better of this metric.
Thanks On Thu, Oct 3, 2019 at 4:54 PM Alexandru Ionita <alexandru.ion...@gmail.com> wrote: > > This might help. > > Try to replicate the configuration this guy is using for benchmarking kafka. > https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines > > Am Do., 3. Okt. 2019 um 22:45 Uhr schrieb Eric Owhadi <eric.owh...@esgyn.com > >: > > > There is a key piece of information that should be critical to guess where > > the problem is: > > > > When I change from ack = all to ack = 1, instead of increasing message/s, > > it actually devises it by half! > > > > As if the problem is about how fast I produce data (given when I use ack 1 > > I assume I block less time in the synchronous send, and therefore my > > producing pump increases. > > > > I wonder if some sort of contention happen when producer populate the 200 > > partition queues when the rate of production is high in the user thread? > > Eric > > > > -----Original Message----- > > From: Eric Owhadi <eric.owh...@esgyn.com> > > Sent: Thursday, October 3, 2019 1:33 PM > > To: users@kafka.apache.org > > Subject: RE: poor producing performance with very low CPU utilization? > > > > External > > > > Hi Eric, > > Thanks a lot for your answer. Please find inline responses: > > > > >>You've given hardware information about your brokers, but I don't think > > you've provided information about the machine your producer is running on. > > >>Have you verified that you're not reaching any caps on your producer's > > machine? > > > > The producer is on the same machine that the broker. Running very quiet, > > 3% CPU when I run my test. So no there is no stress on the producing side > > > > >>I also think you might be hitting the limit of what a single producer is > > capable of pushing through with your current setup. With record size of > > ~12k and the >>default batch size configuration of 64k, you'll only be able > > to send 5 records per batch. The default number of in flight batches is 5. > > > > I have 200 partition on my topic, and the load is well balanced across all > > partition. So the math you are doing should be X200 right? In addition, I > > found that batch size had no effect, and the linger.ms was the triggering > > factor to cause a buffer send. I played with batch size and in flight > > number of request upward, and that had no effect. > > > > >>This means at any given time, you'll only have 25 records in flight per > > connection. I'm assuming your partitions are configured with at least 2 > > replicas. Acks=all >>means your producer is going to wait for the records > > to be fully replicated before considering it complete. > > > > >>Doing the math, you have ~200 records per second, but this is split > > >>between > > >>2 brokers. This means you're producing 100 records per second per broker. > > >>Simplifying a bit to 25 records in flight per broker, that's a latency > > >>of > > >>~250 ms to move around 300kb. At minimum, this includes the time to, > > [compress the batch], [send the batch over the network to the leader], > > [write the batch >>to the leader's log], [fetch the batch over the network > > to the replica], [write the batch to the replica's log], and all of the > > assorted responses to those calls. > > > > given all is local (producer running on same node as broker), and the size > > of my node (80 vcore), I hope I don t need 250ms to do that... > > The equivalent workload on hbase2.0 is 10 to 20X faster (and that include > > same replica config etc). > > > > On Wed, Oct 2, 2019 at 8:38 PM Eric Owhadi <eric.owh...@esgyn.com> wrote: > > > > > > -----Original Message----- > > From: Eric Azama <eazama...@gmail.com> > > Sent: Thursday, October 3, 2019 1:07 PM > > To: users@kafka.apache.org > > Subject: Re: poor producing performance with very low CPU utilization? > > > > External > > > > Hi Eric, > > > > You've given hardware information about your brokers, but I don't think > > you've provided information about the machine your producer is running on. > > Have you verified that you're not reaching any caps on your producer's > > machine? > > > > I also think you might be hitting the limit of what a single producer is > > capable of pushing through with your current setup. With record size of > > ~12k and the default batch size configuration of 64k, you'll only be able > > to send 5 records per batch. The default number of in flight batches is 5. > > This means at any given time, you'll only have 25 records in flight per > > connection. I'm assuming your partitions are configured with at least 2 > > replicas. Acks=all means your producer is going to wait for the records to > > be fully replicated before considering it complete. > > > > Doing the math, you have ~200 records per second, but this is split between > > 2 brokers. This means you're producing 100 records per second per broker. > > Simplifying a bit to 25 records in flight per broker, that's a latency of > > ~250 ms to move around 300kb. At minimum, this includes the time to, > > [compress the batch], [send the batch over the network to the leader], > > [write the batch to the leader's log], [fetch the batch over the network to > > the replica], [write the batch to the replica's log], and all of the > > assorted responses to those calls. > > > > On Wed, Oct 2, 2019 at 8:38 PM Eric Owhadi <eric.owh...@esgyn.com> wrote: > > > > > Hi Jamie, > > > Thanks for the hint. I played with these parameters, and found only > > > linger.ms is playing a significant role for my test case. > > > It is very sensitive and highly non linear. > > > I get these results: > > > Linger.ms message per second > > > 80 100 > > > 84 205 > > > 85 215 -> top > > > 86 213 > > > 90 205 > > > 95 195 > > > 100 187 > > > 200 100 > > > > > > So as you can see, this is very sensitive and one can miss the peek > > easily. > > > However, 200 messages per second for 2 powerful nodes and relatively > > > small message (12016bytes) is still at least 10X bellow what I would > > have hoped. > > > When I see system resources still being barely moving, with cpu at 3%, > > > I am sure something is not right. > > > Regards, > > > Eric > > > > > > -----Original Message----- > > > From: Jamie <jamied...@aol.co.uk.INVALID> > > > Sent: Wednesday, October 2, 2019 4:27 PM > > > To: users@kafka.apache.org > > > Subject: Re: poor producing performance with very low CPU utilization? > > > > > > External > > > > > > Hi Eric, > > > I found increasing the linger.ms to between 50-100 ms significantly > > > increases performance (fewer larger requests instead of many small > > > ones), I'd also increase the batch size and the buffer.memory. > > > Thanks, > > > Jamie > > > > > > > > > -----Original Message----- > > > From: Eric Owhadi <eric.owh...@esgyn.com> > > > To: users@kafka.apache.org <users@kafka.apache.org> > > > Sent: Wed, 2 Oct 2019 16:42 > > > Subject: poor producing performance with very low CPU utilization? > > > > > > Hi Kafka users, > > > I am new to Kafka and am struggling with getting acceptable producing > > rate. > > > I am using a cluster of 2 nodes, 40 CPU cores/ 80 if counting > > > hyperthreading. 256GB memory on a 10Gbit network Kafka is installed as > > > part of cloudera parcel, with 5GB java heap. > > > Producer version: Kafka client 2.2.1 > > > > > > Wed Oct 2 07:56:59 PDT 2019 > > > JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.131-0.b11.el6_9.x86_64 > > > Using -XX:+HeapDumpOnOutOfMemoryError > > > -XX:HeapDumpPath=/tmp/kafka_kafka-KAFKA_BROKER-c1871edf37153578a6fc7f4 > > > 1462d01d2_pid6908.hprof > > > -XX:OnOutOfMemoryError=/usr/lib64/cmf/service/common/killparent.sh as > > > CSD_JAVA_OPTS Using > > > /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER as conf > > > dir Using scripts/control.sh as process script > > > CONF_DIR=/var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER > > > CMF_CONF_DIR=/etc/cloudera-scm-agent > > > > > > Date: Wed Oct 2 07:56:59 PDT 2019 > > > Host: xxxxx.esgyn.local > > > Pwd: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER > > > CONF_DIR: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER > > > KAFKA_HOME: /opt/cloudera/parcels/KAFKA-4.1.0-1.4.1.0.p0.4/lib/kafka > > > Zookeeper Quorum: > > > xxx.esgyn.local:2181,xxx.esgyn.local:2181,xxx.esgyn.local:2181 > > > Zookeeper Chroot: > > > PORT: 9092 > > > JMX_PORT: 9393 > > > SSL_PORT: 9093 > > > ENABLE_MONITORING: true > > > METRIC_REPORTERS: nl.techop.kafka.KafkaHttpMetricsReporter > > > BROKER_HEAP_SIZE: 5120 > > > BROKER_JAVA_OPTS: -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 > > > -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC > > > -Djava.awt.headless=true > > > BROKER_SSL_ENABLED: false > > > KERBEROS_AUTH_ENABLED: false > > > KAFKA_PRINCIPAL: > > > SECURITY_INTER_BROKER_PROTOCOL: INFERRED > > > AUTHENTICATE_ZOOKEEPER_CONNECTION: true > > > SUPER_USERS: kafka > > > Kafka version found: 2.2.1-kafka4.1.0 > > > Sentry version found: 1.5.1-cdh5.15.0 > > > ZK_PRINCIPAL_NAME: zookeeper > > > Final Zookeeper Quorum is > > > xxx.esgyn.local:2181,xx.esgyn.local:2181,x.esgyn.local:2181 > > > security.inter.broker.protocol inferred as PLAINTEXT > > > LISTENERS=listeners=PLAINTEXT://xxxxx.esgyn.local:9092, > > > > > > I am producing messages of 12016 bytes uncompressed, then snappy > > > compressed by kafka. > > > I am using a topic with 200 partitions, and a custom partitioner that > > > I verified is doing good job at spreading the load on the 2 brokers. > > > > > > My producer config look like: > > > > > > > > kafkaProps.put("bootstrap.servers","nap052.esgyn.local:9092,localhost:9092"); > > > kafkaProps.put("key.serializer", > > > "org.apache.kafka.common.serialization.LongSerializer"); > > > > > > kafkaProps.put("value.serializer","org.trafodion.sql.kafka.SmartpumpCo > > > llectorVectorSerializer"); > > > > > > > > kafkaProps.put("partitioner.class","org.trafodion.sql.kafka.TimeSeriesPartitioner"); > > > kafkaProps.put("compression.type","snappy"); > > > kafkaProps.put("batch.size","65536"); > > > kafkaProps.put("acks", "all"); > > > kafkaProps.put("linger.ms","1"); > > > > > > I tried first doing fire and forget send, thinking I would get best > > > performance. > > > Then I tried synchronous send, and amazingly found that I would get > > > better performance with sync send. > > > > > > However, after 1 or 2 minute of load test, I start getting error on > > > the synchronous send like this: > > > ava.util.concurrent.ExecutionException: > > > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) > > > for > > > DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000 ms has passed > > > since batch creation > > > at > > > > > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98) > > > at > > > > > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67) > > > at > > > > > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30) > > > at > > > > > org.trafodion.sql.kafka.TimeseriesEndPoint$customHandler.handle(TimeseriesEndPoint.java:315) > > > at > > > > > org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132) > > > at org.eclipse.jetty.server.Server.handle(Server.java:505) > > > at > > > org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:370) > > > at > > > > > org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:267) > > > at org.eclipse.jetty.io > > > .AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305) > > > at org.eclipse.jetty.io > > > .FillInterest.fillable(FillInterest.java:103) > > > at org.eclipse.jetty.io > > > .ChannelEndPoint$2.run(ChannelEndPoint.java:117) > > > at > > > > > org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333) > > > at > > > > > org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310) > > > at > > > > > org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168) > > > at > > > > > org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126) > > > at > > > > > org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366) > > > at > > > > > org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:781) > > > at > > > > > org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:917) > > > at java.lang.Thread.run(Thread.java:748) > > > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 > > > record(s) for DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000 > > > ms has passed since batch creation > > > > > > So I am suspecting that I am producing too fast, and brokers cannot > > > catch up. > > > I tried bumping up the io thread from default 8 to 40, that did not help. > > > > > > I am getting a producing rate of only about 100 message per seconds, > > > and about 1 Megabyte per seconds according to kafka metrics. > > > The CPU utilization is barely noticeable (3%), network is ridiculously > > > unaffected, and having googled around, this is not the kind of perf I > > > should expect out of my config. I was hoping for at least 10X more if > > > not 100X better. Was my expectations too high, or am I missing > > > something in config that is causing this performance numbers? > > > > > > Some details: I produce using a jetty custom handler that I verified > > > to be super-fast when I am not producing (commenting out the send()), > > > and I am using a single (I also tried with 2) producer reused on all > > jetty threads. > > > > > > Any help/clue would be much appreciated, Thanks in advance, Eric > > > Owhadi Esgyn Corporation. > > > > > > > > > > >