metrics for checking whether a broker throttles requests based on its quota limits?

2019-06-20 Thread Yu Yang
Hi,

Recently we enabled Kafka quota management for our Kafka clusters. We are
looking for Kafka metrics that can be used for alerting on whether a Kafka
broker throttles requests based on quota.

There are a few throttle related metrics on Kafka. But none of them can
tell accurately whether the broker is throttling the requests.  Could
anyone share insights on this?

kafka.network.produce.throttletimems.requestmetrics.95thPercentile

kafka.network.produce.throttletimems.requestmetrics.Count

Thanks!

Regards,


slow log segment loading during kafka 0.10.2 -> 1.0.1 upgrade

2018-03-10 Thread Yu Yang
Hi,

We are experimenting upgrading our kafka service from 0.10.2 to 1.0.1, and
noticed that it was slow in restarting the broker after updating the kafka
binary.  On a test cluster that have ~120G data on each broker, it took
about 20 minutes to load log segments. I am wondering if there is any
work-around to reduce the long log segment loading time. Can we manually
generate snapshot files to speed up the segment loading?   Based on the
sever logs, it is also not clear to me between `loadSegments()`  and `
loadProducerState(...)`  ,which one took more time during the log segment
loading.

Thanks!

Regards,
-Yu


Re: 答复: kafka controller setting for detecting broker failure and re-electing a new leader for partitions?

2018-01-25 Thread Yu Yang
Thanks for the reply, Xi! The default value of 'controller.socket.timeout.ms'
is 3. That is 30 seconds. What we have observed was that the controller
would not assign another replica as the leader, even if it failed to send
updated topic metadata information too the problematic broker for >30
minutes. Reducing controller.socket.timeout.ms will not help.

Based on the current kaka implementation, when such an exception is raised
up, ControllerChannelManager will catch the exception and keep retrying.

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/ControllerChannelManager.scala#L222

<https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/controller/ControllerChannelManager.scala#L245>
On Thu, Jan 25, 2018 at 12:02 AM, Hu Xi <huxi...@hotmail.com> wrote:

> Yu Yang,
>
>
> There does exist a broker-side config named 'controller.socket.timeout.ms'.
> Decrease it to a reasonably smaller value might be a help but please use it
> with caution.
>
> ____
> 发件人: Yu Yang <yuyan...@gmail.com>
> 发送时间: 2018年1月25日 15:42
> 收件人: users@kafka.apache.org
> 主题: kafka controller setting for detecting broker failure and re-electing
> a new leader for partitions?
>
> Hi everyone,
>
> Recently we had a cluster in which the controller failed to connect to a
> broker A for an extended period of time.  I had expected that the
> controller would identify the broker as a failed broker, and re-elect
> another broker as the leader for partitions that were hosted on broker A.
> However, this did not happen in that cluster. What happened was that broker
> A was still considered as the leader for some partitions, and those
> partitions are marked as under replicated partitions. Is there any
> configuration setting in kafka to speed up the broker failure detection?
>
>
> 2018-01-24 14:13:57,132] WARN [Controller-37-to-broker-4-send-thread],
> Controller 37's connection to broker testkafka04:9092 (id: 4 rack: null)
> was unsuccessful (kafka.controller.RequestSendThread)
> java.net.SocketTimeoutException: Failed to connect within 3 ms
> at
> kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.
> scala:231)
> at
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.
> scala:182)
> at
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.
> scala:181)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
>
> Thanks!
>
> Regards,
> -Yu
>


kafka controller setting for detecting broker failure and re-electing a new leader for partitions?

2018-01-24 Thread Yu Yang
Hi everyone,

Recently we had a cluster in which the controller failed to connect to a
broker A for an extended period of time.  I had expected that the
controller would identify the broker as a failed broker, and re-elect
another broker as the leader for partitions that were hosted on broker A.
However, this did not happen in that cluster. What happened was that broker
A was still considered as the leader for some partitions, and those
partitions are marked as under replicated partitions. Is there any
configuration setting in kafka to speed up the broker failure detection?


