[jira] [Comment Edited] (KAFKA-15653) NPE in ChunkedByteStream
[ https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779000#comment-17779000 ] Divij Vaidya edited comment on KAFKA-15653 at 10/24/23 9:36 AM: Thanks [~jolshan] . I will add my thoughts on how to prevent this in future the new Jira you started. As a summary, I think we might want to start working towards a "debug" mode in the broker which will enable assertions for different invariants in Kafka. Invariants could be derived from formal verification that Jack and others have shared with the community earlier OR from tribal knowledge in the community such as network threads should not perform any storage IO. The release qualification will run the broker in "debug" mode and will validate these assertions while running different series of tests. EDIT - I started a thread in dev mailing list to solicit ideas on detecting & preventing hard bugs [https://lists.apache.org/thread/zjcyp4h9kkl3gjfblgcwodf2y8oyy0hj] was (Author: divijvaidya): Thanks [~jolshan] . I will add my thoughts on how to prevent this in future the new Jira you started. As a summary, I think we might want to start working towards a "debug" mode in the broker which will enable assertions for different invariants in Kafka. Invariants could be derived from formal verification that Jack and others have shared with the community earlier OR from tribal knowledge in the community such as network threads should not perform any storage IO. The release qualification will run the broker in "debug" mode and will validate these assertions while running different series of tests. > NPE in ChunkedByteStream > > > Key: KAFKA-15653 > URL: https://issues.apache.org/jira/browse/KAFKA-15653 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.6.0 > Environment: Docker container on a Linux laptop, using the latest > release. >Reporter: Travis Bischel >Assignee: Justine Olshan >Priority: Major > Attachments: repro.sh > > > When looping franz-go integration tests, I received an UNKNOWN_SERVER_ERROR > from producing. The broker logs for the failing request: > > {noformat} > [2023-10-19 22:29:58,160] ERROR [ReplicaManager broker=2] Error processing > append operation on partition > 2fa8995d8002fbfe68a96d783f26aa2c5efc15368bf44ed8f2ab7e24b41b9879-24 > (kafka.server.ReplicaManager) > java.lang.NullPointerException > at > org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89) > at > org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105) > at > org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273) > at > org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277) > at > org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352) > at > org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358) > at > org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165) > at kafka.log.UnifiedLog.append(UnifiedLog.scala:805) > at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719) > at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210) > at > scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28) > at > scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27) > at scala.collection.mutable.HashMap.map(HashMap.scala:35) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198) > at kafka.server.ReplicaManager.appendEntries$1(ReplicaManager.scala:754) > at > kafka.server.ReplicaManager.$anonfun$appendRecords$18(ReplicaManager.scala:874) > at > kafka.server.ReplicaManager.$anonfun$appendRecords$18$adapted(ReplicaManager.scala:874) > at > kafka.server.KafkaRequestHandler$.$anonfun$wrap$3(KafkaRequestHandler.scala:73) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:130) > at java.base/java.lang.Thread.run(Unknown Source) > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15653) NPE in ChunkedByteStream
[ https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17778655#comment-17778655 ] Ismael Juma edited comment on KAFKA-15653 at 10/23/23 1:09 PM: --- Good catch. I think we should review the design of this change in closer detail to understand the full implications - is the buffer supplier the only issue or are there others? The assumption is that anything that is executed in a different thread would have to ensure thread-safety, etc. was (Author: ijuma): Good catch. I think we should review the design of this change in closer detail to understand the full implications - is the buffer supplier the only issue or are there others? > NPE in ChunkedByteStream > > > Key: KAFKA-15653 > URL: https://issues.apache.org/jira/browse/KAFKA-15653 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.6.0 > Environment: Docker container on a Linux laptop, using the latest > release. >Reporter: Travis Bischel >Assignee: Divij Vaidya >Priority: Major > Attachments: repro.sh > > > When looping franz-go integration tests, I received an UNKNOWN_SERVER_ERROR > from producing. The broker logs for the failing request: > > {noformat} > [2023-10-19 22:29:58,160] ERROR [ReplicaManager broker=2] Error processing > append operation on partition > 2fa8995d8002fbfe68a96d783f26aa2c5efc15368bf44ed8f2ab7e24b41b9879-24 > (kafka.server.ReplicaManager) > java.lang.NullPointerException > at > org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89) > at > org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105) > at > org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273) > at > org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277) > at > org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352) > at > org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358) > at > org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165) > at kafka.log.UnifiedLog.append(UnifiedLog.scala:805) > at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719) > at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210) > at > scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28) > at > scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27) > at scala.collection.mutable.HashMap.map(HashMap.scala:35) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198) > at kafka.server.ReplicaManager.appendEntries$1(ReplicaManager.scala:754) > at > kafka.server.ReplicaManager.$anonfun$appendRecords$18(ReplicaManager.scala:874) > at > kafka.server.ReplicaManager.$anonfun$appendRecords$18$adapted(ReplicaManager.scala:874) > at > kafka.server.KafkaRequestHandler$.$anonfun$wrap$3(KafkaRequestHandler.scala:73) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:130) > at java.base/java.lang.Thread.run(Unknown Source) > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15653) NPE in ChunkedByteStream
[ https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17778603#comment-17778603 ] Divij Vaidya edited comment on KAFKA-15653 at 10/23/23 10:33 AM: - I think I found a quite fundamental problem here. By design, BufferPool is not thread safe in Kafka [1]. And that is acceptable because the buffer pool instance is local to a request handler thread [2]. Hence we assume that a particular buffer pool will always be accessed only by it's owner thread. However, unfortunately, seems like that this assumption is not true! I have same BufferPool being accessed by two different request handler threads. The first access is legitimate while trying to process an API request at [3]. The second access comes from a change introduced in [https://github.com/apache/kafka/commit/56dcb837a2f1c1d8c016cfccf8268a910bb77a36] where we are passing the stateful variable of a request (including the buffer supplier) to a callback which could be executed by a different request handler thread [4] . Hence, we will end up in a situation where stateful members such as requestLocal of one thread is being accessed by another different thread. This is a bug. *Impact of the bug* This bug has been present since 3.5 but the impact is visible in 3.6 because in 3.6 we expanded the use of request local buffer pool to perform decompression. Earlier the buffer pool was only being used in read path and by log cleaner. The callback mentioned above calls appendToLocalLog and prior to 3.6 this code path wasn't using requestLocal. All 3.6 operations which call the following line in ReplicaManager are impacted which is basically use case of transactions and also specifically to AddPartitionsToTxnManager API calls. ``` addPartitionsToTxnManager.foreach({_}.addTxnData(node, notYetVerifiedTransaction, KafkaRequestHandler.wrap(appendEntries(entriesPerPartition)({_} ``` *Possible solutions* [~jolshan] can you please take a look and see if we can avoid leaking the requestLocal (bufferpool) belonging to one thread to the other thread? One way we can fix this is to store the bufferpool reference in ThreadLocal and instead of passing the reference around, whenever we want to use bufferpool, we will directly ask the executing thread for it. This will ensure that a thread is always using it's own instance of the buffer pool. [~ijuma] since you wrote the original requestLocal buffer pool, what do you think about this solution? Another option is to extract out function `appendEntries` outside of `appendRecords` since it is `appendEntries` is invoked as a callback and shouldn't rely on local variable/state of `appendRecords`. [1] [https://github.com/apache/kafka/blob/9c77c17c4eae19af743e551e8e7d8b49b07c4e99/clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java#L27] [2] [https://github.com/apache/kafka/blob/9c77c17c4eae19af743e551e8e7d8b49b07c4e99/core/src/main/scala/kafka/server/KafkaRequestHandler.scala#L96] [3] [https://github.com/apache/kafka/blob/9c77c17c4eae19af743e551e8e7d8b49b07c4e99/core/src/main/scala/kafka/server/KafkaRequestHandler.scala#L154] [4] [https://github.com/apache/kafka/blob/526d0f63b5d82f8fb50c97aea9c61f8f85467e92/core/src/main/scala/kafka/server/ReplicaManager.scala#L874] [5] [https://docs.oracle.com/javase/8/docs/api/java/lang/ThreadLocal.html] was (Author: divijvaidya): I think I found a quite fundamental problem here. By design, BufferPool is not thread safe in Kafka [1]. And that is acceptable because the buffer pool instance is local to a request handler thread [2]. Hence we assume that a particular buffer pool will always be accessed only by it's owner thread. However, unfortunately, seems like that this assumption is not true! I have same BufferPool being accessed by two different request handler threads. The first access is legitimate while trying to process an API request at [3]. The second access comes from a change introduced in [https://github.com/apache/kafka/commit/56dcb837a2f1c1d8c016cfccf8268a910bb77a36] where we are passing the stateful variable of a request (including the buffer supplier) to a callback which could be executed by a different request handler thread [4] . Hence, we will end up in a situation where stateful members such as requestLocal of one thread is being accessed by another different thread. This is a bug. *Impact of the bug* This bug has been present since 3.5 but the impact is visible in 3.6 because in 3.6 we expanded the use of request local buffer pool to perform decompression. Earlier the buffer pool was only being used in read path and by log cleaner. The callback mentioned above calls appendToLocalLog and prior to 3.6 this code path wasn't using requestLocal. All 3.6 operations which call the following line in ReplicaManager are impacted which is basically use case of
[jira] [Comment Edited] (KAFKA-15653) NPE in ChunkedByteStream
[ https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17778603#comment-17778603 ] Divij Vaidya edited comment on KAFKA-15653 at 10/23/23 10:23 AM: - I think I found a quite fundamental problem here. By design, BufferPool is not thread safe in Kafka [1]. And that is acceptable because the buffer pool instance is local to a request handler thread [2]. Hence we assume that a particular buffer pool will always be accessed only by it's owner thread. However, unfortunately, seems like that this assumption is not true! I have same BufferPool being accessed by two different request handler threads. The first access is legitimate while trying to process an API request at [3]. The second access comes from a change introduced in [https://github.com/apache/kafka/commit/56dcb837a2f1c1d8c016cfccf8268a910bb77a36] where we are passing the stateful variable of a request (including the buffer supplier) to a callback which could be executed by a different request handler thread [4] . Hence, we will end up in a situation where stateful members such as requestLocal of one thread is being accessed by another different thread. This is a bug. *Impact of the bug* This bug has been present since 3.5 but the impact is visible in 3.6 because in 3.6 we expanded the use of request local buffer pool to perform decompression. Earlier the buffer pool was only being used in read path and by log cleaner. The callback mentioned above calls appendToLocalLog and prior to 3.6 this code path wasn't using requestLocal. All 3.6 operations which call the following line in ReplicaManager are impacted which is basically use case of transactions and also specifically to AddPartitionsToTxnManager API calls. ``` addPartitionsToTxnManager.foreach(_.addTxnData(node, notYetVerifiedTransaction, KafkaRequestHandler.wrap(appendEntries(entriesPerPartition)(_ ``` *Possible solutions* [~jolshan] can you please take a look and see if we can avoid leaking the requestLocal (bufferpool) belonging to one thread to the other thread? One way we can fix this is to store the bufferpool reference in ThreadLocal and instead of passing the reference around, whenever we want to use bufferpool, we will directly ask the executing thread for it. This will ensure that a thread is always using it's own instance of the buffer pool. [~ijuma] since you wrote the original requestLocal buffer pool, what do you think about this solution? [1] [https://github.com/apache/kafka/blob/9c77c17c4eae19af743e551e8e7d8b49b07c4e99/clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java#L27] [2] [https://github.com/apache/kafka/blob/9c77c17c4eae19af743e551e8e7d8b49b07c4e99/core/src/main/scala/kafka/server/KafkaRequestHandler.scala#L96] [3] [https://github.com/apache/kafka/blob/9c77c17c4eae19af743e551e8e7d8b49b07c4e99/core/src/main/scala/kafka/server/KafkaRequestHandler.scala#L154] [4] [https://github.com/apache/kafka/blob/526d0f63b5d82f8fb50c97aea9c61f8f85467e92/core/src/main/scala/kafka/server/ReplicaManager.scala#L874] [5] [https://docs.oracle.com/javase/8/docs/api/java/lang/ThreadLocal.html] was (Author: divijvaidya): I think I found a quite fundamental problem here. By design, BufferPool is not thread safe in Kafka [1]. And that is acceptable because the buffer pool instance is local to a request handler thread [2]. Hence we assume that a particular buffer pool will always be accessed only by it's owner thread. However, unfortunately, seems like that this assumption is not true! I have same BufferPool being accessed by two different request handler threads. The first access is legitimate while trying to process an API request at [3]. The second access comes from a change introduced in [https://github.com/apache/kafka/commit/56dcb837a2f1c1d8c016cfccf8268a910bb77a36] where we are passing the stateful variable of a request (including the buffer supplier) to a callback which could be executed by a different request handler thread [4] . Hence, we will end up in a situation where stateful members such as requestLocal of one thread is being accessed by another different thread. This is a bug. This has been present since 3.5 but the impact is more visible in 3.6 because in 3.6 we expanded the use of request local buffer pool to perform decompression. I am yet to see what other scenarios it can impact but anywhere we are using bufferpool might be at risk [~jolshan] can you please take a look and see if we can avoid leaking the requestLocal (bufferpool) belonging to one thread to the other thread? One way we can fix this is to store the bufferpool reference in ThreadLocal and instead of passing the reference around, whenever we want to use bufferpool, we will directly ask the executing thread for it. This will ensure that a thread is always using it's own instance of
[jira] [Comment Edited] (KAFKA-15653) NPE in ChunkedByteStream
[ https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1664#comment-1664 ] Divij Vaidya edited comment on KAFKA-15653 at 10/20/23 10:12 AM: - I think the potential bug could be in how we are closing the ChunkedBytesStream and returning the buffer for re-use later. {code:java} @Override public void close() throws IOException { byte[] mybuf = intermediateBuf; intermediateBuf = null; InputStream input = in; in = null; if (mybuf != null) bufferSupplier.release(intermediateBufRef); if (input != null) input.close(); } {code} I -am suspecting this because, we are setting underlying buffer behind the ByteBuffer to be null, which will allow it to be garbage collected. But we don't want it to be GC'ed because we simply want to return it to the pool so that it can be used later again.- -I will verify this tomorrow with a test and have a fix out soon.- Nevermind, it will not be GC'ed because we have a reference to it via mybuf. I verified it using a test. {code:java} @Test public void testValidReturnToBufferSupplierOnClose() throws IOException, InterruptedException { final BufferSupplier threadSpecificSupplier = BufferSupplier.create(); final boolean delegateSkipToSourceStream = false; final ByteBuffer inputBuffer = ByteBuffer.allocate(SIZE_LITTLE_LARGE_THAN_INTERMEDIATE_BUFFER_SIZE); final ByteBuffer originalIntermediateBufferRef; try (ChunkedBytesStream is = new ChunkedBytesStream(new ByteBufferInputStream(inputBuffer.duplicate()), threadSpecificSupplier, DEFAULT_INTERMEDIATE_BUFFER_SIZE, delegateSkipToSourceStream)) { assertNotNull(is.intermediateBufRef); // read everything to verify sanity Utils.readFully(is, ByteBuffer.allocate(inputBuffer.capacity())); // store reference of intermediate buffer provided by buffer supplier originalIntermediateBufferRef = is.intermediateBufRef; } // The intermediate buffer should have been returned to buffer supplier. // Force GC to rule out any lingering references. Notably this is just a hint and may not cause actual GC. System.gc(); // Wait to GC to do it's magic. Thread.sleep(2000); try (ChunkedBytesStream is = new ChunkedBytesStream(new ByteBufferInputStream(inputBuffer.duplicate()), threadSpecificSupplier, DEFAULT_INTERMEDIATE_BUFFER_SIZE, delegateSkipToSourceStream)) { assertNotNull(is.intermediateBufRef); assertEquals(originalIntermediateBufferRef, is.intermediateBufRef); } } {code} was (Author: divijvaidya): I think the potential bug could be in how we are closing the ChunkedBytesStream and returning the buffer for re-use later. {code:java} @Override public void close() throws IOException { byte[] mybuf = intermediateBuf; intermediateBuf = null; InputStream input = in; in = null; if (mybuf != null) bufferSupplier.release(intermediateBufRef); if (input != null) input.close(); } {code} I am suspecting this because, we are setting underlying buffer behind the ByteBuffer to be null, which will allow it to be garbage collected. But we don't want it to be GC'ed because we simply want to return it to the pool so that it can be used later again. I will verify this tomorrow with a test and have a fix out soon. > NPE in ChunkedByteStream > > > Key: KAFKA-15653 > URL: https://issues.apache.org/jira/browse/KAFKA-15653 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.6.0 > Environment: Docker container on a Linux laptop, using the latest > release. >Reporter: Travis Bischel >Assignee: Divij Vaidya >Priority: Major > > When looping franz-go integration tests, I received an UNKNOWN_SERVER_ERROR > from producing. The broker logs for the failing request: > > {noformat} > [2023-10-19 22:29:58,160] ERROR [ReplicaManager broker=2] Error processing > append operation on partition > 2fa8995d8002fbfe68a96d783f26aa2c5efc15368bf44ed8f2ab7e24b41b9879-24 > (kafka.server.ReplicaManager) > java.lang.NullPointerException > at > org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89) > at > org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105) > at > org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273) > at > org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277) > at > org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352) > at >
[jira] [Comment Edited] (KAFKA-15653) NPE in ChunkedByteStream
[ https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1647#comment-1647 ] Divij Vaidya edited comment on KAFKA-15653 at 10/20/23 9:31 AM: In 3.6, we started using Buffer pool local to each request handler thread to perform decompression. The above stack trace indicates a null when we ask for a buffer from the buffer pool, returned by bufferQueue.pollfirst() at [https://github.com/apache/kafka/blob/c81a7252195261f649faba166ee723552bed4d81/clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java#L76] Ideally that should not be possible bufferQueue.pollfirst() returns null only when bufferQueue is empty. And we already check fr bufferQueue being empty in the line above. Also this function is not thread safe. It doesn't need to be thread safe because a particular instance of buffer pool (DefaultSupplier) is associated with single thread (the request handler thread) and only one thread should be accessing it at one time. Either we are incorrectly accessing DefaultBufferSupplier.get() from two threads and causing race condition OR somehow in the same thread reference is being set to null/or garbage collected?! I will try to eyeball to code here to see if I can find something. But practically [~twmb] it would be greatly useful if you can share your integration/unit test with us so that we can find a deterministic way to reproduce it. was (Author: divijvaidya): In 3.6, we started using Buffer pool local to each request handler thread to perform decompression. The above stack trace indicates a null when we ask for a buffer from the buffer pool, returned by bufferQueue.pollfirst() at [https://github.com/apache/kafka/blob/c81a7252195261f649faba166ee723552bed4d81/clients/src/main/java/org/apache/kafka/common/utils/BufferSupplier.java#L76] Ideally that should not be possible bufferQueue.pollfirst() returns null only when bufferQueue is empty. And we already check fr bufferQueue being empty in the line above. But this function is not thread safe. It doesn't need to be because a particular instance of buffer pool (DefaultSupplier) is associated with single thread (the request handler thread) and only one thread should be accessing it at one time. I will try to eyeball to code here to see if I can find something. But practically [~twmb] it would be greatly useful if you can share your integration/unit test with us so that we can find a deterministic way to reproduce it. > NPE in ChunkedByteStream > > > Key: KAFKA-15653 > URL: https://issues.apache.org/jira/browse/KAFKA-15653 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.6.0 > Environment: Docker container on a Linux laptop, using the latest > release. >Reporter: Travis Bischel >Assignee: Divij Vaidya >Priority: Major > > When looping franz-go integration tests, I received an UNKNOWN_SERVER_ERROR > from producing. The broker logs for the failing request: > > {noformat} > [2023-10-19 22:29:58,160] ERROR [ReplicaManager broker=2] Error processing > append operation on partition > 2fa8995d8002fbfe68a96d783f26aa2c5efc15368bf44ed8f2ab7e24b41b9879-24 > (kafka.server.ReplicaManager) > java.lang.NullPointerException > at > org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89) > at > org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105) > at > org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273) > at > org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277) > at > org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352) > at > org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358) > at > org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165) > at kafka.log.UnifiedLog.append(UnifiedLog.scala:805) > at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719) > at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210) > at > scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28) > at > scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27) > at scala.collection.mutable.HashMap.map(HashMap.scala:35) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198) >
[jira] [Comment Edited] (KAFKA-15653) NPE in ChunkedByteStream
[ https://issues.apache.org/jira/browse/KAFKA-15653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1542#comment-1542 ] Travis Bischel edited comment on KAFKA-15653 at 10/20/23 2:55 AM: -- {noformat} [2023-10-20 02:31:00,204] ERROR [ReplicaManager broker=1] Error processing append operation on partition 2c69b88eab8670ef1fd0e55b81b9e000995386afd8756ea342494d36911e6f01-29 (kafka.server.ReplicaManager) java.lang.NullPointerException: Cannot invoke "java.nio.ByteBuffer.hasArray()" because "this.intermediateBufRef" is null at org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89) at org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105) at org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273) at org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277) at org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352) at org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358) at org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165) at kafka.log.UnifiedLog.$anonfun$append$2(UnifiedLog.scala:805) at kafka.log.UnifiedLog.append(UnifiedLog.scala:1845) at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719) at kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313) at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301) at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:400) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1198) at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:754) at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:686) at kafka.server.KafkaApis.handle(KafkaApis.scala:180) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:149) at java.base/java.lang.Thread.run(Thread.java:833) {noformat} was (Author: twmb): Not just : {noformat} [2023-10-20 02:31:00,204] ERROR [ReplicaManager broker=1] Error processing append operation on partition 2c69b88eab8670ef1fd0e55b81b9e000995386afd8756ea342494d36911e6f01-29 (kafka.server.ReplicaManager) java.lang.NullPointerException: Cannot invoke "java.nio.ByteBuffer.hasArray()" because "this.intermediateBufRef" is null at org.apache.kafka.common.utils.ChunkedBytesStream.(ChunkedBytesStream.java:89) at org.apache.kafka.common.record.CompressionType$3.wrapForInput(CompressionType.java:105) at org.apache.kafka.common.record.DefaultRecordBatch.recordInputStream(DefaultRecordBatch.java:273) at org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:277) at org.apache.kafka.common.record.DefaultRecordBatch.skipKeyValueIterator(DefaultRecordBatch.java:352) at org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:358) at org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165) at kafka.log.UnifiedLog.$anonfun$append$2(UnifiedLog.scala:805) at kafka.log.UnifiedLog.append(UnifiedLog.scala:1845) at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:719) at kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1313) at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1301) at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1210) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:400) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728) at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279)