Re: OutOfMemoryError in mirror maker

2015-06-28 Thread tao xiao
That is so cool. Thank you

On Sun, 28 Jun 2015 at 04:29 Guozhang Wang wangg...@gmail.com wrote:

 Tao, I have added you to the contributor list of Kafka so you can assign
 tickets to yourself now.

 I will review the patch soon.

 Guozhang

 On Thu, Jun 25, 2015 at 2:54 AM, tao xiao xiaotao...@gmail.com wrote:

  Patch updated. please review
 
  On Mon, 22 Jun 2015 at 12:24 tao xiao xiaotao...@gmail.com wrote:
 
   Yes, you are right. Will update the patch
   On Mon, Jun 22, 2015 at 12:16 PM Jiangjie Qin
 j...@linkedin.com.invalid
  
   wrote:
  
   Should we still store the value bytes when logAsString is set to TRUE
  and
   only store the length when logAsString is set to FALSE.
  
   On 6/21/15, 7:29 PM, tao xiao xiaotao...@gmail.com wrote:
  
   The patch I submitted did the what you suggested. It store the size
  only
   and print it out when error occurs.
   
   On Mon, Jun 22, 2015 at 5:26 AM Jiangjie Qin
 j...@linkedin.com.invalid
  
   wrote:
   
Yes, we can expose a user callback in MM, just like we did for
   rebalance
listener.
I still think ErrorLoggingCallback needs some change, though. Can
 we
   only
store the value bytes when logAsString is set to true? That looks
  more
reasonable to me.
   
Jiangjie (Becket) Qin
   
On 6/21/15, 3:02 AM, tao xiao xiaotao...@gmail.com wrote:
   
Yes, I agree with that. It is even better if we can supply our own
callback. For people who want to view the content of message when
   failure
they still can do so

On Sun, Jun 21, 2015 at 2:20 PM Guozhang Wang wangg...@gmail.com
 
   wrote:

 Hi Tao / Jiangjie,

 I think a better fix here may be not letting
MirrorMakerProducerCallback to
 extend from ErrorLoggingCallback, but rather change the
 ErrorLoggingCallback itself as it defeats the usage of
  logAsString,
which I
 think is useful for a general error logging purposes. Rather we
  can
 let MirrorMakerProducerCallback
 to not take the value bytes itself but just the length if people
   agree
that
 for MM we probably are not interested in its message value in
   callback.
 Thoughts?

 Guozhang

 On Wed, Jun 17, 2015 at 1:06 AM, tao xiao xiaotao...@gmail.com
 
   wrote:

  Thank you for the reply.
 
  Patch submitted
  https://issues.apache.org/jira/browse/KAFKA-2281
 
  On Mon, 15 Jun 2015 at 02:16 Jiangjie Qin
   j...@linkedin.com.invalid
  wrote:
 
   Hi Tao,
  
   Yes, the issue that ErrorLoggingCallback keeps value as
 local
variable
 is
   known for a while and we probably should fix it as the value
  is
   not
 used
   except logging the its size. Can you open a ticket and maybe
   also
 submit
  a
   patch?
  
   For unreachable objects I don¹t think it is memory leak. As
  you
said,
 GC
   should take care of this. In LinkedIn we are using G1GC with
   some
 tunings
   made by our SRE. You can try that if interested.
  
   Thanks,
  
   Jiangjie (Becket) Qin
  
   On 6/13/15, 11:39 AM, tao xiao xiaotao...@gmail.com
  wrote:
  
   Hi,
   
   I am using mirror maker in trunk to replica data across two
   data
  centers.
   While the destination broker was having busy load and
   unresponsive
the
   send
   rate of mirror maker was very low and the available
 producer
   buffer
 was
   quickly filled up. At the end mirror maker threw OOME.
  Detailed
  exception
   can be found here
   
  
 


   
   
  
 
 https://gist.github.com/xiaotao183/53e1bf191c1a4d030a25#file-oome-excepti
o
   n-L1
   
   I started up mirror maker with 1G memory and 256M producer
   buffer.
I
  used
   eclipse MAT to analyze the heap dump and found out the
  retained
heap
  size
   of all RecordBatch objects were more than 500MB half of
 which
   were
 used
  to
   retain data that were to send to destination broker which
  makes
sense
 to
   me
   as it is close to 256MB producer buffer but the other half
 of
   which
 were
   used by
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback.
  As
every
   producer callback in mirror maker takes the message value
 and
   hold
it
   until
   the message is successfully delivered. In my case since the
 destination
   broker was very unresponsive the message value held by
  callback
would
  stay
   forever which I think is a waste and it is a major
  contributor
   to
