This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4d047114518 KAFKA-15602: revert KAFKA-4852 (#14617)
4d047114518 is described below

commit 4d047114518ea30519a31b6297f2bf906e59841e
Author: Matthias J. Sax <matth...@confluent.io>
AuthorDate: Mon Oct 30 13:14:15 2023 -0700

    KAFKA-15602: revert KAFKA-4852 (#14617)
    
    This PR reverts
     - 
https://github.com/apache/kafka/commit/51dbd175b08e78aeca03d6752847aa5f23c98659
     - 
https://github.com/apache/kafka/commit/496ae054c2d43c0905167745bfb2f4a0725e9fc2
    
    Reviewers:  Philip Nee <p...@confluent.io>, Guozhang Wang 
<guozh...@confluent.io>
---
 .../common/serialization/ByteBufferSerializer.java | 34 ++++++++++------------
 .../common/serialization/SerializationTest.java    | 17 -----------
 2 files changed, 15 insertions(+), 36 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java
 
b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java
index 06b66a62cb0..43b3d6e38a7 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/serialization/ByteBufferSerializer.java
@@ -16,38 +16,34 @@
  */
 package org.apache.kafka.common.serialization;
 
-import org.apache.kafka.common.utils.Utils;
-
 import java.nio.ByteBuffer;
 
 /**
- * Do not need to flip before call <i>serialize(String, ByteBuffer)</i>. For 
example:
- *
- * <blockquote>
- * <pre>
- * ByteBufferSerializer serializer = ...; // Create Serializer
- * ByteBuffer buffer = ...;               // Allocate ByteBuffer
- * buffer.put(data);                      // Put data into buffer, do not need 
to flip
- * serializer.serialize(topic, buffer);   // Serialize buffer
- * </pre>
- * </blockquote>
+ * {@code ByteBufferSerializer} always {@link ByteBuffer#rewind() rewinds} the 
position of the input buffer to zero for
+ * serialization. A manual rewind is not necessary.
+ * <p>
+ * Note: any existing buffer position is ignored.
+ * <p>
+ * The position is also rewound back to zero before {@link #serialize(String, 
ByteBuffer)}
+ * returns.
  */
 public class ByteBufferSerializer implements Serializer<ByteBuffer> {
-
-    @Override
     public byte[] serialize(String topic, ByteBuffer data) {
-        if (data == null) {
+        if (data == null)
             return null;
-        }
+
+        data.rewind();
 
         if (data.hasArray()) {
-            final byte[] arr = data.array();
+            byte[] arr = data.array();
             if (data.arrayOffset() == 0 && arr.length == data.remaining()) {
                 return arr;
             }
         }
 
-        data.flip();
-        return Utils.toArray(data);
+        byte[] ret = new byte[data.remaining()];
+        data.get(ret, 0, ret.length);
+        data.rewind();
+        return ret;
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
 
b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
index 2fd28037c81..b89768249cd 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java
@@ -406,23 +406,6 @@ public class SerializationTest {
         return Serdes.serdeFrom(serializer, deserializer);
     }
 
-    @Test
-    public void testByteBufferSerializer() {
-        final byte[] bytes = "Hello".getBytes(UTF_8);
-        final ByteBuffer heapBuffer0 = ByteBuffer.allocate(bytes.length + 
1).put(bytes);
-        final ByteBuffer heapBuffer1 = 
ByteBuffer.allocate(bytes.length).put(bytes);
-        final ByteBuffer heapBuffer2 = ByteBuffer.wrap(bytes);
-        final ByteBuffer directBuffer0 = 
ByteBuffer.allocateDirect(bytes.length + 1).put(bytes);
-        final ByteBuffer directBuffer1 = 
ByteBuffer.allocateDirect(bytes.length).put(bytes);
-        try (final ByteBufferSerializer serializer = new 
ByteBufferSerializer()) {
-            assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer0));
-            assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer1));
-            assertArrayEquals(bytes, serializer.serialize(topic, heapBuffer2));
-            assertArrayEquals(bytes, serializer.serialize(topic, 
directBuffer0));
-            assertArrayEquals(bytes, serializer.serialize(topic, 
directBuffer1));
-        }
-    }
-
     @ParameterizedTest
     @ValueSource(booleans = { true, false })
     public void testBooleanSerializer(Boolean dataToSerialize) {

Reply via email to