[jira] [Comment Edited] (KAFKA-12605) kafka consumer churns through buffer memory iterating over records

2021-05-01 Thread radai rosenblatt (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17337897#comment-17337897
 ] 

radai rosenblatt edited comment on KAFKA-12605 at 5/2/21, 12:37 AM:


PR filed against trunk - [https://github.com/apache/kafka/pull/10624]


was (Author: radai):
PR files against trunk - https://github.com/apache/kafka/pull/10624

> kafka consumer churns through buffer memory iterating over records
> --
>
> Key: KAFKA-12605
> URL: https://issues.apache.org/jira/browse/KAFKA-12605
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.7.0
>Reporter: radai rosenblatt
>Priority: Major
> Attachments: Screen Shot 2021-04-01 at 3.55.47 PM.png
>
>
> we recently conducted analysis on memory allocations by the kafka consumer 
> and found a significant amount of buffers that graduate out of the young gen 
> causing GC load.
>  
> these are tthe buffers used to gunzip record batches in the consumer when 
> polling. since the same iterator (and underlying streams and buffers) are 
> likely to live through several poll() cycles these buffers graduate out of 
> young gen and cause issues.
>  
> see attached memory allocation flame graph:
> !Screen Shot 2021-04-01 at 3.55.47 PM.png!  
> the code causing this is in CompressionTypye.GZIP (taken from current trunk):
> {code:java}
> @Override
> public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, 
> BufferSupplier decompressionBufferSupplier) {
> try {
> // Set output buffer (uncompressed) to 16 KB (none by default) and 
> input buffer (compressed) to
> // 8 KB (0.5 KB by default) to ensure reasonable performance in cases 
> where the caller reads a small
> // number of bytes (potentially a single byte)
> return new BufferedInputStream(new GZIPInputStream(new 
> ByteBufferInputStream(buffer), 8 * 1024),
> 16 * 1024);
> } catch (Exception e) {
> throw new KafkaException(e);
> }
> }{code}
> it allocated 2 buffers - 8K and 16K even though a BufferSupplier is available 
> to attempt re-use.
>  
> i believe it is possible to actually get both tthose buffers from the 
> supplier, and return them when iteration over the record batch is done. 
> doing so will require subclassing  BufferedInputStream and GZIPInputStream 
> (or its parent class) to allow supplying external buffers onto them. also 
> some lifecycle hook would be needed to return said buffers to the pool when 
> iteration is done.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12605) kafka consumer churns through buffer memory iterating over records

2021-05-01 Thread radai rosenblatt (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17337897#comment-17337897
 ] 

radai rosenblatt commented on KAFKA-12605:
--

PR files against trunk - https://github.com/apache/kafka/pull/10624

> kafka consumer churns through buffer memory iterating over records
> --
>
> Key: KAFKA-12605
> URL: https://issues.apache.org/jira/browse/KAFKA-12605
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.7.0
>Reporter: radai rosenblatt
>Priority: Major
> Attachments: Screen Shot 2021-04-01 at 3.55.47 PM.png
>
>
> we recently conducted analysis on memory allocations by the kafka consumer 
> and found a significant amount of buffers that graduate out of the young gen 
> causing GC load.
>  
> these are tthe buffers used to gunzip record batches in the consumer when 
> polling. since the same iterator (and underlying streams and buffers) are 
> likely to live through several poll() cycles these buffers graduate out of 
> young gen and cause issues.
>  
> see attached memory allocation flame graph:
> !Screen Shot 2021-04-01 at 3.55.47 PM.png!  
> the code causing this is in CompressionTypye.GZIP (taken from current trunk):
> {code:java}
> @Override
> public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, 
> BufferSupplier decompressionBufferSupplier) {
> try {
> // Set output buffer (uncompressed) to 16 KB (none by default) and 
> input buffer (compressed) to
> // 8 KB (0.5 KB by default) to ensure reasonable performance in cases 
> where the caller reads a small
> // number of bytes (potentially a single byte)
> return new BufferedInputStream(new GZIPInputStream(new 
> ByteBufferInputStream(buffer), 8 * 1024),
> 16 * 1024);
> } catch (Exception e) {
> throw new KafkaException(e);
> }
> }{code}
> it allocated 2 buffers - 8K and 16K even though a BufferSupplier is available 
> to attempt re-use.
>  
> i believe it is possible to actually get both tthose buffers from the 
> supplier, and return them when iteration over the record batch is done. 
> doing so will require subclassing  BufferedInputStream and GZIPInputStream 
> (or its parent class) to allow supplying external buffers onto them. also 
> some lifecycle hook would be needed to return said buffers to the pool when 
> iteration is done.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12605) kafka consumer churns through buffer memory iterating over records

2021-04-02 Thread radai rosenblatt (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17314037#comment-17314037
 ] 

radai rosenblatt commented on KAFKA-12605:
--

only compression type that uses this mechanism looks to be LZ4. a very similar 
solution can be done for gzip

> kafka consumer churns through buffer memory iterating over records
> --
>
> Key: KAFKA-12605
> URL: https://issues.apache.org/jira/browse/KAFKA-12605
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.7.0
>Reporter: radai rosenblatt
>Priority: Major
> Attachments: Screen Shot 2021-04-01 at 3.55.47 PM.png
>
>
> we recently conducted analysis on memory allocations by the kafka consumer 
> and found a significant amount of buffers that graduate out of the young gen 
> causing GC load.
>  
> these are tthe buffers used to gunzip record batches in the consumer when 
> polling. since the same iterator (and underlying streams and buffers) are 
> likely to live through several poll() cycles these buffers graduate out of 
> young gen and cause issues.
>  
> see attached memory allocation flame graph:
> !Screen Shot 2021-04-01 at 3.55.47 PM.png!  
> the code causing this is in CompressionTypye.GZIP (taken from current trunk):
> {code:java}
> @Override
> public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, 
> BufferSupplier decompressionBufferSupplier) {
> try {
> // Set output buffer (uncompressed) to 16 KB (none by default) and 
> input buffer (compressed) to
> // 8 KB (0.5 KB by default) to ensure reasonable performance in cases 
> where the caller reads a small
> // number of bytes (potentially a single byte)
> return new BufferedInputStream(new GZIPInputStream(new 
> ByteBufferInputStream(buffer), 8 * 1024),
> 16 * 1024);
> } catch (Exception e) {
> throw new KafkaException(e);
> }
> }{code}
> it allocated 2 buffers - 8K and 16K even though a BufferSupplier is available 
> to attempt re-use.
>  
> i believe it is possible to actually get both tthose buffers from the 
> supplier, and return them when iteration over the record batch is done. 
> doing so will require subclassing  BufferedInputStream and GZIPInputStream 
> (or its parent class) to allow supplying external buffers onto them. also 
> some lifecycle hook would be needed to return said buffers to the pool when 
> iteration is done.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12605) kafka consumer churns through buffer memory iterating over records

2021-04-01 Thread radai rosenblatt (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

radai rosenblatt updated KAFKA-12605:
-
Description: 
we recently conducted analysis on memory allocations by the kafka consumer and 
found a significant amount of buffers that graduate out of the young gen 
causing GC load.

 

these are tthe buffers used to gunzip record batches in the consumer when 
polling. since the same iterator (and underlying streams and buffers) are 
likely to live through several poll() cycles these buffers graduate out of 
young gen and cause issues.

 

see attached memory allocation flame graph:

!Screen Shot 2021-04-01 at 3.55.47 PM.png!  

the code causing this is in CompressionTypye.GZIP (taken from current trunk):
{code:java}
@Override
public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, 
BufferSupplier decompressionBufferSupplier) {
try {
// Set output buffer (uncompressed) to 16 KB (none by default) and 
input buffer (compressed) to
// 8 KB (0.5 KB by default) to ensure reasonable performance in cases 
where the caller reads a small
// number of bytes (potentially a single byte)
return new BufferedInputStream(new GZIPInputStream(new 
ByteBufferInputStream(buffer), 8 * 1024),
16 * 1024);
} catch (Exception e) {
throw new KafkaException(e);
}
}{code}
it allocated 2 buffers - 8K and 16K even though a BufferSupplier is available 
to attempt re-use.

 

i believe it is possible to actually get both tthose buffers from the supplier, 
and return them when iteration over the record batch is done. 

doing so will require subclassing  BufferedInputStream and GZIPInputStream (or 
its parent class) to allow supplying external buffers onto them. also some 
lifecycle hook would be needed to return said buffers to the pool when 
iteration is done.

 

  was:
we recently conducted analysis on memory allocations by the kafka consumer and 
found a significant amount of buffers that graduate out of the young gen 
causing GC load.

 

these are tthe buffers used to gunzip record batches in the consumer when 
polling. since the same iterator (and underlying streams and buffers) are 
likely to live through several poll() cycles these buffers graduate out of 
young gen and cause issues.

 

see attached memory allocation flame graph.

 

the code causing this is in CompressionTypye.GZIP (taken from current trunk):
{code:java}
@Override
public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, 
BufferSupplier decompressionBufferSupplier) {
try {
// Set output buffer (uncompressed) to 16 KB (none by default) and 
input buffer (compressed) to
// 8 KB (0.5 KB by default) to ensure reasonable performance in cases 
where the caller reads a small
// number of bytes (potentially a single byte)
return new BufferedInputStream(new GZIPInputStream(new 
ByteBufferInputStream(buffer), 8 * 1024),
16 * 1024);
} catch (Exception e) {
throw new KafkaException(e);
}
}{code}
it allocated 2 buffers - 8K and 16K even though a BufferSupplier is available 
to attempt re-use.

 

i believe it is possible to actually get both tthose buffers from the supplier, 
and return them when iteration over the record batch is done. 

doing so will require subclassing  BufferedInputStream and GZIPInputStream (or 
its parent class) to allow supplying external buffers onto them. also some 
lifecycle hook would be needed to return said buffers to the pool when 
iteration is done.

 


