This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new cbf2a53  HDDS-2721. Allow wrapping list of ByteBuffers with 
ChunkBuffer (#378)
cbf2a53 is described below

commit cbf2a53ddaf08541e3bb6a5ead0892d2a3a5f16e
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Tue Jan 7 12:08:12 2020 +0100

    HDDS-2721. Allow wrapping list of ByteBuffers with ChunkBuffer (#378)
---
 .../apache/hadoop/ozone/common/ChunkBuffer.java    |   8 +
 .../common/ChunkBufferImplWithByteBuffer.java      |   7 +
 .../common/ChunkBufferImplWithByteBufferList.java  | 215 +++++++++++++++++++++
 .../ozone/common/IncrementalChunkBuffer.java       |   5 +
 .../hadoop/ozone/common/TestChunkBuffer.java       |  61 +++++-
 .../TestChunkBufferImplWithByteBufferList.java     |  80 ++++++++
 .../ozone/container/keyvalue/KeyValueHandler.java  |   5 +-
 7 files changed, 377 insertions(+), 4 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
index 618472b..b7ba6d6 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
@@ -23,6 +23,7 @@ import 
org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.GatheringByteChannel;
+import java.util.List;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
@@ -54,6 +55,11 @@ public interface ChunkBuffer {
     return new ChunkBufferImplWithByteBuffer(buffer);
   }
 
+  /** Wrap the given list of {@link ByteBuffer}s as a {@link ChunkBuffer}. */
+  static ChunkBuffer wrap(List<ByteBuffer> buffers) {
+    return new ChunkBufferImplWithByteBufferList(buffers);
+  }
+
   /** Similar to {@link ByteBuffer#position()}. */
   int position();
 
@@ -110,6 +116,8 @@ public interface ChunkBuffer {
    */
   Iterable<ByteBuffer> iterate(int bufferSize);
 
+  List<ByteBuffer> asByteBufferList();
+
   /**
    * Write the contents of the buffer from the current position to the limit
    * to {@code channel}.
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
index 4c1d748..299cab8 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
@@ -22,7 +22,9 @@ import 
org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.GatheringByteChannel;
+import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.function.Function;
@@ -80,6 +82,11 @@ final class ChunkBufferImplWithByteBuffer implements 
ChunkBuffer {
   }
 
   @Override
+  public List<ByteBuffer> asByteBufferList() {
+    return Collections.singletonList(buffer);
+  }
+
+  @Override
   public long writeTo(GatheringByteChannel channel) throws IOException {
     return channel.write(buffer);
   }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
new file mode 100644
index 0000000..e9949cc
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.common;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * {@link ChunkBuffer} implementation using a list of {@link ByteBuffer}s.
+ * Not thread-safe.
+ */
+public class ChunkBufferImplWithByteBufferList implements ChunkBuffer {
+
+  private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
+
+  /** Buffer list backing the ChunkBuffer. */
+  private final List<ByteBuffer> buffers;
+  private final int limit;
+
+  private int limitPrecedingCurrent;
+  private int currentIndex;
+
+  ChunkBufferImplWithByteBufferList(List<ByteBuffer> buffers) {
+    Preconditions.checkArgument(buffers != null, "buffer == null");
+
+    this.buffers = !buffers.isEmpty() ? ImmutableList.copyOf(buffers) :
+        ImmutableList.of(EMPTY_BUFFER);
+    this.limit = buffers.stream().mapToInt(ByteBuffer::limit).sum();
+
+    findCurrent();
+  }
+
+  private void findCurrent() {
+    boolean found = false;
+    for (int i = 0; i < buffers.size(); i++) {
+      final ByteBuffer buf = buffers.get(i);
+      final int pos = buf.position();
+      if (found) {
+        Preconditions.checkArgument(pos == 0,
+            "all buffers after current one should have position=0");
+      } else if (pos < buf.limit()) {
+        found = true;
+        currentIndex = i;
+      } else {
+        limitPrecedingCurrent += buf.limit();
+      }
+    }
+    if (!found) {
+      currentIndex = buffers.size() - 1;
+      limitPrecedingCurrent -= current().limit();
+    }
+  }
+
+  private ByteBuffer current() {
+    return buffers.get(currentIndex);
+  }
+
+  private void advanceCurrent() {
+    if (currentIndex < buffers.size() - 1) {
+      final ByteBuffer current = buffers.get(currentIndex);
+      if (!current.hasRemaining()) {
+        currentIndex++;
+        limitPrecedingCurrent += current.limit();
+      }
+    }
+  }
+
+  private void rewindCurrent() {
+    currentIndex = 0;
+    limitPrecedingCurrent = 0;
+  }
+
+  @Override
+  public int position() {
+    return limitPrecedingCurrent + current().position();
+  }
+
+  @Override
+  public int remaining() {
+    return limit - position();
+  }
+
+  @Override
+  public int limit() {
+    return limit;
+  }
+
+  @Override
+  public ChunkBuffer rewind() {
+    buffers.forEach(ByteBuffer::rewind);
+    rewindCurrent();
+    return this;
+  }
+
+  @Override
+  public ChunkBuffer clear() {
+    buffers.forEach(ByteBuffer::clear);
+    rewindCurrent();
+    return this;
+  }
+
+  @Override
+  public ChunkBuffer put(ByteBuffer that) {
+    final int thisRemaining = remaining();
+    int thatRemaining = that.remaining();
+    if (thatRemaining > thisRemaining) {
+      final BufferOverflowException boe = new BufferOverflowException();
+      boe.initCause(new IllegalArgumentException(
+          "Failed to put since that.remaining() = " + thatRemaining
+              + " > this.remaining() = " + thisRemaining));
+      throw boe;
+    }
+
+    while (thatRemaining > 0) {
+      final ByteBuffer b = current();
+      final int bytes = Math.min(b.remaining(), thatRemaining);
+      that.limit(that.position() + bytes);
+      b.put(that);
+      thatRemaining -= bytes;
+      advanceCurrent();
+    }
+
+    return this;
+  }
+
+  @Override
+  public ChunkBuffer duplicate(int newPosition, int newLimit) {
+    Preconditions.checkArgument(newPosition >= 0);
+    Preconditions.checkArgument(newPosition <= newLimit);
+    Preconditions.checkArgument(newLimit <= limit());
+
+    final List<ByteBuffer> duplicates = new ArrayList<>(buffers.size());
+    int min = 0;
+    for (final ByteBuffer buf : buffers) {
+      final int max = min + buf.limit();
+      final int pos = relativeToRange(newPosition, min, max);
+      final int lim = relativeToRange(newLimit, min, max);
+
+      final ByteBuffer duplicate = buf.duplicate();
+      duplicate.position(pos).limit(lim);
+      duplicates.add(duplicate);
+
+      min = max;
+    }
+
+    return new ChunkBufferImplWithByteBufferList(duplicates);
+  }
+
+  @Override
+  public Iterable<ByteBuffer> iterate(int bufferSize) {
+    // currently not necessary; implement if needed
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<ByteBuffer> asByteBufferList() {
+    return buffers;
+  }
+
+  @Override
+  public long writeTo(GatheringByteChannel channel) throws IOException {
+    long bytes = channel.write(buffers.toArray(new ByteBuffer[0]));
+    findCurrent();
+    return bytes;
+  }
+
+  @Override
+  public ByteString toByteStringImpl(Function<ByteBuffer, ByteString> f) {
+    return buffers.stream().map(f).reduce(ByteString.EMPTY, 
ByteString::concat);
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName()
+        + ":n=" + buffers.size()
+        + ":p=" + position()
+        + ":l=" + limit();
+  }
+
+  private static int relativeToRange(int value, int min, int max) {
+    final int pos;
+    if (value <= min) {
+      pos = 0;
+    } else if (value <= max) {
+      pos = value - min;
+    } else {
+      pos = max - min;
+    }
+    return pos;
+  }
+}
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
index 63e6635..e94e205 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
@@ -249,6 +249,11 @@ final class IncrementalChunkBuffer implements ChunkBuffer {
           "Buffer size and increment mismatched: bufferSize = " + bufferSize
           + " but increment = " + increment);
     }
+    return asByteBufferList();
+  }
+
+  @Override
+  public List<ByteBuffer> asByteBufferList() {
     return Collections.unmodifiableList(buffers);
   }
 
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java
index 1a4429e..9ca735c 100644
--- 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java
@@ -27,7 +27,9 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ThreadLocalRandom;
 
@@ -72,6 +74,39 @@ public class TestChunkBuffer {
     runTestImpl(expected, increment, ChunkBuffer.allocate(n, increment));
   }
 
+  @Test(timeout = 1_000)
+  public void testImplWithList() {
+    runTestImplWithList(4, 8);
+    runTestImplWithList(16, 1 << 10);
+    for(int i = 0; i < 10; i++) {
+      final int a = ThreadLocalRandom.current().nextInt(10) + 1;
+      final int b = ThreadLocalRandom.current().nextInt(100) + 1;
+      runTestImplWithList(Math.min(a, b), Math.max(a, b));
+    }
+  }
+
+  private static void runTestImplWithList(int count, int n) {
+    final byte[] expected = new byte[n];
+    ThreadLocalRandom.current().nextBytes(expected);
+
+    final int avg = n / count;
+    final List<ByteBuffer> buffers = new ArrayList<>(count);
+
+    int offset = 0;
+    for (int i = 0; i < count - 1; i++) {
+      final int length = ThreadLocalRandom.current().nextInt(avg) + 1;
+      buffers.add(ByteBuffer.allocate(length));
+      offset += length;
+    }
+
+    if (n > offset) {
+      buffers.add(ByteBuffer.allocate(n - offset));
+    }
+
+    ChunkBuffer impl = ChunkBuffer.wrap(buffers);
+    runTestImpl(expected, -1, impl);
+  }
+
   private static void runTestImpl(byte[] expected, int bpc, ChunkBuffer impl) {
     final int n = expected.length;
     System.out.println("n=" + n + ", impl=" + impl);
@@ -92,7 +127,7 @@ public class TestChunkBuffer {
     // test iterate
     if (bpc > 0) {
       assertIterate(expected, impl, bpc);
-    } else {
+    } else if (bpc == 0) {
       for (int d = 1; d < 5; d++) {
         final int bytesPerChecksum = n/d;
         if (bytesPerChecksum > 0) {
@@ -158,7 +193,7 @@ public class TestChunkBuffer {
     });
     Assert.assertEquals(offset, duplicated.position());
     Assert.assertEquals(length, duplicated.remaining());
-    Assert.assertEquals("offset=" + offset + ", length=" + length,
+    assertEquals("offset=" + offset + ", length=" + length,
         ByteString.copyFrom(expected, offset, length), computed);
   }
 
@@ -175,5 +210,27 @@ public class TestChunkBuffer {
     }
 
     Assert.assertArrayEquals(expected, output.toByteArray());
+    Assert.assertFalse(impl.hasRemaining());
+  }
+
+  private static void assertEquals(String message,
+      ByteString expected, ByteString actual) {
+    Assert.assertEquals(message,
+        toString(expected.toByteArray()),
+        toString(actual.toByteArray()));
+  }
+
+  private static String toString(byte[] arr) {
+    if (arr == null || arr.length == 0) {
+      return "";
+    }
+
+    StringBuilder sb = new StringBuilder();
+    for (byte b : arr) {
+      sb.append(Character.forDigit((b >> 4) & 0xF, 16))
+          .append(Character.forDigit((b & 0xF), 16))
+          .append(" ");
+    }
+    return sb.deleteCharAt(sb.length() - 1).toString();
   }
 }
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBufferImplWithByteBufferList.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBufferImplWithByteBufferList.java
new file mode 100644
index 0000000..b867fbe
--- /dev/null
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBufferImplWithByteBufferList.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.common;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Test;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Tests for {@link ChunkBufferImplWithByteBufferList}.
+ */
+public class TestChunkBufferImplWithByteBufferList {
+
+  @Test
+  public void rejectsNullList() {
+    List<ByteBuffer> list = null;
+    assertThrows(IllegalArgumentException.class, () -> ChunkBuffer.wrap(list));
+  }
+
+  @Test
+  public void acceptsEmptyList() {
+    ChunkBuffer subject = ChunkBuffer.wrap(ImmutableList.of());
+    assertEmpty(subject);
+    assertEmpty(subject.duplicate(0, 0));
+    assertThrows(BufferOverflowException.class,
+        () -> subject.put(ByteBuffer.allocate(1)));
+  }
+
+  @Test
+  public void rejectsMultipleCurrentBuffers() {
+    ByteBuffer b1 = allocate();
+    ByteBuffer b2 = allocate();
+    ByteBuffer b3 = allocate();
+    List<ByteBuffer> list = ImmutableList.of(b1, b2, b3);
+
+    // buffer with remaining immediately followed by non-zero pos
+    b1.position(b1.limit() - 1);
+    b2.position(1);
+    assertThrows(IllegalArgumentException.class, () -> ChunkBuffer.wrap(list));
+
+    // buffer with remaining followed by non-zero pos
+    b1.position(b1.limit() - 1);
+    b2.position(0);
+    b3.position(1);
+    assertThrows(IllegalArgumentException.class, () -> ChunkBuffer.wrap(list));
+  }
+
+  private static void assertEmpty(ChunkBuffer subject) {
+    assertEquals(0, subject.position());
+    assertEquals(0, subject.remaining());
+    assertEquals(0, subject.limit());
+  }
+
+  private static ByteBuffer allocate() {
+    return ByteBuffer.allocate(ThreadLocalRandom.current().nextInt(10, 20));
+  }
+
+}
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index f0ab56e..61a3be1 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -694,7 +694,8 @@ public class KeyValueHandler extends Handler {
       WriteChunkStage stage = dispatcherContext.getStage();
       if (stage == WriteChunkStage.WRITE_DATA ||
           stage == WriteChunkStage.COMBINED) {
-        data = ChunkBuffer.wrap(writeChunk.getData().asReadOnlyByteBuffer());
+        data =
+            ChunkBuffer.wrap(writeChunk.getData().asReadOnlyByteBufferList());
       }
 
       chunkManager
@@ -748,7 +749,7 @@ public class KeyValueHandler extends Handler {
       Preconditions.checkNotNull(chunkInfo);
 
       ChunkBuffer data = ChunkBuffer.wrap(
-          putSmallFileReq.getData().asReadOnlyByteBuffer());
+          putSmallFileReq.getData().asReadOnlyByteBufferList());
       if (dispatcherContext == null) {
         dispatcherContext = new DispatcherContext.Builder().build();
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to