2018-01-24 14:13:57,132] WARN [Controller-37-to-broker-4-send-thread],
Controller 37's connection to broker testkafka04:9092 (id: 4 rack: null)
was unsuccessful (kafka.controller.RequestSendThread)
java.net.SocketTimeoutException: Failed to connect within 3 ms
at
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:231)
at
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:182)
at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:181)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

Thanks!

Regards,
-Yu


recommendation on kafka go client : sarama or confluent-kafka-go

2017-04-06 Thread Yu Yang
Hi all,

We need to write  a service in Go to consume from kafka. There are two
popular kafka clients:  sarama  and
confluent-kafka-go .
Sarama client is written purely in Go. This allow it to show the full Go
stacktrace in case of an exception. That can be helpful for debugging.
Confluent-kafka-go wraps around librdkafka. That potentially can provide
better performance. Any suggestions on choosing between these two?

Thanks!

Regards,
-Yu


Re: cannot make another partition reassignment due to the previous partition reassignment failure

2015-06-15 Thread Yu Yang
Thanks, Manikumar!

On Mon, Jun 15, 2015 at 9:14 PM, Manikumar Reddy ku...@nmsworks.co.in
wrote:

 Hi,
   Jut delete the /admin/reassign_partitions zk node for zookeeper  and
 try again.

 #sh zookeeper-shell.sh localhost:2181
   delete /admin/reassign_partitions


 Manikumar


 On Tue, Jun 16, 2015 at 8:15 AM, Yu Yang yuyan...@gmail.com wrote:

  HI,
 
  We have a kafka 0.8.1.1 cluster. Recently I did a partition assignment
 for
  some topic partitions in the cluster. Due to broker failure, the
 partition
  reassignment failed. I cannot do another partition assignment now, and
  always get errors as follows. How can we work around this? I have tried
  google for answers, but did not succeed.
 
  Partitions reassignment failed due to Partition reassignment currently in
  progress for Map(). Aborting operation
  kafka.common.AdminCommandFailedException: Partition reassignment
 currently
  in progress for Map(). Aborting operation
  at
 
 
 kafka.admin.ReassignPartitionsCommand.reassignPartitions(ReassignPartitionsCommand.scala:204)
  at
 
 
 kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:124)
  at
 
 
 kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:49)
  at
 
 kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala)
 
 
  Thanks!
 
  Regards,
  Yu
 



cannot make another partition reassignment due to the previous partition reassignment failure

2015-06-15 Thread Yu Yang
HI,

We have a kafka 0.8.1.1 cluster. Recently I did a partition assignment for
some topic partitions in the cluster. Due to broker failure, the partition
reassignment failed. I cannot do another partition assignment now, and
always get errors as follows. How can we work around this? I have tried
google for answers, but did not succeed.

Partitions reassignment failed due to Partition reassignment currently in
progress for Map(). Aborting operation
kafka.common.AdminCommandFailedException: Partition reassignment currently
in progress for Map(). Aborting operation
at
kafka.admin.ReassignPartitionsCommand.reassignPartitions(ReassignPartitionsCommand.scala:204)
at
kafka.admin.ReassignPartitionsCommand$.executeAssignment(ReassignPartitionsCommand.scala:124)
at
kafka.admin.ReassignPartitionsCommand$.main(ReassignPartitionsCommand.scala:49)
at
kafka.admin.ReassignPartitionsCommand.main(ReassignPartitionsCommand.scala)


Thanks!

Regards,
Yu


Re: Does Kafka 0.8.2 producer has a lower throughput in sync-mode, comparing with 0.8.1.x?

2015-03-09 Thread Yu Yang
If a send request in the middle of the list fails, will all send requests
that follows it fail?  Or only the messages that are put in the same batch
by the underneath transportation layer fail?