> kafka consumer churns through buffer memory iterating over records
> --
>
> Key: KAFKA-12605
> URL: https://issues.apache.org/jira/browse/KAFKA-12605
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.7.0
>Reporter: radai rosenblatt
>Priority: Major
> Attachments: Screen Shot 2021-04-01 at 3.55.47 PM.png
>
>
> we recently conducted analysis on memory allocations by the kafka consumer 
> and found a significant amount of buffers that graduate out of the young gen 
> causing GC load.
>  
> these are tthe buffers used to gunzip record batches in the consumer when 
> polling. since the same iterator (and underlying streams and buffers) are 
> likely to live through several poll() cycles these buffers graduate out of 
> young gen and cause issues.
>  
> see attached memory allocation flame graph:
> !Screen Shot 2021-04-01 at 3.55.47 PM.png!  
> the code causing this is in CompressionTypye.GZIP (taken from current trunk):
> {code:java}
> @Override
> public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, 
> BufferSupplier decompressionBufferSupplier) {
> try {
> // Set output buffer (uncompressed) to 16 KB (none by default) and 
> 

[jira] [Updated] (KAFKA-12605) kafka consumer churns through buffer memory iterating over records

2021-04-01 Thread radai rosenblatt (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

radai rosenblatt updated KAFKA-12605:
-
Attachment: Screen Shot 2021-04-01 at 3.55.47 PM.png

> kafka consumer churns through buffer memory iterating over records
> --
>
> Key: KAFKA-12605
> URL: https://issues.apache.org/jira/browse/KAFKA-12605
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.7.0
>Reporter: radai rosenblatt
>Priority: Major
> Attachments: Screen Shot 2021-04-01 at 3.55.47 PM.png
>
>
> we recently conducted analysis on memory allocations by the kafka consumer 
> and found a significant amount of buffers that graduate out of the young gen 
> causing GC load.
>  
> these are tthe buffers used to gunzip record batches in the consumer when 
> polling. since the same iterator (and underlying streams and buffers) are 
> likely to live through several poll() cycles these buffers graduate out of 
> young gen and cause issues.
>  
> see attached memory allocation flame graph.
>  
> the code causing this is in CompressionTypye.GZIP (taken from current trunk):
> {code:java}
> @Override
> public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, 
> BufferSupplier decompressionBufferSupplier) {
> try {
> // Set output buffer (uncompressed) to 16 KB (none by default) and 
> input buffer (compressed) to
> // 8 KB (0.5 KB by default) to ensure reasonable performance in cases 
> where the caller reads a small
> // number of bytes (potentially a single byte)
> return new BufferedInputStream(new GZIPInputStream(new 
> ByteBufferInputStream(buffer), 8 * 1024),
> 16 * 1024);
> } catch (Exception e) {
> throw new KafkaException(e);
> }
> }{code}
> it allocated 2 buffers - 8K and 16K even though a BufferSupplier is available 
> to attempt re-use.
>  
> i believe it is possible to actually get both tthose buffers from the 
> supplier, and return them when iteration over the record batch is done. 
> doing so will require subclassing  BufferedInputStream and GZIPInputStream 
> (or its parent class) to allow supplying external buffers onto them. also 
> some lifecycle hook would be needed to return said buffers to the pool when 
> iteration is done.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12605) kafka consumer churns through buffer memory iterating over records

2021-04-01 Thread radai rosenblatt (Jira)
radai rosenblatt created KAFKA-12605:


 Summary: kafka consumer churns through buffer memory iterating 
over records
 Key: KAFKA-12605
 URL: https://issues.apache.org/jira/browse/KAFKA-12605
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 2.7.0
Reporter: radai rosenblatt


we recently conducted analysis on memory allocations by the kafka consumer and 
found a significant amount of buffers that graduate out of the young gen 
causing GC load.

 

these are tthe buffers used to gunzip record batches in the consumer when 
polling. since the same iterator (and underlying streams and buffers) are 
likely to live through several poll() cycles these buffers graduate out of 
young gen and cause issues.

 

see attached memory allocation flame graph.

 

the code causing this is in CompressionTypye.GZIP (taken from current trunk):
{code:java}
@Override
public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, 
BufferSupplier decompressionBufferSupplier) {
try {
// Set output buffer (uncompressed) to 16 KB (none by default) and 
input buffer (compressed) to
// 8 KB (0.5 KB by default) to ensure reasonable performance in cases 
where the caller reads a small
// number of bytes (potentially a single byte)
return new BufferedInputStream(new GZIPInputStream(new 
ByteBufferInputStream(buffer), 8 * 1024),
16 * 1024);
} catch (Exception e) {
throw new KafkaException(e);
}
}{code}
it allocated 2 buffers - 8K and 16K even though a BufferSupplier is available 
to attempt re-use.

 

i believe it is possible to actually get both tthose buffers from the supplier, 
and return them when iteration over the record batch is done. 

doing so will require subclassing  BufferedInputStream and GZIPInputStream (or 
its parent class) to allow supplying external buffers onto them. also some 
lifecycle hook would be needed to return said buffers to the pool when 
iteration is done.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9998) KafkaProducer.close(timeout) still may block indefinitely

2020-05-19 Thread radai rosenblatt (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17111317#comment-17111317
 ] 

radai rosenblatt edited comment on KAFKA-9998 at 5/19/20, 4:23 PM:
---

i think thats wrong, for several reasons:
 # the io thread will (eventually) terminate because this.sender.forceClose() 
has been called - so no resource leak. just a delay in freeing resources.
 # just like schedulers/executor service classes in the JDK, there's no need to 
make close() block until all resources have been released
 # the code already avoids joining if called from a callback handler, further 
proving that there's no potential leak risk.
 # this violates the caller thread's timeout argument, where elsewhere the 
close method seems to honor it - thats just inconsistent API behavior - either 
the producer respects timeout or it does not. "sometimes" is hard to explain to 
users.

seems to me this part was jjust forgotten when timeout support was added


was (Author: radai):
i think thats wrong, for several reasons:
 # the io thread will (eventually) terminate because 
this.sender.initiateClose() has been called - so no resource leak. just a delay 
in freeing resources.
 # just like schedulers/executor service classes in the JDK, there's no need to 
make close() block until all resources have been released
 # this violates the caller thread's timeout argument, where elsewhere the 
close method seems to honor it - thats just inconsistent API behavior - either 
the producer respects timeout or it does not. "sometimes" is hard to explain to 
users.

seems to me this part was jjust forgotten when timeout support was added

> KafkaProducer.close(timeout) still may block indefinitely
> -
>
> Key: KAFKA-9998
> URL: https://issues.apache.org/jira/browse/KAFKA-9998
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.1
>Reporter: radai rosenblatt
>Priority: Major
>
> looking at KafkaProducer.close(timeout), we have this:
> {code:java}
> private void close(Duration timeout, boolean swallowException) {
> long timeoutMs = timeout.toMillis();
> if (timeoutMs < 0)
> throw new IllegalArgumentException("The timeout cannot be negative.");
> log.info("Closing the Kafka producer with timeoutMillis = {} ms.", 
> timeoutMs);
> // this will keep track of the first encountered exception
> AtomicReference firstException = new AtomicReference<>();
> boolean invokedFromCallback = Thread.currentThread() == this.ioThread;
> if (timeoutMs > 0) {
> if (invokedFromCallback) {
> log.warn("Overriding close timeout {} ms to 0 ms in order to 
> prevent useless blocking due to self-join. " +
> "This means you have incorrectly invoked close with a 
> non-zero timeout from the producer call-back.",
> timeoutMs);
> } else {
> // Try to close gracefully.
> if (this.sender != null)
> this.sender.initiateClose();
> if (this.ioThread != null) {
> try {
> this.ioThread.join(timeoutMs);< GRACEFUL JOIN
> } catch (InterruptedException t) {
> firstException.compareAndSet(null, new 
> InterruptException(t));
> log.error("Interrupted while joining ioThread", t);
> }
> }
> }
> }
> if (this.sender != null && this.ioThread != null && 
> this.ioThread.isAlive()) {
> log.info("Proceeding to force close the producer since pending 
> requests could not be completed " +
> "within timeout {} ms.", timeoutMs);
> this.sender.forceClose();
> // Only join the sender thread when not calling from callback.
> if (!invokedFromCallback) {
> try {
> this.ioThread.join();   <- UNBOUNDED JOIN
> } catch (InterruptedException e) {
> firstException.compareAndSet(null, new InterruptException(e));
> }
> }
> }
> ...
> }
> {code}
> specifically in our case the ioThread was running a (very) long running 
> user-provided callback which was preventing the producer from closing within 
> the given timeout.
>  
> I think the 2nd join() call should either be _VERY_ short (since we're 
> already past the timeout at that stage) ir should not happen at all.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9998) KafkaProducer.close(timeout) still may block indefinitely

2020-05-19 Thread radai rosenblatt (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17111317#comment-17111317
 ] 

radai rosenblatt commented on KAFKA-9998:
-

i think thats wrong, for several reasons:
 # the io thread will (eventually) terminate because 
this.sender.initiateClose() has been called - so no resource leak. just a delay 
in freeing resources.
 # just like schedulers/executor service classes in the JDK, there's no need to 
make close() block until all resources have been released
 # this violates the caller thread's timeout argument, where elsewhere the 
close method seems to honor it - thats just inconsistent API behavior - either 
the producer respects timeout or it does not. "sometimes" is hard to explain to 
users.

seems to me this part was jjust forgotten when timeout support was added

> KafkaProducer.close(timeout) still may block indefinitely
> -
>
> Key: KAFKA-9998
> URL: https://issues.apache.org/jira/browse/KAFKA-9998
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.1
>Reporter: radai rosenblatt
>Priority: Major
>
> looking at KafkaProducer.close(timeout), we have this:
> {code:java}
> private void close(Duration timeout, boolean swallowException) {
> long timeoutMs = timeout.toMillis();
> if (timeoutMs < 0)
> throw new IllegalArgumentException("The timeout cannot be negative.");
> log.info("Closing the Kafka producer with timeoutMillis = {} ms.", 
> timeoutMs);
> // this will keep track of the first encountered exception
> AtomicReference firstException = new AtomicReference<>();
> boolean invokedFromCallback = Thread.currentThread() == this.ioThread;
> if (timeoutMs > 0) {
> if (invokedFromCallback) {
> log.warn("Overriding close timeout {} ms to 0 ms in order to 
> prevent useless blocking due to self-join. " +
> "This means you have incorrectly invoked close with a 
> non-zero timeout from the producer call-back.",
> timeoutMs);
> } else {
> // Try to close gracefully.
> if (this.sender != null)
> this.sender.initiateClose();
> if (this.ioThread != null) {
> try {
> this.ioThread.join(timeoutMs);< GRACEFUL JOIN
> } catch (InterruptedException t) {
> firstException.compareAndSet(null, new 
> InterruptException(t));
> log.error("Interrupted while joining ioThread", t);
> }
> }
> }
> }
> if (this.sender != null && this.ioThread != null && 
> this.ioThread.isAlive()) {
> log.info("Proceeding to force close the producer since pending 
> requests could not be completed " +
> "within timeout {} ms.", timeoutMs);
> this.sender.forceClose();
> // Only join the sender thread when not calling from callback.
> if (!invokedFromCallback) {
> try {
> this.ioThread.join();   <- UNBOUNDED JOIN
> } catch (InterruptedException e) {
> firstException.compareAndSet(null, new InterruptException(e));
> }
> }
> }
> ...
> }
> {code}
> specifically in our case the ioThread was running a (very) long running 
> user-provided callback which was preventing the producer from closing within 
> the given timeout.
>  
> I think the 2nd join() call should either be _VERY_ short (since we're 
> already past the timeout at that stage) ir should not happen at all.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9998) KafkaProducer.close(timeout) still may block indefinitely

2020-05-14 Thread radai rosenblatt (Jira)
radai rosenblatt created KAFKA-9998:
---

 Summary: KafkaProducer.close(timeout) still may block indefinitely
 Key: KAFKA-9998
 URL: https://issues.apache.org/jira/browse/KAFKA-9998
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.4.1
Reporter: radai rosenblatt


