[GitHub] [kafka] ijuma commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-04-05 Thread via GitHub


ijuma commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1158577698


##
clients/src/main/java/org/apache/kafka/common/utils/BytesStreamBufferSource.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+
+/**
+ * Provides a BytesStream wrapper over a buffer.
+ */
+public class BytesStreamBufferSource implements BytesStream {
+final private ByteBuffer buf;
+
+public BytesStreamBufferSource(final ByteBuffer buffer) {
+// we do not modify the markers of source buffer
+buf = buffer.duplicate();
+}
+
+@Override
+public int read() throws IOException {
+if (!buf.hasRemaining()) {
+return -1;
+}
+return buf.get() & 0xFF;
+}
+
+@Override
+public int skipBytes(int toSkip) {
+if (toSkip <= 0) {
+return 0;
+}
+
+int avail = Math.min(toSkip, buf.remaining());
+buf.position(buf.position() + avail);
+return avail;
+}
+
+@Override
+public int read(byte[] b, int off, int len) throws IOException {
+if (len == 0) {
+return 0;
+}
+if (!buf.hasRemaining()) {
+return -1;
+}
+
+len = Math.min(len, buf.remaining());
+buf.get(b, off, len);
+return len;
+}
+
+@Override
+public byte readByte() throws IOException {
+try {
+return buf.get();

Review Comment:
   Would it be better to call `read` and check for `-1`? That would result in 
one exception (and resulting stacktrace) vs two.



##
clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java:
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * ChunkedBytesStream is a ByteReader which reads from source stream in chunks 
of configurable size. The
+ * implementation of this reader is optimized to reduce the number of calls to 
sourceStream#read(). This works best in
+ * scenarios where sourceStream#read() call is expensive, e.g. when the call 
crosses JNI boundary.
+ * 
+ * The functionality of this stream is a combination of DataInput and 
BufferedInputStream with the following
+ * differences:
+ * - Unlike BufferedInputStream.skip()

Review Comment:
   Incomplete sentence?



##
clients/src/main/java/org/apache/kafka/common/utils/SkippableChunkedBytesStream.java:
##
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY 

[GitHub] [kafka] ijuma commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-01-23 Thread via GitHub


ijuma commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1084344515


##
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java:
##
@@ -273,20 +272,32 @@ public int partitionLeaderEpoch() {
 public DataInputStream recordInputStream(BufferSupplier bufferSupplier) {
 final ByteBuffer buffer = this.buffer.duplicate();
 buffer.position(RECORDS_OFFSET);
-return new DataInputStream(compressionType().wrapForInput(buffer, 
magic(), bufferSupplier));
+final InputStream decompressedStream = 
compressionType().wrapForInput(buffer, magic(), bufferSupplier);
+return decompressedStream instanceof DataInputStream ? 
(DataInputStream) decompressedStream : new DataInputStream(decompressedStream);
 }
 
 private CloseableIterator compressedIterator(BufferSupplier 
bufferSupplier, boolean skipKeyValue) {
 final DataInputStream inputStream = recordInputStream(bufferSupplier);
 
 if (skipKeyValue) {
 // this buffer is used to skip length delimited fields like key, 
value, headers
-byte[] skipArray = new byte[MAX_SKIP_BUFFER_SIZE];
+final ByteBuffer skipBuffer = 
bufferSupplier.get(compressionType().getRecommendedDOutSize());

Review Comment:
   Since we cache buffers per thread, I think you mean we will use two buffers 
instead of one per thread (for the zstd case).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-01-23 Thread via GitHub


ijuma commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1084344515


##
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java:
##
@@ -273,20 +272,32 @@ public int partitionLeaderEpoch() {
 public DataInputStream recordInputStream(BufferSupplier bufferSupplier) {
 final ByteBuffer buffer = this.buffer.duplicate();
 buffer.position(RECORDS_OFFSET);
-return new DataInputStream(compressionType().wrapForInput(buffer, 
magic(), bufferSupplier));
+final InputStream decompressedStream = 
compressionType().wrapForInput(buffer, magic(), bufferSupplier);
+return decompressedStream instanceof DataInputStream ? 
(DataInputStream) decompressedStream : new DataInputStream(decompressedStream);
 }
 
 private CloseableIterator compressedIterator(BufferSupplier 
bufferSupplier, boolean skipKeyValue) {
 final DataInputStream inputStream = recordInputStream(bufferSupplier);
 
 if (skipKeyValue) {
 // this buffer is used to skip length delimited fields like key, 
value, headers
-byte[] skipArray = new byte[MAX_SKIP_BUFFER_SIZE];
+final ByteBuffer skipBuffer = 
bufferSupplier.get(compressionType().getRecommendedDOutSize());

Review Comment:
   Since we cache buffers per thread, I think you mean we will use two buffers 
instead of one per thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-01-20 Thread GitBox


ijuma commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1082623877


##
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##
@@ -126,6 +144,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream 
buffer, byte messageVer
 public InputStream wrapForInput(ByteBuffer buffer, byte 
messageVersion, BufferSupplier decompressionBufferSupplier) {
 return ZstdFactory.wrapForInput(buffer, messageVersion, 
decompressionBufferSupplier);
 }
+
+@Override
+public int getRecommendedDOutSize() {
+return 16 * 1024; // 16KB

Review Comment:
   OK, that's fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-01-20 Thread GitBox


ijuma commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1082622801


##
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##
@@ -126,6 +144,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream 
buffer, byte messageVer
 public InputStream wrapForInput(ByteBuffer buffer, byte 
messageVersion, BufferSupplier decompressionBufferSupplier) {
 return ZstdFactory.wrapForInput(buffer, messageVersion, 
decompressionBufferSupplier);
 }
+
+@Override
+public int getRecommendedDOutSize() {
+return 16 * 1024; // 16KB

Review Comment:
   The PR says:
   
   > we pushed the skipping of key/value logic to zstd-jni implementation 
instead of using the one provided by BufferedInputStream



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression

2023-01-20 Thread GitBox


ijuma commented on code in PR #13135:
URL: https://github.com/apache/kafka/pull/13135#discussion_r1082588469


##
clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java:
##
@@ -62,10 +68,11 @@ public void release(ByteBuffer buffer) {
 }
 };
 
-// Set output buffer (uncompressed) to 16 KB (none by default) to 
ensure reasonable performance
-// in cases where the caller reads a small number of bytes 
(potentially a single byte).
-return new BufferedInputStream(new ZstdInputStreamNoFinalizer(new 
ByteBufferInputStream(buffer),
-bufferPool), 16 * 1024);
+// We do not use an intermediate buffer to store the decompressed 
data as a result of JNI read() calls using
+// `ZstdInputStreamNoFinalizer` here. Every read() call to 
`DataInputStream` will be a JNI call and the
+// caller is expected to balance the tradeoff between reading 
large amount of data vs. making multiple JNI
+// calls.
+return new DataInputStream(new ZstdInputStreamNoFinalizer(new 
ByteBufferInputStream(buffer), bufferPool));

Review Comment:
   2 questions:
   1. Why do we wrap into DataInputStream?
   2. Have as checked that there are no workloads where we end up doing too 
many JNI calls?



##
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##
@@ -47,6 +47,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream 
buffer, byte messageVer
 public InputStream wrapForInput(ByteBuffer buffer, byte 
messageVersion, BufferSupplier decompressionBufferSupplier) {
 return new ByteBufferInputStream(buffer);
 }
+
+@Override
+public int getRecommendedDOutSize() {
+return 2 * 1024; // 2KB

Review Comment:
   What's the meaning of this for an uncompressed stream?



##
clients/src/main/java/org/apache/kafka/common/compress/ZstdFactory.java:
##
@@ -26,21 +26,25 @@
 import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
 
-import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 public class ZstdFactory {
+/**
+ * Default compression level
+ */
+private static final int DEFAULT_COMPRESSION_LEVEL = 3;

Review Comment:
   Since this is unrelated, do we have to include it as part of this PR?



##
clients/src/main/java/org/apache/kafka/common/record/CompressionType.java:
##
@@ -126,6 +144,11 @@ public OutputStream wrapForOutput(ByteBufferOutputStream 
buffer, byte messageVer
 public InputStream wrapForInput(ByteBuffer buffer, byte 
messageVersion, BufferSupplier decompressionBufferSupplier) {
 return ZstdFactory.wrapForInput(buffer, messageVersion, 
decompressionBufferSupplier);
 }
+
+@Override
+public int getRecommendedDOutSize() {
+return 16 * 1024; // 16KB

Review Comment:
   We decided not to get this info from the zstd library?



##
clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java:
##
@@ -273,20 +272,32 @@ public int partitionLeaderEpoch() {
 public DataInputStream recordInputStream(BufferSupplier bufferSupplier) {
 final ByteBuffer buffer = this.buffer.duplicate();
 buffer.position(RECORDS_OFFSET);
-return new DataInputStream(compressionType().wrapForInput(buffer, 
magic(), bufferSupplier));
+final InputStream decompressedStream = 
compressionType().wrapForInput(buffer, magic(), bufferSupplier);
+return decompressedStream instanceof DataInputStream ? 
(DataInputStream) decompressedStream : new DataInputStream(decompressedStream);
 }
 
 private CloseableIterator compressedIterator(BufferSupplier 
bufferSupplier, boolean skipKeyValue) {
 final DataInputStream inputStream = recordInputStream(bufferSupplier);
 
 if (skipKeyValue) {
 // this buffer is used to skip length delimited fields like key, 
value, headers
-byte[] skipArray = new byte[MAX_SKIP_BUFFER_SIZE];
+final ByteBuffer skipBuffer = 
bufferSupplier.get(compressionType().getRecommendedDOutSize());

Review Comment:
   I thought we wanted to call the underlying skipBytes API versus doing the 
skipping by reading into a skip buffer. I don't see that change. What am I 
missing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org