On Mon, Mar 9, 2015 at 1:31 AM, Manikumar Reddy ku...@nmsworks.co.in
wrote:

 1. We can send list of messages  and  wait on the returned futures

 List responses = new ArrayList();
 for(input: recordBatch)
 responses.add(producer.send(input));
 for(response: responses)
 response.get

 2.  messages will be send in the submission order.

 On Mon, Mar 9, 2015 at 1:56 PM, Manikumar Reddy ku...@nmsworks.co.in
 wrote:

  1 .
 
 
 
  On Mon, Mar 9, 2015 at 1:03 PM, Yu Yang yuyan...@gmail.com wrote:
 
  The confluent blog
  
 http://blog.confluent.io/2014/12/02/whats-coming-in-apache-kafka-0-8-2/
  mentions
  that the the batching is done whenever possible now. The sync producer,
  under load, can get performance as good as the async producer.   Does
 it
  mean that kafka 0.8.2 guarantees that  the sequence of
  broker-received-message is the same as the async-call sequence by the
  producer?  If I have code as follows to send messages to the same topic
  partition:
 
  producer.send(msg1);
  prodcuer.send(msg2);
  producer.send(msg3);
 
  If all three calls succeed, is it possible that broker received msg3
  before
  msg2?
 
 
 
 
  On Mon, Mar 9, 2015 at 12:17 AM, Yu Yang yuyan...@gmail.com wrote:
 
   Hi,
  
   Kafka 0.8.1.1 allows us to send a list of messages in sync mode:
  
   public void send(ListKeyedMessageK,V messages);
  
   I did not find a counter-part of this api in the new producer that is
   introduced in kafka 0.8.2. It seems that  we can use the following
  method
   to do sync send in kafka 0.8.2:
  
   producer.send(new ProducerRecord(...))).get();
  
   My understanding is that we can only send one message at a time in
   sync-mode. This will limit the throughput of kafka producer. Is there
 a
  way
   to send a batch of messages in sync mode using kafka 0.8.2 producer?
 Is
   there any study on the throughput of Kafka 0.8.2 producer, comparing
  with
   kafka 0.8.1.1?
  
   Thanks!
  
   Regards,
   Yu
  
  
  
 
 
 



Does Kafka 0.8.2 producer has a lower throughput in sync-mode, comparing with 0.8.1.x?

2015-03-09 Thread Yu Yang
Hi,

Kafka 0.8.1.1 allows us to send a list of messages in sync mode:

public void send(ListKeyedMessageK,V messages);

I did not find a counter-part of this api in the new producer that is
introduced in kafka 0.8.2. It seems that  we can use the following method
to do sync send in kafka 0.8.2:

producer.send(new ProducerRecord(...))).get();

My understanding is that we can only send one message at a time in
sync-mode. This will limit the throughput of kafka producer. Is there a way
to send a batch of messages in sync mode using kafka 0.8.2 producer?  Is
there any study on the throughput of Kafka 0.8.2 producer, comparing with
kafka 0.8.1.1?

Thanks!

Regards,
Yu


Re: Best practice for upgrading Kafka cluster from 0.8.1 to 0.8.1.1

2014-12-04 Thread Yu Yang
Guozhang,

We haven't enable message compression yet. In this case, what shall we do
when we upgrade to 0.8.2?  Must we launch a new cluster, redirect the
traffic to the new cluster, and turn off the old one?

Thanks!

-Yu


On Tue, Dec 2, 2014 at 4:33 PM, Guozhang Wang wangg...@gmail.com wrote:

 Yu,

 Are you enabling message compression in 0.8.1 now? If you have already then
 upgrading to 0.8.2 will not change its behavior.

 Guozhang

 On Tue, Dec 2, 2014 at 4:21 PM, Yu Yang yuyan...@gmail.com wrote:

  Hi Neha,
 
  Thanks for the reply!  We know that Kafka 0.8.2 will be released soon. If
  we want to upgrade to Kafka 0.8.2 and enable message compression, will we
  still be able do this in the same way, or we need to handle it
 differently?
 
  Thanks!
 
  Regards,
  -Yu
 
  On Tue, Dec 2, 2014 at 3:11 PM, Neha Narkhede neha.narkh...@gmail.com
  wrote:
 
   Will doing one broker at
   a time by brining the broker down, updating the code, and restarting it
  be
   sufficient?
  
   Yes this should work for the upgrade.
  
   On Mon, Dec 1, 2014 at 10:23 PM, Yu Yang yuyan...@gmail.com wrote:
  