looking at KafkaProducer.close(timeout), we have this:
{code:java}
private void close(Duration timeout, boolean swallowException) {
long timeoutMs = timeout.toMillis();
if (timeoutMs < 0)
throw new IllegalArgumentException("The timeout cannot be negative.");
log.info("Closing the Kafka producer with timeoutMillis = {} ms.", 
timeoutMs);

// this will keep track of the first encountered exception
AtomicReference firstException = new AtomicReference<>();
boolean invokedFromCallback = Thread.currentThread() == this.ioThread;
if (timeoutMs > 0) {
if (invokedFromCallback) {
log.warn("Overriding close timeout {} ms to 0 ms in order to 
prevent useless blocking due to self-join. " +
"This means you have incorrectly invoked close with a 
non-zero timeout from the producer call-back.",
timeoutMs);
} else {
// Try to close gracefully.
if (this.sender != null)
this.sender.initiateClose();
if (this.ioThread != null) {
try {
this.ioThread.join(timeoutMs);< GRACEFUL JOIN
} catch (InterruptedException t) {
firstException.compareAndSet(null, new 
InterruptException(t));
log.error("Interrupted while joining ioThread", t);
}
}
}
}

if (this.sender != null && this.ioThread != null && 
this.ioThread.isAlive()) {
log.info("Proceeding to force close the producer since pending requests 
could not be completed " +
"within timeout {} ms.", timeoutMs);
this.sender.forceClose();
// Only join the sender thread when not calling from callback.
if (!invokedFromCallback) {
try {
this.ioThread.join();   <- UNBOUNDED JOIN
} catch (InterruptedException e) {
firstException.compareAndSet(null, new InterruptException(e));
}
}
}
...
}

{code}
specifically in our case the ioThread was running a (very) long running 
user-provided callback which was preventing the producer from closing within 
the given timeout.

 

I think the 2nd join() call should either be _VERY_ short (since we're already 
past the timeout at that stage) ir should not happen at all.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9855) dont waste memory allocating Struct and values objects for Schemas with no fields

2020-04-12 Thread radai rosenblatt (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

radai rosenblatt reassigned KAFKA-9855:
---

Assignee: (was: radai rosenblatt)

> dont waste memory allocating Struct and values objects for Schemas with no 
> fields
> -
>
> Key: KAFKA-9855
> URL: https://issues.apache.org/jira/browse/KAFKA-9855
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.4.0, 2.4.1
>Reporter: radai rosenblatt
>Priority: Major
>
> at the time of this writing there are 6 schemas in kafka APIs with no fields 
> - 3 versions each of LIST_GROUPS and API_VERSIONS.
> under some workloads this may result in the creation of a lot of Struct 
> objects with an Object[0] for values when deserializing those requests from 
> the wire.
> in one particular heap dump we've found a significant amount of heap space 
> wasted on creating such objects.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9855) dont waste memory allocating Struct and values objects for Schemas with no fields

2020-04-12 Thread radai rosenblatt (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

radai rosenblatt reassigned KAFKA-9855:
---

Assignee: radai rosenblatt

> dont waste memory allocating Struct and values objects for Schemas with no 
> fields
> -
>
> Key: KAFKA-9855
> URL: https://issues.apache.org/jira/browse/KAFKA-9855
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.4.0, 2.4.1
>Reporter: radai rosenblatt
>Assignee: radai rosenblatt
>Priority: Major
>
> at the time of this writing there are 6 schemas in kafka APIs with no fields 
> - 3 versions each of LIST_GROUPS and API_VERSIONS.
> under some workloads this may result in the creation of a lot of Struct 
> objects with an Object[0] for values when deserializing those requests from 
> the wire.
> in one particular heap dump we've found a significant amount of heap space 
> wasted on creating such objects.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9855) dont waste memory allocating Struct and values objects for Schemas with no fields

2020-04-12 Thread radai rosenblatt (Jira)
radai rosenblatt created KAFKA-9855:
---

 Summary: dont waste memory allocating Struct and values objects 
for Schemas with no fields
 Key: KAFKA-9855
 URL: https://issues.apache.org/jira/browse/KAFKA-9855
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 2.4.1, 2.4.0
Reporter: radai rosenblatt
Assignee: radai rosenblatt


at the time of this writing there are 6 schemas in kafka APIs with no fields - 
3 versions each of LIST_GROUPS and API_VERSIONS.

under some workloads this may result in the creation of a lot of Struct objects 
with an Object[0] for values when deserializing those requests from the wire.

in one particular heap dump we've found a significant amount of heap space 
wasted on creating such objects.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-8366) partitions of topics being deleted show up in the offline partitions metric

2019-05-14 Thread radai rosenblatt (JIRA)
radai rosenblatt created KAFKA-8366:
---

 Summary: partitions of topics being deleted show up in the offline 
partitions metric
 Key: KAFKA-8366
 URL: https://issues.apache.org/jira/browse/KAFKA-8366
 Project: Kafka
  Issue Type: Improvement
Reporter: radai rosenblatt


i believe this is a bug
offline partitions is a metric that indicates an error condition - lack of 
kafka availability.
as an artifact of how deletion is implemented the partitions for a topic 
undergoing deletion will show up as offline, which just creates false-positive 
alerts.

if needed, maybe there should exist a separate "partitions to be deleted" 
sensor.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3539) KafkaProducer.send() may block even though it returns the Future

2019-03-25 Thread radai rosenblatt (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16800714#comment-16800714
 ] 

radai rosenblatt commented on KAFKA-3539:
-

IIUC, the root of the problem is the kafka producer stores compressed batches 
of msgs, in a map keyed by the partition these msgs are intended for.
since without metadata there's no knowing the layout of a topic the producer 
cant tell where to "place" a msg, which is why it blocks on no metadata.
one possible solution would be to have an "unknown" msg bucket (with some 
finite capacity) where msgs of unknown destination go. the biggest issue with 
this is that those msgs cannot be compressed (as kafka compresses batches, not 
individual msgs, and there's no guarantee that everything in the unknown bucket 
will go into the same batch).
once metadata is obtained the "unknown bucket" would need to be iterated over, 
and the msgs deposited (and compressed) into the correct queues. this would 
need to happen when metadata arrives and before any new msgs are allowed into 
the producer (to not violate order)

> KafkaProducer.send() may block even though it returns the Future
> 
>
> Key: KAFKA-3539
> URL: https://issues.apache.org/jira/browse/KAFKA-3539
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Reporter: Oleg Zhurakousky
>Priority: Critical
>
> You can get more details from the us...@kafka.apache.org by searching on the 
> thread with the subject "KafkaProducer block on send".
> The bottom line is that method that returns Future must never block, since it 
> essentially violates the Future contract as it was specifically designed to 
> return immediately passing control back to the user to check for completion, 
> cancel etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7475) print the actual cluster bootstrap address on authentication failures

2018-10-11 Thread radai rosenblatt (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

radai rosenblatt resolved KAFKA-7475.
-
   Resolution: Fixed
 Reviewer: Rajini Sivaram
Fix Version/s: 2.1.0

> print the actual cluster bootstrap address on authentication failures
> -
>
> Key: KAFKA-7475
> URL: https://issues.apache.org/jira/browse/KAFKA-7475
> Project: Kafka
>  Issue Type: Improvement
>Reporter: radai rosenblatt
>Assignee: radai rosenblatt
>Priority: Major
> Fix For: 2.1.0
>
>
> currently when a kafka client fails to connect to a cluster, users see 
> something like this:
> {code}
> Connection to node -1 terminated during authentication. This may indicate 
> that authentication failed due to invalid credentials. 
> {code}
> that log line is mostly useless in identifying which (of potentially many) 
> kafka client is having issues and what kafka cluster is it having issues with.
> would be nice to record the remote host/port



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7475) print the actual cluster bootstrap address on authentication failures

2018-10-02 Thread radai rosenblatt (JIRA)
radai rosenblatt created KAFKA-7475:
---

 Summary: print the actual cluster bootstrap address on 
authentication failures
 Key: KAFKA-7475
 URL: https://issues.apache.org/jira/browse/KAFKA-7475
 Project: Kafka
  Issue Type: Improvement
Reporter: radai rosenblatt


currently when a kafka client fails to connect to a cluster, users see 
something like this:
{code}
Connection to node -1 terminated during authentication. This may indicate that 
authentication failed due to invalid credentials. 
{code}

that log line is mostly useless in identifying which (of potentially many) 
kafka client is having issues and what kafka cluster is it having issues with.

would be nice to record the remote host/port



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7473) allow configuring kafka client configs to not warn for unknown config peoperties

2018-10-02 Thread radai rosenblatt (JIRA)
radai rosenblatt created KAFKA-7473:
---

 Summary: allow configuring kafka client configs to not warn for 
unknown config peoperties
 Key: KAFKA-7473
 URL: https://issues.apache.org/jira/browse/KAFKA-7473
 Project: Kafka
  Issue Type: Improvement
Reporter: radai rosenblatt


as the config handed to a client may contain config keys for use by either 
modular code in the client (serializers, deserializers, interceptors) or the 
subclasses of the client class, having "unknown" (to the vanilla client) 
configs logged as a warning is an annoyance.

