junrao commented on code in PR #15516:
URL: https://github.com/apache/kafka/pull/15516#discussion_r1586920863


##########
clients/src/main/java/org/apache/kafka/common/compress/NoCompression.java:
##########
@@ -14,37 +14,48 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.common.compress;
 
-import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
-import org.xerial.snappy.SnappyInputStream;
-import org.xerial.snappy.SnappyOutputStream;
 
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
-public class SnappyFactory {
+public class NoCompression implements Compression {
 
-    private SnappyFactory() { }
+    @Override
+    public CompressionType type() {
+        return CompressionType.NONE;
+    }
 
-    public static OutputStream wrapForOutput(ByteBufferOutputStream buffer) {
-        try {
-            return new SnappyOutputStream(buffer);
-        } catch (Throwable e) {
-            throw new KafkaException(e);
-        }
+    @Override
+    public OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, 
byte messageVersion) {
+        return bufferStream;
     }
 
-    public static InputStream wrapForInput(ByteBuffer buffer) {
-        try {
-            return new SnappyInputStream(new ByteBufferInputStream(buffer));
-        } catch (Throwable e) {
-            throw new KafkaException(e);
-        }
+    @Override
+    public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, 
BufferSupplier decompressionBufferSupplier) {
+        return new ByteBufferInputStream(buffer);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        return o instanceof NoCompression;
+    }
+
+    @Override
+    public int hashCode() {
+        return super.hashCode();

Review Comment:
   Hmm, we redefine `equals()` such that all objects of type NoCompression are 
equal, yet they have different hashcode?



##########
clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryUtils.java:
##########
@@ -201,8 +203,10 @@ public static byte[] compress(byte[] raw, CompressionType 
compressionType) throw
 
     public static ByteBuffer decompress(byte[] metrics, CompressionType 
compressionType) {
         ByteBuffer data = ByteBuffer.wrap(metrics);
-        try (InputStream in = compressionType.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
+        Compression compression = Compression.of(compressionType).build();
+        try (InputStream in = compression.wrapForInput(data, 
RecordBatch.CURRENT_MAGIC_VALUE, BufferSupplier.create());
             ByteBufferOutputStream out = new ByteBufferOutputStream(512)) {
+

Review Comment:
   extra new line



##########
clients/src/main/java/org/apache/kafka/common/compress/Lz4Compression.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.compress;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.ChunkedBytesStream;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class Lz4Compression implements Compression {
+
+    public static final int MIN_LEVEL = 1;
+    public static final int MAX_LEVEL = 17;
+    public static final int DEFAULT_LEVEL = 9;

Review Comment:
   So, every time we update the Lz4 library, we may need to update the above 
values? We probably want to add a note here.



##########
clients/src/main/java/org/apache/kafka/common/compress/Compression.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.compress;
+
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public interface Compression {
+
+    /**
+     * The compression type for this compression codec
+     */
+    CompressionType type();
+
+    /**
+     * Wrap bufferStream with an OutputStream that will compress data with 
this CompressionType.

Review Comment:
   Could we add the javadoc for messageVersion?



##########
clients/src/main/java/org/apache/kafka/common/compress/GzipCompression.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.compress;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferInputStream;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.ChunkedBytesStream;
+
+import java.io.BufferedOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import java.util.zip.Deflater;
+import java.util.zip.GZIPInputStream;
+
+public class GzipCompression implements Compression {
+
+    public static final int MIN_LEVEL = Deflater.BEST_SPEED;
+    public static final int MAX_LEVEL = Deflater.BEST_COMPRESSION;
+    public static final int DEFAULT_LEVEL = Deflater.DEFAULT_COMPRESSION;
+
+    private final int level;
+
+    private GzipCompression(int level) {
+        this.level = level;
+    }
+
+    @Override
+    public CompressionType type() {
+        return CompressionType.GZIP;
+    }
+
+    @Override
+    public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte 
messageVersion) {
+        try {
+            // Set input buffer (uncompressed) to 16 KB (none by default) and 
output buffer (compressed) to
+            // 8 KB (0.5 KB by default) to ensure reasonable performance in 
cases where the caller passes a small
+            // number of bytes to write (potentially a single byte)
+            return new BufferedOutputStream(new GzipOutputStream(buffer, 8 * 
1024, this.level), 16 * 1024);

Review Comment:
   this.level => level Ditto in a few other classes.



##########
clients/src/main/java/org/apache/kafka/common/compress/NoCompression.java:
##########
@@ -14,37 +14,48 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.common.compress;
 
-import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.utils.ByteBufferInputStream;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
-import org.xerial.snappy.SnappyInputStream;
-import org.xerial.snappy.SnappyOutputStream;
 
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
-public class SnappyFactory {
+public class NoCompression implements Compression {
 
-    private SnappyFactory() { }
+    @Override
+    public CompressionType type() {
+        return CompressionType.NONE;
+    }
 
-    public static OutputStream wrapForOutput(ByteBufferOutputStream buffer) {
-        try {
-            return new SnappyOutputStream(buffer);
-        } catch (Throwable e) {
-            throw new KafkaException(e);
-        }
+    @Override
+    public OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, 
byte messageVersion) {
+        return bufferStream;
     }
 
-    public static InputStream wrapForInput(ByteBuffer buffer) {
-        try {
-            return new SnappyInputStream(new ByteBufferInputStream(buffer));
-        } catch (Throwable e) {
-            throw new KafkaException(e);
-        }
+    @Override
+    public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, 
BufferSupplier decompressionBufferSupplier) {
+        return new ByteBufferInputStream(buffer);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        return o instanceof NoCompression;
+    }
+
+    @Override
+    public int hashCode() {
+        return super.hashCode();
     }
 
+    public static class Builder implements Compression.Builder<NoCompression> {

Review Comment:
   Should we make the constructor of NoCompression private so that only the 
builder can be used? Ditto for other compression classes.



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -388,6 +402,32 @@ public LogConfig(Map<?, ?> props, Set<String> 
overriddenConfigs) {
         remoteLogConfig = new RemoteLogConfig(this);
     }
 
+    private Optional<Compression> getCompression() {
+        BrokerCompressionType brokerCompressionType = 
BrokerCompressionType.forName(getString(TopicConfig.COMPRESSION_TYPE_CONFIG));

Review Comment:
   Could we just use `compressionType`, which is previously set already?



##########
clients/src/main/java/org/apache/kafka/common/compress/ZstdCompression.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.compress;
+
+import com.github.luben.zstd.BufferPool;
+import com.github.luben.zstd.RecyclingBufferPool;
+import com.github.luben.zstd.Zstd;
+import com.github.luben.zstd.ZstdInputStreamNoFinalizer;
+import com.github.luben.zstd.ZstdOutputStreamNoFinalizer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferInputStream;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.ChunkedBytesStream;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+public class ZstdCompression implements Compression {
+
+    public static final int MIN_LEVEL = Zstd.minCompressionLevel();
+    public static final int MAX_LEVEL = Zstd.maxCompressionLevel();
+    public static final int DEFAULT_LEVEL = Zstd.defaultCompressionLevel();
+
+    private final int level;
+
+    private ZstdCompression(int level) {
+        this.level = level;
+    }
+
+    @Override
+    public CompressionType type() {
+        return CompressionType.ZSTD;
+    }
+
+    @Override
+    public OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, 
byte messageVersion) {
+        try {
+            // Set input buffer (uncompressed) to 16 KB (none by default) to 
ensure reasonable performance
+            // in cases where the caller passes a small number of bytes to 
write (potentially a single byte).
+            return new BufferedOutputStream(new 
ZstdOutputStreamNoFinalizer(bufferStream, RecyclingBufferPool.INSTANCE, level), 
16 * 1024);
+        } catch (Throwable e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    @Override
+    public InputStream wrapForInput(ByteBuffer buffer, byte messageVersion, 
BufferSupplier decompressionBufferSupplier) {
+        try {
+            return new ChunkedBytesStream(wrapForZstdInput(buffer, 
decompressionBufferSupplier),
+                    decompressionBufferSupplier,
+                    decompressionOutputSize(),
+                    false);
+        } catch (Throwable e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    // visible for testing
+    public ZstdInputStreamNoFinalizer wrapForZstdInput(ByteBuffer buffer, 
BufferSupplier decompressionBufferSupplier) throws IOException {

Review Comment:
   Should we make this static?



##########
clients/src/main/java/org/apache/kafka/common/compress/Compression.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.compress;
+
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public interface Compression {
+
+    /**
+     * The compression type for this compression codec
+     */
+    CompressionType type();
+
+    /**
+     * Wrap bufferStream with an OutputStream that will compress data with 
this CompressionType.
+     * Note: Unlike {@link #wrapForInput}, this cannot take {@link 
ByteBuffer}s directly.
+     * Currently, MemoryRecordsBuilder writes to the underlying buffer in the 
given {@link ByteBufferOutputStream} after the compressed data has been written.
+     * In the event that the buffer needs to be expanded while writing the 
data, access to the underlying buffer needs to be preserved.
+     */
+    OutputStream wrapForOutput(ByteBufferOutputStream bufferStream, byte 
messageVersion);
+
+    /**
+     * Wrap buffer with an InputStream that will decompress data with this 
CompressionType.
+     *
+     * @param buffer The {@link ByteBuffer} instance holding the data to 
decompress.
+     * @param messageVersion The record format version to use.
+     * @param decompressionBufferSupplier The supplier of ByteBuffer(s) used 
for decompression if supported.
+     * For small record batches, allocating a potentially large buffer (64 KB 
for LZ4)

Review Comment:
   indentation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to