This is an automated email from the ASF dual-hosted git repository. jorgebg pushed a commit to branch TINKERPOP-2305 in repository https://gitbox.apache.org/repos/asf/tinkerpop.git
The following commit(s) were added to refs/heads/TINKERPOP-2305 by this push: new 5c2cb9f TINKERPOP-2035 Add utility methods to read/write from Netty buffer 5c2cb9f is described below commit 5c2cb9fb8c6fdba228da7b0c27e3063e6ba06862 Author: Jorge Bay Gondra <jorgebaygon...@gmail.com> AuthorDate: Mon Oct 14 12:36:55 2019 +0200 TINKERPOP-2035 Add utility methods to read/write from Netty buffer --- .../tinkerpop/gremlin/structure/io/Buffer.java | 17 +- .../tinkerpop/gremlin/driver/ser/NettyBuffer.java | 22 ++ .../gremlin/driver/ser/NettyBufferFactory.java | 73 ++++ .../gremlin/driver/ser/NettyBufferFactoryTest.java | 385 +++++++++++++++++++++ 4 files changed, 496 insertions(+), 1 deletion(-) diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Buffer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Buffer.java index 4d37bba..e370ed7 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Buffer.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Buffer.java @@ -215,11 +215,21 @@ public interface Buffer { int referenceCount(); /** - * Exposes this buffer's readable bytes as an NIO ByteBuffer's. + * Returns the maximum number of NIO {@link ByteBuffer}s that consist this buffer. + */ + int nioBufferCount(); + + /** + * Exposes this buffer's readable bytes as NIO ByteBuffer's instances. */ ByteBuffer[] nioBuffers(); /** + * Exposes this buffer's readable bytes as NIO ByteBuffer's instances. + */ + ByteBuffer[] nioBuffers(int index, int length); + + /** * Exposes this buffer's readable bytes as a NIO {@link ByteBuffer}. The returned buffer * either share or contains the copied content of this buffer, while changing the position * and limit of the returned NIO buffer does not affect the indexes and marks of this buffer. @@ -227,6 +237,11 @@ public interface Buffer { ByteBuffer nioBuffer(); /** + * Exposes this buffer's sub-region as an NIO {@link ByteBuffer}. + */ + ByteBuffer nioBuffer(int index, int length); + + /** * Transfers this buffer's data to the specified destination starting at * the specified absolute {@code index}. * This method does not modify reader or writer indexes. diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/NettyBuffer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/NettyBuffer.java index f9de6c6..e194f1f 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/NettyBuffer.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/NettyBuffer.java @@ -238,8 +238,30 @@ final class NettyBuffer implements Buffer { } @Override + public ByteBuffer nioBuffer(int index, int length) { + return this.buffer.nioBuffer(index, length); + } + + @Override + public ByteBuffer[] nioBuffers(int index, int length) { + return this.buffer.nioBuffers(index, length); + } + + @Override + public int nioBufferCount() { + return this.buffer.nioBufferCount(); + } + + @Override public Buffer getBytes(int index, byte[] dst) { this.buffer.getBytes(index, dst); return this; } + + /** + * Returns the underlying buffer. + */ + public ByteBuf getUnderlyingBuffer() { + return this.buffer; + } } diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/NettyBufferFactory.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/NettyBufferFactory.java index b1bfe1f..2be524c 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/NettyBufferFactory.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/NettyBufferFactory.java @@ -24,6 +24,7 @@ import org.apache.tinkerpop.gremlin.structure.io.Buffer; import org.apache.tinkerpop.gremlin.structure.io.BufferFactory; import java.nio.ByteBuffer; +import java.util.function.Consumer; /** * Represents a factory to create {@link Buffer} instances from wrapped {@link ByteBuf} instances. @@ -38,4 +39,76 @@ public class NettyBufferFactory implements BufferFactory<ByteBuf> { public Buffer wrap(ByteBuffer value) { return create(Unpooled.wrappedBuffer(value)); } + + private static ByteBuf getFromIndex(Buffer buffer, int index) { + if (buffer.nioBufferCount() == 1) { + // Heap and direct buffers usually take a single buffer + // It will create a new ByteBuf using the same backing byte array + return Unpooled.wrappedBuffer(buffer.nioBuffer(index, buffer.capacity() - index)); + } + + // Use a wrapper or composite buffer + return Unpooled.wrappedBuffer(buffer.nioBuffers(index, buffer.capacity() - index)); + } + + /** + * Utility method to allow reading from the underlying bytes using a Netty {@link ByteBuf} instance for + * interoperability, advancing the reader index of the {@link Buffer} after the consumer is called. + * + * Note that the {@link ByteBuf} used by the consumer should not be released by the caller. + * In case the provided {@link Buffer} instance is not a {@link NettyBuffer}, it will create a {@link ByteBuf} + * wrapper for the consumer to use, releasing it after use. + */ + public static void readRaw(Buffer buffer, Consumer<ByteBuf> consumer) { + if (buffer instanceof NettyBuffer) { + consumer.accept(((NettyBuffer)buffer).getUnderlyingBuffer()); + return; + } + + // Create a new ByteBuf as a wrapper + final int initialIndex = buffer.readerIndex(); + final ByteBuf newBuffer = getFromIndex(buffer, initialIndex); + + try { + // Invoke the consumer to read from the ByteBuf + consumer.accept(newBuffer); + + // Advance the reader index of the Buffer implementation + buffer.readerIndex(initialIndex + newBuffer.readerIndex()); + } finally { + newBuffer.release(); + } + } + + /** + * Allows writing from the underlying bytes using a Netty {@link ByteBuf} instance for interoperability, + * advancing the writer index of the {@link Buffer} after the consumer is called. + * + * Note that the {@link ByteBuf} used by the consumer should not be released by the caller. + * In case the provided {@link Buffer} instance is not a {@link NettyBuffer}, it will create a {@link ByteBuf} + * wrapper for the consumer to use, releasing it after use. + */ + public static void writeRaw(Buffer buffer, Consumer<ByteBuf> consumer) { + if (buffer instanceof NettyBuffer) { + consumer.accept(((NettyBuffer)buffer).getUnderlyingBuffer()); + return; + } + + // Create a new ByteBuf as a wrapper + final int initialIndex = buffer.writerIndex(); + final ByteBuf newBuffer = getFromIndex(buffer, initialIndex); + + // Set writer index to 0 + newBuffer.writerIndex(0); + + try { + // Invoke the consumer to read from the ByteBuf + consumer.accept(newBuffer); + + // Advance the reader index of the Buffer implementation + buffer.writerIndex(initialIndex + newBuffer.writerIndex()); + } finally { + newBuffer.release(); + } + } } diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ser/NettyBufferFactoryTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ser/NettyBufferFactoryTest.java new file mode 100644 index 0000000..223b289 --- /dev/null +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ser/NettyBufferFactoryTest.java @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.driver.ser; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.util.ReferenceCounted; +import org.apache.tinkerpop.gremlin.structure.io.Buffer; +import org.junit.AfterClass; +import org.junit.Test; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +public class NettyBufferFactoryTest { + public static final NettyBufferFactory factory = new NettyBufferFactory(); + private static final ByteBufAllocator allocator = new UnpooledByteBufAllocator(false); + private static final List<ByteBuf> rawInstances = new ArrayList<>(); + + @AfterClass + public static void tearDown() { + rawInstances.forEach(ReferenceCounted::release); + } + + private static ByteBuf getRaw() { + final ByteBuf raw = allocator.buffer(256); + rawInstances.add(raw); + return raw; + } + + @Test + public void shouldReturnAWrapper() { + final ByteBuf raw = getRaw(); + final Buffer buffer = factory.create(raw); + assertEquals(raw.refCnt(), buffer.referenceCount()); + } + + @Test + public void shouldAdvanceWriterAndReaderIndex() { + final ByteBuf raw = getRaw(); + final Buffer buffer = factory.create(raw); + + final int intValue = 100; + final long longValue = 2019L; + final float floatValue = 10.9F; + + assertEquals(0, buffer.writerIndex()); + assertEquals(0, buffer.readerIndex()); + + buffer.writeBoolean(true); + buffer.writeInt(intValue); + buffer.writeLong(longValue); + buffer.writeFloat(floatValue); + + assertEquals(17, buffer.writerIndex()); + assertEquals(0, buffer.readerIndex()); + + assertTrue(buffer.readBoolean()); + assertEquals(intValue, buffer.readInt()); + assertEquals(longValue, buffer.readLong()); + assertEquals(floatValue, buffer.readFloat(), 0f); + + assertEquals(17, buffer.writerIndex()); + assertEquals(17, buffer.readerIndex()); + } + + @Test + public void readRawShouldAdvanceReaderIndexAndReleaseIt() { + final int intValue = 5; + final FakeBuffer fakeBuffer = new FakeBuffer(); + fakeBuffer.writeInt(intValue); + assertEquals(4, fakeBuffer.writerIndex()); + assertEquals(0, fakeBuffer.readerIndex()); + final ByteBuf[] captured = new ByteBuf[1]; + + NettyBufferFactory.readRaw(fakeBuffer, byteBuf -> { + assertEquals(intValue, byteBuf.readInt()); + assertNotSame(byteBuf, fakeBuffer.getUnderlyingRaw()); + assertEquals(1, byteBuf.refCnt()); + captured[0] = byteBuf; + }); + + assertEquals(4, fakeBuffer.writerIndex()); + + // The reader index advanced + assertEquals(4, fakeBuffer.readerIndex()); + + // Should be released afterwards + assertEquals(0, captured[0].refCnt()); + } + + @Test + public void writeRawShouldAdvanceWriterIndexAndReleaseIt() { + final int intValue1 = 314; + final int intValue2 = 314; + final FakeBuffer fakeBuffer = new FakeBuffer(); + + fakeBuffer.writeInt(intValue1); + assertEquals(4, fakeBuffer.writerIndex()); + assertEquals(0, fakeBuffer.readerIndex()); + final ByteBuf[] captured = new ByteBuf[1]; + + NettyBufferFactory.writeRaw(fakeBuffer, byteBuf -> { + byteBuf.writeInt(intValue2); + assertNotSame(byteBuf, fakeBuffer.getUnderlyingRaw()); + assertEquals(1, byteBuf.refCnt()); + captured[0] = byteBuf; + }); + + // The writer index advanced + assertEquals(8, fakeBuffer.writerIndex()); + assertEquals(0, fakeBuffer.readerIndex()); + + // Should have painted the underlying bytes + assertEquals(intValue1, fakeBuffer.readInt()); + assertEquals(intValue2, fakeBuffer.readInt()); + + // Should be released afterwards + assertEquals(0, captured[0].refCnt()); + } + + @Test + public void readRawShouldUseTheSameBufferWhenNettyBuffer() { + final NettyBuffer wrapperBuffer = new NettyBuffer(allocator.buffer()); + + NettyBufferFactory.readRaw(wrapperBuffer, byteBuf -> { + assertSame(byteBuf, wrapperBuffer.getUnderlyingBuffer()); + assertEquals(1, byteBuf.refCnt()); + }); + + // It shouldn't have released it + assertEquals(1, wrapperBuffer.referenceCount()); + } + + @Test + public void writeRawShouldUseTheSameBufferWhenNettyBuffer() { + final NettyBuffer wrapperBuffer = new NettyBuffer(allocator.buffer()); + + NettyBufferFactory.writeRaw(wrapperBuffer, byteBuf -> { + assertSame(byteBuf, wrapperBuffer.getUnderlyingBuffer()); + assertEquals(1, byteBuf.refCnt()); + }); + + // It shouldn't have released it + assertEquals(1, wrapperBuffer.referenceCount()); + } + + /** An incomplete implementation that allows testing */ + class FakeBuffer implements Buffer { + private final ByteBuf buffer = getRaw(); + + FakeBuffer() { + + } + + ByteBuf getUnderlyingRaw() { + return buffer; + } + + @Override + public int readableBytes() { + return buffer.readableBytes(); + } + + @Override + public int readerIndex() { + return buffer.readerIndex(); + } + + @Override + public Buffer readerIndex(int readerIndex) { + buffer.readerIndex(readerIndex); + return this; + } + + @Override + public int writerIndex() { + return buffer.writerIndex(); + } + + @Override + public Buffer writerIndex(int writerIndex) { + buffer.writerIndex(writerIndex); + return this; + } + + @Override + public Buffer markWriterIndex() { + return null; + } + + @Override + public Buffer resetWriterIndex() { + return null; + } + + @Override + public int capacity() { + return buffer.capacity(); + } + + @Override + public boolean isDirect() { + return false; + } + + @Override + public boolean readBoolean() { + return false; + } + + @Override + public byte readByte() { + return 0; + } + + @Override + public short readShort() { + return 0; + } + + @Override + public int readInt() { + return buffer.readInt(); + } + + @Override + public long readLong() { + return 0; + } + + @Override + public float readFloat() { + return 0; + } + + @Override + public double readDouble() { + return 0; + } + + @Override + public Buffer readBytes(byte[] destination) { + return null; + } + + @Override + public Buffer readBytes(byte[] destination, int dstIndex, int length) { + return null; + } + + @Override + public Buffer readBytes(ByteBuffer dst) { + return null; + } + + @Override + public Buffer readBytes(OutputStream out, int length) { + return null; + } + + @Override + public Buffer writeBoolean(boolean value) { + return null; + } + + @Override + public Buffer writeByte(int value) { + return null; + } + + @Override + public Buffer writeShort(int value) { + return null; + } + + @Override + public Buffer writeInt(int value) { + buffer.writeInt(value); + return this; + } + + @Override + public Buffer writeLong(long value) { + return null; + } + + @Override + public Buffer writeFloat(float value) { + return null; + } + + @Override + public Buffer writeDouble(double value) { + return null; + } + + @Override + public Buffer writeBytes(byte[] src) { + return null; + } + + @Override + public Buffer writeBytes(ByteBuffer src) { + return null; + } + + @Override + public Buffer writeBytes(byte[] src, int srcIndex, int length) { + return null; + } + + @Override + public boolean release() { + return buffer.release(); + } + + @Override + public Buffer retain() { + buffer.retain(); + return this; + } + + @Override + public int referenceCount() { + return buffer.refCnt(); + } + + @Override + public int nioBufferCount() { + return buffer.nioBufferCount(); + } + + @Override + public ByteBuffer[] nioBuffers() { + return buffer.nioBuffers(); + } + + @Override + public ByteBuffer[] nioBuffers(int index, int length) { + return buffer.nioBuffers(index, length); + } + + @Override + public ByteBuffer nioBuffer() { + return buffer.nioBuffer(); + } + + @Override + public ByteBuffer nioBuffer(int index, int length) { + return buffer.nioBuffer(index, length); + } + + @Override + public Buffer getBytes(int index, byte[] dst) { + return null; + } + } +}