[jira] [Comment Edited] (KAFKA-15653) NPE in ChunkedByteStream

2023-10-24 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779000#comment-17779000
 ] 

Divij Vaidya edited comment on KAFKA-15653 at 10/24/23 9:36 AM:


Thanks [~jolshan] . I will add my thoughts on how to prevent this in future the 
new Jira you started. As a summary, I think we might want to start working 
towards a "debug" mode in the broker which will enable assertions for different 
invariants in Kafka. Invariants could be derived from formal verification that 
Jack and others have shared with the community earlier OR from tribal knowledge 
in the community such as network threads should not perform any storage IO. The 
release qualification will run the broker in "debug" mode and will validate 
these assertions while running different series of tests. 

EDIT - I started a thread in dev mailing list to solicit ideas on detecting & 
preventing hard bugs 
[https://lists.apache.org/thread/zjcyp4h9kkl3gjfblgcwodf2y8oyy0hj] 


was (Author: divijvaidya):
Thanks [~jolshan] . I will add my thoughts on how to prevent this in future the 
new Jira you started. As a summary, I think we might want to start working 
towards a "debug" mode in the broker which will enable assertions for different 
invariants in Kafka. Invariants could be derived from formal verification that 
Jack and others have shared with the community earlier OR from tribal knowledge 
in the community such as network threads should not perform any storage IO. The 
release qualification will run the broker in "debug" mode and will validate 
these assertions while running different series of tests. 

> NPE in ChunkedByteStream
> 
>
> Key: KAFKA-15653
> URL: https://issues.apache.org/jira/browse/KAFKA-15653
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.6.0
> Environment: Docker container on a Linux laptop, using the latest 
> release.
>Reporter: Travis Bischel
>Assignee: Justine Olshan
>Priority: Major
> Attachments: repro.sh
>
>
> When looping franz-go integration tests, I received an UNKNOWN_SERVER_ERROR 
> from producing. The broker logs for the failing request:
>  
> {noformat}
> [2023-10-19 22:29:58,160] ERROR [ReplicaManager broker=2] Error processing 
> append operation on partition 
> 2fa8995d8002fbfe68a96d783f26aa2c5efc15368bf44ed8f2ab7e24b41b9879-24 
> (kafka.server.ReplicaManager)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89)
>   at 
> org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352)
>   at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358)
>   at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165)
>   at kafka.log.UnifiedLog.append(UnifiedLog.scala:805)
>   at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719)
>   at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313)
>   at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210)
>   at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
>   at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
>   at scala.collection.mutable.HashMap.map(HashMap.scala:35)
>   at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198)
>   at kafka.server.ReplicaManager.appendEntries$1(ReplicaManager.scala:754)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendRecords$18(ReplicaManager.scala:874)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendRecords$18$adapted(ReplicaManager.scala:874)
>   at 
> kafka.server.KafkaRequestHandler$.$anonfun$wrap$3(KafkaRequestHandler.scala:73)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:130)
>   at java.base/java.lang.Thread.run(Unknown Source)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15653) NPE in ChunkedByteStream

2023-10-23 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17778655#comment-17778655
 ] 

Ismael Juma edited comment on KAFKA-15653 at 10/23/23 1:09 PM:
---

Good catch. I think we should review the design of this change in closer detail 
to understand the full implications - is the buffer supplier the only issue or 
are there others? The assumption is that anything that is executed in a 
different thread would have to ensure thread-safety, etc.


was (Author: ijuma):
Good catch. I think we should review the design of this change in closer detail 
to understand the full implications - is the buffer supplier the only issue or 
are there others?

