Re: [Java New Producer] Snappy NPE Issue

2014-10-21 Thread Bhavesh Mistry
Hi Ewen,

It seems Leo has fixed the snappy lib for this issue.  Here are changes:
https://github.com/xerial/snappy-java/commit/7b86642f75c280debf3c1983053ea7f8635b48a5


Here is Jar with fix:
https://oss.sonatype.org/content/repositories/snapshots/org/xerial/snappy/snappy-java/1.1.1.4-SNAPSHOT/


I will try this today afternoon.  If it works, would you be able to upgrade
Kafka trunk with this version.

Thanks,

Bhavesh

On Mon, Oct 20, 2014 at 9:53 PM, Bhavesh Mistry mistry.p.bhav...@gmail.com
wrote:

 Hi Ewen,

 Thanks for doing the deep analysis on this issue.  I have file this issue
 with Snappy project and linked this Kafka Issues.  Here is details about
 the git hub issue:  https://github.com/xerial/snappy-java/issues/88

 I will follow-up with snappy guys to figure out how to solve this
 problem.  For us, this is typical use case of running web app J2EE
 container  with thread pool and recycled threads.

 Thanks,

 Bhavesh

 On Mon, Oct 20, 2014 at 6:56 PM, Ewen Cheslack-Postava m...@ewencp.org
 wrote:

 Also, filed https://issues.apache.org/jira/browse/KAFKA-1721 for this
 since it either requires an updated version of the upstream library, a
 workaround by us, or at a bare minimum clear documentation of the issue.

 On Mon, Oct 20, 2014, at 06:23 PM, Ewen Cheslack-Postava wrote:
  I took a quick look at this since I noticed the same issue when testing
  your code for the issues you filed. I think the issue is that you're
  using all your producers across a thread pool and the snappy library
  uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated,
  they may be allocated from the same thread (e.g. one of your MyProducer
  classes calls Producer.send() on multiple producers from the same
  thread) and therefore use the same BufferRecycler. Eventually you hit
  the code in the stacktrace, and if two producer send threads hit it
  concurrently they improperly share the unsynchronized BufferRecycler.
 
  This seems like a pain to fix -- it's really a deficiency of the snappy
  library and as far as I can see there's no external control over
  BufferRecycler in their API. One possibility is to record the thread ID
  when we generate a new stream in Compressor and use that to synchronize
  access to ensure no concurrent BufferRecycler access. That could be made
  specific to snappy so it wouldn't impact other codecs. Not exactly
  ideal, but it would work. Unfortunately I can't think of any way for you
  to protect against this in your own code since the problem arises in the
  producer send thread, which your code should never know about.
 
  Another option would be to setup your producers differently to avoid the
  possibility of unsynchronized access from multiple threads (i.e. don't
  use the same thread pool approach), but whether you can do that will
  depend on your use case.
 
  -Ewen
 
  On Mon, Oct 20, 2014, at 12:19 PM, Bhavesh Mistry wrote:
   Hi Kafka Dev,
  
   I am getting following issue with Snappy Library.  I checked code for
   Snappy lib it seems to be fine.  Have you guys seen this issue ?
  
   2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR
   org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
   kafka producer I/O thread:
   *java.lang.NullPointerException*
   at
  
 org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153)
   at
  
 org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317)
   at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
   at org.apache.kafka.common.record.Compressor.close(Compressor.java:94)
   at
  
 org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119)
   at
  
 org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285)
   at
  
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
   at
  
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
   at java.lang.Thread.run(Thread.java:744)
  
  
   Here is code for Snappy
  
 http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153
   :
  
   153
   
 http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153
 
*if* (inputBuffer
   
 http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer
 
   == *null* || (buffer != *null*  buffer.length  inputBuffer
   
 http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer
 .length
   
 http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer
 ))
   {
  
  
   Thanks,
  
   Bhavesh





[Java New Producer] Snappy NPE Issue

2014-10-20 Thread Bhavesh Mistry
Hi Kafka Dev,

I am getting following issue with Snappy Library.  I checked code for
Snappy lib it seems to be fine.  Have you guys seen this issue ?

2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR
org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
kafka producer I/O thread:
*java.lang.NullPointerException*
at
org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153)
at org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317)
at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
at org.apache.kafka.common.record.Compressor.close(Compressor.java:94)
at
org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119)
at
org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
at java.lang.Thread.run(Thread.java:744)


Here is code for Snappy
http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153
:

153 
http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153
 *if* (inputBuffer
http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer
== *null* || (buffer != *null*  buffer.length  inputBuffer
http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer.length
http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer))
{


Thanks,

Bhavesh


Re: [Java New Producer] Snappy NPE Issue

2014-10-20 Thread Ewen Cheslack-Postava
I took a quick look at this since I noticed the same issue when testing
your code for the issues you filed. I think the issue is that you're
using all your producers across a thread pool and the snappy library
uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated,
they may be allocated from the same thread (e.g. one of your MyProducer
classes calls Producer.send() on multiple producers from the same
thread) and therefore use the same BufferRecycler. Eventually you hit
the code in the stacktrace, and if two producer send threads hit it
concurrently they improperly share the unsynchronized BufferRecycler.

This seems like a pain to fix -- it's really a deficiency of the snappy
library and as far as I can see there's no external control over
BufferRecycler in their API. One possibility is to record the thread ID
when we generate a new stream in Compressor and use that to synchronize
access to ensure no concurrent BufferRecycler access. That could be made
specific to snappy so it wouldn't impact other codecs. Not exactly
ideal, but it would work. Unfortunately I can't think of any way for you
to protect against this in your own code since the problem arises in the
producer send thread, which your code should never know about.

Another option would be to setup your producers differently to avoid the
possibility of unsynchronized access from multiple threads (i.e. don't
use the same thread pool approach), but whether you can do that will
depend on your use case.

-Ewen

On Mon, Oct 20, 2014, at 12:19 PM, Bhavesh Mistry wrote:
 Hi Kafka Dev,
 
 I am getting following issue with Snappy Library.  I checked code for
 Snappy lib it seems to be fine.  Have you guys seen this issue ?
 
 2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
 kafka producer I/O thread:
 *java.lang.NullPointerException*
 at
 org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153)
 at
 org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317)
 at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
 at org.apache.kafka.common.record.Compressor.close(Compressor.java:94)
 at
 org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119)
 at
 org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285)
 at
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
 at
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 
 
 Here is code for Snappy
 http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153
 :
 
 153
 http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153
  *if* (inputBuffer
 http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer
 == *null* || (buffer != *null*  buffer.length  inputBuffer
 http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer.length
 http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer))
 {
 
 
 Thanks,
 
 Bhavesh


Re: [Java New Producer] Snappy NPE Issue

2014-10-20 Thread Ewen Cheslack-Postava
Also, filed https://issues.apache.org/jira/browse/KAFKA-1721 for this
since it either requires an updated version of the upstream library, a
workaround by us, or at a bare minimum clear documentation of the issue.

On Mon, Oct 20, 2014, at 06:23 PM, Ewen Cheslack-Postava wrote:
 I took a quick look at this since I noticed the same issue when testing
 your code for the issues you filed. I think the issue is that you're
 using all your producers across a thread pool and the snappy library
 uses ThreadLocal BufferRecyclers. When new Snappy streams are allocated,
 they may be allocated from the same thread (e.g. one of your MyProducer
 classes calls Producer.send() on multiple producers from the same
 thread) and therefore use the same BufferRecycler. Eventually you hit
 the code in the stacktrace, and if two producer send threads hit it
 concurrently they improperly share the unsynchronized BufferRecycler.
 
 This seems like a pain to fix -- it's really a deficiency of the snappy
 library and as far as I can see there's no external control over
 BufferRecycler in their API. One possibility is to record the thread ID
 when we generate a new stream in Compressor and use that to synchronize
 access to ensure no concurrent BufferRecycler access. That could be made
 specific to snappy so it wouldn't impact other codecs. Not exactly
 ideal, but it would work. Unfortunately I can't think of any way for you
 to protect against this in your own code since the problem arises in the
 producer send thread, which your code should never know about.
 
 Another option would be to setup your producers differently to avoid the
 possibility of unsynchronized access from multiple threads (i.e. don't
 use the same thread pool approach), but whether you can do that will
 depend on your use case.
 
 -Ewen
 
 On Mon, Oct 20, 2014, at 12:19 PM, Bhavesh Mistry wrote:
  Hi Kafka Dev,
  
  I am getting following issue with Snappy Library.  I checked code for
  Snappy lib it seems to be fine.  Have you guys seen this issue ?
  
  2014-10-20 18:55:21.841 [kafka-producer-network-thread] ERROR
  org.apache.kafka.clients.producer.internals.Sender - Uncaught error in
  kafka producer I/O thread:
  *java.lang.NullPointerException*
  at
  org.xerial.snappy.BufferRecycler.releaseInputBuffer(BufferRecycler.java:153)
  at
  org.xerial.snappy.SnappyOutputStream.close(SnappyOutputStream.java:317)
  at java.io.FilterOutputStream.close(FilterOutputStream.java:160)
  at org.apache.kafka.common.record.Compressor.close(Compressor.java:94)
  at
  org.apache.kafka.common.record.MemoryRecords.close(MemoryRecords.java:119)
  at
  org.apache.kafka.clients.producer.internals.RecordAccumulator.drain(RecordAccumulator.java:285)
  at
  org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
  at
  org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
  at java.lang.Thread.run(Thread.java:744)
  
  
  Here is code for Snappy
  http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153
  :
  
  153
  http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#153
   *if* (inputBuffer
  http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer
  == *null* || (buffer != *null*  buffer.length  inputBuffer
  http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer.length
  http://grepcode.com/file/repo1.maven.org/maven2/org.xerial.snappy/snappy-java/1.1.1.3/org/xerial/snappy/BufferRecycler.java#BufferRecycler.0inputBuffer))
  {
  
  
  Thanks,
  
  Bhavesh