it would be nice to have a constructor parameter that controls this behavior 
(just like there's already a flag for `boolean doLog`)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7012) Performance issue upgrading to kafka 1.0.1 or 1.1

2018-06-15 Thread radai rosenblatt (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513037#comment-16513037
 ] 

radai rosenblatt edited comment on KAFKA-7012 at 6/15/18 8:48 PM:
--

i dont have the time to pick this up right now.

IIRC the original PR (https://github.com/apache/kafka/pull/2330) had a more 
complicated condition for when a key (channel) gets picked into 
`keysWithBufferedRead`. the codition is currently
{code}
if (channel.hasBytesBuffered()) {
//this channel has bytes enqueued in intermediary buffers 
that we could not read
//(possibly because no memory). it may be the case that the 
underlying socket will
//not come up in the next poll() and so we need to remember 
this channel for the
//next poll call otherwise data may be stuck in said 
buffers forever.
keysWithBufferedRead.add(key);
}
{code}
which results in lots of "false positives" - keys that have something left over 
in ssl buffers (likely, since request sizes are rarely a multiple of ssl cipher 
block sizes) that cause the next poll() cycle to be inefficient.

the conditions needs to check if a channel has something left *that could not 
be read out due to memory pressure*.

alternatively - the doomsday scenario this is meant to handle is pretty rare: 
if a channel has a request fully inside the ssl buffers that cannot be read due 
to memory pressure, *and* the underlying channel will never have any more 
incoming bytes (so will never come back from select) the request will sit there 
and rot resulting in a client timeout.

the alternative to making the condition more complicated is not treating this 
case at all and suffering the (rare?) timeout?


was (Author: radai):
i dont have the time to pick this up right now.

IIRC the original PR (https://github.com/apache/kafka/pull/2330) had a more 
complicated condition for when a key (channel) gets picked into 
`keysWithBufferedRead`. the codition is currently
{code}
if (channel.hasBytesBuffered()) {
//this channel has bytes enqueued in intermediary buffers 
that we could not read
//(possibly because no memory). it may be the case that the 
underlying socket will
//not come up in the next poll() and so we need to remember 
this channel for the
//next poll call otherwise data may be stuck in said 
buffers forever.
keysWithBufferedRead.add(key);
}
{code}
which results in lots of "false positives" - keys that have something left over 
in ssl buffers (likely, since request sizes are rarely a multiple of ssl cipher 
block sizes) that cause the next poll() cycle to be inefficient.

the conditions needs to check if a channel has something left *that could not 
be read out due to memory pressure*.

alternatively - the doomsday scenario this is meant to handle is pretty rare: 
if a channel has a request fully inside the ssl buffers that cannot be read due 
to memory pressure, *and* the underlying channel will never have any more 
incoming bytes (so will never come back from select) the request will sit there 
anr rot resulting in a client timeout.

the alternative to making the condition more complicated is not treating this 
case at all and suffering the (rare?) timeout?

> Performance issue upgrading to kafka 1.0.1 or 1.1
> -
>
> Key: KAFKA-7012
> URL: https://issues.apache.org/jira/browse/KAFKA-7012
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.0.1
>Reporter: rajadayalan perumalsamy
>Assignee: praveen
>Priority: Major
>  Labels: regression
> Attachments: Commit-47ee8e954-0607-bufferkeys-nopoll-profile.png, 
> Commit-47ee8e954-0607-memory.png, Commit-47ee8e954-0607-profile.png, 
> Commit-47ee8e954-profile.png, Commit-47ee8e954-profile2.png, 
> Commit-f15cdbc91b-profile.png, Commit-f15cdbc91b-profile2.png
>
>
> We are trying to upgrade kafka cluster from Kafka 0.11.0.1 to Kafka 1.0.1. 
> After upgrading 1 node on the cluster, we notice that network threads use 
> most of the cpu. It is a 3 node cluster with 15k messages/sec on each node. 
> With Kafka 0.11.0.1 typical usage of the servers is around 50 to 60% 
> vcpu(using less than 1 vcpu). After upgrade we are noticing that cpu usage is 
> high depending on the number of network threads used. If networks threads is 
> set to 8, then the cpu usage is around 850%(9 vcpus) and if it is set to 4 
> then the cpu usage is around 450%(5 vcpus). Using the same kafka 
> server.properties for both.
> Did further analysis with git bisect, couple of build and deploys, traced the 
> issue to commit 

[jira] [Comment Edited] (KAFKA-7012) Performance issue upgrading to kafka 1.0.1 or 1.1

2018-06-14 Thread radai rosenblatt (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513037#comment-16513037
 ] 

radai rosenblatt edited comment on KAFKA-7012 at 6/14/18 9:46 PM:
--

i dont have the time to pick this up right now.

IIRC the original PR (https://github.com/apache/kafka/pull/2330) had a more 
complicated condition for when a key (channel) gets picked into 
`keysWithBufferedRead`. the codition is currently
{code}
if (channel.hasBytesBuffered()) {
//this channel has bytes enqueued in intermediary buffers 
that we could not read
//(possibly because no memory). it may be the case that the 
underlying socket will
//not come up in the next poll() and so we need to remember 
this channel for the
//next poll call otherwise data may be stuck in said 
buffers forever.
keysWithBufferedRead.add(key);
}
{code}
which results in lots of "false positives" - keys that have something left over 
in ssl buffers (likely, since request sizes are rarely a multiple of ssl cipher 
block sizes) that cause the next poll() cycle to be inefficient.

the conditions needs to check if a channel has something left *that could not 
be read out due to memory pressure*.

alternatively - the doomsday scenario this is meant to handle is pretty rare: 
if a channel has a request fully inside the ssl buffers that cannot be read due 
to memory pressure, *and* the underlying channel will never have any more 
incoming bytes (so will never come back from select) the request will sit there 
anr rot resulting in a client timeout.

the alternative to making the condition more complicated is not treating this 
case at all and suffering the (rare?) timeout?


was (Author: radai):
i dont have the time to pick this up right now.

IIRC the original PR (https://github.com/apache/kafka/pull/2330) had a more 
complicated condition for when a key (channel) gets picked into 
`keysWithBufferedRead`. the codition is currently
```java
if (channel.hasBytesBuffered()) {
//this channel has bytes enqueued in intermediary buffers 
that we could not read
//(possibly because no memory). it may be the case that the 
underlying socket will
//not come up in the next poll() and so we need to remember 
this channel for the
//next poll call otherwise data may be stuck in said 
buffers forever.
keysWithBufferedRead.add(key);
}
```
which results in lots of "false positives" - keys that have something left over 
in ssl buffers (likely, since request sizes are rarely a multiple of ssl cipher 
block sizes) that cause the next poll() cycle to be inefficient.

the conditions needs to check if a channel has something left *that could not 
be read out due to memory pressure*.

alternatively - the doomsday scenario this is meant to handle is pretty rare: 
if a channel has a request fully inside the ssl buffers that cannot be read due 
to memory pressure, *and* the underlying channel will never have any more 
incoming bytes (so will never come back from select) the request will sit there 
anr rot resulting in a client timeout.

the alternative to making the condition more complicated is not treating this 
case at all and suffering the (rare?) timeout?

> Performance issue upgrading to kafka 1.0.1 or 1.1
> -
>
> Key: KAFKA-7012
> URL: https://issues.apache.org/jira/browse/KAFKA-7012
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.0.1
>Reporter: rajadayalan perumalsamy
>Assignee: praveen
>Priority: Major
> Attachments: Commit-47ee8e954-0607-bufferkeys-nopoll-profile.png, 
> Commit-47ee8e954-0607-memory.png, Commit-47ee8e954-0607-profile.png, 
> Commit-47ee8e954-profile.png, Commit-47ee8e954-profile2.png, 
> Commit-f15cdbc91b-profile.png, Commit-f15cdbc91b-profile2.png
>
>
> We are trying to upgrade kafka cluster from Kafka 0.11.0.1 to Kafka 1.0.1. 
> After upgrading 1 node on the cluster, we notice that network threads use 
> most of the cpu. It is a 3 node cluster with 15k messages/sec on each node. 
> With Kafka 0.11.0.1 typical usage of the servers is around 50 to 60% 
> vcpu(using less than 1 vcpu). After upgrade we are noticing that cpu usage is 
> high depending on the number of network threads used. If networks threads is 
> set to 8, then the cpu usage is around 850%(9 vcpus) and if it is set to 4 
> then the cpu usage is around 450%(5 vcpus). Using the same kafka 
> server.properties for both.
> Did further analysis with git bisect, couple of build and deploys, traced the 
> issue to commit 47ee8e954df62b9a79099e944ec4be29afe046f6. CPU 

[jira] [Commented] (KAFKA-7012) Performance issue upgrading to kafka 1.0.1 or 1.1

2018-06-14 Thread radai rosenblatt (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16513037#comment-16513037
 ] 

radai rosenblatt commented on KAFKA-7012:
-

i dont have the time to pick this up right now.

IIRC the original PR (https://github.com/apache/kafka/pull/2330) had a more 
complicated condition for when a key (channel) gets picked into 
`keysWithBufferedRead`. the codition is currently
```java
if (channel.hasBytesBuffered()) {
//this channel has bytes enqueued in intermediary buffers 
that we could not read
//(possibly because no memory). it may be the case that the 
underlying socket will
//not come up in the next poll() and so we need to remember 
this channel for the
//next poll call otherwise data may be stuck in said 
buffers forever.
keysWithBufferedRead.add(key);
}
```
which results in lots of "false positives" - keys that have something left over 
in ssl buffers (likely, since request sizes are rarely a multiple of ssl cipher 
block sizes) that cause the next poll() cycle to be inefficient.

the conditions needs to check if a channel has something left *that could not 
be read out due to memory pressure*.

alternatively - the doomsday scenario this is meant to handle is pretty rare: 
if a channel has a request fully inside the ssl buffers that cannot be read due 
to memory pressure, *and* the underlying channel will never have any more 
incoming bytes (so will never come back from select) the request will sit there 
anr rot resulting in a client timeout.

the alternative to making the condition more complicated is not treating this 
case at all and suffering the (rare?) timeout?

> Performance issue upgrading to kafka 1.0.1 or 1.1
> -
>
> Key: KAFKA-7012
> URL: https://issues.apache.org/jira/browse/KAFKA-7012
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.0.1
>Reporter: rajadayalan perumalsamy
>Assignee: praveen
>Priority: Major
> Attachments: Commit-47ee8e954-0607-bufferkeys-nopoll-profile.png, 
> Commit-47ee8e954-0607-memory.png, Commit-47ee8e954-0607-profile.png, 
> Commit-47ee8e954-profile.png, Commit-47ee8e954-profile2.png, 
> Commit-f15cdbc91b-profile.png, Commit-f15cdbc91b-profile2.png
>
>
> We are trying to upgrade kafka cluster from Kafka 0.11.0.1 to Kafka 1.0.1. 
> After upgrading 1 node on the cluster, we notice that network threads use 
> most of the cpu. It is a 3 node cluster with 15k messages/sec on each node. 
> With Kafka 0.11.0.1 typical usage of the servers is around 50 to 60% 
> vcpu(using less than 1 vcpu). After upgrade we are noticing that cpu usage is 
> high depending on the number of network threads used. If networks threads is 
> set to 8, then the cpu usage is around 850%(9 vcpus) and if it is set to 4 
> then the cpu usage is around 450%(5 vcpus). Using the same kafka 
> server.properties for both.
> Did further analysis with git bisect, couple of build and deploys, traced the 
> issue to commit 47ee8e954df62b9a79099e944ec4be29afe046f6. CPU usage is fine 
> for commit f15cdbc91b240e656d9a2aeb6877e94624b21f8d. But with commit 
> 47ee8e954df62b9a79099e944ec4be29afe046f6 cpu usage has increased. Have 
> attached screenshots of profiling done with both the commits. Screenshot 
> Commit-f15cdbc91b-profile shows less cpu usage by network threads and 
> Screenshots Commit-47ee8e954-profile and Commit-47ee8e954-profile2 show 
> higher cpu usage(almost entire cpu usage) by network threads. Also noticed 
> that kafka.network.Processor.poll() method is invoked 10 times more with 
> commit 47ee8e954df62b9a79099e944ec4be29afe046f6.
> We need the issue to be resolved to upgrade the cluster. Please let me know 
> if you need any additional information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7012) Performance issue upgrading to kafka 1.0.1 or 1.1

2018-06-12 Thread radai rosenblatt (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510122#comment-16510122
 ] 

radai rosenblatt commented on KAFKA-7012:
-

in light of that i think the safe change to make would be to only record keys 
with bytes still buffered if memory pressure prevented anything from being read 
out this round (as opposed to always, as is current behaviour).

> Performance issue upgrading to kafka 1.0.1 or 1.1
> -
>
> Key: KAFKA-7012
> URL: https://issues.apache.org/jira/browse/KAFKA-7012
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.0.1
>Reporter: rajadayalan perumalsamy
>Assignee: praveen
>Priority: Major
> Attachments: Commit-47ee8e954-0607-bufferkeys-nopoll-profile.png, 
> Commit-47ee8e954-0607-memory.png, Commit-47ee8e954-0607-profile.png, 
> Commit-47ee8e954-profile.png, Commit-47ee8e954-profile2.png, 
> Commit-f15cdbc91b-profile.png, Commit-f15cdbc91b-profile2.png
>
>
> We are trying to upgrade kafka cluster from Kafka 0.11.0.1 to Kafka 1.0.1. 
> After upgrading 1 node on the cluster, we notice that network threads use 
> most of the cpu. It is a 3 node cluster with 15k messages/sec on each node. 
> With Kafka 0.11.0.1 typical usage of the servers is around 50 to 60% 
> vcpu(using less than 1 vcpu). After upgrade we are noticing that cpu usage is 
> high depending on the number of network threads used. If networks threads is 
> set to 8, then the cpu usage is around 850%(9 vcpus) and if it is set to 4 
> then the cpu usage is around 450%(5 vcpus). Using the same kafka 
> server.properties for both.
> Did further analysis with git bisect, couple of build and deploys, traced the 
> issue to commit 47ee8e954df62b9a79099e944ec4be29afe046f6. CPU usage is fine 
> for commit f15cdbc91b240e656d9a2aeb6877e94624b21f8d. But with commit 
> 47ee8e954df62b9a79099e944ec4be29afe046f6 cpu usage has increased. Have 
> attached screenshots of profiling done with both the commits. Screenshot 
> Commit-f15cdbc91b-profile shows less cpu usage by network threads and 
> Screenshots Commit-47ee8e954-profile and Commit-47ee8e954-profile2 show 
> higher cpu usage(almost entire cpu usage) by network threads. Also noticed 
> that kafka.network.Processor.poll() method is invoked 10 times more with 
> commit 47ee8e954df62b9a79099e944ec4be29afe046f6.
> We need the issue to be resolved to upgrade the cluster. Please let me know 
> if you need any additional information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7012) Performance issue upgrading to kafka 1.0.1 or 1.1

2018-06-09 Thread radai rosenblatt (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16506994#comment-16506994
 ] 

radai rosenblatt commented on KAFKA-7012:
-

that line is meant to handle a (hypothetical) scenario where socket data is 
available in the SSL cyphertext buffer but no data will ever be read from the 
socket itself again.
i guess we'll need to add some code to only run this path every-so-often as the 
SSL code is apparently likely to leave things behind.
did commenting this out affect function/correctness? missing msgs? higher 99th 
percentile latencies?

> Performance issue upgrading to kafka 1.0.1 or 1.1
> -
>
> Key: KAFKA-7012
> URL: https://issues.apache.org/jira/browse/KAFKA-7012
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.0.1
>Reporter: rajadayalan perumalsamy
>Assignee: praveen
>Priority: Major
> Attachments: Commit-47ee8e954-0607-bufferkeys-nopoll-profile.png, 
> Commit-47ee8e954-0607-memory.png, Commit-47ee8e954-0607-profile.png, 
> Commit-47ee8e954-profile.png, Commit-47ee8e954-profile2.png, 
> Commit-f15cdbc91b-profile.png, Commit-f15cdbc91b-profile2.png
>
>
> We are trying to upgrade kafka cluster from Kafka 0.11.0.1 to Kafka 1.0.1. 
> After upgrading 1 node on the cluster, we notice that network threads use 
> most of the cpu. It is a 3 node cluster with 15k messages/sec on each node. 
> With Kafka 0.11.0.1 typical usage of the servers is around 50 to 60% 
> vcpu(using less than 1 vcpu). After upgrade we are noticing that cpu usage is 
> high depending on the number of network threads used. If networks threads is 
> set to 8, then the cpu usage is around 850%(9 vcpus) and if it is set to 4 
> then the cpu usage is around 450%(5 vcpus). Using the same kafka 
> server.properties for both.
> Did further analysis with git bisect, couple of build and deploys, traced the 
> issue to commit 47ee8e954df62b9a79099e944ec4be29afe046f6. CPU usage is fine 
> for commit f15cdbc91b240e656d9a2aeb6877e94624b21f8d. But with commit 
> 47ee8e954df62b9a79099e944ec4be29afe046f6 cpu usage has increased. Have 
> attached screenshots of profiling done with both the commits. Screenshot 
> Commit-f15cdbc91b-profile shows less cpu usage by network threads and 
> Screenshots Commit-47ee8e954-profile and Commit-47ee8e954-profile2 show 
> higher cpu usage(almost entire cpu usage) by network threads. Also noticed 
> that kafka.network.Processor.poll() method is invoked 10 times more with 
> commit 47ee8e954df62b9a79099e944ec4be29afe046f6.
> We need the issue to be resolved to upgrade the cluster. Please let me know 
> if you need any additional information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7012) Performance issue upgrading to kafka 1.0.1 or 1.1

2018-06-08 Thread radai rosenblatt (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16506787#comment-16506787
 ] 

radai rosenblatt edited comment on KAFKA-7012 at 6/9/18 2:06 AM:
-

no, not all of poll(), just this part:
{code}
// Poll from channels that have buffered data (but nothing more 
from the underlying socket)
if (dataInBuffers) {
keysWithBufferedRead.removeAll(readyKeys); //so no channel gets 
polled twice
Set toPoll = keysWithBufferedRead;
keysWithBufferedRead = new HashSet<>(); //poll() calls will 
repopulate if needed
pollSelectionKeys(toPoll, false, endSelect);
}
{code}

just comment out the call to pollSelectionKeys() at the end of the condition


was (Author: radai):
no, not all of poll(), just this part:
{code}
// Poll from channels that have buffered data (but nothing more 
from the underlying socket)
if (dataInBuffers) {
keysWithBufferedRead.removeAll(readyKeys); //so no channel gets 
polled twice
Set toPoll = keysWithBufferedRead;
keysWithBufferedRead = new HashSet<>(); //poll() calls will 
repopulate if needed
pollSelectionKeys(toPoll, false, endSelect);
}
{code}

> Performance issue upgrading to kafka 1.0.1 or 1.1
> -
>
> Key: KAFKA-7012
> URL: https://issues.apache.org/jira/browse/KAFKA-7012
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.0.1
>Reporter: rajadayalan perumalsamy
>Assignee: praveen
>Priority: Major
> Attachments: Commit-47ee8e954-0607-memory.png, 
> Commit-47ee8e954-0607-profile.png, Commit-47ee8e954-profile.png, 
> Commit-47ee8e954-profile2.png, Commit-f15cdbc91b-profile.png, 
> Commit-f15cdbc91b-profile2.png
>
>
> We are trying to upgrade kafka cluster from Kafka 0.11.0.1 to Kafka 1.0.1. 
> After upgrading 1 node on the cluster, we notice that network threads use 
> most of the cpu. It is a 3 node cluster with 15k messages/sec on each node. 
> With Kafka 0.11.0.1 typical usage of the servers is around 50 to 60% 
> vcpu(using less than 1 vcpu). After upgrade we are noticing that cpu usage is 
> high depending on the number of network threads used. If networks threads is 
> set to 8, then the cpu usage is around 850%(9 vcpus) and if it is set to 4 
> then the cpu usage is around 450%(5 vcpus). Using the same kafka 
> server.properties for both.
> Did further analysis with git bisect, couple of build and deploys, traced the 
> issue to commit 47ee8e954df62b9a79099e944ec4be29afe046f6. CPU usage is fine 
> for commit f15cdbc91b240e656d9a2aeb6877e94624b21f8d. But with commit 
> 47ee8e954df62b9a79099e944ec4be29afe046f6 cpu usage has increased. Have 
> attached screenshots of profiling done with both the commits. Screenshot 
> Commit-f15cdbc91b-profile shows less cpu usage by network threads and 
> Screenshots Commit-47ee8e954-profile and Commit-47ee8e954-profile2 show 
> higher cpu usage(almost entire cpu usage) by network threads. Also noticed 
> that kafka.network.Processor.poll() method is invoked 10 times more with 
> commit 47ee8e954df62b9a79099e944ec4be29afe046f6.
> We need the issue to be resolved to upgrade the cluster. Please let me know 
> if you need any additional information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7012) Performance issue upgrading to kafka 1.0.1 or 1.1