the
  OOME
   issue. screenshot of MAT
   
  
 


   
   
  
 
 https://gist.github.com/xiaotao183/53e1bf191c1a4d030a25#file-mat-screensh
o
   t-png
   
   The other interesting problem I observed is that when I
  turned
   on
   unreachable object parsing in MAT more than 400MB memory
 was
 

Re: Batch producer latencies and flush()

2015-06-28 Thread Ewen Cheslack-Postava
The logic you're requesting is basically what the new producer implements.
The first condition is the batch size limit and the second is linger.ms.
The actual logic is a bit more complicated and has some caveats dealing
with, for example, backing off after failures, but you can see in this code

https://github.com/apache/kafka/blob/0.8.2.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L222

that the two normal conditions that will trigger a send are full and
expired.

Note that increasing batch size and linger ms will generally *increase*
your latency -- in most cases their effect is to make messages wait longer
on the client before being sent because it can result in higher throughput.
There may be edge cases where this isn't the case (e.g. high latencies to
the broker can cause a low linger.ms to have a negative effect in
combination with max.in.flight.requests.per.connection), but usually this
will be the case.

For the specific case you gave with increasing batch size, I would guess it
stopped having any effect because after 64KB you were never getting full
batches -- they were probably being sent out due to linger.ms expiring with
few enough in flight requests before the batch was full.

Maybe giving some more concrete numbers for the settings and some idea of
message size + message rate in specific instances would allow people to
suggest tweaks that might help?

-Ewen

On Sun, Jun 28, 2015 at 11:17 AM, Achanta Vamsi Subhash 
achanta.va...@flipkart.com wrote:

 *bump*

 On Tue, Jun 23, 2015 at 1:03 PM, Achanta Vamsi Subhash 
 achanta.va...@flipkart.com wrote:

  Hi,
 
  We are using the batch producer of 0.8.2.1 and we are getting very bad
  latencies for the topics. We have ~40K partitions now in a 20-node
 cluster.
 
  - We have many topics and each with messages published to them varying.
  Ex: some topics take 10k/sec and other 2000/minute.
  - We are seeing latencies of 99th percentile 2sec and 95th percentile of
  1sec.
  - The current parameters that are tunable are batch size, buffer size and
  linger. We monitor the metrics for the new producer and tuned the above
  accordingly. Still, we are not able to get any improvements. Batch size
 in
  a sense didn't matter after increasing from 64KB (we increased it till
 1MB).
  - We also noticed that the record queue time is high (2-3sec).
  Documentation describes that this is the time records wait in the
  accumulator to be sent.
 
  Later looking at the code in the trunk, I see that the batch size set is
  same for all the TopicPartitions and each have their own RecordBatch.
 Also,
  flush() method is added in the latest code.
 
  We want to have an upper bound on the latencies for every message push
  irrespective of the incoming rate. Can we achieve it by following logic:
 
  - Wait until X-Kb of batch size / Topic Partition is reached
  (or)
  - Wait for Y-ms
 
  If either of them is reached, flush the producer records. Can this be
 part
  of the producer code itself? This will avoid the case of records getting
  accumulated for 2-3 sec.
 
  Please correct me if the analysis is wrong and suggest me on how do we
  improve latencies of the new producer. Thanks.
 
  --
  Regards
  Vamsi Subhash
 



 --
 Regards
 Vamsi Subhash

 --



 --

 This email and any files transmitted with it are confidential and intended
 solely for the use of the individual or entity to whom they are addressed.
 If you have received this email in error please notify the system manager.
 This message contains confidential information and is intended only for the
 individual named. If you are not the named addressee you should not
 disseminate, distribute or copy this e-mail. Please notify the sender
 immediately by e-mail if you have received this e-mail by mistake and
 delete this e-mail from your system. If you are not the intended recipient
 you are notified that disclosing, copying, distributing or taking any
 action in reliance on the contents of this information is strictly
 prohibited. Although Flipkart has taken reasonable precautions to ensure no
 viruses are present in this email, the company cannot accept responsibility
 for any loss or damage arising from the use of this email or attachments




-- 
Thanks,
Ewen


Re: Batch producer latencies and flush()

2015-06-28 Thread Achanta Vamsi Subhash
*bump*