Hi,
   
We have a kafka cluster that runs Kafka 0.8.1 that we are considering
upgrade to 0.8.1.1. The Kafka documentation
http://kafka.apache.org/documentation.html#upgrade mentions
  upgrading
from 0.8 to 0.8.1, but not from 0.8.1 to 0.8.1.1.  Will doing one
  broker
   at
a time by brining the broker down, updating the code, and restarting
 it
   be
sufficient? Any best practice suggestions?
   
Thanks!
   
Regards,
Yu
   
  
 



 --
 -- Guozhang



Re: Best practice for upgrading Kafka cluster from 0.8.1 to 0.8.1.1

2014-12-04 Thread Yu Yang
Thanks, Guozhang!

On Thu, Dec 4, 2014 at 9:08 AM, Guozhang Wang wangg...@gmail.com wrote:

 You can still do the in-place upgrade, and the logs on the broker will be
 then mixed with uncompressed and compressed messages. This is fine also
 since the consumers are able to de-compress dynamically based on the
 message type when consuming the data.

 Guozhang

 On Wed, Dec 3, 2014 at 11:33 AM, Yu Yang yuyan...@gmail.com wrote:

  Guozhang,
 
  We haven't enable message compression yet. In this case, what shall we do
  when we upgrade to 0.8.2?  Must we launch a new cluster, redirect the
  traffic to the new cluster, and turn off the old one?
 
  Thanks!
 
  -Yu
 
 
  On Tue, Dec 2, 2014 at 4:33 PM, Guozhang Wang wangg...@gmail.com
 wrote:
 
   Yu,
  
   Are you enabling message compression in 0.8.1 now? If you have already
  then
   upgrading to 0.8.2 will not change its behavior.
  
   Guozhang
  
   On Tue, Dec 2, 2014 at 4:21 PM, Yu Yang yuyan...@gmail.com wrote:
  
Hi Neha,
   
Thanks for the reply!  We know that Kafka 0.8.2 will be released
 soon.
  If
we want to upgrade to Kafka 0.8.2 and enable message compression,
 will
  we
still be able do this in the same way, or we need to handle it
   differently?
   
Thanks!
   
Regards,
-Yu
   
On Tue, Dec 2, 2014 at 3:11 PM, Neha Narkhede 
 neha.narkh...@gmail.com
  
wrote:
   
 Will doing one broker at
 a time by brining the broker down, updating the code, and
 restarting
  it
be
 sufficient?

 Yes this should work for the upgrade.

 On Mon, Dec 1, 2014 at 10:23 PM, Yu Yang yuyan...@gmail.com
 wrote:

  Hi,
 
  We have a kafka cluster that runs Kafka 0.8.1 that we are
  considering
  upgrade to 0.8.1.1. The Kafka documentation
  http://kafka.apache.org/documentation.html#upgrade mentions
upgrading
  from 0.8 to 0.8.1, but not from 0.8.1 to 0.8.1.1.  Will doing one
broker
 at
  a time by brining the broker down, updating the code, and
  restarting
   it
 be
  sufficient? Any best practice suggestions?
 
  Thanks!
 
  Regards,
  Yu
 

   
  
  
  
   --
   -- Guozhang
  
 



 --
 -- Guozhang



Best practice for upgrading Kafka cluster from 0.8.1 to 0.8.1.1

2014-12-01 Thread Yu Yang
Hi,

We have a kafka cluster that runs Kafka 0.8.1 that we are considering
upgrade to 0.8.1.1. The Kafka documentation
http://kafka.apache.org/documentation.html#upgrade mentions upgrading
from 0.8 to 0.8.1, but not from 0.8.1 to 0.8.1.1.  Will doing one broker at
a time by brining the broker down, updating the code, and restarting it be
sufficient? Any best practice suggestions?