2018-06-08 Thread radai rosenblatt (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16506787#comment-16506787
 ] 

radai rosenblatt edited comment on KAFKA-7012 at 6/9/18 2:06 AM:
-

no, not all of poll(), just this part:
{code}
// Poll from channels that have buffered data (but nothing more 
from the underlying socket)
if (dataInBuffers) {
keysWithBufferedRead.removeAll(readyKeys); //so no channel gets 
polled twice
Set toPoll = keysWithBufferedRead;
keysWithBufferedRead = new HashSet<>(); //poll() calls will 
repopulate if needed
pollSelectionKeys(toPoll, false, endSelect);  < COMMENT 
THIS OUT
}
{code}

just comment out the call to pollSelectionKeys() at the end of the condition


was (Author: radai):
no, not all of poll(), just this part:
{code}
// Poll from channels that have buffered data (but nothing more 
from the underlying socket)
if (dataInBuffers) {
keysWithBufferedRead.removeAll(readyKeys); //so no channel gets 
polled twice
Set toPoll = keysWithBufferedRead;
keysWithBufferedRead = new HashSet<>(); //poll() calls will 
repopulate if needed
pollSelectionKeys(toPoll, false, endSelect);
}
{code}

just comment out the call to pollSelectionKeys() at the end of the condition

> Performance issue upgrading to kafka 1.0.1 or 1.1
> -
>
> Key: KAFKA-7012
> URL: https://issues.apache.org/jira/browse/KAFKA-7012
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.0.1
>Reporter: rajadayalan perumalsamy
>Assignee: praveen
>Priority: Major
> Attachments: Commit-47ee8e954-0607-memory.png, 
> Commit-47ee8e954-0607-profile.png, Commit-47ee8e954-profile.png, 
> Commit-47ee8e954-profile2.png, Commit-f15cdbc91b-profile.png, 
> Commit-f15cdbc91b-profile2.png
>
>
> We are trying to upgrade kafka cluster from Kafka 0.11.0.1 to Kafka 1.0.1. 
> After upgrading 1 node on the cluster, we notice that network threads use 
> most of the cpu. It is a 3 node cluster with 15k messages/sec on each node. 
> With Kafka 0.11.0.1 typical usage of the servers is around 50 to 60% 
> vcpu(using less than 1 vcpu). After upgrade we are noticing that cpu usage is 
> high depending on the number of network threads used. If networks threads is 
> set to 8, then the cpu usage is around 850%(9 vcpus) and if it is set to 4 
> then the cpu usage is around 450%(5 vcpus). Using the same kafka 
> server.properties for both.
> Did further analysis with git bisect, couple of build and deploys, traced the 
> issue to commit 47ee8e954df62b9a79099e944ec4be29afe046f6. CPU usage is fine 
> for commit f15cdbc91b240e656d9a2aeb6877e94624b21f8d. But with commit 
> 47ee8e954df62b9a79099e944ec4be29afe046f6 cpu usage has increased. Have 
> attached screenshots of profiling done with both the commits. Screenshot 
> Commit-f15cdbc91b-profile shows less cpu usage by network threads and 
> Screenshots Commit-47ee8e954-profile and Commit-47ee8e954-profile2 show 
> higher cpu usage(almost entire cpu usage) by network threads. Also noticed 
> that kafka.network.Processor.poll() method is invoked 10 times more with 
> commit 47ee8e954df62b9a79099e944ec4be29afe046f6.
> We need the issue to be resolved to upgrade the cluster. Please let me know 
> if you need any additional information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7012) Performance issue upgrading to kafka 1.0.1 or 1.1

2018-06-08 Thread radai rosenblatt (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16506787#comment-16506787
 ] 

radai rosenblatt commented on KAFKA-7012:
-

no, not all of poll(), just this part:
{code}
// Poll from channels that have buffered data (but nothing more 
from the underlying socket)
if (dataInBuffers) {
keysWithBufferedRead.removeAll(readyKeys); //so no channel gets 
polled twice
Set toPoll = keysWithBufferedRead;
keysWithBufferedRead = new HashSet<>(); //poll() calls will 
repopulate if needed
pollSelectionKeys(toPoll, false, endSelect);
}
{code}

