Re: poor producing performance with very low CPU utilization?

2019-10-03 Thread Alexandru Ionita
This might help.

Try to replicate the configuration this guy is using for benchmarking kafka.
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines

Am Do., 3. Okt. 2019 um 22:45 Uhr schrieb Eric Owhadi :

> There is a key piece of information that should be critical to guess where
> the problem is:
>
> When I change from ack = all to ack = 1, instead of increasing message/s,
> it actually devises it by half!
>
> As if the problem is about how fast I produce data (given when I use ack 1
> I assume I block less time in the synchronous send, and therefore my
> producing pump increases.
>
> I wonder if some sort of contention happen when producer populate the 200
> partition queues when the rate of production is high in the user thread?
> Eric
>
> -Original Message-
> From: Eric Owhadi 
> Sent: Thursday, October 3, 2019 1:33 PM
> To: users@kafka.apache.org
> Subject: RE: poor producing performance with very low CPU utilization?
>
> External
>
> Hi Eric,
> Thanks a lot for your answer. Please find inline responses:
>
> >>You've given hardware information about your brokers, but I don't think
> you've provided information about the machine your producer is running on.
> >>Have you verified that you're not reaching any caps on your producer's
> machine?
>
> The producer is on the same machine that the broker. Running very quiet,
> 3% CPU when I run my test. So no there is no stress on the producing side
>
> >>I also think you might be hitting the limit of what a single producer is
> capable of pushing through with your current setup. With record size of
> ~12k and the >>default batch size configuration of 64k, you'll only be able
> to send 5 records per batch. The default number of in flight batches is 5.
>
> I have 200 partition on my topic, and the load is well balanced across all
> partition. So the math you are doing should be X200 right? In addition, I
> found that batch size had no effect, and the linger.ms was the triggering
> factor to cause a buffer send. I played with batch size and in flight
> number of request upward, and that had no effect.
>
> >>This means at any given time, you'll only have 25 records in flight per
> connection. I'm assuming your partitions are configured with at least 2
> replicas. Acks=all >>means your producer is going to wait for the records
> to be fully replicated before considering it complete.
>
> >>Doing the math, you have ~200 records per second, but this is split
> >>between
> >>2 brokers. This means you're producing 100 records per second per broker.
> >>Simplifying a bit to 25 records in flight per broker, that's a latency
> >>of
> >>~250 ms to move around 300kb. At minimum, this includes the time to,
> [compress the batch], [send the batch over the network to the leader],
> [write the batch >>to the leader's log], [fetch the batch over the network
> to the replica], [write the batch to the replica's log], and all of the
> assorted responses to those calls.
>
> given all is local (producer running on same node as broker), and the size
> of my node (80 vcore), I hope I don t need 250ms to do that...
> The equivalent workload on hbase2.0  is 10 to 20X faster (and that include
> same replica config etc).
>
> On Wed, Oct 2, 2019 at 8:38 PM Eric Owhadi  wrote:
>
>
> -Original Message-
> From: Eric Azama 
> Sent: Thursday, October 3, 2019 1:07 PM
> To: users@kafka.apache.org
> Subject: Re: poor producing performance with very low CPU utilization?
>
> External
>
> Hi Eric,
>
> You've given hardware information about your brokers, but I don't think
> you've provided information about the machine your producer is running on.
> Have you verified that you're not reaching any caps on your producer's
> machine?
>
> I also think you might be hitting the limit of what a single producer is
> capable of pushing through with your current setup. With record size of
> ~12k and the default batch size configuration of 64k, you'll only be able
> to send 5 records per batch. The default number of in flight batches is 5.
> This means at any given time, you'll only have 25 records in flight per
> connection. I'm assuming your partitions are configured with at least 2
> replicas. Acks=all means your producer is going to wait for the records to
> be fully replicated before considering it complete.
>
> Doing the math, you have ~200 records per second, but this is split between
> 2 brokers. This means you're producing 100 records per second per broker.
> Simplifying a bit to 25 records in flight per broker, that's a latency of
> ~250 ms to move around 300kb. At minimum, this includes the time to,
> [compress the batch], [send the batch over the network to the leader],
> [write the batch to the leader's log], [fetch the batch over the network to
> the replica], [write the batch to the replica's log], and all of the
> assorted responses to those calls.
>
> On Wed, Oct 2, 2019 at 8:38 PM Eric Owhadi  wrote:
>
> > Hi Jamie,
> > 

Re: Need help determining consumer group offsets

2017-03-23 Thread Alexandru Ionita
Hi Greg!!

Are you using offset auto commit or do you commit manually?

2017-03-22 22:21 GMT+01:00 Greg Lloyd :

> I have a 0.8.2.2 cluster which has been configured
> with offsets.storage=kafka. We are experiencing some issues after a few
> nodes went down and wrong nodes were brought up in their place, fortunately
> not production. I am trying to determine what the current offsets are for a
> consumer group and can't seem to get any offsets from either
> zookeeper(expected) or the __consumer_offsets topic.
>
> How can I fetch the group offsets if not from that topic?
>


Re: Force producer topic metadata refresh.

2016-10-12 Thread Alexandru Ionita
OK. then my question is: why is not the producer trying to recover from
this error by updating its topic metadata right away instead of waiting for
the "metadata.max.age.ms" to expire?

2016-10-12 11:43 GMT+02:00 Manikumar <manikumar.re...@gmail.com>:

> we have similar setting "metadata.max.age.ms" in new producer api.
> Its default value is 300sec.
>
> On Wed, Oct 12, 2016 at 3:04 PM, Alexandru Ionita <
> alexandru.ion...@gmail.com> wrote:
>
> > Hello kafka users!!
> >
> > I'm trying implement/use a mechanism to make a Kafka producer
> imperatively
> > update its topic metadata for a particular topic.
> >
> > Here is the use case:
> >
> > we are adding partitions on topics programmatically because we want to
> very
> > strictly control how messages are published to particular partitions.
> >
> > We are using AdminUtils.addPartitions to achieve this.
> > We then store the ID of the newly added partition in Zookeeper so that we
> > persist a mapping to a partition ID for our particular domain key.
> >
> > The problem we are facing right now is that the Kafka producer won't
> > refresh its topic metadata until after a while, preventing the producer
> > from posting to those partitions by throwing an error :
> >
> > Caused by: java.lang.IllegalArgumentException: Invalid partition given
> > with
> > record: 56 is not in the range [0...55].
> > at
> > org.apache.kafka.clients.producer.KafkaProducer.
> > partition(KafkaProducer.java:717)
> > ~[kafka-clients-0.10.0.1.jar:na]
> > at
> > org.apache.kafka.clients.producer.KafkaProducer.doSend(
> > KafkaProducer.java:459)
> > ~[kafka-clients-0.10.0.1.jar:na]
> > at
> > org.apache.kafka.clients.producer.KafkaProducer.send(
> > KafkaProducer.java:430)
> > ~[kafka-clients-0.10.0.1.jar:na]
> > at
> > org.apache.kafka.clients.producer.KafkaProducer.send(
> > KafkaProducer.java:353)
> > ~[kafka-clients-0.10.0.1.jar:na]
> >
> > As I somewhere read (https://github.com/SOHU-Co/kafka-node/issues/175),
> > the
> > producer should try to recover from such error by pulling the latest
> > version of the topic metadata.
> >
> > This doesn't happening and I will keep getting those errors for like 60
> > seconds until the producer eventually will be able to publish to that
> > partition.
> >
> > In the previous version of kafka (0.8) there was a producer setting
> called
> > topic.metadata.refresh.interval.ms that was aimed to make the producer
> > pull
> > that information. This is what I found related to that setting in the 0.8
> > documentation: "The producer generally refreshes the topic metadata from
> > brokers when there is a failure (partition missing, leader not
> > available...)
> > "
> >
> > Any ideas and comments on this are much appreciated.
> > Thanks
> >
>


Force producer topic metadata refresh.

2016-10-12 Thread Alexandru Ionita
Hello kafka users!!

I'm trying implement/use a mechanism to make a Kafka producer imperatively
update its topic metadata for a particular topic.

Here is the use case:

we are adding partitions on topics programmatically because we want to very
strictly control how messages are published to particular partitions.

We are using AdminUtils.addPartitions to achieve this.
We then store the ID of the newly added partition in Zookeeper so that we
persist a mapping to a partition ID for our particular domain key.

The problem we are facing right now is that the Kafka producer won't
refresh its topic metadata until after a while, preventing the producer
from posting to those partitions by throwing an error :

Caused by: java.lang.IllegalArgumentException: Invalid partition given with
record: 56 is not in the range [0...55].
at
org.apache.kafka.clients.producer.KafkaProducer.partition(KafkaProducer.java:717)
~[kafka-clients-0.10.0.1.jar:na]
at
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:459)
~[kafka-clients-0.10.0.1.jar:na]
at
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:430)
~[kafka-clients-0.10.0.1.jar:na]
at
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:353)
~[kafka-clients-0.10.0.1.jar:na]

As I somewhere read (https://github.com/SOHU-Co/kafka-node/issues/175), the
producer should try to recover from such error by pulling the latest
version of the topic metadata.

This doesn't happening and I will keep getting those errors for like 60
seconds until the producer eventually will be able to publish to that
partition.

In the previous version of kafka (0.8) there was a producer setting called
topic.metadata.refresh.interval.ms that was aimed to make the producer pull
that information. This is what I found related to that setting in the 0.8
documentation: "The producer generally refreshes the topic metadata from
brokers when there is a failure (partition missing, leader not available...)
"

Any ideas and comments on this are much appreciated.
Thanks