[
https://issues.apache.org/jira/browse/KAFKA-4514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16644276#comment-16644276
]
ASF GitHub Bot commented on KAFKA-4514:
---------------------------------------
hachikuji closed pull request #2267: KAFKA-4514: Add Codec for ZStandard
Compression
URL: https://github.com/apache/kafka/pull/2267
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/LICENSE b/LICENSE
index bf7fe1c487b..db706caf09e 100644
--- a/LICENSE
+++ b/LICENSE
@@ -201,6 +201,7 @@
See the License for the specific language governing permissions and
limitations under the License.
+------------------------------------------------------------------------------------
This distribution has a binary dependency on jersey, which is available under
the CDDL
License as described below.
@@ -328,3 +329,68 @@ As between Initial Developer and the Contributors, each
party is responsible for
NOTICE PURSUANT TO SECTION 9 OF THE COMMON DEVELOPMENT AND DISTRIBUTION
LICENSE (CDDL)
The code released under the CDDL shall be governed by the laws of the State of
California (excluding conflict-of-law provisions). Any litigation relating to
this License shall be subject to the jurisdiction of the Federal Courts of the
Northern District of California and the state courts of the State of
California, with venue lying in Santa Clara County, California.
+
+------------------------------------------------------------------------------------
+This distribution has a binary dependency on zstd, which is available under
the BSD 3-Clause License as described below.
+
+BSD License
+
+For Zstandard software
+
+Copyright (c) 2016-present, Facebook, Inc. All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
modification,
+are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+ * Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+ * Neither the name Facebook nor the names of its contributors may be used to
+ endorse or promote products derived from this software without specific
+ prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR
+ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+------------------------------------------------------------------------------------
+This distribution has a binary dependency on zstd-jni, which is available
under the BSD 2-Clause License
+as described below.
+
+Zstd-jni: JNI bindings to Zstd Library
+
+Copyright (c) 2015-2016, Luben Karavelov/ All rights reserved.
+
+BSD License
+
+Redistribution and use in source and binary forms, with or without
modification,
+are permitted provided that the following conditions are met:
+
+* Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+* Redistributions in binary form must reproduce the above copyright notice,
this
+ list of conditions and the following disclaimer in the documentation and/or
+ other materials provided with the distribution.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR
+ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff --git a/build.gradle b/build.gradle
index 47fa18620fd..2651f10da65 100644
--- a/build.gradle
+++ b/build.gradle
@@ -820,6 +820,7 @@ project(':clients') {
conf2ScopeMappings.addMapping(1000, configurations.jacksonDatabindConfig,
"provided")
dependencies {
+ compile libs.zstd
compile libs.lz4
compile libs.snappy
compile libs.slf4jApi
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 1a1bab5127b..6142519c4dc 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -159,7 +159,7 @@
/** <code>compression.type</code> */
public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
private static final String COMPRESSION_TYPE_DOC = "The compression type
for all data generated by the producer. The default is none (i.e. no
compression). Valid "
- + " values are
<code>none</code>, <code>gzip</code>, <code>snappy</code>, or <code>lz4</code>.
"
+ + " values are
<code>none</code>, <code>gzip</code>, <code>snappy</code>, <code>lz4</code>, or
<code>zstd</code>. "
+ "Compression is of
full batches of data, so the efficacy of batching will also impact the
compression ratio (more batching means better compression).";
/** <code>metrics.sample.window.ms</code> */
diff --git
a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
index bc657ea738f..4410c971a16 100755
--- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
@@ -140,7 +140,7 @@
public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
public static final String COMPRESSION_TYPE_DOC = "Specify the final
compression type for a given topic. " +
- "This configuration accepts the standard compression codecs ('gzip',
'snappy', lz4). It additionally " +
+ "This configuration accepts the standard compression codecs ('gzip',
'snappy', 'lz4', 'zstd'). It additionally " +
"accepts 'uncompressed' which is equivalent to no compression; and
'producer' which means retain the " +
"original compression codec set by the producer.";
diff --git
a/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedCompressionTypeException.java
b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedCompressionTypeException.java
new file mode 100644
index 00000000000..29ffe1b900e
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedCompressionTypeException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * The requesting client does not support the compression type of given
partition.
+ */
+public class UnsupportedCompressionTypeException extends ApiException {
+
+ private static final long serialVersionUID = 1L;
+
+ public UnsupportedCompressionTypeException(String message) {
+ super(message);
+ }
+
+ public UnsupportedCompressionTypeException(String message, Throwable
cause) {
+ super(message, cause);
+ }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 7cf5a295084..6b54a89daf3 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -91,6 +91,7 @@
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedByAuthenticationException;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
@@ -284,7 +285,9 @@
FENCED_LEADER_EPOCH(74, "The leader epoch in the request is older than the
epoch on the broker",
FencedLeaderEpochException::new),
UNKNOWN_LEADER_EPOCH(75, "The leader epoch in the request is newer than
the epoch on the broker",
- UnknownLeaderEpochException::new);
+ UnknownLeaderEpochException::new),
+ UNSUPPORTED_COMPRESSION_TYPE(76, "The requesting client does not support
the compression type of given partition.",
+ UnsupportedCompressionTypeException::new);
private static final Logger log = LoggerFactory.getLogger(Errors.class);
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index 6ac073dd298..cdf731c3bf2 100644
---
a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++
b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -322,6 +322,8 @@ private DeepRecordsIterator(AbstractLegacyRecordBatch
wrapperEntry,
throw new InvalidRecordException("Invalid wrapper magic found
in legacy deep record iterator " + wrapperMagic);
CompressionType compressionType = wrapperRecord.compressionType();
+ if (compressionType == CompressionType.ZSTD)
+ throw new InvalidRecordException("Invalid wrapper
compressionType found in legacy deep record iterator " + wrapperMagic);
ByteBuffer wrapperValue = wrapperRecord.value();
if (wrapperValue == null)
throw new InvalidRecordException("Found invalid compressed
record set with null value (magic = " +
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
index a333d2aea27..352d12d8349 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java
@@ -113,6 +113,26 @@ public InputStream wrapForInput(ByteBuffer inputBuffer,
byte messageVersion, Buf
throw new KafkaException(e);
}
}
+ },
+
+ ZSTD(4, "zstd", 1.0f) {
+ @Override
+ public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte
messageVersion) {
+ try {
+ return (OutputStream) ZstdConstructors.OUTPUT.invoke(buffer);
+ } catch (Throwable e) {
+ throw new KafkaException(e);
+ }
+ }
+
+ @Override
+ public InputStream wrapForInput(ByteBuffer buffer, byte
messageVersion, BufferSupplier decompressionBufferSupplier) {
+ try {
+ return (InputStream) ZstdConstructors.INPUT.invoke(new
ByteBufferInputStream(buffer));
+ } catch (Throwable e) {
+ throw new KafkaException(e);
+ }
+ }
};
public final int id;
@@ -156,6 +176,8 @@ public static CompressionType forId(int id) {
return SNAPPY;
case 3:
return LZ4;
+ case 4:
+ return ZSTD;
default:
throw new IllegalArgumentException("Unknown compression type
id: " + id);
}
@@ -170,6 +192,8 @@ else if (SNAPPY.name.equals(name))
return SNAPPY;
else if (LZ4.name.equals(name))
return LZ4;
+ else if (ZSTD.name.equals(name))
+ return ZSTD;
else
throw new IllegalArgumentException("Unknown compression name: " +
name);
}
@@ -177,7 +201,7 @@ else if (LZ4.name.equals(name))
// We should only have a runtime dependency on compression algorithms in
case the native libraries don't support
// some platforms.
//
- // For Snappy, we dynamically load the classes and rely on the
initialization-on-demand holder idiom to ensure
+ // For Snappy and Zstd, we dynamically load the classes and rely on the
initialization-on-demand holder idiom to ensure
// they're only loaded if used.
//
// For LZ4 we are using org.apache.kafka classes, which should always be
in the classpath, and would not trigger
@@ -190,6 +214,13 @@ else if (LZ4.name.equals(name))
MethodType.methodType(void.class, OutputStream.class));
}
+ private static class ZstdConstructors {
+ static final MethodHandle INPUT =
findConstructor("com.github.luben.zstd.ZstdInputStream",
+ MethodType.methodType(void.class, InputStream.class));
+ static final MethodHandle OUTPUT =
findConstructor("com.github.luben.zstd.ZstdOutputStream",
+ MethodType.methodType(void.class, OutputStream.class));
+ }
+
private static MethodHandle findConstructor(String className, MethodType
methodType) {
try {
return
MethodHandles.publicLookup().findConstructor(Class.forName(className),
methodType);
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
index d58689de119..4e844736bbd 100644
---
a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
+++
b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java
@@ -44,6 +44,9 @@
* @param firstOffset The starting offset for down-converted records. This
only impacts some cases. See
* {@link RecordsUtil#downConvert(Iterable, byte, long,
Time)} for an explanation.
* @param time The time instance to use
+ *
+ * @throws
org.apache.kafka.common.errors.UnsupportedCompressionTypeException If the first
batch to down-convert
+ * has a compression type which we do not support down-conversion for.
*/
public LazyDownConversionRecords(TopicPartition topicPartition, Records
records, byte toMagic, long firstOffset, Time time) {
this.topicPartition = Objects.requireNonNull(topicPartition);
@@ -150,7 +153,7 @@ protected ConvertedRecords makeNext() {
}
while (batchIterator.hasNext()) {
- List<RecordBatch> batches = new ArrayList<>();
+ final List<RecordBatch> batches = new ArrayList<>();
boolean isFirstBatch = true;
long sizeSoFar = 0;
@@ -162,6 +165,7 @@ protected ConvertedRecords makeNext() {
sizeSoFar += currentBatch.sizeInBytes();
isFirstBatch = false;
}
+
ConvertedRecords convertedRecords =
RecordsUtil.downConvert(batches, toMagic, firstOffset, time);
// During conversion, it is possible that we drop certain
batches because they do not have an equivalent
// representation in the message format we want to convert to.
For example, V0 and V1 message formats
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
index f0fab7d876d..ad1f97fa0cb 100644
---
a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
+++
b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecordsSend.java
@@ -17,6 +17,7 @@
package org.apache.kafka.common.record;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,35 +46,50 @@ public LazyDownConversionRecordsSend(String destination,
LazyDownConversionRecor
convertedRecordsIterator = records().iterator(MAX_READ_SIZE);
}
+ private MemoryRecords buildOverflowBatch(int remaining) {
+ // We do not have any records left to down-convert. Construct an
overflow message for the length remaining.
+ // This message will be ignored by the consumer because its length
will be past the length of maximum
+ // possible response size.
+ // DefaultRecordBatch =>
+ // BaseOffset => Int64
+ // Length => Int32
+ // ...
+ ByteBuffer overflowMessageBatch = ByteBuffer.allocate(
+ Math.max(MIN_OVERFLOW_MESSAGE_LENGTH, Math.min(remaining + 1,
MAX_READ_SIZE)));
+ overflowMessageBatch.putLong(-1L);
+
+ // Fill in the length of the overflow batch. A valid batch must be at
least as long as the minimum batch
+ // overhead.
+ overflowMessageBatch.putInt(Math.max(remaining + 1,
DefaultRecordBatch.RECORD_BATCH_OVERHEAD));
+ log.debug("Constructed overflow message batch for partition {} with
length={}", topicPartition(), remaining);
+ return MemoryRecords.readableRecords(overflowMessageBatch);
+ }
+
@Override
public long writeTo(GatheringByteChannel channel, long previouslyWritten,
int remaining) throws IOException {
if (convertedRecordsWriter == null ||
convertedRecordsWriter.completed()) {
MemoryRecords convertedRecords;
- // Check if we have more chunks left to down-convert
- if (convertedRecordsIterator.hasNext()) {
- // Get next chunk of down-converted messages
- ConvertedRecords<MemoryRecords> recordsAndStats =
convertedRecordsIterator.next();
- convertedRecords = recordsAndStats.records();
-
recordConversionStats.add(recordsAndStats.recordConversionStats());
- log.debug("Down-converted records for partition {} with
length={}", topicPartition(), convertedRecords.sizeInBytes());
- } else {
- // We do not have any records left to down-convert. Construct
an overflow message for the length remaining.
- // This message will be ignored by the consumer because its
length will be past the length of maximum
- // possible response size.
- // DefaultRecordBatch =>
- // BaseOffset => Int64
- // Length => Int32
- // ...
- ByteBuffer overflowMessageBatch = ByteBuffer.allocate(
- Math.max(MIN_OVERFLOW_MESSAGE_LENGTH,
Math.min(remaining + 1, MAX_READ_SIZE)));
- overflowMessageBatch.putLong(-1L);
- // Fill in the length of the overflow batch. A valid batch
must be at least as long as the minimum batch
- // overhead.
- overflowMessageBatch.putInt(Math.max(remaining + 1,
DefaultRecordBatch.RECORD_BATCH_OVERHEAD));
- convertedRecords =
MemoryRecords.readableRecords(overflowMessageBatch);
- log.debug("Constructed overflow message batch for partition {}
with length={}", topicPartition(), remaining);
+ try {
+ // Check if we have more chunks left to down-convert
+ if (convertedRecordsIterator.hasNext()) {
+ // Get next chunk of down-converted messages
+ ConvertedRecords<MemoryRecords> recordsAndStats =
convertedRecordsIterator.next();
+ convertedRecords = recordsAndStats.records();
+
recordConversionStats.add(recordsAndStats.recordConversionStats());
+ log.debug("Down-converted records for partition {} with
length={}", topicPartition(), convertedRecords.sizeInBytes());
+ } else {
+ convertedRecords = buildOverflowBatch(remaining);
+ }
+ } catch (UnsupportedCompressionTypeException e) {
+ // We have encountered a compression type which does not
support down-conversion (e.g. zstd).
+ // Since we have already sent at least one batch and we have
committed to the fetch size, we
+ // send an overflow batch. The consumer will read the first
few records and then fetch from the
+ // offset of the batch which has the unsupported compression
type. At that time, we will
+ // send back the UNSUPPORTED_COMPRESSION_TYPE erro which will
allow the consumer to fail gracefully.
+ convertedRecords = buildOverflowBatch(remaining);
}
+
convertedRecordsWriter = new DefaultRecordsSend(destination(),
convertedRecords, Math.min(convertedRecords.sizeInBytes(), remaining));
}
return convertedRecordsWriter.writeTo(channel);
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index 1c7a6c746df..0cc2cec31f9 100644
---
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -102,6 +102,8 @@ public MemoryRecordsBuilder(ByteBufferOutputStream
bufferStream,
throw new IllegalArgumentException("Transactional records are
not supported for magic " + magic);
if (isControlBatch)
throw new IllegalArgumentException("Control records are not
supported for magic " + magic);
+ if (compressionType == CompressionType.ZSTD)
+ throw new IllegalArgumentException("ZStandard compression is
not supported for magic " + magic);
}
this.magic = magic;
diff --git
a/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java
b/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java
index c9b73941317..3b0c59a24a5 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsUtil.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.record;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.apache.kafka.common.utils.Time;
import java.nio.ByteBuffer;
@@ -45,8 +46,14 @@
long startNanos = time.nanoseconds();
for (RecordBatch batch : batches) {
- if (toMagic < RecordBatch.MAGIC_VALUE_V2 && batch.isControlBatch())
- continue;
+ if (toMagic < RecordBatch.MAGIC_VALUE_V2) {
+ if (batch.isControlBatch())
+ continue;
+
+ if (batch.compressionType() == CompressionType.ZSTD)
+ throw new
UnsupportedCompressionTypeException("Down-conversion of zstandard-compressed
batches " +
+ "is not supported");
+ }
if (batch.magic() <= toMagic) {
totalSizeEstimate += batch.sizeInBytes();
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 32eb24d7de2..8d94bfd2134 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -191,9 +191,13 @@
FETCH_REQUEST_TOPIC_V9,
FORGOTTEN_TOPIC_DATA_V7);
+ // V10 bumped up to indicate ZStandard capability. (see KIP-110)
+ private static final Schema FETCH_REQUEST_V10 = FETCH_REQUEST_V9;
+
public static Schema[] schemaVersions() {
return new Schema[]{FETCH_REQUEST_V0, FETCH_REQUEST_V1,
FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4,
- FETCH_REQUEST_V5, FETCH_REQUEST_V6, FETCH_REQUEST_V7,
FETCH_REQUEST_V8, FETCH_REQUEST_V9};
+ FETCH_REQUEST_V5, FETCH_REQUEST_V6, FETCH_REQUEST_V7,
FETCH_REQUEST_V8, FETCH_REQUEST_V9,
+ FETCH_REQUEST_V10};
}
// default values for older versions where a request level limit did not
exist
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index f87f2ef05d3..9c29d375ccb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -61,6 +61,8 @@
* - {@link Errors#UNKNOWN_LEADER_EPOCH} If the epoch is larger than the
broker's epoch
* - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION} If the broker does not have
metadata for a topic or partition
* - {@link Errors#KAFKA_STORAGE_ERROR} If the log directory for one of the
requested partitions is offline
+ * - {@link Errors#UNSUPPORTED_COMPRESSION_TYPE} If a fetched topic is using a
compression type which is
+ * not supported by the fetch request version
* - {@link Errors#UNKNOWN_SERVER_ERROR} For any unexpected errors
*/
public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
@@ -180,10 +182,13 @@
// V9 adds the current leader epoch (see KIP-320)
private static final Schema FETCH_RESPONSE_V9 = FETCH_RESPONSE_V8;
+ // V10 bumped up to indicate ZStandard capability. (see KIP-110)
+ private static final Schema FETCH_RESPONSE_V10 = FETCH_RESPONSE_V9;
+
public static Schema[] schemaVersions() {
return new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1,
FETCH_RESPONSE_V2,
FETCH_RESPONSE_V3, FETCH_RESPONSE_V4, FETCH_RESPONSE_V5,
FETCH_RESPONSE_V6,
- FETCH_RESPONSE_V7, FETCH_RESPONSE_V8, FETCH_RESPONSE_V9};
+ FETCH_RESPONSE_V7, FETCH_RESPONSE_V8, FETCH_RESPONSE_V9,
FETCH_RESPONSE_V10};
}
public static final long INVALID_HIGHWATERMARK = -1L;
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 4f1d766b8bf..f87090eba6a 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -24,6 +24,7 @@
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
@@ -113,9 +114,14 @@
*/
private static final Schema PRODUCE_REQUEST_V6 = PRODUCE_REQUEST_V5;
+ /**
+ * V7 bumped up to indicate ZStandard capability. (see KIP-110)
+ */
+ private static final Schema PRODUCE_REQUEST_V7 = PRODUCE_REQUEST_V6;
+
public static Schema[] schemaVersions() {
return new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1,
PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3,
- PRODUCE_REQUEST_V4, PRODUCE_REQUEST_V5, PRODUCE_REQUEST_V6};
+ PRODUCE_REQUEST_V4, PRODUCE_REQUEST_V5, PRODUCE_REQUEST_V6,
PRODUCE_REQUEST_V7};
}
public static class Builder extends
AbstractRequest.Builder<ProduceRequest> {
@@ -151,12 +157,12 @@ public static Builder forMagic(byte magic,
return new Builder(minVersion, maxVersion, acks, timeout,
partitionRecords, transactionalId);
}
- private Builder(short minVersion,
- short maxVersion,
- short acks,
- int timeout,
- Map<TopicPartition, MemoryRecords> partitionRecords,
- String transactionalId) {
+ public Builder(short minVersion,
+ short maxVersion,
+ short acks,
+ int timeout,
+ Map<TopicPartition, MemoryRecords> partitionRecords,
+ String transactionalId) {
super(ApiKeys.PRODUCE, minVersion, maxVersion);
this.acks = acks;
this.timeout = timeout;
@@ -246,6 +252,10 @@ private void validateRecords(short version, MemoryRecords
records) {
if (entry.magic() != RecordBatch.MAGIC_VALUE_V2)
throw new InvalidRecordException("Produce requests with
version " + version + " are only allowed to " +
"contain record batches with magic version 2");
+ if (version < 7 && entry.compressionType() ==
CompressionType.ZSTD) {
+ throw new InvalidRecordException("Produce requests with
version " + version + " are note allowed to " +
+ "use ZStandard compression");
+ }
if (iterator.hasNext())
throw new InvalidRecordException("Produce requests with
version " + version + " are only allowed to " +
@@ -330,6 +340,7 @@ public ProduceResponse getErrorResponse(int throttleTimeMs,
Throwable e) {
case 4:
case 5:
case 6:
+ case 7:
return new ProduceResponse(responseMap, throttleTimeMs);
default:
throw new IllegalArgumentException(String.format("Version %d
is not valid. Valid versions for %s are 0 to %d",
@@ -400,6 +411,7 @@ public static byte requiredMagicForVersion(short
produceRequestVersion) {
case 4:
case 5:
case 6:
+ case 7:
return RecordBatch.MAGIC_VALUE_V2;
default:
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index fb15813ccf1..7d3e4fed362 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -144,9 +144,14 @@
*/
private static final Schema PRODUCE_RESPONSE_V6 = PRODUCE_RESPONSE_V5;
+ /**
+ * V7 bumped up to indicate ZStandard capability. (see KIP-110)
+ */
+ private static final Schema PRODUCE_RESPONSE_V7 = PRODUCE_RESPONSE_V6;
+
public static Schema[] schemaVersions() {
return new Schema[]{PRODUCE_RESPONSE_V0, PRODUCE_RESPONSE_V1,
PRODUCE_RESPONSE_V2, PRODUCE_RESPONSE_V3,
- PRODUCE_RESPONSE_V4, PRODUCE_RESPONSE_V5, PRODUCE_RESPONSE_V6};
+ PRODUCE_RESPONSE_V4, PRODUCE_RESPONSE_V5, PRODUCE_RESPONSE_V6,
PRODUCE_RESPONSE_V7};
}
private final Map<TopicPartition, PartitionResponse> responses;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
index 6a85449a659..f9887f9033e 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java
@@ -197,6 +197,9 @@ public void testSplitPreservesMagicAndCompressionType() {
if (compressionType == CompressionType.NONE && magic <
MAGIC_VALUE_V2)
continue;
+ if (compressionType == CompressionType.ZSTD && magic <
MAGIC_VALUE_V2)
+ continue;
+
MemoryRecordsBuilder builder =
MemoryRecords.builder(ByteBuffer.allocate(1024), magic,
compressionType, TimestampType.CREATE_TIME, 0L);
diff --git
a/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java
b/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java
index 83ada730ce2..fe6ffabaf61 100644
---
a/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/record/AbstractLegacyRecordBatchTest.java
@@ -25,6 +25,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class AbstractLegacyRecordBatchTest {
@@ -208,4 +209,41 @@ public void testSetCreateTimeV1() {
assertEquals(expectedTimestamp++, record.timestamp());
}
+ @Test
+ public void testZStdCompressionTypeWithV0OrV1() {
+ SimpleRecord[] simpleRecords = new SimpleRecord[] {
+ new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+ new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+ new SimpleRecord(3L, "c".getBytes(), "3".getBytes())
+ };
+
+ // Check V0
+ try {
+ MemoryRecords records =
MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V0, 0L,
+ CompressionType.ZSTD, TimestampType.CREATE_TIME,
simpleRecords);
+
+ ByteBufferLegacyRecordBatch batch = new
ByteBufferLegacyRecordBatch(records.buffer());
+ batch.setLastOffset(1L);
+
+ batch.iterator();
+ fail("Can't reach here");
+ } catch (IllegalArgumentException e) {
+ assertEquals("ZStandard compression is not supported for magic 0",
e.getMessage());
+ }
+
+ // Check V1
+ try {
+ MemoryRecords records =
MemoryRecords.withRecords(RecordBatch.MAGIC_VALUE_V1, 0L,
+ CompressionType.ZSTD, TimestampType.CREATE_TIME,
simpleRecords);
+
+ ByteBufferLegacyRecordBatch batch = new
ByteBufferLegacyRecordBatch(records.buffer());
+ batch.setLastOffset(1L);
+
+ batch.iterator();
+ fail("Can't reach here");
+ } catch (IllegalArgumentException e) {
+ assertEquals("ZStandard compression is not supported for magic 1",
e.getMessage());
+ }
+ }
+
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
index 8a955972b5c..783a5b531ef 100644
---
a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
@@ -56,6 +56,9 @@ public FileLogInputStreamTest(byte magic, CompressionType
compression) {
@Test
public void testWriteTo() throws IOException {
+ if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2)
+ return;
+
try (FileRecords fileRecords = FileRecords.open(tempFile())) {
fileRecords.append(MemoryRecords.withRecords(magic, compression,
new SimpleRecord("foo".getBytes())));
fileRecords.flush();
@@ -81,6 +84,9 @@ public void testWriteTo() throws IOException {
@Test
public void testSimpleBatchIteration() throws IOException {
+ if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2)
+ return;
+
try (FileRecords fileRecords = FileRecords.open(tempFile())) {
SimpleRecord firstBatchRecord = new SimpleRecord(3241324L,
"a".getBytes(), "foo".getBytes());
SimpleRecord secondBatchRecord = new SimpleRecord(234280L,
"b".getBytes(), "bar".getBytes());
@@ -108,6 +114,9 @@ public void testBatchIterationWithMultipleRecordsPerBatch()
throws IOException {
if (magic < MAGIC_VALUE_V2 && compression == CompressionType.NONE)
return;
+ if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2)
+ return;
+
try (FileRecords fileRecords = FileRecords.open(tempFile())) {
SimpleRecord[] firstBatchRecords = new SimpleRecord[]{
new SimpleRecord(3241324L, "a".getBytes(), "1".getBytes()),
@@ -185,6 +194,9 @@ public void testBatchIterationV2() throws IOException {
@Test
public void testBatchIterationIncompleteBatch() throws IOException {
+ if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2)
+ return;
+
try (FileRecords fileRecords = FileRecords.open(tempFile())) {
SimpleRecord firstBatchRecord = new SimpleRecord(100L,
"foo".getBytes());
SimpleRecord secondBatchRecord = new SimpleRecord(200L,
"bar".getBytes());
diff --git
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index 5d5221eccea..552b4e6c42b 100644
---
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -16,10 +16,13 @@
*/
package org.apache.kafka.common.record;
+import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -30,6 +33,7 @@
import java.util.List;
import java.util.Random;
+import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2;
import static org.apache.kafka.common.utils.Utils.utf8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -39,6 +43,8 @@
@RunWith(value = Parameterized.class)
public class MemoryRecordsBuilderTest {
+ @Rule
+ public ExpectedException exceptionRule = ExpectedException.none();
private final CompressionType compressionType;
private final int bufferOffset;
@@ -52,6 +58,8 @@ public MemoryRecordsBuilderTest(int bufferOffset,
CompressionType compressionTyp
@Test
public void testWriteEmptyRecordSet() {
+ expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0);
+
ByteBuffer buffer = ByteBuffer.allocate(128);
buffer.position(bufferOffset);
@@ -207,6 +215,8 @@ public void testWriteEndTxnMarkerNonControlBatch() {
@Test
public void testCompressionRateV0() {
+ expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0);
+
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.position(bufferOffset);
@@ -262,6 +272,8 @@ public void testEstimatedSizeInBytes() {
@Test
public void testCompressionRateV1() {
+ expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V1);
+
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.position(bufferOffset);
@@ -293,6 +305,8 @@ public void testCompressionRateV1() {
@Test
public void buildUsingLogAppendTime() {
+ expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V1);
+
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.position(bufferOffset);
@@ -322,6 +336,8 @@ public void buildUsingLogAppendTime() {
@Test
public void buildUsingCreateTime() {
+ expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V1);
+
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.position(bufferOffset);
@@ -353,6 +369,8 @@ public void buildUsingCreateTime() {
@Test
public void testAppendedChecksumConsistency() {
+ expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0);
+
ByteBuffer buffer = ByteBuffer.allocate(512);
for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0,
RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) {
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer,
magic, compressionType,
@@ -397,6 +415,8 @@ public void testSmallWriteLimit() {
@Test
public void writePastLimit() {
+ expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V1);
+
ByteBuffer buffer = ByteBuffer.allocate(64);
buffer.position(bufferOffset);
@@ -442,6 +462,11 @@ public void testAppendAtInvalidOffset() {
@Test
public void convertV2ToV1UsingMixedCreateAndLogAppendTime() {
+ if (compressionType == CompressionType.ZSTD) {
+ exceptionRule.expect(UnsupportedCompressionTypeException.class);
+ exceptionRule.expectMessage("Down-conversion of
zstandard-compressed batches is not supported");
+ }
+
ByteBuffer buffer = ByteBuffer.allocate(512);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
RecordBatch.MAGIC_VALUE_V2,
compressionType, TimestampType.LOG_APPEND_TIME, 0L);
@@ -497,6 +522,8 @@ public void convertV2ToV1UsingMixedCreateAndLogAppendTime()
{
@Test
public void convertToV1WithMixedV0AndV2Data() {
+ expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0);
+
ByteBuffer buffer = ByteBuffer.allocate(512);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
RecordBatch.MAGIC_VALUE_V0,
compressionType, TimestampType.NO_TIMESTAMP_TYPE, 0L);
@@ -571,6 +598,8 @@ public void convertToV1WithMixedV0AndV2Data() {
@Test
public void shouldThrowIllegalStateExceptionOnBuildWhenAborted() throws
Exception {
+ expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0);
+
ByteBuffer buffer = ByteBuffer.allocate(128);
buffer.position(bufferOffset);
@@ -588,6 +617,8 @@ public void
shouldThrowIllegalStateExceptionOnBuildWhenAborted() throws Exceptio
@Test
public void shouldResetBufferToInitialPositionOnAbort() throws Exception {
+ expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0);
+
ByteBuffer buffer = ByteBuffer.allocate(128);
buffer.position(bufferOffset);
@@ -601,6 +632,8 @@ public void shouldResetBufferToInitialPositionOnAbort()
throws Exception {
@Test
public void shouldThrowIllegalStateExceptionOnCloseWhenAborted() throws
Exception {
+ expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0);
+
ByteBuffer buffer = ByteBuffer.allocate(128);
buffer.position(bufferOffset);
@@ -618,6 +651,8 @@ public void
shouldThrowIllegalStateExceptionOnCloseWhenAborted() throws Exceptio
@Test
public void shouldThrowIllegalStateExceptionOnAppendWhenAborted() throws
Exception {
+ expectExceptionWithZStd(compressionType, RecordBatch.MAGIC_VALUE_V0);
+
ByteBuffer buffer = ByteBuffer.allocate(128);
buffer.position(bufferOffset);
@@ -699,4 +734,10 @@ else if (numRecordsConverted == numRecords)
}
}
+ private void expectExceptionWithZStd(CompressionType compressionType, byte
magic) {
+ if (compressionType == CompressionType.ZSTD && magic < MAGIC_VALUE_V2)
{
+ exceptionRule.expect(IllegalArgumentException.class);
+ exceptionRule.expectMessage("ZStandard compression is not
supported for magic " + magic);
+ }
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 579fb74b44a..5f16acfb9e3 100644
---
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -22,7 +22,9 @@
import
org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -33,6 +35,7 @@
import java.util.List;
import static java.util.Arrays.asList;
+import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -42,6 +45,8 @@
@RunWith(value = Parameterized.class)
public class MemoryRecordsTest {
+ @Rule
+ public ExpectedException exceptionRule = ExpectedException.none();
private CompressionType compression;
private byte magic;
@@ -69,6 +74,8 @@ public MemoryRecordsTest(byte magic, long firstOffset,
CompressionType compressi
@Test
public void testIterator() {
+ expectExceptionWithZStd(compression, magic);
+
ByteBuffer buffer = ByteBuffer.allocate(1024);
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic,
compression,
@@ -152,6 +159,8 @@ public void testIterator() {
@Test
public void testHasRoomForMethod() {
+ expectExceptionWithZStd(compression, magic);
+
MemoryRecordsBuilder builder =
MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression,
TimestampType.CREATE_TIME, 0L);
builder.append(0L, "a".getBytes(), "1".getBytes());
@@ -439,6 +448,8 @@ public void testBuildEndTxnMarker() {
@Test
public void testFilterToBatchDiscard() {
if (compression != CompressionType.NONE || magic >=
RecordBatch.MAGIC_VALUE_V2) {
+ expectExceptionWithZStd(compression, magic);
+
ByteBuffer buffer = ByteBuffer.allocate(2048);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
magic, compression, TimestampType.CREATE_TIME, 0L);
builder.append(10L, "1".getBytes(), "a".getBytes());
@@ -489,6 +500,8 @@ protected boolean shouldRetainRecord(RecordBatch
recordBatch, Record record) {
@Test
public void testFilterToAlreadyCompactedLog() {
+ expectExceptionWithZStd(compression, magic);
+
ByteBuffer buffer = ByteBuffer.allocate(2048);
// create a batch with some offset gaps to simulate a compacted batch
@@ -629,6 +642,8 @@ public void testFilterToPreservesProducerInfo() {
@Test
public void testFilterToWithUndersizedBuffer() {
+ expectExceptionWithZStd(compression, magic);
+
ByteBuffer buffer = ByteBuffer.allocate(1024);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic,
compression, TimestampType.CREATE_TIME, 0L);
builder.append(10L, null, "a".getBytes());
@@ -679,6 +694,8 @@ public void testFilterToWithUndersizedBuffer() {
@Test
public void testToString() {
+ expectExceptionWithZStd(compression, magic);
+
long timestamp = 1000000;
MemoryRecords memoryRecords = MemoryRecords.withRecords(magic,
compression,
new SimpleRecord(timestamp, "key1".getBytes(),
"value1".getBytes()),
@@ -709,6 +726,8 @@ public void testToString() {
@Test
public void testFilterTo() {
+ expectExceptionWithZStd(compression, magic);
+
ByteBuffer buffer = ByteBuffer.allocate(2048);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic,
compression, TimestampType.CREATE_TIME, 0L);
builder.append(10L, null, "a".getBytes());
@@ -822,6 +841,8 @@ public void testFilterTo() {
@Test
public void testFilterToPreservesLogAppendTime() {
+ expectExceptionWithZStd(compression, magic);
+
long logAppendTime = System.currentTimeMillis();
ByteBuffer buffer = ByteBuffer.allocate(2048);
@@ -866,6 +887,8 @@ public void testFilterToPreservesLogAppendTime() {
@Test
public void testNextBatchSize() {
+ expectExceptionWithZStd(compression, magic);
+
ByteBuffer buffer = ByteBuffer.allocate(2048);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic,
compression,
TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, pid, epoch,
firstSequence);
@@ -905,6 +928,13 @@ public void testNextBatchSize() {
}
}
+ private void expectExceptionWithZStd(CompressionType compressionType, byte
magic) {
+ if (compressionType == CompressionType.ZSTD && magic < MAGIC_VALUE_V2)
{
+ exceptionRule.expect(IllegalArgumentException.class);
+ exceptionRule.expectMessage("ZStandard compression is not
supported for magic " + magic);
+ }
+ }
+
@Parameterized.Parameters(name = "{index} magic={0}, firstOffset={1},
compressionType={2}")
public static Collection<Object[]> data() {
List<Object[]> values = new ArrayList<>();
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
index ef17c96c5ad..74e3960dd47 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java
@@ -158,6 +158,26 @@ public void testV3AndAboveCannotUseMagicV1() {
assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder);
}
+ @Test
+ public void testV6AndBelowCannotUseZStdCompression() {
+ ByteBuffer buffer = ByteBuffer.allocate(256);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
RecordBatch.MAGIC_VALUE_V2, CompressionType.ZSTD,
+ TimestampType.CREATE_TIME, 0L);
+ builder.append(10L, null, "a".getBytes());
+
+ Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
+ produceData.put(new TopicPartition("test", 0), builder.build());
+
+ // Can't create ProduceRequest instance with version within [3, 7)
+ for (short version = 3; version < 7; version++) {
+ ProduceRequest.Builder requestBuilder = new
ProduceRequest.Builder(version, version, (short) 1, 5000, produceData, null);
+ assertThrowsInvalidRecordExceptionForAllVersions(requestBuilder);
+ }
+
+ // Works fine with current version (>= 7)
+ ProduceRequest.Builder.forCurrentMagic((short) 1, 5000, produceData);
+ }
+
private void
assertThrowsInvalidRecordExceptionForAllVersions(ProduceRequest.Builder
builder) {
for (short version = builder.oldestAllowedVersion(); version <
builder.latestAllowedVersion(); version++) {
assertThrowsInvalidRecordException(builder, version);
diff --git a/config/producer.properties b/config/producer.properties
index 750b95ee0ae..4786b988a29 100644
--- a/config/producer.properties
+++ b/config/producer.properties
@@ -20,7 +20,7 @@
# format: host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092
-# specify the compression codec for all data generated: none, gzip, snappy, lz4
+# specify the compression codec for all data generated: none, gzip, snappy,
lz4, zstd
compression.type=none
# name of the partitioner class for partitioning events; default partition
spreads data randomly
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala
b/core/src/main/scala/kafka/api/ApiVersion.scala
index bc3602bd69a..e9b16fafeee 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -79,7 +79,9 @@ object ApiVersion {
// Introduced new schemas for group offset (v2) and group metadata (v2)
(KIP-211)
KAFKA_2_1_IV0,
// New Fetch, OffsetsForLeaderEpoch, and ListOffsets schemas (KIP-320)
- KAFKA_2_1_IV1
+ KAFKA_2_1_IV1,
+ // Support ZStandard Compression Codec (KIP-110)
+ KAFKA_2_1_IV2
)
// Map keys are the union of the short and full versions
@@ -270,6 +272,13 @@ case object KAFKA_2_1_IV1 extends DefaultApiVersion {
val id: Int = 18
}
+case object KAFKA_2_1_IV2 extends DefaultApiVersion {
+ val shortVersion: String = "2.1"
+ val subVersion = "IV2"
+ val recordVersion = RecordVersion.V2
+ val id: Int = 19
+}
+
object ApiVersionValidator extends Validator {
override def ensureValid(name: String, value: Any): Unit = {
diff --git a/core/src/main/scala/kafka/message/CompressionCodec.scala
b/core/src/main/scala/kafka/message/CompressionCodec.scala
index 64e0aaa72a1..abe3694b035 100644
--- a/core/src/main/scala/kafka/message/CompressionCodec.scala
+++ b/core/src/main/scala/kafka/message/CompressionCodec.scala
@@ -28,6 +28,7 @@ object CompressionCodec {
case GZIPCompressionCodec.codec => GZIPCompressionCodec
case SnappyCompressionCodec.codec => SnappyCompressionCodec
case LZ4CompressionCodec.codec => LZ4CompressionCodec
+ case ZStdCompressionCodec.codec => ZStdCompressionCodec
case _ => throw new UnknownCodecException("%d is an unknown compression
codec".format(codec))
}
}
@@ -37,6 +38,7 @@ object CompressionCodec {
case GZIPCompressionCodec.name => GZIPCompressionCodec
case SnappyCompressionCodec.name => SnappyCompressionCodec
case LZ4CompressionCodec.name => LZ4CompressionCodec
+ case ZStdCompressionCodec.name => ZStdCompressionCodec
case _ => throw new kafka.common.UnknownCodecException("%s is an unknown
compression codec".format(name))
}
}
@@ -44,7 +46,7 @@ object CompressionCodec {
object BrokerCompressionCodec {
- val brokerCompressionCodecs = List(UncompressedCodec,
SnappyCompressionCodec, LZ4CompressionCodec, GZIPCompressionCodec,
ProducerCompressionCodec)
+ val brokerCompressionCodecs = List(UncompressedCodec, ZStdCompressionCodec,
LZ4CompressionCodec, SnappyCompressionCodec, GZIPCompressionCodec,
ProducerCompressionCodec)
val brokerCompressionOptions = brokerCompressionCodecs.map(codec =>
codec.name)
def isValid(compressionType: String): Boolean =
brokerCompressionOptions.contains(compressionType.toLowerCase(Locale.ROOT))
@@ -87,6 +89,11 @@ case object LZ4CompressionCodec extends CompressionCodec
with BrokerCompressionC
val name = "lz4"
}
+case object ZStdCompressionCodec extends CompressionCodec with
BrokerCompressionCodec {
+ val codec = 4
+ val name = "zstd"
+}
+
case object NoCompressionCodec extends CompressionCodec with
BrokerCompressionCodec {
val codec = 0
val name = "none"
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 18ab9db22ed..ecbbdb6f03f 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -31,6 +31,8 @@ import kafka.common.OffsetAndMetadata
import kafka.controller.KafkaController
import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
import kafka.coordinator.transaction.{InitProducerIdResult,
TransactionCoordinator}
+import kafka.log.{Log, LogManager, TimestampOffset}
+import kafka.message.{CompressionCodec, NoCompressionCodec,
ZStdCompressionCodec}
import kafka.network.RequestChannel
import kafka.security.SecurityUtils
import kafka.security.auth.{Resource, _}
@@ -534,42 +536,56 @@ class KafkaApis(val requestChannel: RequestChannel,
def maybeConvertFetchedData(tp: TopicPartition,
partitionData:
FetchResponse.PartitionData[Records]): FetchResponse.PartitionData[BaseRecords]
= {
- // Down-conversion of the fetched records is needed when the stored
magic version is
- // greater than that supported by the client (as indicated by the fetch
request version). If the
- // configured magic version for the topic is less than or equal to that
supported by the version of the
- // fetch request, we skip the iteration through the records in order to
check the magic version since we
- // know it must be supported. However, if the magic version is changed
from a higher version back to a
- // lower version, this check will no longer be valid and we will fail to
down-convert the messages
- // which were written in the new format prior to the version downgrade.
- val unconvertedRecords = partitionData.records
val logConfig = replicaManager.getLogConfig(tp)
- val downConvertMagic =
- logConfig.map(_.messageFormatVersion.recordVersion.value).flatMap {
magic =>
- if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1 &&
!unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0))
- Some(RecordBatch.MAGIC_VALUE_V0)
- else if (magic > RecordBatch.MAGIC_VALUE_V1 && versionId <= 3 &&
!unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1))
- Some(RecordBatch.MAGIC_VALUE_V1)
- else
- None
- }
- // For fetch requests from clients, check if down-conversion is disabled
for the particular partition
- if (downConvertMagic.isDefined && !fetchRequest.isFromFollower &&
!logConfig.forall(_.messageDownConversionEnable)) {
- trace(s"Conversion to message format ${downConvertMagic.get} is
disabled for partition $tp. Sending unsupported version response to $clientId.")
- errorResponse(Errors.UNSUPPORTED_VERSION)
+ if (logConfig.forall(_.compressionType == ZStdCompressionCodec.name) &&
versionId < 10) {
+ trace(s"Fetching messages is disabled for ZStandard compressed
partition $tp. Sending unsupported version response to $clientId.")
+ errorResponse(Errors.UNSUPPORTED_COMPRESSION_TYPE)
} else {
- val convertedRecords =
- downConvertMagic.map { magic =>
- trace(s"Down converting records from partition $tp to message
format version $magic for fetch request from $clientId")
- // Because down-conversion is extremely memory intensive, we want
to try and delay the down-conversion as much
- // as possible. With KIP-283, we have the ability to lazily
down-convert in a chunked manner. The lazy, chunked
- // down-conversion always guarantees that at least one batch of
messages is down-converted and sent out to the
- // client.
- new LazyDownConversionRecords(tp, unconvertedRecords, magic,
fetchContext.getFetchOffset(tp).get, time)
- }.getOrElse(unconvertedRecords)
- new FetchResponse.PartitionData[BaseRecords](partitionData.error,
partitionData.highWatermark,
- FetchResponse.INVALID_LAST_STABLE_OFFSET,
partitionData.logStartOffset, partitionData.abortedTransactions,
- convertedRecords)
+ // Down-conversion of the fetched records is needed when the stored
magic version is
+ // greater than that supported by the client (as indicated by the
fetch request version). If the
+ // configured magic version for the topic is less than or equal to
that supported by the version of the
+ // fetch request, we skip the iteration through the records in order
to check the magic version since we
+ // know it must be supported. However, if the magic version is changed
from a higher version back to a
+ // lower version, this check will no longer be valid and we will fail
to down-convert the messages
+ // which were written in the new format prior to the version downgrade.
+ val unconvertedRecords = partitionData.records
+ val downConvertMagic =
+ logConfig.map(_.messageFormatVersion.recordVersion.value).flatMap {
magic =>
+ if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1 &&
!unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0))
+ Some(RecordBatch.MAGIC_VALUE_V0)
+ else if (magic > RecordBatch.MAGIC_VALUE_V1 && versionId <= 3 &&
!unconvertedRecords.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1))
+ Some(RecordBatch.MAGIC_VALUE_V1)
+ else
+ None
+ }
+
+ downConvertMagic match {
+ case Some(magic) =>
+ // For fetch requests from clients, check if down-conversion is
disabled for the particular partition
+ if (!fetchRequest.isFromFollower &&
!logConfig.forall(_.messageDownConversionEnable)) {
+ trace(s"Conversion to message format ${downConvertMagic.get} is
disabled for partition $tp. Sending unsupported version response to $clientId.")
+ errorResponse(Errors.UNSUPPORTED_VERSION)
+ } else {
+ try {
+ trace(s"Down converting records from partition $tp to message
format version $magic for fetch request from $clientId")
+ // Because down-conversion is extremely memory intensive, we
want to try and delay the down-conversion as much
+ // as possible. With KIP-283, we have the ability to lazily
down-convert in a chunked manner. The lazy, chunked
+ // down-conversion always guarantees that at least one batch
of messages is down-converted and sent out to the
+ // client.
+ new
FetchResponse.PartitionData[BaseRecords](partitionData.error,
partitionData.highWatermark,
+ FetchResponse.INVALID_LAST_STABLE_OFFSET,
partitionData.logStartOffset, partitionData.abortedTransactions,
+ new LazyDownConversionRecords(tp, unconvertedRecords, magic,
fetchContext.getFetchOffset(tp).get, time))
+ } catch {
+ case e: UnsupportedCompressionTypeException =>
+ trace("Received unsupported compression type error during
down-conversion", e)
+ errorResponse(Errors.UNSUPPORTED_COMPRESSION_TYPE)
+ }
+ }
+ case None => new
FetchResponse.PartitionData[BaseRecords](partitionData.error,
partitionData.highWatermark,
+ FetchResponse.INVALID_LAST_STABLE_OFFSET,
partitionData.logStartOffset, partitionData.abortedTransactions,
+ unconvertedRecords)
+ }
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 700f32c534f..9edda4ea7f1 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -720,7 +720,7 @@ object KafkaConfig {
val DeleteTopicEnableDoc = "Enables delete topic. Delete topic through the
admin tool will have no effect if this config is turned off"
val CompressionTypeDoc = "Specify the final compression type for a given
topic. This configuration accepts the standard compression codecs " +
- "('gzip', 'snappy', 'lz4'). It additionally accepts 'uncompressed' which is
equivalent to no compression; and " +
+ "('gzip', 'snappy', 'lz4', 'zstd'). It additionally accepts 'uncompressed'
which is equivalent to no compression; and " +
"'producer' which means retain the original compression codec set by the
producer."
/** ********* Kafka Metrics Configuration ***********/
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 930281a342c..aeeaf29516a 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -62,7 +62,7 @@ class ReplicaFetcherThread(name: String,
// Visible for testing
private[server] val fetchRequestVersion: Short =
- if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_1_IV1) 9
+ if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_1_IV2) 10
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1) 8
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_1_1_IV0) 7
else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV1) 5
diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
index 8d8c42d36cf..a9099954564 100644
--- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala
@@ -120,7 +120,7 @@ object ConsoleProducer {
.describedAs("broker-list")
.ofType(classOf[String])
val syncOpt = parser.accepts("sync", "If set message send requests to the
brokers are synchronously, one at a time as they arrive.")
- val compressionCodecOpt = parser.accepts("compression-codec", "The
compression codec: either 'none', 'gzip', 'snappy', or 'lz4'." +
+ val compressionCodecOpt = parser.accepts("compression-codec", "The
compression codec: either 'none', 'gzip', 'snappy', 'lz4', or 'zstd'." +
"If
specified without value, then it defaults to 'gzip'")
.withOptionalArg()
.describedAs("compression-codec")
diff --git
a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
index bb49884686b..24193521f91 100755
--- a/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerCompressionTest.scala
@@ -113,7 +113,8 @@ object ProducerCompressionTest {
Array("none"),
Array("gzip"),
Array("snappy"),
- Array("lz4")
+ Array("lz4"),
+ Array("zstd")
).asJava
}
}
diff --git a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
index d2d115b2cfa..1ffa695f48c 100644
--- a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
+++ b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
@@ -80,9 +80,10 @@ class ApiVersionTest {
assertEquals(KAFKA_2_0_IV0, ApiVersion("2.0-IV0"))
assertEquals(KAFKA_2_0_IV1, ApiVersion("2.0-IV1"))
- assertEquals(KAFKA_2_1_IV1, ApiVersion("2.1"))
+ assertEquals(KAFKA_2_1_IV2, ApiVersion("2.1"))
assertEquals(KAFKA_2_1_IV0, ApiVersion("2.1-IV0"))
assertEquals(KAFKA_2_1_IV1, ApiVersion("2.1-IV1"))
+ assertEquals(KAFKA_2_1_IV2, ApiVersion("2.1-IV2"))
}
@Test
diff --git
a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
index 266bb391e2e..232cfdbf1c1 100755
---
a/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
+++
b/core/src/test/scala/unit/kafka/log/LogCleanerParameterizedIntegrationTest.scala
@@ -131,6 +131,10 @@ class
LogCleanerParameterizedIntegrationTest(compressionCodec: String) extends A
@Test
def testCleanerWithMessageFormatV0(): Unit = {
+ // zstd compression is not supported with older message formats
+ if (codec == CompressionType.ZSTD)
+ return
+
val largeMessageKey = 20
val (largeMessageValue, largeMessageSet) =
createLargeSingleMessageSet(largeMessageKey, RecordBatch.MAGIC_VALUE_V0)
val maxMessageSize = codec match {
@@ -181,6 +185,10 @@ class
LogCleanerParameterizedIntegrationTest(compressionCodec: String) extends A
@Test
def testCleaningNestedMessagesWithMultipleVersions(): Unit = {
+ // zstd compression is not supported with older message formats
+ if (codec == CompressionType.ZSTD)
+ return
+
val maxMessageSize = 192
cleaner = makeCleaner(partitions = topicPartitions, maxMessageSize =
maxMessageSize)
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index b4315d105de..72a28549cf5 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -22,6 +22,7 @@ import java.util.{Optional, Properties}
import kafka.api.KAFKA_0_11_0_IV2
import kafka.log.LogConfig
+import kafka.message.{GZIPCompressionCodec, ProducerCompressionCodec,
ZStdCompressionCodec}
import kafka.utils.TestUtils
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
@@ -424,6 +425,119 @@ class FetchRequestTest extends BaseRequestTest {
assertFalse(resp4.responseData().containsKey(bar0))
}
+ @Test
+ def testZStdCompressedTopic(): Unit = {
+ // ZSTD compressed topic
+ val topicConfig = Map(LogConfig.CompressionTypeProp ->
ZStdCompressionCodec.name)
+ val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions
= 1, configs = topicConfig).head
+
+ // Produce messages (v2)
+ producer =
TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
+ keySerializer = new StringSerializer,
+ valueSerializer = new StringSerializer)
+ producer.send(new ProducerRecord(topicPartition.topic,
topicPartition.partition,
+ "key1", "value1")).get
+ producer.send(new ProducerRecord(topicPartition.topic,
topicPartition.partition,
+ "key2", "value2")).get
+ producer.send(new ProducerRecord(topicPartition.topic,
topicPartition.partition,
+ "key3", "value3")).get
+ producer.close()
+
+ // fetch request with version below v10: UNSUPPORTED_COMPRESSION_TYPE
error occurs
+ val req0 = new FetchRequest.Builder(0, 9, -1, Int.MaxValue, 0,
+ createPartitionMap(300, Seq(topicPartition), Map.empty))
+ .setMaxBytes(800).build()
+
+ val res0 = sendFetchRequest(leaderId, req0)
+ val data0 = res0.responseData.get(topicPartition)
+ assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, data0.error)
+
+ // fetch request with version 10: works fine!
+ val req1= new FetchRequest.Builder(0, 10, -1, Int.MaxValue, 0,
+ createPartitionMap(300, Seq(topicPartition), Map.empty))
+ .setMaxBytes(800).build()
+ val res1 = sendFetchRequest(leaderId, req1)
+ val data1 = res1.responseData.get(topicPartition)
+ assertEquals(Errors.NONE, data1.error)
+ assertEquals(3, records(data1).size)
+ }
+
+ @Test
+ def testZStdCompressedRecords(): Unit = {
+ // Producer compressed topic
+ val topicConfig = Map(LogConfig.CompressionTypeProp ->
ProducerCompressionCodec.name,
+ LogConfig.MessageFormatVersionProp -> "2.0.0")
+ val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions
= 1, configs = topicConfig).head
+
+ // Produce GZIP compressed messages (v2)
+ val producer1 =
TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
+ compressionType = GZIPCompressionCodec.name,
+ keySerializer = new StringSerializer,
+ valueSerializer = new StringSerializer)
+ producer1.send(new ProducerRecord(topicPartition.topic,
topicPartition.partition,
+ "key1", "value1")).get
+ producer1.close()
+ // Produce ZSTD compressed messages (v2)
+ val producer2 =
TestUtils.createProducer(TestUtils.getBrokerListStrFromServers(servers),
+ compressionType = ZStdCompressionCodec.name,
+ keySerializer = new StringSerializer,
+ valueSerializer = new StringSerializer)
+ producer2.send(new ProducerRecord(topicPartition.topic,
topicPartition.partition,
+ "key2", "value2")).get
+ producer2.send(new ProducerRecord(topicPartition.topic,
topicPartition.partition,
+ "key3", "value3")).get
+ producer2.close()
+
+ // fetch request with fetch version v1 (magic 0):
+ // gzip compressed record is returned with down-conversion.
+ // zstd compressed record raises UNSUPPORTED_COMPRESSION_TYPE error.
+ val req0 = new FetchRequest.Builder(0, 1, -1, Int.MaxValue, 0,
+ createPartitionMap(300, Seq(topicPartition), Map.empty))
+ .setMaxBytes(800).build()
+
+ val res0 = sendFetchRequest(leaderId, req0)
+ val data0 = res0.responseData.get(topicPartition)
+ assertEquals(Errors.NONE, data0.error)
+ assertEquals(1, records(data0).size)
+
+ val req1 = new FetchRequest.Builder(0, 1, -1, Int.MaxValue, 0,
+ createPartitionMap(300, Seq(topicPartition), Map(topicPartition -> 1L)))
+ .setMaxBytes(800).build()
+
+ val res1 = sendFetchRequest(leaderId, req1)
+ val data1 = res1.responseData.get(topicPartition)
+ assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, data1.error)
+
+ // fetch request with fetch version v3 (magic 1):
+ // gzip compressed record is returned with down-conversion.
+ // zstd compressed record raises UNSUPPORTED_COMPRESSION_TYPE error.
+ val req2 = new FetchRequest.Builder(2, 3, -1, Int.MaxValue, 0,
+ createPartitionMap(300, Seq(topicPartition), Map.empty))
+ .setMaxBytes(800).build()
+
+ val res2 = sendFetchRequest(leaderId, req2)
+ val data2 = res2.responseData.get(topicPartition)
+ assertEquals(Errors.NONE, data2.error)
+ assertEquals(1, records(data2).size)
+
+ val req3 = new FetchRequest.Builder(0, 1, -1, Int.MaxValue, 0,
+ createPartitionMap(300, Seq(topicPartition), Map(topicPartition -> 1L)))
+ .setMaxBytes(800).build()
+
+ val res3 = sendFetchRequest(leaderId, req3)
+ val data3 = res3.responseData.get(topicPartition)
+ assertEquals(Errors.UNSUPPORTED_COMPRESSION_TYPE, data3.error)
+
+ // fetch request with version 10: works fine!
+ val req4= new FetchRequest.Builder(0, 10, -1, Int.MaxValue, 0,
+ createPartitionMap(300, Seq(topicPartition), Map.empty))
+ .setMaxBytes(800).build()
+ val res4 = sendFetchRequest(leaderId, req4)
+ val data4 = res4.responseData.get(topicPartition)
+ assertEquals(Errors.NONE, data4.error)
+ assertEquals(3, records(data4).size)
+ }
+
private def records(partitionData:
FetchResponse.PartitionData[MemoryRecords]): Seq[Record] = {
partitionData.records.records.asScala.toIndexedSeq
}
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
index 4e66494374f..b1f3af145b9 100644
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -17,10 +17,14 @@
package kafka.server
+import java.util.Properties
+
+import kafka.log.LogConfig
+import kafka.message.ZStdCompressionCodec
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.{CompressionType, DefaultRecordBatch,
MemoryRecords, SimpleRecord}
+import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse}
import org.junit.Assert._
import org.junit.Test
@@ -111,6 +115,31 @@ class ProduceRequestTest extends BaseRequestTest {
assertEquals(-1, partitionResponse.logAppendTime)
}
+ @Test
+ def testZSTDProduceRequest(): Unit = {
+ val topic = "topic"
+ val partition = 0
+
+ // Create a single-partition topic compressed with ZSTD
+ val topicConfig = new Properties
+ topicConfig.setProperty(LogConfig.CompressionTypeProp,
ZStdCompressionCodec.name)
+ val partitionToLeader = TestUtils.createTopic(zkClient, topic, 1, 1,
servers, topicConfig)
+ val leader = partitionToLeader(partition)
+ val memoryRecords = MemoryRecords.withRecords(CompressionType.ZSTD,
+ new SimpleRecord(System.currentTimeMillis(), "key".getBytes,
"value".getBytes))
+ val topicPartition = new TopicPartition("topic", partition)
+ val partitionRecords = Map(topicPartition -> memoryRecords)
+
+ // produce request with v7: works fine!
+ val res1 = sendProduceRequest(leader,
+ new ProduceRequest.Builder(7, 7, -1, 3000, partitionRecords.asJava,
null).build())
+ val (tp, partitionResponse) = res1.responses.asScala.head
+ assertEquals(topicPartition, tp)
+ assertEquals(Errors.NONE, partitionResponse.error)
+ assertEquals(0, partitionResponse.baseOffset)
+ assertEquals(-1, partitionResponse.logAppendTime)
+ }
+
private def sendProduceRequest(leaderId: Int, request: ProduceRequest):
ProduceResponse = {
val response = connectAndSend(request, ApiKeys.PRODUCE, destination =
brokerSocketServer(leaderId))
ProduceResponse.parse(response, request.version)
diff --git a/docs/design.html b/docs/design.html
index bdc7e637ea9..0061a53c49d 100644
--- a/docs/design.html
+++ b/docs/design.html
@@ -136,7 +136,7 @@ <h4><a id="design_compression"
href="#design_compression">End-to-end Batch Compr
Kafka supports this with an efficient batching format. A batch of messages
can be clumped together compressed and sent to the server in this form. This
batch of messages will be written in compressed form and will
remain compressed in the log and will only be decompressed by the consumer.
<p>
- Kafka supports GZIP, Snappy and LZ4 compression protocols. More details on
compression can be found <a
href="https://cwiki.apache.org/confluence/display/KAFKA/Compression">here</a>.
+ Kafka supports GZIP, Snappy, LZ4 and ZStandard compression protocols. More
details on compression can be found <a
href="https://cwiki.apache.org/confluence/display/KAFKA/Compression">here</a>.
<h3><a id="theproducer" href="#theproducer">4.4 The Producer</a></h3>
diff --git a/docs/implementation.html b/docs/implementation.html
index 4ecce7b4485..cc2b0f47ffe 100644
--- a/docs/implementation.html
+++ b/docs/implementation.html
@@ -44,6 +44,7 @@ <h4><a id="recordbatch" href="#recordbatch">5.3.1 Record
Batch</a></h4>
1: gzip
2: snappy
3: lz4
+ 4: zstd
bit 3: timestampType
bit 4: isTransactional (0 means not transactional)
bit 5: isControlBatch (0 means not a control batch)
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 23fc68aaba5..18a25349088 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -83,7 +83,8 @@ versions += [
slf4j: "1.7.25",
snappy: "1.1.7.2",
zkclient: "0.10",
- zookeeper: "3.4.13"
+ zookeeper: "3.4.13",
+ zstd: "1.3.5-4"
]
libs += [
@@ -141,5 +142,6 @@ libs += [
zkclient: "com.101tec:zkclient:$versions.zkclient",
zookeeper: "org.apache.zookeeper:zookeeper:$versions.zookeeper",
jfreechart: "jfreechart:jfreechart:$versions.jfreechart",
- mavenArtifact: "org.apache.maven:maven-artifact:$versions.mavenArtifact"
+ mavenArtifact: "org.apache.maven:maven-artifact:$versions.mavenArtifact",
+ zstd: "com.github.luben:zstd-jni:$versions.zstd",
]
diff --git a/tests/kafkatest/tests/client/compression_test.py
b/tests/kafkatest/tests/client/compression_test.py
index 165e11add19..2085d9b6259 100644
--- a/tests/kafkatest/tests/client/compression_test.py
+++ b/tests/kafkatest/tests/client/compression_test.py
@@ -54,7 +54,7 @@ def min_cluster_size(self):
return super(CompressionTest, self).min_cluster_size() +
self.num_producers + self.num_consumers
@cluster(num_nodes=7)
- @parametrize(compression_types=["snappy","gzip","lz4","none"])
+ @parametrize(compression_types=["snappy","gzip","lz4","zstd","none"])
def test_compressed_topic(self, compression_types):
"""Test produce => consume => validate for compressed topics
Setup: 1 zk, 1 kafka node, 1 topic with partitions=10,
replication-factor=1
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Add Codec for ZStandard Compression
> -----------------------------------
>
> Key: KAFKA-4514
> URL: https://issues.apache.org/jira/browse/KAFKA-4514
> Project: Kafka
> Issue Type: Improvement
> Components: compression
> Reporter: Thomas Graves
> Assignee: Lee Dongjin
> Priority: Major
>
> ZStandard: https://github.com/facebook/zstd and
> http://facebook.github.io/zstd/ has been in use for a while now. v1.0 was
> recently released. Hadoop
> (https://issues.apache.org/jira/browse/HADOOP-13578) and others are adopting
> it.
> We have done some initial trials and seen good results. Zstd seems to give
> great results => Gzip level Compression with Lz4 level CPU.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)