> Performance issue upgrading to kafka 1.0.1 or 1.1
> -
>
> Key: KAFKA-7012
> URL: https://issues.apache.org/jira/browse/KAFKA-7012
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.0.1
>Reporter: rajadayalan perumalsamy
>Assignee: praveen
>Priority: Major
> Attachments: Commit-47ee8e954-0607-memory.png, 
> Commit-47ee8e954-0607-profile.png, Commit-47ee8e954-profile.png, 
> Commit-47ee8e954-profile2.png, Commit-f15cdbc91b-profile.png, 
> Commit-f15cdbc91b-profile2.png
>
>
> We are trying to upgrade kafka cluster from Kafka 0.11.0.1 to Kafka 1.0.1. 
> After upgrading 1 node on the cluster, we notice that network threads use 
> most of the cpu. It is a 3 node cluster with 15k messages/sec on each node. 
> With Kafka 0.11.0.1 typical usage of the servers is around 50 to 60% 
> vcpu(using less than 1 vcpu). After upgrade we are noticing that cpu usage is 
> high depending on the number of network threads used. If networks threads is 
> set to 8, then the cpu usage is around 850%(9 vcpus) and if it is set to 4 
> then the cpu usage is around 450%(5 vcpus). Using the same kafka 
> server.properties for both.
> Did further analysis with git bisect, couple of build and deploys, traced the 
> issue to commit 47ee8e954df62b9a79099e944ec4be29afe046f6. CPU usage is fine 
> for commit f15cdbc91b240e656d9a2aeb6877e94624b21f8d. But with commit 
> 47ee8e954df62b9a79099e944ec4be29afe046f6 cpu usage has increased. Have 
> attached screenshots of profiling done with both the commits. Screenshot 
> Commit-f15cdbc91b-profile shows less cpu usage by network threads and 
> Screenshots Commit-47ee8e954-profile and Commit-47ee8e954-profile2 show 
> higher cpu usage(almost entire cpu usage) by network threads. Also noticed 
> that kafka.network.Processor.poll() method is invoked 10 times more with 
> commit 47ee8e954df62b9a79099e944ec4be29afe046f6.
> We need the issue to be resolved to upgrade the cluster. Please let me know 
> if you need any additional information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7012) Performance issue upgrading to kafka 1.0.1 or 1.1

2018-06-07 Thread radai rosenblatt (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16505583#comment-16505583
 ] 

radai rosenblatt commented on KAFKA-7012:
-

my only current hypothesis is that `keysWithBufferedRead` is non-empty, causing 
lots of very short poll() calls for those.
could you try comment out that poll() and re-run the experiment ?

> Performance issue upgrading to kafka 1.0.1 or 1.1
> -
>
> Key: KAFKA-7012
> URL: https://issues.apache.org/jira/browse/KAFKA-7012
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.0.1
>Reporter: rajadayalan perumalsamy
>Assignee: praveen
>Priority: Major
> Attachments: Commit-47ee8e954-0607-memory.png, 
> Commit-47ee8e954-0607-profile.png, Commit-47ee8e954-profile.png, 
> Commit-47ee8e954-profile2.png, Commit-f15cdbc91b-profile.png, 
> Commit-f15cdbc91b-profile2.png
>
>
> We are trying to upgrade kafka cluster from Kafka 0.11.0.1 to Kafka 1.0.1. 
> After upgrading 1 node on the cluster, we notice that network threads use 
> most of the cpu. It is a 3 node cluster with 15k messages/sec on each node. 
> With Kafka 0.11.0.1 typical usage of the servers is around 50 to 60% 
> vcpu(using less than 1 vcpu). After upgrade we are noticing that cpu usage is 
> high depending on the number of network threads used. If networks threads is 
> set to 8, then the cpu usage is around 850%(9 vcpus) and if it is set to 4 
> then the cpu usage is around 450%(5 vcpus). Using the same kafka 
> server.properties for both.
> Did further analysis with git bisect, couple of build and deploys, traced the 
> issue to commit 47ee8e954df62b9a79099e944ec4be29afe046f6. CPU usage is fine 
> for commit f15cdbc91b240e656d9a2aeb6877e94624b21f8d. But with commit 
> 47ee8e954df62b9a79099e944ec4be29afe046f6 cpu usage has increased. Have 
> attached screenshots of profiling done with both the commits. Screenshot 
> Commit-f15cdbc91b-profile shows less cpu usage by network threads and 
> Screenshots Commit-47ee8e954-profile and Commit-47ee8e954-profile2 show 
> higher cpu usage(almost entire cpu usage) by network threads. Also noticed 
> that kafka.network.Processor.poll() method is invoked 10 times more with 
> commit 47ee8e954df62b9a79099e944ec4be29afe046f6.
> We need the issue to be resolved to upgrade the cluster. Please let me know 
> if you need any additional information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7012) Performance issue upgrading to kafka 1.0.1 or 1.1

2018-06-07 Thread radai rosenblatt (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7012?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504672#comment-16504672
 ] 

radai rosenblatt commented on KAFKA-7012:
-

the attached profile screenshot shows only 210ms (out of >650k ms) of self time 
in pool.tryAllocate().
a possible time drain with the patch is determineHandlingOrder() - it may 
shuffle the sockets in order to guarantee fairness under low memory conditions 
- but it doesnt appear in the screenshot.
there's also no equivalent screenshot of a detailed profile breakdown for the 
results without the patch, so no base to compare to.
was the profiler using cpu time or wall clock time for measurements?
were the ssl settings (cipher used etc) the same across measurements?

> Performance issue upgrading to kafka 1.0.1 or 1.1
> -
>
> Key: KAFKA-7012
> URL: https://issues.apache.org/jira/browse/KAFKA-7012
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.0.1
>Reporter: rajadayalan perumalsamy
>Assignee: praveen
>Priority: Major
> Attachments: Commit-47ee8e954-profile.png, 
> Commit-47ee8e954-profile2.png, Commit-f15cdbc91b-profile.png
>
>
> We are trying to upgrade kafka cluster from Kafka 0.11.0.1 to Kafka 1.0.1. 
> After upgrading 1 node on the cluster, we notice that network threads use 
> most of the cpu. It is a 3 node cluster with 15k messages/sec on each node. 
> With Kafka 0.11.0.1 typical usage of the servers is around 50 to 60% 
> vcpu(using less than 1 vcpu). After upgrade we are noticing that cpu usage is 
> high depending on the number of network threads used. If networks threads is 
> set to 8, then the cpu usage is around 850%(9 vcpus) and if it is set to 4 
> then the cpu usage is around 450%(5 vcpus). Using the same kafka 
> server.properties for both.
> Did further analysis with git bisect, couple of build and deploys, traced the 
> issue to commit 47ee8e954df62b9a79099e944ec4be29afe046f6. CPU usage is fine 
> for commit f15cdbc91b240e656d9a2aeb6877e94624b21f8d. But with commit 
> 47ee8e954df62b9a79099e944ec4be29afe046f6 cpu usage has increased. Have 
> attached screenshots of profiling done with both the commits. Screenshot 
> Commit-f15cdbc91b-profile shows less cpu usage by network threads and 
> Screenshots Commit-47ee8e954-profile and Commit-47ee8e954-profile2 show 
> higher cpu usage(almost entire cpu usage) by network threads. Also noticed 
> that kafka.network.Processor.poll() method is invoked 10 times more with 
> commit 47ee8e954df62b9a79099e944ec4be29afe046f6.
> We need the issue to be resolved to upgrade the cluster. Please let me know 
> if you need any additional information.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6648) Fetcher.getTopicMetadata() only returns "healthy" partitions, not all

2018-06-06 Thread radai rosenblatt (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504111#comment-16504111
 ] 

radai rosenblatt commented on KAFKA-6648:
-

[~rsivaram] - PR has been updated with tests (and rebased)

> Fetcher.getTopicMetadata() only returns "healthy" partitions, not all
> -
>
> Key: KAFKA-6648
> URL: https://issues.apache.org/jira/browse/KAFKA-6648
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0, 0.11.0.2
>Reporter: radai rosenblatt
>Assignee: radai rosenblatt
>Priority: Major
> Fix For: 2.0.0
>
>
> {code}
> if (!shouldRetry) {
>HashMap> topicsPartitionInfos = new 
> HashMap<>();
>for (String topic : cluster.topics())
>   topicsPartitionInfos.put(topic, 
> cluster.availablePartitionsForTopic(topic));
>return topicsPartitionInfos;
> }
> {code}
> this leads to inconsistent behavior upstream, for example in 
> KafkaConsumer.partitionsFor(), where if there's valid metadata all partitions 
> would be returned, whereas if MD doesnt exist (or has expired) a subset of 
> partitions (only the healthy ones) would be returned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6648) Fetcher.getTopicMetadata() only returns "healthy" partitions, not all

2018-06-06 Thread radai rosenblatt (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503665#comment-16503665
 ] 

radai rosenblatt commented on KAFKA-6648:
-

thanks for the ping, i had completely forgotten i was trying to upstream this 
fix.

> Fetcher.getTopicMetadata() only returns "healthy" partitions, not all
> -
>
> Key: KAFKA-6648
> URL: https://issues.apache.org/jira/browse/KAFKA-6648
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0, 0.11.0.2
>Reporter: radai rosenblatt
>Assignee: radai rosenblatt
>Priority: Major
> Fix For: 2.0.0
>
>
> {code}
> if (!shouldRetry) {
>HashMap> topicsPartitionInfos = new 
> HashMap<>();
>for (String topic : cluster.topics())
>   topicsPartitionInfos.put(topic, 
> cluster.availablePartitionsForTopic(topic));
>return topicsPartitionInfos;
> }
> {code}
> this leads to inconsistent behavior upstream, for example in 
> KafkaConsumer.partitionsFor(), where if there's valid metadata all partitions 
> would be returned, whereas if MD doesnt exist (or has expired) a subset of 
> partitions (only the healthy ones) would be returned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6648) Fetcher.getTopicMetadata() only returns "healthy" partitions, not all

2018-03-13 Thread radai rosenblatt (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16397306#comment-16397306
 ] 

radai rosenblatt commented on KAFKA-6648:
-

PR is https://github.com/apache/kafka/pull/4679

> Fetcher.getTopicMetadata() only returns "healthy" partitions, not all
> -
>
> Key: KAFKA-6648
> URL: https://issues.apache.org/jira/browse/KAFKA-6648
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0, 0.11.0.2
>Reporter: radai rosenblatt
>Assignee: radai rosenblatt
>Priority: Major
> Fix For: 2.0.0
>
>
> {code}
> if (!shouldRetry) {
>HashMap topicsPartitionInfos = new 
> HashMap<>();
>for (String topic : cluster.topics())
>   topicsPartitionInfos.put(topic, 
> cluster.availablePartitionsForTopic(topic));
>return topicsPartitionInfos;
> }
> {code}
> this leads to inconsistent behavior upstream, for example in 
> KafkaConsumer.partitionsFor(), where if there's valid metadata all partitions 
> would be returned, whereas if MD doesnt exist (or has expired) a subset of 
> partitions (only the healthy ones) would be returned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6648) Fetcher.getTopicMetadata() only returns "healthy" partitions, not all