> NPE in ChunkedByteStream
> 
>
> Key: KAFKA-15653
> URL: https://issues.apache.org/jira/browse/KAFKA-15653
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.6.0
> Environment: Docker container on a Linux laptop, using the latest 
> release.
>Reporter: Travis Bischel
>Assignee: Divij Vaidya
>Priority: Major
> Attachments: repro.sh
>
>
> When looping franz-go integration tests, I received an UNKNOWN_SERVER_ERROR 
> from producing. The broker logs for the failing request:
>  
> {noformat}
> [2023-10-19 22:29:58,160] ERROR [ReplicaManager broker=2] Error processing 
> append operation on partition 
> 2fa8995d8002fbfe68a96d783f26aa2c5efc15368bf44ed8f2ab7e24b41b9879-24 
> (kafka.server.ReplicaManager)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89)
>   at 
> org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352)
>   at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358)
>   at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165)
>   at kafka.log.UnifiedLog.append(UnifiedLog.scala:805)
>   at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719)
>   at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313)
>   at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210)
>   at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
>   at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
>   at scala.collection.mutable.HashMap.map(HashMap.scala:35)
>   at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198)
>   at kafka.server.ReplicaManager.appendEntries$1(ReplicaManager.scala:754)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendRecords$18(ReplicaManager.scala:874)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendRecords$18$adapted(ReplicaManager.scala:874)
>   at 
> kafka.server.KafkaRequestHandler$.$anonfun$wrap$3(KafkaRequestHandler.scala:73)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:130)
>   at java.base/java.lang.Thread.run(Unknown Source)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15653) NPE in ChunkedByteStream

2023-10-23 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17778603#comment-17778603
 ] 

Divij Vaidya edited comment on KAFKA-15653 at 10/23/23 10:33 AM:
-

I think I found a quite fundamental problem here.

By design, BufferPool is not thread safe in Kafka [1]. And that is acceptable 
because the buffer pool instance is local to a request handler thread [2]. 
Hence we assume that a particular buffer pool will always be accessed only by 
it's owner thread.

However, unfortunately, seems like that this assumption is not true!

