Hi Eric,

I had the similar issue on my application. One thing I noticed is,
metric `kafka_consumer_io_wait_time_avg_seconds` is switching from 2
mins to 6 hours! Average is about 1-2 hours. This metric is a build-in
micrometer metric. I'm still googling the source code of this metric
to understand better of this metric.

Thanks

On Thu, Oct 3, 2019 at 4:54 PM Alexandru Ionita
<alexandru.ion...@gmail.com> wrote:
>
> This might help.
>
> Try to replicate the configuration this guy is using for benchmarking kafka.
> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
>
> Am Do., 3. Okt. 2019 um 22:45 Uhr schrieb Eric Owhadi <eric.owh...@esgyn.com
> >:
>
> > There is a key piece of information that should be critical to guess where
> > the problem is:
> >
> > When I change from ack = all to ack = 1, instead of increasing message/s,
> > it actually devises it by half!
> >
> > As if the problem is about how fast I produce data (given when I use ack 1
> > I assume I block less time in the synchronous send, and therefore my
> > producing pump increases.
> >
> > I wonder if some sort of contention happen when producer populate the 200
> > partition queues when the rate of production is high in the user thread?
> > Eric
> >
> > -----Original Message-----
> > From: Eric Owhadi <eric.owh...@esgyn.com>
> > Sent: Thursday, October 3, 2019 1:33 PM
> > To: users@kafka.apache.org
> > Subject: RE: poor producing performance with very low CPU utilization?
> >
> > External
> >
> > Hi Eric,
> > Thanks a lot for your answer. Please find inline responses:
> >
> > >>You've given hardware information about your brokers, but I don't think
> > you've provided information about the machine your producer is running on.
> > >>Have you verified that you're not reaching any caps on your producer's
> > machine?
> >
> > The producer is on the same machine that the broker. Running very quiet,
> > 3% CPU when I run my test. So no there is no stress on the producing side
> >
> > >>I also think you might be hitting the limit of what a single producer is
> > capable of pushing through with your current setup. With record size of
> > ~12k and the >>default batch size configuration of 64k, you'll only be able
> > to send 5 records per batch. The default number of in flight batches is 5.
> >
> > I have 200 partition on my topic, and the load is well balanced across all
> > partition. So the math you are doing should be X200 right? In addition, I
> > found that batch size had no effect, and the linger.ms was the triggering
> > factor to cause a buffer send. I played with batch size and in flight
> > number of request upward, and that had no effect.
> >
> > >>This means at any given time, you'll only have 25 records in flight per
> > connection. I'm assuming your partitions are configured with at least 2
> > replicas. Acks=all >>means your producer is going to wait for the records
> > to be fully replicated before considering it complete.
> >
> > >>Doing the math, you have ~200 records per second, but this is split
> > >>between
> > >>2 brokers. This means you're producing 100 records per second per broker.
> > >>Simplifying a bit to 25 records in flight per broker, that's a latency
> > >>of
> > >>~250 ms to move around 300kb. At minimum, this includes the time to,
> > [compress the batch], [send the batch over the network to the leader],
> > [write the batch >>to the leader's log], [fetch the batch over the network
> > to the replica], [write the batch to the replica's log], and all of the
> > assorted responses to those calls.
> >
> > given all is local (producer running on same node as broker), and the size
> > of my node (80 vcore), I hope I don t need 250ms to do that...
> > The equivalent workload on hbase2.0  is 10 to 20X faster (and that include
> > same replica config etc).
> >
> > On Wed, Oct 2, 2019 at 8:38 PM Eric Owhadi <eric.owh...@esgyn.com> wrote:
> >
> >
> > -----Original Message-----
> > From: Eric Azama <eazama...@gmail.com>
> > Sent: Thursday, October 3, 2019 1:07 PM
> > To: users@kafka.apache.org
> > Subject: Re: poor producing performance with very low CPU utilization?
> >
> > External
> >
> > Hi Eric,
> >
> > You've given hardware information about your brokers, but I don't think
> > you've provided information about the machine your producer is running on.
> > Have you verified that you're not reaching any caps on your producer's
> > machine?
> >
> > I also think you might be hitting the limit of what a single producer is
> > capable of pushing through with your current setup. With record size of
> > ~12k and the default batch size configuration of 64k, you'll only be able
> > to send 5 records per batch. The default number of in flight batches is 5.
> > This means at any given time, you'll only have 25 records in flight per
> > connection. I'm assuming your partitions are configured with at least 2
> > replicas. Acks=all means your producer is going to wait for the records to
> > be fully replicated before considering it complete.
> >
> > Doing the math, you have ~200 records per second, but this is split between
> > 2 brokers. This means you're producing 100 records per second per broker.
> > Simplifying a bit to 25 records in flight per broker, that's a latency of
> > ~250 ms to move around 300kb. At minimum, this includes the time to,
> > [compress the batch], [send the batch over the network to the leader],
> > [write the batch to the leader's log], [fetch the batch over the network to
> > the replica], [write the batch to the replica's log], and all of the
> > assorted responses to those calls.
> >
> > On Wed, Oct 2, 2019 at 8:38 PM Eric Owhadi <eric.owh...@esgyn.com> wrote:
> >
> > > Hi Jamie,
> > > Thanks for the hint. I played with these parameters, and found only
> > > linger.ms is playing a significant role for my test case.
> > > It is very sensitive and highly non linear.
> > > I get these results:
> > > Linger.ms       message per second
> > > 80              100
> > > 84              205
> > > 85              215 -> top
> > > 86              213
> > > 90              205
> > > 95              195
> > > 100             187
> > > 200             100
> > >
> > > So as you can see, this is very sensitive and one can miss the peek
> > easily.
> > > However, 200 messages per second for 2 powerful nodes and relatively
> > > small message (12016bytes) is still at least 10X bellow what I would
> > have hoped.
> > > When I see system resources still being barely moving, with cpu at 3%,
> > > I am sure something is not right.
> > > Regards,
> > > Eric
> > >
> > > -----Original Message-----
> > > From: Jamie <jamied...@aol.co.uk.INVALID>
> > > Sent: Wednesday, October 2, 2019 4:27 PM
> > > To: users@kafka.apache.org
> > > Subject: Re: poor producing performance with very low CPU utilization?
> > >
> > > External
> > >
> > > Hi Eric,
> > > I found increasing the linger.ms to between 50-100 ms significantly
> > > increases performance (fewer larger requests instead of many small
> > > ones), I'd also increase the batch size and the buffer.memory.
> > > Thanks,
> > > Jamie
> > >
> > >
> > > -----Original Message-----
> > > From: Eric Owhadi <eric.owh...@esgyn.com>
> > > To: users@kafka.apache.org <users@kafka.apache.org>
> > > Sent: Wed, 2 Oct 2019 16:42
> > > Subject: poor producing performance with very low CPU utilization?
> > >
> > > Hi Kafka users,
> > > I am new to Kafka and am struggling with getting acceptable producing
> > rate.
> > > I am using a cluster of 2 nodes, 40 CPU cores/ 80 if counting
> > > hyperthreading. 256GB memory on a 10Gbit network Kafka is installed as
> > > part of cloudera parcel, with 5GB java heap.
> > > Producer version: Kafka client 2.2.1
> > >
> > > Wed Oct  2 07:56:59 PDT 2019
> > > JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.131-0.b11.el6_9.x86_64
> > > Using -XX:+HeapDumpOnOutOfMemoryError
> > > -XX:HeapDumpPath=/tmp/kafka_kafka-KAFKA_BROKER-c1871edf37153578a6fc7f4
> > > 1462d01d2_pid6908.hprof
> > > -XX:OnOutOfMemoryError=/usr/lib64/cmf/service/common/killparent.sh as
> > > CSD_JAVA_OPTS Using
> > > /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER as conf
> > > dir Using scripts/control.sh as process script
> > > CONF_DIR=/var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
> > > CMF_CONF_DIR=/etc/cloudera-scm-agent
> > >
> > > Date: Wed Oct  2 07:56:59 PDT 2019
> > > Host: xxxxx.esgyn.local
> > > Pwd: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
> > > CONF_DIR: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
> > > KAFKA_HOME: /opt/cloudera/parcels/KAFKA-4.1.0-1.4.1.0.p0.4/lib/kafka
> > > Zookeeper Quorum:
> > > xxx.esgyn.local:2181,xxx.esgyn.local:2181,xxx.esgyn.local:2181
> > > Zookeeper Chroot:
> > > PORT: 9092
> > > JMX_PORT: 9393
> > > SSL_PORT: 9093
> > > ENABLE_MONITORING: true
> > > METRIC_REPORTERS: nl.techop.kafka.KafkaHttpMetricsReporter
> > > BROKER_HEAP_SIZE: 5120
> > > BROKER_JAVA_OPTS: -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20
> > > -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC
> > > -Djava.awt.headless=true
> > > BROKER_SSL_ENABLED: false
> > > KERBEROS_AUTH_ENABLED: false
> > > KAFKA_PRINCIPAL:
> > > SECURITY_INTER_BROKER_PROTOCOL: INFERRED
> > > AUTHENTICATE_ZOOKEEPER_CONNECTION: true
> > > SUPER_USERS: kafka
> > > Kafka version found: 2.2.1-kafka4.1.0
> > > Sentry version found: 1.5.1-cdh5.15.0
> > > ZK_PRINCIPAL_NAME: zookeeper
> > > Final Zookeeper Quorum is
> > > xxx.esgyn.local:2181,xx.esgyn.local:2181,x.esgyn.local:2181
> > > security.inter.broker.protocol inferred as PLAINTEXT
> > > LISTENERS=listeners=PLAINTEXT://xxxxx.esgyn.local:9092,
> > >
> > > I am producing messages of 12016 bytes uncompressed, then snappy
> > > compressed by kafka.
> > > I am using a topic with 200 partitions, and a custom partitioner that
> > > I verified is doing good job at spreading the load on the 2 brokers.
> > >
> > > My producer config look like:
> > >
> > >
> > kafkaProps.put("bootstrap.servers","nap052.esgyn.local:9092,localhost:9092");
> > >                     kafkaProps.put("key.serializer",
> > > "org.apache.kafka.common.serialization.LongSerializer");
> > >
> > > kafkaProps.put("value.serializer","org.trafodion.sql.kafka.SmartpumpCo
> > > llectorVectorSerializer");
> > >
> > >
> > kafkaProps.put("partitioner.class","org.trafodion.sql.kafka.TimeSeriesPartitioner");
> > >                     kafkaProps.put("compression.type","snappy");
> > >                     kafkaProps.put("batch.size","65536");
> > >                     kafkaProps.put("acks", "all");
> > >                     kafkaProps.put("linger.ms","1");
> > >
> > > I tried first doing fire and forget send, thinking I would get best
> > > performance.
> > > Then I tried synchronous send, and amazingly found that I would get
> > > better performance with sync send.
> > >
> > > However, after 1 or 2 minute of load test, I start getting error on
> > > the synchronous send like this:
> > > ava.util.concurrent.ExecutionException:
> > > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s)
> > > for
> > > DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000 ms has passed
> > > since batch creation
> > >         at
> > >
> > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)
> > >         at
> > >
> > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)
> > >         at
> > >
> > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
> > >         at
> > >
> > org.trafodion.sql.kafka.TimeseriesEndPoint$customHandler.handle(TimeseriesEndPoint.java:315)
> > >         at
> > >
> > org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
> > >         at org.eclipse.jetty.server.Server.handle(Server.java:505)
> > >         at
> > > org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:370)
> > >         at
> > >
> > org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:267)
> > >         at org.eclipse.jetty.io
> > > .AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
> > >         at org.eclipse.jetty.io
> > > .FillInterest.fillable(FillInterest.java:103)
> > >         at org.eclipse.jetty.io
> > > .ChannelEndPoint$2.run(ChannelEndPoint.java:117)
> > >         at
> > >
> > org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
> > >         at
> > >
> > org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
> > >         at
> > >
> > org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
> > >         at
> > >
> > org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)
> > >         at
> > >
> > org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)
> > >         at
> > >
> > org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:781)
> > >         at
> > >
> > org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:917)
> > >         at java.lang.Thread.run(Thread.java:748)
> > > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1
> > > record(s) for DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000
> > > ms has passed since batch creation
> > >
> > > So I am suspecting that I am producing too fast, and brokers cannot
> > > catch up.
> > > I tried bumping up the io thread from default 8 to 40, that did not help.
> > >
> > > I am getting a producing rate of only about 100 message per seconds,
> > > and about 1 Megabyte per seconds according to kafka metrics.
> > > The CPU utilization is barely noticeable (3%), network is ridiculously
> > > unaffected, and having googled around, this is not the kind of perf I
> > > should expect out of my config. I was hoping for at least 10X more if
> > > not 100X better. Was my expectations too high, or am I missing
> > > something in config that is causing this performance numbers?
> > >
> > > Some details: I produce using a jetty custom handler that I verified
> > > to be super-fast when I am not producing (commenting out the send()),
> > > and I am using a single (I also tried with 2) producer reused on all
> > jetty threads.
> > >
> > > Any help/clue would be much appreciated, Thanks in advance, Eric
> > > Owhadi Esgyn Corporation.
> > >
> > >
> > >
> >

Reply via email to