2018-03-13 Thread radai rosenblatt (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

radai rosenblatt updated KAFKA-6648:

Fix Version/s: 2.0.0

> Fetcher.getTopicMetadata() only returns "healthy" partitions, not all
> -
>
> Key: KAFKA-6648
> URL: https://issues.apache.org/jira/browse/KAFKA-6648
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0, 0.11.0.2
>Reporter: radai rosenblatt
>Assignee: radai rosenblatt
>Priority: Major
> Fix For: 2.0.0
>
>
> {code}
> if (!shouldRetry) {
>HashMap topicsPartitionInfos = new 
> HashMap<>();
>for (String topic : cluster.topics())
>   topicsPartitionInfos.put(topic, 
> cluster.availablePartitionsForTopic(topic));
>return topicsPartitionInfos;
> }
> {code}
> this leads to inconsistent behavior upstream, for example in 
> KafkaConsumer.partitionsFor(), where if there's valid metadata all partitions 
> would be returned, whereas if MD doesnt exist (or has expired) a subset of 
> partitions (only the healthy ones) would be returned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6648) Fetcher.getTopicMetadata() only returns "healthy" partitions, not all

2018-03-13 Thread radai rosenblatt (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

radai rosenblatt updated KAFKA-6648:

Affects Version/s: (was: 1.0.1)
   1.0.0
   0.11.0.2

> Fetcher.getTopicMetadata() only returns "healthy" partitions, not all
> -
>
> Key: KAFKA-6648
> URL: https://issues.apache.org/jira/browse/KAFKA-6648
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0, 0.11.0.2
>Reporter: radai rosenblatt
>Assignee: radai rosenblatt
>Priority: Major
> Fix For: 2.0.0
>
>
> {code}
> if (!shouldRetry) {
>HashMap topicsPartitionInfos = new 
> HashMap<>();
>for (String topic : cluster.topics())
>   topicsPartitionInfos.put(topic, 
> cluster.availablePartitionsForTopic(topic));
>return topicsPartitionInfos;
> }
> {code}
> this leads to inconsistent behavior upstream, for example in 
> KafkaConsumer.partitionsFor(), where if there's valid metadata all partitions 
> would be returned, whereas if MD doesnt exist (or has expired) a subset of 
> partitions (only the healthy ones) would be returned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6648) Fetcher.getTopicMetadata() only returns "healthy" partitions, not all

2018-03-13 Thread radai rosenblatt (JIRA)
radai rosenblatt created KAFKA-6648:
---

 Summary: Fetcher.getTopicMetadata() only returns "healthy" 
partitions, not all
 Key: KAFKA-6648
 URL: https://issues.apache.org/jira/browse/KAFKA-6648
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 1.0.1
Reporter: radai rosenblatt
Assignee: radai rosenblatt


{code}
if (!shouldRetry) {
   HashMap topicsPartitionInfos = new HashMap<>();
   for (String topic : cluster.topics())
  topicsPartitionInfos.put(topic, 
cluster.availablePartitionsForTopic(topic));
   return topicsPartitionInfos;
}
{code}

this leads to inconsistent behavior upstream, for example in 
KafkaConsumer.partitionsFor(), where if there's valid metadata all partitions 
would be returned, whereas if MD doesnt exist (or has expired) a subset of 
partitions (only the healthy ones) would be returned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6622) GroupMetadataManager.loadGroupsAndOffsets decompresses record batch needlessly

2018-03-07 Thread radai rosenblatt (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16390502#comment-16390502
 ] 

radai rosenblatt commented on KAFKA-6622:
-

see PR - https://github.com/apache/kafka/pull/4661

> GroupMetadataManager.loadGroupsAndOffsets decompresses record batch needlessly
> --
>
> Key: KAFKA-6622
> URL: https://issues.apache.org/jira/browse/KAFKA-6622
> Project: Kafka
>  Issue Type: Bug
>Reporter: radai rosenblatt
>Assignee: radai rosenblatt
>Priority: Major
> Attachments: kafka batch iteration funtime.png
>
>
> when reading records from a consumer offsets batch, the entire batch is 
> decompressed multiple times (per record) as part of calling 
> `batch.baseOffset`. this is a very expensive operation being called in a loop 
> for no reason:
> !kafka batch iteration funtime.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6622) GroupMetadataManager.loadGroupsAndOffsets decompresses record batch needlessly

2018-03-07 Thread radai rosenblatt (JIRA)
radai rosenblatt created KAFKA-6622:
---

 Summary: GroupMetadataManager.loadGroupsAndOffsets decompresses 
record batch needlessly
 Key: KAFKA-6622
 URL: https://issues.apache.org/jira/browse/KAFKA-6622
 Project: Kafka
  Issue Type: Bug
Reporter: radai rosenblatt
Assignee: radai rosenblatt
 Attachments: kafka batch iteration funtime.png

when reading records from a consumer offsets batch, the entire batch is 
decompressed multiple times (per record) as part of calling `batch.baseOffset`. 
this is a very expensive operation being called in a loop for no reason:
!kafka batch iteration funtime.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6345) NetworkClient.inFlightRequestCount() is not thread safe, causing ConcurrentModificationExceptions when sensors are read

2018-02-08 Thread radai rosenblatt (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

radai rosenblatt updated KAFKA-6345:

Fix Version/s: 1.1.0

> NetworkClient.inFlightRequestCount() is not thread safe, causing 
> ConcurrentModificationExceptions when sensors are read
> ---
>
> Key: KAFKA-6345
> URL: https://issues.apache.org/jira/browse/KAFKA-6345
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0
>Reporter: radai rosenblatt
>Assignee: Sean McCauliff
>Priority: Major
> Fix For: 1.1.0
>
>
> example stack trace (code is ~0.10.2.*)
> {code}
> java.util.ConcurrentModificationException: 
> java.util.ConcurrentModificationException
>   at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
>   at java.util.HashMap$ValueIterator.next(HashMap.java:1458)
>   at 
> org.apache.kafka.clients.InFlightRequests.inFlightRequestCount(InFlightRequests.java:109)
>   at 
> org.apache.kafka.clients.NetworkClient.inFlightRequestCount(NetworkClient.java:382)
>   at 
> org.apache.kafka.clients.producer.internals.Sender$SenderMetrics$1.measure(Sender.java:480)
>   at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
>   at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
>   at 
> org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttribute(JmxReporter.java:183)
>   at 
> org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttributes(JmxReporter.java:193)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getAttributes(DefaultMBeanServerInterceptor.java:709)
>   at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.getAttributes(JmxMBeanServer.java:705)
> {code}
> looking at latest trunk, the code is still vulnerable:
> # NetworkClient.inFlightRequestCount() eventually iterates over 
> InFlightRequests.requests.values(), which is backed by a (non-thread-safe) 
> HashMap
> # this will be called from the "requests-in-flight" sensor's measure() method 
> (Sender.java line  ~765 in SenderMetrics ctr), which would be driven by some 
> thread reading JMX values
> # HashMap in question would also be updated by some client io thread calling 
> NetworkClient.doSend() - which calls into InFlightRequests.add())
> i guess the only upside is that this exception will always happen on the 
> thread reading the JMX values and never on the actual client io thread ...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6345) NetworkClient.inFlightRequestCount() is not thread safe, causing ConcurrentModificationExceptions when sensors are read

2018-02-08 Thread radai rosenblatt (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

radai rosenblatt resolved KAFKA-6345.
-
Resolution: Fixed

> NetworkClient.inFlightRequestCount() is not thread safe, causing 
> ConcurrentModificationExceptions when sensors are read
> ---
>
> Key: KAFKA-6345
> URL: https://issues.apache.org/jira/browse/KAFKA-6345
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0
>Reporter: radai rosenblatt
>Assignee: Sean McCauliff
>Priority: Major
> Fix For: 1.1.0
>
>
> example stack trace (code is ~0.10.2.*)
> {code}
> java.util.ConcurrentModificationException: 
> java.util.ConcurrentModificationException
>   at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
>   at java.util.HashMap$ValueIterator.next(HashMap.java:1458)
>   at 
> org.apache.kafka.clients.InFlightRequests.inFlightRequestCount(InFlightRequests.java:109)
>   at 
> org.apache.kafka.clients.NetworkClient.inFlightRequestCount(NetworkClient.java:382)
>   at 
> org.apache.kafka.clients.producer.internals.Sender$SenderMetrics$1.measure(Sender.java:480)
>   at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
>   at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
>   at 
> org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttribute(JmxReporter.java:183)
>   at 
> org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttributes(JmxReporter.java:193)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getAttributes(DefaultMBeanServerInterceptor.java:709)
>   at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.getAttributes(JmxMBeanServer.java:705)
> {code}
> looking at latest trunk, the code is still vulnerable:
> # NetworkClient.inFlightRequestCount() eventually iterates over 
> InFlightRequests.requests.values(), which is backed by a (non-thread-safe) 
> HashMap
> # this will be called from the "requests-in-flight" sensor's measure() method 
> (Sender.java line  ~765 in SenderMetrics ctr), which would be driven by some 
> thread reading JMX values
> # HashMap in question would also be updated by some client io thread calling 
> NetworkClient.doSend() - which calls into InFlightRequests.add())
> i guess the only upside is that this exception will always happen on the 
> thread reading the JMX values and never on the actual client io thread ...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6345) NetworkClient.inFlightRequestCount() is not thread safe, causing ConcurrentModificationExceptions when sensors are read

2018-02-08 Thread radai rosenblatt (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6345?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

radai rosenblatt reassigned KAFKA-6345:
---

Assignee: Sean McCauliff

> NetworkClient.inFlightRequestCount() is not thread safe, causing 
> ConcurrentModificationExceptions when sensors are read
> ---
>
> Key: KAFKA-6345
> URL: https://issues.apache.org/jira/browse/KAFKA-6345
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.0
>Reporter: radai rosenblatt
>Assignee: Sean McCauliff
>Priority: Major
>
> example stack trace (code is ~0.10.2.*)
> {code}
> java.util.ConcurrentModificationException: 
> java.util.ConcurrentModificationException
>   at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
>   at java.util.HashMap$ValueIterator.next(HashMap.java:1458)
>   at 
> org.apache.kafka.clients.InFlightRequests.inFlightRequestCount(InFlightRequests.java:109)
>   at 
> org.apache.kafka.clients.NetworkClient.inFlightRequestCount(NetworkClient.java:382)
>   at 
> org.apache.kafka.clients.producer.internals.Sender$SenderMetrics$1.measure(Sender.java:480)
>   at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
>   at 
> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
>   at 
> org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttribute(JmxReporter.java:183)
>   at 
> org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttributes(JmxReporter.java:193)
>   at 
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getAttributes(DefaultMBeanServerInterceptor.java:709)
>   at 
> com.sun.jmx.mbeanserver.JmxMBeanServer.getAttributes(JmxMBeanServer.java:705)
> {code}
> looking at latest trunk, the code is still vulnerable:
> # NetworkClient.inFlightRequestCount() eventually iterates over 
> InFlightRequests.requests.values(), which is backed by a (non-thread-safe) 
> HashMap
> # this will be called from the "requests-in-flight" sensor's measure() method 
> (Sender.java line  ~765 in SenderMetrics ctr), which would be driven by some 
> thread reading JMX values
> # HashMap in question would also be updated by some client io thread calling 
> NetworkClient.doSend() - which calls into InFlightRequests.add())
> i guess the only upside is that this exception will always happen on the 
> thread reading the JMX values and never on the actual client io thread ...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6345) NetworkClient.inFlightRequestCount() is not thread safe, causing ConcurrentModificationExceptions when sensors are read

