This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push:
new cbf2a53 HDDS-2721. Allow wrapping list of ByteBuffers with
ChunkBuffer (#378)
cbf2a53 is described below
commit cbf2a53ddaf08541e3bb6a5ead0892d2a3a5f16e
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Tue Jan 7 12:08:12 2020 +0100
HDDS-2721. Allow wrapping list of ByteBuffers with ChunkBuffer (#378)
---
.../apache/hadoop/ozone/common/ChunkBuffer.java | 8 +
.../common/ChunkBufferImplWithByteBuffer.java | 7 +
.../common/ChunkBufferImplWithByteBufferList.java | 215 +++++++++++++++++++++
.../ozone/common/IncrementalChunkBuffer.java | 5 +
.../hadoop/ozone/common/TestChunkBuffer.java | 61 +++++-
.../TestChunkBufferImplWithByteBufferList.java | 80 ++++++++
.../ozone/container/keyvalue/KeyValueHandler.java | 5 +-
7 files changed, 377 insertions(+), 4 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
index 618472b..b7ba6d6 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
@@ -23,6 +23,7 @@ import
org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
+import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -54,6 +55,11 @@ public interface ChunkBuffer {
return new ChunkBufferImplWithByteBuffer(buffer);
}
+ /** Wrap the given list of {@link ByteBuffer}s as a {@link ChunkBuffer}. */
+ static ChunkBuffer wrap(List<ByteBuffer> buffers) {
+ return new ChunkBufferImplWithByteBufferList(buffers);
+ }
+
/** Similar to {@link ByteBuffer#position()}. */
int position();
@@ -110,6 +116,8 @@ public interface ChunkBuffer {
*/
Iterable<ByteBuffer> iterate(int bufferSize);
+ List<ByteBuffer> asByteBufferList();
+
/**
* Write the contents of the buffer from the current position to the limit
* to {@code channel}.
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
index 4c1d748..299cab8 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
@@ -22,7 +22,9 @@ import
org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
+import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.function.Function;
@@ -80,6 +82,11 @@ final class ChunkBufferImplWithByteBuffer implements
ChunkBuffer {
}
@Override
+ public List<ByteBuffer> asByteBufferList() {
+ return Collections.singletonList(buffer);
+ }
+
+ @Override
public long writeTo(GatheringByteChannel channel) throws IOException {
return channel.write(buffer);
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
new file mode 100644
index 0000000..e9949cc
--- /dev/null
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.common;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * {@link ChunkBuffer} implementation using a list of {@link ByteBuffer}s.
+ * Not thread-safe.
+ */
+public class ChunkBufferImplWithByteBufferList implements ChunkBuffer {
+
+ private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
+
+ /** Buffer list backing the ChunkBuffer. */
+ private final List<ByteBuffer> buffers;
+ private final int limit;
+
+ private int limitPrecedingCurrent;
+ private int currentIndex;
+
+ ChunkBufferImplWithByteBufferList(List<ByteBuffer> buffers) {
+ Preconditions.checkArgument(buffers != null, "buffer == null");
+
+ this.buffers = !buffers.isEmpty() ? ImmutableList.copyOf(buffers) :
+ ImmutableList.of(EMPTY_BUFFER);
+ this.limit = buffers.stream().mapToInt(ByteBuffer::limit).sum();
+
+ findCurrent();
+ }
+
+ private void findCurrent() {
+ boolean found = false;
+ for (int i = 0; i < buffers.size(); i++) {
+ final ByteBuffer buf = buffers.get(i);
+ final int pos = buf.position();
+ if (found) {
+ Preconditions.checkArgument(pos == 0,
+ "all buffers after current one should have position=0");
+ } else if (pos < buf.limit()) {
+ found = true;
+ currentIndex = i;
+ } else {
+ limitPrecedingCurrent += buf.limit();
+ }
+ }
+ if (!found) {
+ currentIndex = buffers.size() - 1;
+ limitPrecedingCurrent -= current().limit();
+ }
+ }
+
+ private ByteBuffer current() {
+ return buffers.get(currentIndex);
+ }
+
+ private void advanceCurrent() {
+ if (currentIndex < buffers.size() - 1) {
+ final ByteBuffer current = buffers.get(currentIndex);
+ if (!current.hasRemaining()) {
+ currentIndex++;
+ limitPrecedingCurrent += current.limit();
+ }
+ }
+ }
+
+ private void rewindCurrent() {
+ currentIndex = 0;
+ limitPrecedingCurrent = 0;
+ }
+
+ @Override
+ public int position() {
+ return limitPrecedingCurrent + current().position();
+ }
+
+ @Override
+ public int remaining() {
+ return limit - position();
+ }
+
+ @Override
+ public int limit() {
+ return limit;
+ }
+
+ @Override
+ public ChunkBuffer rewind() {
+ buffers.forEach(ByteBuffer::rewind);
+ rewindCurrent();
+ return this;
+ }
+
+ @Override
+ public ChunkBuffer clear() {
+ buffers.forEach(ByteBuffer::clear);
+ rewindCurrent();
+ return this;
+ }
+
+ @Override
+ public ChunkBuffer put(ByteBuffer that) {
+ final int thisRemaining = remaining();
+ int thatRemaining = that.remaining();
+ if (thatRemaining > thisRemaining) {
+ final BufferOverflowException boe = new BufferOverflowException();
+ boe.initCause(new IllegalArgumentException(
+ "Failed to put since that.remaining() = " + thatRemaining
+ + " > this.remaining() = " + thisRemaining));
+ throw boe;
+ }
+
+ while (thatRemaining > 0) {
+ final ByteBuffer b = current();
+ final int bytes = Math.min(b.remaining(), thatRemaining);
+ that.limit(that.position() + bytes);
+ b.put(that);
+ thatRemaining -= bytes;
+ advanceCurrent();
+ }
+
+ return this;
+ }
+
+ @Override
+ public ChunkBuffer duplicate(int newPosition, int newLimit) {
+ Preconditions.checkArgument(newPosition >= 0);
+ Preconditions.checkArgument(newPosition <= newLimit);
+ Preconditions.checkArgument(newLimit <= limit());
+
+ final List<ByteBuffer> duplicates = new ArrayList<>(buffers.size());
+ int min = 0;
+ for (final ByteBuffer buf : buffers) {
+ final int max = min + buf.limit();
+ final int pos = relativeToRange(newPosition, min, max);
+ final int lim = relativeToRange(newLimit, min, max);
+
+ final ByteBuffer duplicate = buf.duplicate();
+ duplicate.position(pos).limit(lim);
+ duplicates.add(duplicate);
+
+ min = max;
+ }
+
+ return new ChunkBufferImplWithByteBufferList(duplicates);
+ }
+
+ @Override
+ public Iterable<ByteBuffer> iterate(int bufferSize) {
+ // currently not necessary; implement if needed
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<ByteBuffer> asByteBufferList() {
+ return buffers;
+ }
+
+ @Override
+ public long writeTo(GatheringByteChannel channel) throws IOException {
+ long bytes = channel.write(buffers.toArray(new ByteBuffer[0]));
+ findCurrent();
+ return bytes;
+ }
+
+ @Override
+ public ByteString toByteStringImpl(Function<ByteBuffer, ByteString> f) {
+ return buffers.stream().map(f).reduce(ByteString.EMPTY,
ByteString::concat);
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName()
+ + ":n=" + buffers.size()
+ + ":p=" + position()
+ + ":l=" + limit();
+ }
+
+ private static int relativeToRange(int value, int min, int max) {
+ final int pos;
+ if (value <= min) {
+ pos = 0;
+ } else if (value <= max) {
+ pos = value - min;
+ } else {
+ pos = max - min;
+ }
+ return pos;
+ }
+}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
index 63e6635..e94e205 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/IncrementalChunkBuffer.java
@@ -249,6 +249,11 @@ final class IncrementalChunkBuffer implements ChunkBuffer {
"Buffer size and increment mismatched: bufferSize = " + bufferSize
+ " but increment = " + increment);
}
+ return asByteBufferList();
+ }
+
+ @Override
+ public List<ByteBuffer> asByteBufferList() {
return Collections.unmodifiableList(buffers);
}
diff --git
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java
index 1a4429e..9ca735c 100644
---
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java
+++
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java
@@ -27,7 +27,9 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.ThreadLocalRandom;
@@ -72,6 +74,39 @@ public class TestChunkBuffer {
runTestImpl(expected, increment, ChunkBuffer.allocate(n, increment));
}
+ @Test(timeout = 1_000)
+ public void testImplWithList() {
+ runTestImplWithList(4, 8);
+ runTestImplWithList(16, 1 << 10);
+ for(int i = 0; i < 10; i++) {
+ final int a = ThreadLocalRandom.current().nextInt(10) + 1;
+ final int b = ThreadLocalRandom.current().nextInt(100) + 1;
+ runTestImplWithList(Math.min(a, b), Math.max(a, b));
+ }
+ }
+
+ private static void runTestImplWithList(int count, int n) {
+ final byte[] expected = new byte[n];
+ ThreadLocalRandom.current().nextBytes(expected);
+
+ final int avg = n / count;
+ final List<ByteBuffer> buffers = new ArrayList<>(count);
+
+ int offset = 0;
+ for (int i = 0; i < count - 1; i++) {
+ final int length = ThreadLocalRandom.current().nextInt(avg) + 1;
+ buffers.add(ByteBuffer.allocate(length));
+ offset += length;
+ }
+
+ if (n > offset) {
+ buffers.add(ByteBuffer.allocate(n - offset));
+ }
+
+ ChunkBuffer impl = ChunkBuffer.wrap(buffers);
+ runTestImpl(expected, -1, impl);
+ }
+
private static void runTestImpl(byte[] expected, int bpc, ChunkBuffer impl) {
final int n = expected.length;
System.out.println("n=" + n + ", impl=" + impl);
@@ -92,7 +127,7 @@ public class TestChunkBuffer {
// test iterate
if (bpc > 0) {
assertIterate(expected, impl, bpc);
- } else {
+ } else if (bpc == 0) {
for (int d = 1; d < 5; d++) {
final int bytesPerChecksum = n/d;
if (bytesPerChecksum > 0) {
@@ -158,7 +193,7 @@ public class TestChunkBuffer {
});
Assert.assertEquals(offset, duplicated.position());
Assert.assertEquals(length, duplicated.remaining());
- Assert.assertEquals("offset=" + offset + ", length=" + length,
+ assertEquals("offset=" + offset + ", length=" + length,
ByteString.copyFrom(expected, offset, length), computed);
}
@@ -175,5 +210,27 @@ public class TestChunkBuffer {
}
Assert.assertArrayEquals(expected, output.toByteArray());
+ Assert.assertFalse(impl.hasRemaining());
+ }
+
+ private static void assertEquals(String message,
+ ByteString expected, ByteString actual) {
+ Assert.assertEquals(message,
+ toString(expected.toByteArray()),
+ toString(actual.toByteArray()));
+ }
+
+ private static String toString(byte[] arr) {
+ if (arr == null || arr.length == 0) {
+ return "";
+ }
+
+ StringBuilder sb = new StringBuilder();
+ for (byte b : arr) {
+ sb.append(Character.forDigit((b >> 4) & 0xF, 16))
+ .append(Character.forDigit((b & 0xF), 16))
+ .append(" ");
+ }
+ return sb.deleteCharAt(sb.length() - 1).toString();
}
}
diff --git
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBufferImplWithByteBufferList.java
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBufferImplWithByteBufferList.java
new file mode 100644
index 0000000..b867fbe
--- /dev/null
+++
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBufferImplWithByteBufferList.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.common;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Test;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Tests for {@link ChunkBufferImplWithByteBufferList}.
+ */
+public class TestChunkBufferImplWithByteBufferList {
+
+ @Test
+ public void rejectsNullList() {
+ List<ByteBuffer> list = null;
+ assertThrows(IllegalArgumentException.class, () -> ChunkBuffer.wrap(list));
+ }
+
+ @Test
+ public void acceptsEmptyList() {
+ ChunkBuffer subject = ChunkBuffer.wrap(ImmutableList.of());
+ assertEmpty(subject);
+ assertEmpty(subject.duplicate(0, 0));
+ assertThrows(BufferOverflowException.class,
+ () -> subject.put(ByteBuffer.allocate(1)));
+ }
+
+ @Test
+ public void rejectsMultipleCurrentBuffers() {
+ ByteBuffer b1 = allocate();
+ ByteBuffer b2 = allocate();
+ ByteBuffer b3 = allocate();
+ List<ByteBuffer> list = ImmutableList.of(b1, b2, b3);
+
+ // buffer with remaining immediately followed by non-zero pos
+ b1.position(b1.limit() - 1);
+ b2.position(1);
+ assertThrows(IllegalArgumentException.class, () -> ChunkBuffer.wrap(list));
+
+ // buffer with remaining followed by non-zero pos
+ b1.position(b1.limit() - 1);
+ b2.position(0);
+ b3.position(1);
+ assertThrows(IllegalArgumentException.class, () -> ChunkBuffer.wrap(list));
+ }
+
+ private static void assertEmpty(ChunkBuffer subject) {
+ assertEquals(0, subject.position());
+ assertEquals(0, subject.remaining());
+ assertEquals(0, subject.limit());
+ }
+
+ private static ByteBuffer allocate() {
+ return ByteBuffer.allocate(ThreadLocalRandom.current().nextInt(10, 20));
+ }
+
+}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index f0ab56e..61a3be1 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -694,7 +694,8 @@ public class KeyValueHandler extends Handler {
WriteChunkStage stage = dispatcherContext.getStage();
if (stage == WriteChunkStage.WRITE_DATA ||
stage == WriteChunkStage.COMBINED) {
- data = ChunkBuffer.wrap(writeChunk.getData().asReadOnlyByteBuffer());
+ data =
+ ChunkBuffer.wrap(writeChunk.getData().asReadOnlyByteBufferList());
}
chunkManager
@@ -748,7 +749,7 @@ public class KeyValueHandler extends Handler {
Preconditions.checkNotNull(chunkInfo);
ChunkBuffer data = ChunkBuffer.wrap(
- putSmallFileReq.getData().asReadOnlyByteBuffer());
+ putSmallFileReq.getData().asReadOnlyByteBufferList());
if (dispatcherContext == null) {
dispatcherContext = new DispatcherContext.Builder().build();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]