I have same BufferPool being accessed by two different request handler threads. 
The first access is legitimate while trying to process an API request at [3]. 
The second access comes from a change introduced in 
[https://github.com/apache/kafka/commit/56dcb837a2f1c1d8c016cfccf8268a910bb77a36]
 where we are passing the stateful variable of a request (including the buffer 
supplier) to a callback which could be executed by a different request handler 
thread [4] . Hence, we will end up in a situation where stateful members such 
as requestLocal of one thread is being accessed by another different thread. 
This is a bug.

*Impact of the bug*

This bug has been present since 3.5 but the impact is visible in 3.6 because in 
3.6 we expanded the use of request local buffer pool to perform decompression. 
Earlier the buffer pool was only being used in read path and by log cleaner. 
The callback mentioned above calls appendToLocalLog and prior to 3.6 this code 
path wasn't using requestLocal.

All 3.6 operations which call the following line in ReplicaManager are impacted 
which is basically use case of transactions and also specifically to 
AddPartitionsToTxnManager API calls.

```
addPartitionsToTxnManager.foreach({_}.addTxnData(node, 
notYetVerifiedTransaction, 
KafkaRequestHandler.wrap(appendEntries(entriesPerPartition)({_}
```

*Possible solutions*
[~jolshan] can you please take a look and see if we can avoid leaking the 
requestLocal (bufferpool) belonging to one thread to the other thread?

One way we can fix this is to store the bufferpool reference in ThreadLocal and 
instead of passing the reference around, whenever we want to use bufferpool, we 
will directly ask the executing thread for it. This will ensure that a thread 
is always using it's own instance of the buffer pool. [~ijuma] since you wrote 
the original requestLocal buffer pool, what do you think about this solution?

Another option is to extract out function `appendEntries` outside of 
`appendRecords` since it is `appendEntries` is invoked as a callback and 
shouldn't rely on local variable/state of `appendRecords`.



[1] 
[https://github.com/apache/kafka/blob/9c77c17c4eae19af743e551e8e7d8b49b07c4e99/clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java#L27]
 

[2] 
[https://github.com/apache/kafka/blob/9c77c17c4eae19af743e551e8e7d8b49b07c4e99/core/src/main/scala/kafka/server/KafkaRequestHandler.scala#L96]
 

[3] 
[https://github.com/apache/kafka/blob/9c77c17c4eae19af743e551e8e7d8b49b07c4e99/core/src/main/scala/kafka/server/KafkaRequestHandler.scala#L154]
 

[4] 
[https://github.com/apache/kafka/blob/526d0f63b5d82f8fb50c97aea9c61f8f85467e92/core/src/main/scala/kafka/server/ReplicaManager.scala#L874]
 

[5] [https://docs.oracle.com/javase/8/docs/api/java/lang/ThreadLocal.html] 


was (Author: divijvaidya):
I think I found a quite fundamental problem here.

By design, BufferPool is not thread safe in Kafka [1]. And that is acceptable 
because the buffer pool instance is local to a request handler thread [2]. 
Hence we assume that a particular buffer pool will always be accessed only by 
it's owner thread.

However, unfortunately, seems like that this assumption is not true!

I have same BufferPool being accessed by two different request handler threads. 
The first access is legitimate while trying to process an API request at [3]. 
The second access comes from a change introduced in 
[https://github.com/apache/kafka/commit/56dcb837a2f1c1d8c016cfccf8268a910bb77a36]
 where we are passing the stateful variable of a request (including the buffer 
supplier) to a callback which could be executed by a different request handler 
thread [4] . Hence, we will end up in a situation where stateful members such 
as requestLocal of one thread is being accessed by another different thread. 
This is a bug.

*Impact of the bug*

This bug has been present since 3.5 but the impact is visible in 3.6 because in 
3.6 we expanded the use of request local buffer pool to perform decompression. 
Earlier the buffer pool was only being used in read path and by log cleaner. 
The callback mentioned above calls appendToLocalLog and prior to 3.6 this code 
path wasn't using requestLocal.

All 3.6 operations which call the following line in ReplicaManager are impacted 
which is basically use case of 

[jira] [Comment Edited] (KAFKA-15653) NPE in ChunkedByteStream

2023-10-23 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17778603#comment-17778603
 ] 

Divij Vaidya edited comment on KAFKA-15653 at 10/23/23 10:23 AM:
-

I think I found a quite fundamental problem here.

By design, BufferPool is not thread safe in Kafka [1]. And that is acceptable 
because the buffer pool instance is local to a request handler thread [2]. 
Hence we assume that a particular buffer pool will always be accessed only by 
it's owner thread.

However, unfortunately, seems like that this assumption is not true!

I have same BufferPool being accessed by two different request handler threads. 
The first access is legitimate while trying to process an API request at [3]. 
The second access comes from a change introduced in 
[https://github.com/apache/kafka/commit/56dcb837a2f1c1d8c016cfccf8268a910bb77a36]
 where we are passing the stateful variable of a request (including the buffer 
supplier) to a callback which could be executed by a different request handler 
thread [4] . Hence, we will end up in a situation where stateful members such 
as requestLocal of one thread is being accessed by another different thread. 
This is a bug.

*Impact of the bug*

This bug has been present since 3.5 but the impact is visible in 3.6 because in 
3.6 we expanded the use of request local buffer pool to perform decompression. 
Earlier the buffer pool was only being used in read path and by log cleaner. 
The callback mentioned above calls appendToLocalLog and prior to 3.6 this code 
path wasn't using requestLocal.

All 3.6 operations which call the following line in ReplicaManager are impacted 
which is basically use case of transactions and also specifically to 
AddPartitionsToTxnManager API calls.

```
addPartitionsToTxnManager.foreach(_.addTxnData(node, notYetVerifiedTransaction, 
KafkaRequestHandler.wrap(appendEntries(entriesPerPartition)(_
```

*Possible solutions*
[~jolshan] can you please take a look and see if we can avoid leaking the 
requestLocal (bufferpool) belonging to one thread to the other thread?

One way we can fix this is to store the bufferpool reference in ThreadLocal and 
instead of passing the reference around, whenever we want to use bufferpool, we 
will directly ask the executing thread for it. This will ensure that a thread 
is always using it's own instance of the buffer pool. [~ijuma] since you wrote 
the original requestLocal buffer pool, what do you think about this solution?

[1] 
[https://github.com/apache/kafka/blob/9c77c17c4eae19af743e551e8e7d8b49b07c4e99/clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java#L27]
 

[2] 
[https://github.com/apache/kafka/blob/9c77c17c4eae19af743e551e8e7d8b49b07c4e99/core/src/main/scala/kafka/server/KafkaRequestHandler.scala#L96]
 

[3] 
[https://github.com/apache/kafka/blob/9c77c17c4eae19af743e551e8e7d8b49b07c4e99/core/src/main/scala/kafka/server/KafkaRequestHandler.scala#L154]
 

[4] 
[https://github.com/apache/kafka/blob/526d0f63b5d82f8fb50c97aea9c61f8f85467e92/core/src/main/scala/kafka/server/ReplicaManager.scala#L874]
 

[5] [https://docs.oracle.com/javase/8/docs/api/java/lang/ThreadLocal.html] 


was (Author: divijvaidya):
I think I found a quite fundamental problem here.

By design, BufferPool is not thread safe in Kafka [1]. And that is acceptable 
because the buffer pool instance is local to a request handler thread [2]. 
Hence we assume that a particular buffer pool will always be accessed only by 
it's owner thread.

However, unfortunately, seems like that this assumption is not true!

I have same BufferPool being accessed by two different request handler threads. 
The first access is legitimate while trying to process an API request at [3]. 
The second access comes from a change introduced in 
[https://github.com/apache/kafka/commit/56dcb837a2f1c1d8c016cfccf8268a910bb77a36]
 where we are passing the stateful variable of a request (including the buffer 
supplier) to a callback which could be executed by a different request handler 
thread [4] . Hence, we will end up in a situation where stateful members such 
as requestLocal of one thread is being accessed by another different thread. 
This is a bug. This has been present since 3.5 but the impact is more visible 
in 3.6 because in 3.6 we expanded the use of request local buffer pool to 
perform decompression. I am yet to see what other scenarios it can impact but 
anywhere we are using bufferpool might be at risk

[~jolshan] can you please take a look and see if we can avoid leaking the 
requestLocal (bufferpool) belonging to one thread to the other thread? 

One way we can fix this is to store the bufferpool reference in ThreadLocal and 
instead of passing the reference around, whenever we want to use bufferpool, we 
will directly ask the executing thread for it. This will ensure that a thread 
is always using it's own instance of 

[jira] [Comment Edited] (KAFKA-15653) NPE in ChunkedByteStream

2023-10-20 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1664#comment-1664
 ] 

Divij Vaidya edited comment on KAFKA-15653 at 10/20/23 10:12 AM:
-

I think the potential bug could be in how we are closing the ChunkedBytesStream 
and returning the buffer for re-use later.
{code:java}
@Override
public void close() throws IOException {
byte[] mybuf = intermediateBuf;
intermediateBuf = null;

InputStream input = in;
in = null;

if (mybuf != null)
bufferSupplier.release(intermediateBufRef);
if (input != null)
input.close();
} {code}
I -am suspecting this because, we are setting underlying buffer behind the 
ByteBuffer to be null, which will allow it to be garbage collected. But we 
don't want it to be GC'ed because we simply want to return it to the pool so 
that it can be used later again.-

-I will verify this tomorrow with a test and have a fix out soon.-
Nevermind, it will not be GC'ed because we have a reference to it via mybuf. I 
verified it using a test.


{code:java}
@Test
public void testValidReturnToBufferSupplierOnClose() throws IOException, 
InterruptedException {
final BufferSupplier threadSpecificSupplier = BufferSupplier.create();
final boolean delegateSkipToSourceStream = false;

final ByteBuffer inputBuffer = 
ByteBuffer.allocate(SIZE_LITTLE_LARGE_THAN_INTERMEDIATE_BUFFER_SIZE);
final ByteBuffer originalIntermediateBufferRef;

try (ChunkedBytesStream is = new ChunkedBytesStream(new 
ByteBufferInputStream(inputBuffer.duplicate()), threadSpecificSupplier, 
DEFAULT_INTERMEDIATE_BUFFER_SIZE, delegateSkipToSourceStream)) {
assertNotNull(is.intermediateBufRef);
// read everything to verify sanity
Utils.readFully(is, ByteBuffer.allocate(inputBuffer.capacity()));
// store reference of intermediate buffer provided by buffer supplier
originalIntermediateBufferRef = is.intermediateBufRef;
}

// The intermediate buffer should have been returned to buffer supplier.
// Force GC to rule out any lingering references. Notably this is just a 
hint and may not cause actual GC.
System.gc();
// Wait to GC to do it's magic.
Thread.sleep(2000);

try (ChunkedBytesStream is = new ChunkedBytesStream(new 
ByteBufferInputStream(inputBuffer.duplicate()), threadSpecificSupplier, 
DEFAULT_INTERMEDIATE_BUFFER_SIZE, delegateSkipToSourceStream)) {
assertNotNull(is.intermediateBufRef);
assertEquals(originalIntermediateBufferRef, is.intermediateBufRef);
}
} {code}


was (Author: divijvaidya):
I think the potential bug could be in how we are closing the ChunkedBytesStream 
and returning the buffer for re-use later.
{code:java}

@Override
public void close() throws IOException {
byte[] mybuf = intermediateBuf;
intermediateBuf = null;

InputStream input = in;
in = null;

if (mybuf != null)
bufferSupplier.release(intermediateBufRef);
if (input != null)
input.close();
} {code}

I am suspecting this because, we are setting underlying buffer behind the 
ByteBuffer to be null, which will allow it to be garbage collected. But we 
don't want it to be GC'ed because we simply want to return it to the pool so 
that it can be used later again.

I will verify this tomorrow with a test and have a fix out soon.

> NPE in ChunkedByteStream
> 
>
> Key: KAFKA-15653
> URL: https://issues.apache.org/jira/browse/KAFKA-15653
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.6.0
> Environment: Docker container on a Linux laptop, using the latest 
> release.
>Reporter: Travis Bischel
>Assignee: Divij Vaidya
>Priority: Major
>
> When looping franz-go integration tests, I received an UNKNOWN_SERVER_ERROR 
> from producing. The broker logs for the failing request:
>  
> {noformat}
> [2023-10-19 22:29:58,160] ERROR [ReplicaManager broker=2] Error processing 
> append operation on partition 
> 2fa8995d8002fbfe68a96d783f26aa2c5efc15368bf44ed8f2ab7e24b41b9879-24 
> (kafka.server.ReplicaManager)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89)
>   at 
> org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352)
>   at 
> 

[jira] [Comment Edited] (KAFKA-15653) NPE in ChunkedByteStream

2023-10-20 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1647#comment-1647
 ] 

Divij Vaidya edited comment on KAFKA-15653 at 10/20/23 9:31 AM:


In 3.6, we started using Buffer pool local to each request handler thread to 
perform decompression. The above stack trace indicates a null when we ask for a 
buffer from the buffer pool, returned by bufferQueue.pollfirst() at 
[https://github.com/apache/kafka/blob/c81a7252195261f649faba166ee723552bed4d81/clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java#L76]
 

Ideally that should not be possible bufferQueue.pollfirst() returns null only 
when bufferQueue is empty. And we already check fr bufferQueue being empty in 
the line above. Also this function is not thread safe. It doesn't need to be 
thread safe because a particular instance of buffer pool (DefaultSupplier) is 
associated with single thread (the request handler thread) and only one thread 
should be accessing it at one time. 

Either we are incorrectly accessing DefaultBufferSupplier.get() from two 
threads and causing race condition OR somehow in the same thread reference is 
being set to null/or garbage collected?!



I will try to eyeball to code here to see if I can find something. But 
practically [~twmb] it would be greatly useful if you can share your 
integration/unit test with us so that we can find a deterministic way to 
reproduce it.


was (Author: divijvaidya):
In 3.6, we started using Buffer pool local to each request handler thread to 
perform decompression. The above stack trace indicates a null when we ask for a 
buffer from the buffer pool, returned by bufferQueue.pollfirst() at 
[https://github.com/apache/kafka/blob/c81a7252195261f649faba166ee723552bed4d81/clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java#L76]
 

Ideally that should not be possible bufferQueue.pollfirst() returns null only 
when bufferQueue is empty. And we already check fr bufferQueue being empty in 
the line above. But this function is not thread safe. It doesn't need to be 
because a particular instance of buffer pool (DefaultSupplier) is associated 
with single thread (the request handler thread) and only one thread should be 
accessing it at one time. 

I will try to eyeball to code here to see if I can find something. But 
practically [~twmb] it would be greatly useful if you can share your 
integration/unit test with us so that we can find a deterministic way to 
reproduce it.

> NPE in ChunkedByteStream
> 
>
> Key: KAFKA-15653
> URL: https://issues.apache.org/jira/browse/KAFKA-15653
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.6.0
> Environment: Docker container on a Linux laptop, using the latest 
> release.
>Reporter: Travis Bischel
>Assignee: Divij Vaidya
>Priority: Major
>
> When looping franz-go integration tests, I received an UNKNOWN_SERVER_ERROR 
> from producing. The broker logs for the failing request:
>  
> {noformat}
> [2023-10-19 22:29:58,160] ERROR [ReplicaManager broker=2] Error processing 
> append operation on partition 
> 2fa8995d8002fbfe68a96d783f26aa2c5efc15368bf44ed8f2ab7e24b41b9879-24 
> (kafka.server.ReplicaManager)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89)
>   at 
> org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277)
>   at 
> org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352)
>   at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358)
>   at 
> org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165)
>   at kafka.log.UnifiedLog.append(UnifiedLog.scala:805)
>   at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719)
>   at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313)
>   at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210)
>   at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
>   at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
>   at scala.collection.mutable.HashMap.map(HashMap.scala:35)
>   at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198)
>  

[jira] [Comment Edited] (KAFKA-15653) NPE in ChunkedByteStream

2023-10-19 Thread Travis Bischel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1542#comment-1542
 ] 

Travis Bischel edited comment on KAFKA-15653 at 10/20/23 2:55 AM:
--

{noformat}
[2023-10-20 02:31:00,204] ERROR [ReplicaManager broker=1] Error processing 
append operation on partition 
2c69b88eab8670ef1fd0e55b81b9e000995386afd8756ea342494d36911e6f01-29 
(kafka.server.ReplicaManager)
java.lang.NullPointerException: Cannot invoke "java.nio.ByteBuffer.hasArray()" 
because "this.intermediateBufRef" is null 
at 
org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89)
at 
org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105)
at 
org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273)
at 
org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277)
at 
org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352)
at 
org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358)
at 
org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165)
at kafka.log.UnifiedLog.$anonfun$append$2(UnifiedLog.scala:805)
at kafka.log.UnifiedLog.append(UnifiedLog.scala:1845)
at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719)
at 
kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313)
at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301)
at 
kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210)
at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:400)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at 
kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198)
at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:754)
at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:686)
at kafka.server.KafkaApis.handle(KafkaApis.scala:180)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:149)
at java.base/java.lang.Thread.run(Thread.java:833)
{noformat}


was (Author: twmb):
Not just :
{noformat}
[2023-10-20 02:31:00,204] ERROR [ReplicaManager broker=1] Error processing 
append operation on partition 
2c69b88eab8670ef1fd0e55b81b9e000995386afd8756ea342494d36911e6f01-29 
(kafka.server.ReplicaManager)
java.lang.NullPointerException: Cannot invoke "java.nio.ByteBuffer.hasArray()" 
because "this.intermediateBufRef" is null 
at 
org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89)
at 
org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105)
at 
org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273)
at 
org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277)
at 
org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352)
at 
org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358)
at 
org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165)
at kafka.log.UnifiedLog.$anonfun$append$2(UnifiedLog.scala:805)
at kafka.log.UnifiedLog.append(UnifiedLog.scala:1845)
at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719)
at 
kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313)
at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301)
at 
kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210)
at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at 
scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:400)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728)
at 
scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)