This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 51dbd175b08 KAFKA-4852: Fix ByteBufferSerializer#serialize(String, ByteBuffer) not compatible with offsets (#12683) 51dbd175b08 is described below commit 51dbd175b08e78aeca03d6752847aa5f23c98659 Author: LinShunKang <linshunkang....@gmail.com> AuthorDate: Fri Sep 30 01:59:47 2022 +0800 KAFKA-4852: Fix ByteBufferSerializer#serialize(String, ByteBuffer) not compatible with offsets (#12683) Reviewers: Guozhang Wang <wangg...@gmail.com> --- .../common/serialization/ByteBufferSerializer.java | 31 ++++++++++++++++------ .../common/serialization/SerializationTest.java | 19 +++++++++++++ 2 files changed, 42 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java index 9fb12544e0f..5987688759e 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java @@ -16,25 +16,40 @@ */ package org.apache.kafka.common.serialization; +import org.apache.kafka.common.utils.Utils; + import java.nio.ByteBuffer; +/** + * ByteBufferSerializer will not change ByteBuffer's mark, position and limit. + * And do not need to flip before call <i>serialize(String, ByteBuffer)</i>. For example: + * + * <blockquote> + * <pre> + * ByteBufferSerializer serializer = ...; // Create Serializer + * ByteBuffer buffer = ...; // Allocate ByteBuffer + * buffer.put(data); // Put data into buffer, do not need to flip + * serializer.serialize(topic, buffer); // Serialize buffer + * </pre> + * </blockquote> + */ public class ByteBufferSerializer implements Serializer<ByteBuffer> { + + @Override public byte[] serialize(String topic, ByteBuffer data) { - if (data == null) + if (data == null) { return null; - - data.rewind(); + } if (data.hasArray()) { - byte[] arr = data.array(); + final byte[] arr = data.array(); if (data.arrayOffset() == 0 && arr.length == data.remaining()) { return arr; } } - byte[] ret = new byte[data.remaining()]; - data.get(ret, 0, ret.length); - data.rewind(); - return ret; + final ByteBuffer copyData = data.asReadOnlyBuffer(); + copyData.flip(); + return Utils.toArray(copyData); } } diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java index 85c09dd17ae..eb1fee3943f 100644 --- a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java +++ b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java @@ -31,6 +31,8 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.Stack; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -368,4 +370,21 @@ public class SerializationTest { return Serdes.serdeFrom(serializer, deserializer); } + + @Test + public void testByteBufferSerializer() { + final byte[] bytes = "Hello".getBytes(UTF_8); + final ByteBuffer heapBuffer0 = ByteBuffer.allocate(bytes.length + 1).put(bytes); + final ByteBuffer heapBuffer1 = ByteBuffer.allocate(bytes.length).put(bytes); + final ByteBuffer heapBuffer2 = ByteBuffer.wrap(bytes); + final ByteBuffer directBuffer0 = ByteBuffer.allocateDirect(bytes.length + 1).put(bytes); + final ByteBuffer directBuffer1 = ByteBuffer.allocateDirect(bytes.length).put(bytes); + try (final ByteBufferSerializer serializer = new ByteBufferSerializer()) { + assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer0)); + assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer1)); + assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer2)); + assertArrayEquals(bytes, serializer.serialize(topic, directBuffer0)); + assertArrayEquals(bytes, serializer.serialize(topic, directBuffer1)); + } + } }