On Tue, Jun 23, 2015 at 1:03 PM, Achanta Vamsi Subhash 
achanta.va...@flipkart.com wrote:

 Hi,

 We are using the batch producer of 0.8.2.1 and we are getting very bad
 latencies for the topics. We have ~40K partitions now in a 20-node cluster.

 - We have many topics and each with messages published to them varying.
 Ex: some topics take 10k/sec and other 2000/minute.
 - We are seeing latencies of 99th percentile 2sec and 95th percentile of
 1sec.
 - The current parameters that are tunable are batch size, buffer size and
 linger. We monitor the metrics for the new producer and tuned the above
 accordingly. Still, we are not able to get any improvements. Batch size in
 a sense didn't matter after increasing from 64KB (we increased it till 1MB).
 - We also noticed that the record queue time is high (2-3sec).
 Documentation describes that this is the time records wait in the
 accumulator to be sent.

 Later looking at the code in the trunk, I see that the batch size set is
 same for all the TopicPartitions and each have their own RecordBatch. Also,
 flush() method is added in the latest code.

 We want to have an upper bound on the latencies for every message push
 irrespective of the incoming rate. Can we achieve it by following logic:

 - Wait until X-Kb of batch size / Topic Partition is reached
 (or)
 - Wait for Y-ms

 If either of them is reached, flush the producer records. Can this be part
 of the producer code itself? This will avoid the case of records getting
 accumulated for 2-3 sec.

 Please correct me if the analysis is wrong and suggest me on how do we
 improve latencies of the new producer. Thanks.

 --
 Regards
 Vamsi Subhash




-- 
Regards
Vamsi Subhash

-- 


--

This email and any files transmitted with it are confidential and intended 
solely for the use of the individual or entity to whom they are addressed. 
If you have received this email in error please notify the system manager. 
This message contains confidential information and is intended only for the 
individual named. If you are not the named addressee you should not 
disseminate, distribute or copy this e-mail. Please notify the sender 
immediately by e-mail if you have received this e-mail by mistake and 
delete this e-mail from your system. If you are not the intended recipient 
you are notified that disclosing, copying, distributing or taking any 
action in reliance on the contents of this information is strictly 
prohibited. Although Flipkart has taken reasonable precautions to ensure no 
viruses are present in this email, the company cannot accept responsibility 
for any loss or damage arising from the use of this email or attachments


RE: Help Us Nominate Apache Kafka for a 2015 Bossie (Best of OSS) Award - Due June 30th

2015-06-28 Thread Aditya Auradkar
Sent :)

From: Gwen Shapira [gshap...@cloudera.com]
Sent: Friday, June 26, 2015 11:53 AM
To: users@kafka.apache.org
Cc: d...@kafka.apache.org
Subject: Re: Help Us Nominate Apache Kafka for a 2015 Bossie (Best of OSS) 
Award - Due June 30th

Sent! Thanks for letting us know of this opportunity to promote our
favorite Apache project :)

For inspiration, here's what I wrote:

Apache Kafka revolutionized stream processing for big data. There is a
dramatic growth in the number and variety of data sources an
organization has to track as well as new business requirements to
process and analyze the data in real time.
Not only does Kafka made stream processing a practical reality - by
integrating all data sources into a scalable and reliable platforms
that provides consistent streams of data for stream processing systems
like Storm, Spark and Flink to work with. Stream processing is
probably the most exciting big data innovation of the year, and it
would not be possible without Kafka to provide those streams of data.

Please don't copy-paste, but in case you want to help and feel
writer-blocked, I hope you get some inspiration :)

Gwen

On Fri, Jun 26, 2015 at 9:47 AM, Neha Narkhede n...@confluent.io wrote:
 Hello Kafka community members,

 We appreciate your use and support of Kafka and all the feedback you’ve
 provided to us along the way.  If all is still going well with Kafka and
 you’re realizing great value from it, we’d like your support in nominating
 Kafka for a 2015 InfoWorld Bossie award, which is an annual award where
 InfoWorld honors the best of open source software.


 As a reminder, Kafka
 http://www.infoworld.com/article/2688074/big-data/big-data-164727-bossie-awards-2014-the-best-open-source-big-data-tools.html#slide17
 was
 selected as one of InfoWorld's top picks in distributed data processing,
 data analytics, machine learning, NoSQL databases, and the Hadoop
 ecosystem. A technology can win consecutive years, so there's nothing
 stopping Kafka from making the list again.

 Nominations for this award are very simple and require you to simply
 deliver an email to InfoWorld's executive editor Doug Dineley (
 doug_dine...@infoworld.com) with the following information:

-

The name of your software, or your use case
-

A link to Kafka's website: http://kafka.apache.org/
-

A few sentences on how you or your customers are using the software and
why it is important and award-worthy.

 Submissions must be sent to Doug doug_dine...@infoworld.com by June 30,
 2015. Please let us know if you have any questions or if we can help in any
 way.


 Thank you for being part of the Kafka community!


 --
 Best,
 Neha