This might help.

Try to replicate the configuration this guy is using for benchmarking kafka.
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines

Am Do., 3. Okt. 2019 um 22:45 Uhr schrieb Eric Owhadi <eric.owh...@esgyn.com
>:

> There is a key piece of information that should be critical to guess where
> the problem is:
>
> When I change from ack = all to ack = 1, instead of increasing message/s,
> it actually devises it by half!
>
> As if the problem is about how fast I produce data (given when I use ack 1
> I assume I block less time in the synchronous send, and therefore my
> producing pump increases.
>
> I wonder if some sort of contention happen when producer populate the 200
> partition queues when the rate of production is high in the user thread?
> Eric
>
> -----Original Message-----
> From: Eric Owhadi <eric.owh...@esgyn.com>
> Sent: Thursday, October 3, 2019 1:33 PM
> To: users@kafka.apache.org
> Subject: RE: poor producing performance with very low CPU utilization?
>
> External
>
> Hi Eric,
> Thanks a lot for your answer. Please find inline responses:
>
> >>You've given hardware information about your brokers, but I don't think
> you've provided information about the machine your producer is running on.
> >>Have you verified that you're not reaching any caps on your producer's
> machine?
>
> The producer is on the same machine that the broker. Running very quiet,
> 3% CPU when I run my test. So no there is no stress on the producing side
>
> >>I also think you might be hitting the limit of what a single producer is
> capable of pushing through with your current setup. With record size of
> ~12k and the >>default batch size configuration of 64k, you'll only be able
> to send 5 records per batch. The default number of in flight batches is 5.
>
> I have 200 partition on my topic, and the load is well balanced across all
> partition. So the math you are doing should be X200 right? In addition, I
> found that batch size had no effect, and the linger.ms was the triggering
> factor to cause a buffer send. I played with batch size and in flight
> number of request upward, and that had no effect.
>
> >>This means at any given time, you'll only have 25 records in flight per
> connection. I'm assuming your partitions are configured with at least 2
> replicas. Acks=all >>means your producer is going to wait for the records
> to be fully replicated before considering it complete.
>
> >>Doing the math, you have ~200 records per second, but this is split
> >>between
> >>2 brokers. This means you're producing 100 records per second per broker.
> >>Simplifying a bit to 25 records in flight per broker, that's a latency
> >>of
> >>~250 ms to move around 300kb. At minimum, this includes the time to,
> [compress the batch], [send the batch over the network to the leader],
> [write the batch >>to the leader's log], [fetch the batch over the network
> to the replica], [write the batch to the replica's log], and all of the
> assorted responses to those calls.
>
> given all is local (producer running on same node as broker), and the size
> of my node (80 vcore), I hope I don t need 250ms to do that...
> The equivalent workload on hbase2.0  is 10 to 20X faster (and that include
> same replica config etc).
>
> On Wed, Oct 2, 2019 at 8:38 PM Eric Owhadi <eric.owh...@esgyn.com> wrote:
>
>
> -----Original Message-----
> From: Eric Azama <eazama...@gmail.com>
> Sent: Thursday, October 3, 2019 1:07 PM
> To: users@kafka.apache.org
> Subject: Re: poor producing performance with very low CPU utilization?
>
> External
>
> Hi Eric,
>
> You've given hardware information about your brokers, but I don't think
> you've provided information about the machine your producer is running on.
> Have you verified that you're not reaching any caps on your producer's
> machine?
>
> I also think you might be hitting the limit of what a single producer is
> capable of pushing through with your current setup. With record size of
> ~12k and the default batch size configuration of 64k, you'll only be able
> to send 5 records per batch. The default number of in flight batches is 5.
> This means at any given time, you'll only have 25 records in flight per
> connection. I'm assuming your partitions are configured with at least 2
> replicas. Acks=all means your producer is going to wait for the records to
> be fully replicated before considering it complete.
>
> Doing the math, you have ~200 records per second, but this is split between
> 2 brokers. This means you're producing 100 records per second per broker.
> Simplifying a bit to 25 records in flight per broker, that's a latency of
> ~250 ms to move around 300kb. At minimum, this includes the time to,
> [compress the batch], [send the batch over the network to the leader],
> [write the batch to the leader's log], [fetch the batch over the network to
> the replica], [write the batch to the replica's log], and all of the
> assorted responses to those calls.
>
> On Wed, Oct 2, 2019 at 8:38 PM Eric Owhadi <eric.owh...@esgyn.com> wrote:
>
> > Hi Jamie,
> > Thanks for the hint. I played with these parameters, and found only
> > linger.ms is playing a significant role for my test case.
> > It is very sensitive and highly non linear.
> > I get these results:
> > Linger.ms       message per second
> > 80              100
> > 84              205
> > 85              215 -> top
> > 86              213
> > 90              205
> > 95              195
> > 100             187
> > 200             100
> >
> > So as you can see, this is very sensitive and one can miss the peek
> easily.
> > However, 200 messages per second for 2 powerful nodes and relatively
> > small message (12016bytes) is still at least 10X bellow what I would
> have hoped.
> > When I see system resources still being barely moving, with cpu at 3%,
> > I am sure something is not right.
> > Regards,
> > Eric
> >
> > -----Original Message-----
> > From: Jamie <jamied...@aol.co.uk.INVALID>
> > Sent: Wednesday, October 2, 2019 4:27 PM
> > To: users@kafka.apache.org
> > Subject: Re: poor producing performance with very low CPU utilization?
> >
> > External
> >
> > Hi Eric,
> > I found increasing the linger.ms to between 50-100 ms significantly
> > increases performance (fewer larger requests instead of many small
> > ones), I'd also increase the batch size and the buffer.memory.
> > Thanks,
> > Jamie
> >
> >
> > -----Original Message-----
> > From: Eric Owhadi <eric.owh...@esgyn.com>
> > To: users@kafka.apache.org <users@kafka.apache.org>
> > Sent: Wed, 2 Oct 2019 16:42
> > Subject: poor producing performance with very low CPU utilization?
> >
> > Hi Kafka users,
> > I am new to Kafka and am struggling with getting acceptable producing
> rate.
> > I am using a cluster of 2 nodes, 40 CPU cores/ 80 if counting
> > hyperthreading. 256GB memory on a 10Gbit network Kafka is installed as
> > part of cloudera parcel, with 5GB java heap.
> > Producer version: Kafka client 2.2.1
> >
> > Wed Oct  2 07:56:59 PDT 2019
> > JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.131-0.b11.el6_9.x86_64
> > Using -XX:+HeapDumpOnOutOfMemoryError
> > -XX:HeapDumpPath=/tmp/kafka_kafka-KAFKA_BROKER-c1871edf37153578a6fc7f4
> > 1462d01d2_pid6908.hprof
> > -XX:OnOutOfMemoryError=/usr/lib64/cmf/service/common/killparent.sh as
> > CSD_JAVA_OPTS Using
> > /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER as conf
> > dir Using scripts/control.sh as process script
> > CONF_DIR=/var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
> > CMF_CONF_DIR=/etc/cloudera-scm-agent
> >
> > Date: Wed Oct  2 07:56:59 PDT 2019
> > Host: xxxxx.esgyn.local
> > Pwd: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
> > CONF_DIR: /var/run/cloudera-scm-agent/process/33853-kafka-KAFKA_BROKER
> > KAFKA_HOME: /opt/cloudera/parcels/KAFKA-4.1.0-1.4.1.0.p0.4/lib/kafka
> > Zookeeper Quorum:
> > xxx.esgyn.local:2181,xxx.esgyn.local:2181,xxx.esgyn.local:2181
> > Zookeeper Chroot:
> > PORT: 9092
> > JMX_PORT: 9393
> > SSL_PORT: 9093
> > ENABLE_MONITORING: true
> > METRIC_REPORTERS: nl.techop.kafka.KafkaHttpMetricsReporter
> > BROKER_HEAP_SIZE: 5120
> > BROKER_JAVA_OPTS: -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20
> > -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC
> > -Djava.awt.headless=true
> > BROKER_SSL_ENABLED: false
> > KERBEROS_AUTH_ENABLED: false
> > KAFKA_PRINCIPAL:
> > SECURITY_INTER_BROKER_PROTOCOL: INFERRED
> > AUTHENTICATE_ZOOKEEPER_CONNECTION: true
> > SUPER_USERS: kafka
> > Kafka version found: 2.2.1-kafka4.1.0
> > Sentry version found: 1.5.1-cdh5.15.0
> > ZK_PRINCIPAL_NAME: zookeeper
> > Final Zookeeper Quorum is
> > xxx.esgyn.local:2181,xx.esgyn.local:2181,x.esgyn.local:2181
> > security.inter.broker.protocol inferred as PLAINTEXT
> > LISTENERS=listeners=PLAINTEXT://xxxxx.esgyn.local:9092,
> >
> > I am producing messages of 12016 bytes uncompressed, then snappy
> > compressed by kafka.
> > I am using a topic with 200 partitions, and a custom partitioner that
> > I verified is doing good job at spreading the load on the 2 brokers.
> >
> > My producer config look like:
> >
> >
> kafkaProps.put("bootstrap.servers","nap052.esgyn.local:9092,localhost:9092");
> >                     kafkaProps.put("key.serializer",
> > "org.apache.kafka.common.serialization.LongSerializer");
> >
> > kafkaProps.put("value.serializer","org.trafodion.sql.kafka.SmartpumpCo
> > llectorVectorSerializer");
> >
> >
> kafkaProps.put("partitioner.class","org.trafodion.sql.kafka.TimeSeriesPartitioner");
> >                     kafkaProps.put("compression.type","snappy");
> >                     kafkaProps.put("batch.size","65536");
> >                     kafkaProps.put("acks", "all");
> >                     kafkaProps.put("linger.ms","1");
> >
> > I tried first doing fire and forget send, thinking I would get best
> > performance.
> > Then I tried synchronous send, and amazingly found that I would get
> > better performance with sync send.
> >
> > However, after 1 or 2 minute of load test, I start getting error on
> > the synchronous send like this:
> > ava.util.concurrent.ExecutionException:
> > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s)
> > for
> > DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000 ms has passed
> > since batch creation
> >         at
> >
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:98)
> >         at
> >
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:67)
> >         at
> >
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:30)
> >         at
> >
> org.trafodion.sql.kafka.TimeseriesEndPoint$customHandler.handle(TimeseriesEndPoint.java:315)
> >         at
> >
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
> >         at org.eclipse.jetty.server.Server.handle(Server.java:505)
> >         at
> > org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:370)
> >         at
> >
> org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:267)
> >         at org.eclipse.jetty.io
> > .AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
> >         at org.eclipse.jetty.io
> > .FillInterest.fillable(FillInterest.java:103)
> >         at org.eclipse.jetty.io
> > .ChannelEndPoint$2.run(ChannelEndPoint.java:117)
> >         at
> >
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
> >         at
> >
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
> >         at
> >
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
> >         at
> >
> org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)
> >         at
> >
> org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)
> >         at
> >
> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:781)
> >         at
> >
> org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:917)
> >         at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1
> > record(s) for DEFAULT-.TIMESERIES.SmartpumpCollectorVector--112:120000
> > ms has passed since batch creation
> >
> > So I am suspecting that I am producing too fast, and brokers cannot
> > catch up.
> > I tried bumping up the io thread from default 8 to 40, that did not help.
> >
> > I am getting a producing rate of only about 100 message per seconds,
> > and about 1 Megabyte per seconds according to kafka metrics.
> > The CPU utilization is barely noticeable (3%), network is ridiculously
> > unaffected, and having googled around, this is not the kind of perf I
> > should expect out of my config. I was hoping for at least 10X more if
> > not 100X better. Was my expectations too high, or am I missing
> > something in config that is causing this performance numbers?
> >
> > Some details: I produce using a jetty custom handler that I verified
> > to be super-fast when I am not producing (commenting out the send()),
> > and I am using a single (I also tried with 2) producer reused on all
> jetty threads.
> >
> > Any help/clue would be much appreciated, Thanks in advance, Eric
> > Owhadi Esgyn Corporation.
> >
> >
> >
>

Reply via email to