Thanks!

Regards,
Yu


Re: ConsumerOffsetChecker returns negative value for log lag

2014-10-19 Thread Yu Yang
Yes, our service do handle the consumer group offsets by itself. It seems
to be that the negative value happens because all replicas of a partition
are dead, and Kafka create a new replica for the partition, and the high
level consumer is not aware of the Kafka server side change.

On Sun, Oct 19, 2014 at 8:06 PM, Jun Rao jun...@gmail.com wrote:

 Haven't seen this one before. Are you manually committing offsets yourself?

 Thanks,

 Jun

 On Fri, Oct 17, 2014 at 11:10 PM, Yu Yang yuyan...@gmail.com wrote:

  Hi,
 
  I have a Kafka 0.8.1 cluster. I used the ConsumerOffsetChecker tool to
  check the lag of consumer groups. I found that for some partition, the
 tool
  returns negative value for the lag  column.  Is this a known issue that
  has been seen before? I find that the negative value prevents the
 consumer
  consuming the latest events in these partitions.  How can we work around
  the problem?
 
  The following is the command:
 
  ~/kafka_2.9.2-0.8.1$ bin/kafka-run-class.sh
  kafka.tools.ConsumerOffsetChecker --group topic_partition --zkconnect
  zk001:2181 --topic the_topic | grep ' -'
 
  The following is part of the output. The topic that I am checking has 128
  partitions, and the tool returns negative value for 63 partitions.
 
  topic_partition event  6   202936733
  28822327   * -*174114406  topic_partition_m031_29714_20-0
 
  topic_partition event  10  177322216
  36578944   * -*140743272  topic_partition_m032_16773_16-0
 
  topic_partition event  11  187891640
  28999350   * -*158892290  topic_partition_m032_16773_17-0
 
 
 
  Thanks!
 
  -Yu
 



Re: how to do disaster recovery for kafka 0.8 cluster with consumers that uses high-level consumer api?

2014-10-19 Thread Yu Yang
Thanks, Jun! Yes, I set the topic replication factor to 3.

On Sun, Oct 19, 2014 at 8:09 PM, Jun Rao jun...@gmail.com wrote:

 Did you set the replication factor to be more than 1?

 Thanks,

 Jun

 On Sat, Oct 18, 2014 at 2:32 AM, Yu Yang yuyan...@gmail.com wrote:

  Hi all,
 
  We have a kafka 0.8.1 cluster. We implemented a consumers for the topics
 on
  the Kafka 0.8 cluster using high-level consumer api. We observed that if
  the Kafka cluster was down and got rebooted and the consumer was running,
  the consumer will fail to read a few topic partitions due to negative lag
  behind value. How shall we handle disaster recovery without re-reading
 the
  processed messages?
 
  Thanks!
 
  -Yu
 



ConsumerOffsetChecker returns negative value for log lag

2014-10-18 Thread Yu Yang
Hi,

I have a Kafka 0.8.1 cluster. I used the ConsumerOffsetChecker tool to
check the lag of consumer groups. I found that for some partition, the tool
returns negative value for the lag  column.  Is this a known issue that
has been seen before? I find that the negative value prevents the consumer
consuming the latest events in these partitions.  How can we work around
the problem?

The following is the command:

~/kafka_2.9.2-0.8.1$ bin/kafka-run-class.sh
kafka.tools.ConsumerOffsetChecker --group topic_partition --zkconnect
zk001:2181 --topic the_topic | grep ' -'

The following is part of the output. The topic that I am checking has 128
partitions, and the tool returns negative value for 63 partitions.

topic_partition event  6   202936733
28822327   * -*174114406  topic_partition_m031_29714_20-0

topic_partition event  10  177322216
36578944   * -*140743272  topic_partition_m032_16773_16-0

topic_partition event  11  187891640
28999350   * -*158892290  topic_partition_m032_16773_17-0



Thanks!

-Yu