2017-12-11 Thread radai rosenblatt (JIRA)
radai rosenblatt created KAFKA-6345:
---

 Summary: NetworkClient.inFlightRequestCount() is not thread safe, 
causing ConcurrentModificationExceptions when sensors are read
 Key: KAFKA-6345
 URL: https://issues.apache.org/jira/browse/KAFKA-6345
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 1.0.0
Reporter: radai rosenblatt


example stack trace (code is ~0.10.2.*)
{code}
java.util.ConcurrentModificationException: 
java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
at java.util.HashMap$ValueIterator.next(HashMap.java:1458)
at 
org.apache.kafka.clients.InFlightRequests.inFlightRequestCount(InFlightRequests.java:109)
at 
org.apache.kafka.clients.NetworkClient.inFlightRequestCount(NetworkClient.java:382)
at 
org.apache.kafka.clients.producer.internals.Sender$SenderMetrics$1.measure(Sender.java:480)
at 
org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
at 
org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
at 
org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttribute(JmxReporter.java:183)
at 
org.apache.kafka.common.metrics.JmxReporter$KafkaMbean.getAttributes(JmxReporter.java:193)
at 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.getAttributes(DefaultMBeanServerInterceptor.java:709)
at 
com.sun.jmx.mbeanserver.JmxMBeanServer.getAttributes(JmxMBeanServer.java:705)
{code}

looking at latest trunk, the code is still vulnerable:
# NetworkClient.inFlightRequestCount() eventually iterates over 
InFlightRequests.requests.values(), which is backed by a (non-thread-safe) 
HashMap
# this will be called from the "requests-in-flight" sensor's measure() method 
(Sender.java line  ~765 in SenderMetrics ctr), which would be driven by some 
thread reading JMX values
# HashMap in question would also be updated by some client io thread calling 
NetworkClient.doSend() - which calls into InFlightRequests.add())

i guess the only upside is that this exception will always happen on the thread 
reading the JMX values and never on the actual client io thread ...



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6216) kafka logs for misconfigured ssl clients are unhelpful

2017-11-17 Thread radai rosenblatt (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

radai rosenblatt updated KAFKA-6216:

Affects Version/s: (was: 1.0.0)
   0.10.2.1

> kafka logs for misconfigured ssl clients are unhelpful
> --
>
> Key: KAFKA-6216
> URL: https://issues.apache.org/jira/browse/KAFKA-6216
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.1
>Reporter: radai rosenblatt
>
> if you misconfigure the keystores on an ssl client, you will currently get a 
> log full of these:
> {code}
> java.io.IOException: Connection reset by peer
>   at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>   at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>   at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>   at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>   at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
>   at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
>   at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
>   at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
>   at org.apache.kafka.common.network.Selector.close(Selector.java:531)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:233)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> these are caught and printed as part of the client Selector trying to close 
> the channel after having caught an IOException (lets call that the root 
> issue).
> the root issue itself is only logged at debug, which is not on 99% of the 
> time, leaving users with very litle clues as to whats gone wrong.
> after turning on debug log, the root issue clearly indicated what the problem 
> was in our case:
> {code}
> javax.net.ssl.SSLHandshakeException: General SSLEngine problem
>   at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1409)
>   at 
> sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535)
>   at 
> sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1214)
>   at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1186)
>   at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshakeWrap(SslTransportLayer.java:382)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:243)
>   at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:69)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:350)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:233)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
>   at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
>   at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1728)
>   at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:304)
>   at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)
>   at 
> sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1478)
>   at 
> sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:212)
>   at sun.security.ssl.Handshaker.processLoop(Handshaker.java:957)
>   at sun.security.ssl.Handshaker$1.run(Handshaker.java:897)
>   at sun.security.ssl.Handshaker$1.run(Handshaker.java:894)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1347)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:336)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:417)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:270)
>   ... 7 more
> Caused by: 

[jira] [Updated] (KAFKA-6216) kafka logs for misconfigured ssl clients are unhelpful

2017-11-15 Thread radai rosenblatt (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

radai rosenblatt updated KAFKA-6216:

Description: 
if you misconfigure the keystores on an ssl client, you will currently get a 
log full of these:
{code}
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470)
at 
org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
at 
org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
at 
org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
at org.apache.kafka.common.network.Selector.close(Selector.java:531)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:233)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
at java.lang.Thread.run(Thread.java:745)
{code}
these are caught and printed as part of the client Selector trying to close the 
channel after having caught an IOException (lets call that the root issue).

the root issue itself is only logged at debug, which is not on 99% of the time, 
leaving users with very litle clues as to whats gone wrong.

after turning on debug log, the root issue clearly indicated what the problem 
was in our case:
{code}
javax.net.ssl.SSLHandshakeException: General SSLEngine problem
at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1409)
at 
sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535)
at 
sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1214)
at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1186)
at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469)
at 
org.apache.kafka.common.network.SslTransportLayer.handshakeWrap(SslTransportLayer.java:382)
at 
org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:243)
at 
org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:69)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:350)
at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:233)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
at java.lang.Thread.run(Thread.java:745)
Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1728)
at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:304)
at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)
at 
sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1478)
at 
sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:212)
at sun.security.ssl.Handshaker.processLoop(Handshaker.java:957)
at sun.security.ssl.Handshaker$1.run(Handshaker.java:897)
at sun.security.ssl.Handshaker$1.run(Handshaker.java:894)
at java.security.AccessController.doPrivileged(Native Method)
at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1347)
at 
org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:336)
at 
org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:417)
at 
org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:270)
... 7 more
Caused by: sun.security.validator.ValidatorException: No trusted certificate 
found
at 
sun.security.validator.SimpleValidator.buildTrustedChain(SimpleValidator.java:384)
at 
sun.security.validator.SimpleValidator.engineValidate(SimpleValidator.java:133)
at sun.security.validator.Validator.validate(Validator.java:260)
at 
sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:324)
at 

[jira] [Commented] (KAFKA-6216) kafka logs for misconfigured ssl clients are unhelpful

2017-11-15 Thread radai rosenblatt (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16254473#comment-16254473
 ] 

radai rosenblatt commented on KAFKA-6216:
-

i have a proposed fix to address this here - 
https://github.com/apache/kafka/pull/4223
the current PR carries along the root cause, which gets printed in case of any 
further exceptions closing the channel.
an alternative is to log SSL exceptions as something other than debug.

> kafka logs for misconfigured ssl clients are unhelpful
> --
>
> Key: KAFKA-6216
> URL: https://issues.apache.org/jira/browse/KAFKA-6216
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: radai rosenblatt
>
> if you misconfigure the keystores on an ssl client, you will currently get a 
> log full of these:
> ```
> java.io.IOException: Connection reset by peer
>   at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>   at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>   at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>   at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>   at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
>   at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
>   at 
> org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
>   at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
>   at org.apache.kafka.common.network.Selector.close(Selector.java:531)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:233)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
>   at java.lang.Thread.run(Thread.java:745)
> ```
> these are caught and printed as part of the client Selector trying to close 
> the channel after having caught an IOException (lets call that the root 
> issue).
> the root issue itself is only logged at debug, which is not on 99% of the 
> time, leaving users with very litle clues as to whats gone wrong.
> after turning on debug log, the root issue clearly indicated what the problem 
> was in our case:
> ```
> javax.net.ssl.SSLHandshakeException: General SSLEngine problem
>   at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1409)
>   at 
> sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535)
>   at 
> sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1214)
>   at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1186)
>   at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshakeWrap(SslTransportLayer.java:382)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:243)
>   at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:69)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:350)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:233)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
>   at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
>   at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1728)
>   at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:304)
>   at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)
>   at 
> sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1478)
>   at 
> sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:212)
>   at sun.security.ssl.Handshaker.processLoop(Handshaker.java:957)
>   at sun.security.ssl.Handshaker$1.run(Handshaker.java:897)
>   at sun.security.ssl.Handshaker$1.run(Handshaker.java:894)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1347)
>   at 
> org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:336)
>   at 
> 

[jira] [Created] (KAFKA-6216) kafka logs for misconfigured ssl clients are unhelpful

2017-11-15 Thread radai rosenblatt (JIRA)
radai rosenblatt created KAFKA-6216:
---

 Summary: kafka logs for misconfigured ssl clients are unhelpful
 Key: KAFKA-6216
 URL: https://issues.apache.org/jira/browse/KAFKA-6216
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.0.0
Reporter: radai rosenblatt


if you misconfigure the keystores on an ssl client, you will currently get a 
log full of these:
```
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470)
at 
org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
at 
org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:731)
at 
org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:54)
at org.apache.kafka.common.network.Selector.doClose(Selector.java:540)
at org.apache.kafka.common.network.Selector.close(Selector.java:531)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:378)
at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:233)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
at java.lang.Thread.run(Thread.java:745)
```
these are caught and printed as part of the client Selector trying to close the 
channel after having caught an IOException (lets call that the root issue).

the root issue itself is only logged at debug, which is not on 99% of the time, 
leaving users with very litle clues as to whats gone wrong.

after turning on debug log, the root issue clearly indicated what the problem 
was in our case:
```
javax.net.ssl.SSLHandshakeException: General SSLEngine problem
at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1409)
at 
sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535)
at 
sun.security.ssl.SSLEngineImpl.writeAppRecord(SSLEngineImpl.java:1214)
at sun.security.ssl.SSLEngineImpl.wrap(SSLEngineImpl.java:1186)
at javax.net.ssl.SSLEngine.wrap(SSLEngine.java:469)
at 
org.apache.kafka.common.network.SslTransportLayer.handshakeWrap(SslTransportLayer.java:382)
at 
org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:243)
at 
org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:69)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:350)
at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:233)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:131)
at java.lang.Thread.run(Thread.java:745)
Caused by: javax.net.ssl.SSLHandshakeException: General SSLEngine problem
at sun.security.ssl.Alerts.getSSLException(Alerts.java:192)
at sun.security.ssl.SSLEngineImpl.fatal(SSLEngineImpl.java:1728)
at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:304)
at sun.security.ssl.Handshaker.fatalSE(Handshaker.java:296)
at 
sun.security.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1478)
at 
sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:212)
at sun.security.ssl.Handshaker.processLoop(Handshaker.java:957)
at sun.security.ssl.Handshaker$1.run(Handshaker.java:897)
at sun.security.ssl.Handshaker$1.run(Handshaker.java:894)
at java.security.AccessController.doPrivileged(Native Method)
at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1347)
at 
org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:336)
at 
org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:417)
at 
org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:270)
... 7 more
Caused by: sun.security.validator.ValidatorException: No trusted certificate 
found
at 
sun.security.validator.SimpleValidator.buildTrustedChain(SimpleValidator.java:384)
at 
sun.security.validator.SimpleValidator.engineValidate(SimpleValidator.java:133)
at sun.security.validator.Validator.validate(Validator.java:260)
at