[
https://issues.apache.org/jira/browse/KAFKA-15057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856158#comment-17856158
]
Josep Prat commented on KAFKA-15057:
------------------------------------
Changing target fix version to 3.9 since this is not a blocker and we are past
code freeze
> Use new interface ZstdBufferDecompressingStreamNoFinalizer from zstd-jni
> ------------------------------------------------------------------------
>
> Key: KAFKA-15057
> URL: https://issues.apache.org/jira/browse/KAFKA-15057
> Project: Kafka
> Issue Type: Sub-task
> Components: core
> Affects Versions: 3.6.0
> Reporter: Divij Vaidya
> Assignee: Divij Vaidya
> Priority: Major
> Fix For: 3.8.0
>
> Attachments: zstd-upgrade.xlsx
>
>
> h1. Background
> In Kafka's code, every batch of records is stored in a in-memory byte buffer.
> For compressed workload, this buffer contains data in compressed form. Before
> writing it to the log, Kafka performs some validations such as ensuring that
> offsets are monotonically increasing etc. To perform this validation, Kafka
> needs to uncompress the data stored in byte buffer.
> For zstd compressed batches, Kafka uses ZstdInputStreamNoFinalizer interface
> provided by the downstream zstd-jni library to perform decompression.
> ZstdInputStreamNoFinalizer takes input an InputStream and provides output an
> InputStream. Since, Kafka stores the entire batch in a ByteBuffer, Kafka
> wraps the ByteBuffer into an InputStream to satisfy the input contract for
> ZstdInputStreamNoFinalizer.
> h1. Problem
> ZstdInputStreamNoFinalizer is not a good fit for our use case because we
> already have the entire compressed data stored in a buffer. We don't have a
> need for an interface which takes InputStream as an input. Our requirement is
> for an interface which takes a ByteBuffer as an input and provides a stream
> of uncompressed data as output. Prior to zstd-jni 1.5.5, no such interface
> existed. Hence, we were forced to use ZstdInputStreamNoFinalizer.
> Usage of ZstdInputStreamNoFinalizer has the following problems:
> 1. When decompression of batch is complete, we try to read another byte to
> check if the actual batch size if equal to declared batch size. This is done
> at RecordIterator#next(). This extra call to read another byte leads to a JNI
> call in existing interface.
> 2. Since this interface requires input as a InputStream, we take the
> ByteBuffer containing compressed batch and convert it into a InputStream.
> This interface internally uses an intermediate buffer to read data from this
> InputStream in chunks. The chunk size is determined by underlying zstd
> library and hence, we will allocate a new buffer with very batch. This leads
> to the following transformation: ByteBuffer (compressed batch) -> InputStream
> (compressed batch) -> data copy to intermediate ByteBuffer (chunk of
> compressed batch) -> send chunk to zstd library for decompression -> refill
> the intermediate buffer by copying the data to intermediate ByteBuffer (next
> chunk of compressed batch)
> h1. Solution
> I have extended an an interface in downstream library zstd-jni to suit the
> use case of Kafka. The new interface is called
> ZstdBufferDecompressingStreamNoFinalizer. It provides an interface where it
> takes input as a ByteBuffer containing compressed data and provides output as
> an InputStream. It solves the above problems as follows:
> 1. When we read the final decompressed frame, this interface sets a flag to
> mark that all uncompressed data has been consumed. When RecordIterator#next()
> tries to determine if the stream has ended, we simply read the flag and
> hence, do not have to make a JNI call.
> 2. It does not require any buffer allocation for input. It takes the input
> buffer and passes it across the JNI boundary without any intermediate
> copying. Hence, we don't perform any buffer allocation.
> h1. References
> h2. Changes in downstream zstd-jni
> Add new interface -
> [https://github.com/luben/zstd-jni/commit/d65490e8b8aadc4b59545755e55f7dd368fe8aa5]
> Bug fixes in new interface -
> [https://github.com/luben/zstd-jni/commit/8bf8066438785ce55b62fc7e6816faafe1e3b39e]
>
> [https://github.com/luben/zstd-jni/commit/100c434dfcec17a865ca2c2b844afe1046ce1b10]
> [https://github.com/luben/zstd-jni/commit/355b8511a2967d097a619047a579930cac2ccd9d]
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)