This is an automated email from the ASF dual-hosted git repository.

jorgebg pushed a commit to branch TINKERPOP-2305
in repository https://gitbox.apache.org/repos/asf/tinkerpop.git


The following commit(s) were added to refs/heads/TINKERPOP-2305 by this push:
     new 5c2cb9f  TINKERPOP-2035 Add utility methods to read/write from Netty 
buffer
5c2cb9f is described below

commit 5c2cb9fb8c6fdba228da7b0c27e3063e6ba06862
Author: Jorge Bay Gondra <jorgebaygon...@gmail.com>
AuthorDate: Mon Oct 14 12:36:55 2019 +0200

    TINKERPOP-2035 Add utility methods to read/write from Netty buffer
---
 .../tinkerpop/gremlin/structure/io/Buffer.java     |  17 +-
 .../tinkerpop/gremlin/driver/ser/NettyBuffer.java  |  22 ++
 .../gremlin/driver/ser/NettyBufferFactory.java     |  73 ++++
 .../gremlin/driver/ser/NettyBufferFactoryTest.java | 385 +++++++++++++++++++++
 4 files changed, 496 insertions(+), 1 deletion(-)

diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Buffer.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Buffer.java
index 4d37bba..e370ed7 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Buffer.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Buffer.java
@@ -215,11 +215,21 @@ public interface Buffer {
     int referenceCount();
 
     /**
-     * Exposes this buffer's readable bytes as an NIO ByteBuffer's.
+     * Returns the maximum number of NIO {@link ByteBuffer}s that consist this 
buffer.
+     */
+    int nioBufferCount();
+
+    /**
+     * Exposes this buffer's readable bytes as NIO ByteBuffer's instances.
      */
     ByteBuffer[] nioBuffers();
 
     /**
+     * Exposes this buffer's readable bytes as NIO ByteBuffer's instances.
+     */
+    ByteBuffer[] nioBuffers(int index, int length);
+
+    /**
      * Exposes this buffer's readable bytes as a NIO {@link ByteBuffer}. The 
returned buffer
      * either share or contains the copied content of this buffer, while 
changing the position
      * and limit of the returned NIO buffer does not affect the indexes and 
marks of this buffer.
@@ -227,6 +237,11 @@ public interface Buffer {
     ByteBuffer nioBuffer();
 
     /**
+     * Exposes this buffer's sub-region as an NIO {@link ByteBuffer}.
+     */
+    ByteBuffer nioBuffer(int index, int length);
+
+    /**
      * Transfers this buffer's data to the specified destination starting at
      * the specified absolute {@code index}.
      * This method does not modify reader or writer indexes.
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/NettyBuffer.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/NettyBuffer.java
index f9de6c6..e194f1f 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/NettyBuffer.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/NettyBuffer.java
@@ -238,8 +238,30 @@ final class NettyBuffer implements Buffer {
     }
 
     @Override
+    public ByteBuffer nioBuffer(int index, int length) {
+        return this.buffer.nioBuffer(index, length);
+    }
+
+    @Override
+    public ByteBuffer[] nioBuffers(int index, int length) {
+        return this.buffer.nioBuffers(index, length);
+    }
+
+    @Override
+    public int nioBufferCount() {
+        return this.buffer.nioBufferCount();
+    }
+
+    @Override
     public Buffer getBytes(int index, byte[] dst) {
         this.buffer.getBytes(index, dst);
         return this;
     }
+
+    /**
+     * Returns the underlying buffer.
+     */
+    public ByteBuf getUnderlyingBuffer() {
+        return this.buffer;
+    }
 }
diff --git 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/NettyBufferFactory.java
 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/NettyBufferFactory.java
index b1bfe1f..2be524c 100644
--- 
a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/NettyBufferFactory.java
+++ 
b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/ser/NettyBufferFactory.java
@@ -24,6 +24,7 @@ import org.apache.tinkerpop.gremlin.structure.io.Buffer;
 import org.apache.tinkerpop.gremlin.structure.io.BufferFactory;
 
 import java.nio.ByteBuffer;
+import java.util.function.Consumer;
 
 /**
  * Represents a factory to create {@link Buffer} instances from wrapped {@link 
ByteBuf} instances.
@@ -38,4 +39,76 @@ public class NettyBufferFactory implements 
BufferFactory<ByteBuf> {
     public Buffer wrap(ByteBuffer value) {
         return create(Unpooled.wrappedBuffer(value));
     }
+
+    private static ByteBuf getFromIndex(Buffer buffer, int index) {
+        if (buffer.nioBufferCount() == 1) {
+            // Heap and direct buffers usually take a single buffer
+            // It will create a new ByteBuf using the same backing byte array
+            return Unpooled.wrappedBuffer(buffer.nioBuffer(index, 
buffer.capacity() - index));
+        }
+
+        // Use a wrapper or composite buffer
+        return Unpooled.wrappedBuffer(buffer.nioBuffers(index, 
buffer.capacity() - index));
+    }
+
+    /**
+     * Utility method to allow reading from the underlying bytes using a Netty 
{@link ByteBuf} instance for
+     * interoperability, advancing the reader index of the {@link Buffer} 
after the consumer is called.
+     *
+     * Note that the {@link ByteBuf} used by the consumer should not be 
released by the caller.
+     * In case the provided {@link Buffer} instance is not a {@link 
NettyBuffer}, it will create a {@link ByteBuf}
+     * wrapper for the consumer to use, releasing it after use.
+     */
+    public static void readRaw(Buffer buffer, Consumer<ByteBuf> consumer) {
+        if (buffer instanceof NettyBuffer) {
+            consumer.accept(((NettyBuffer)buffer).getUnderlyingBuffer());
+            return;
+        }
+
+        // Create a new ByteBuf as a wrapper
+        final int initialIndex = buffer.readerIndex();
+        final ByteBuf newBuffer = getFromIndex(buffer, initialIndex);
+
+        try {
+            // Invoke the consumer to read from the ByteBuf
+            consumer.accept(newBuffer);
+
+            // Advance the reader index of the Buffer implementation
+            buffer.readerIndex(initialIndex + newBuffer.readerIndex());
+        } finally {
+            newBuffer.release();
+        }
+    }
+
+    /**
+     * Allows writing from the underlying bytes using a Netty {@link ByteBuf} 
instance for interoperability,
+     * advancing the writer index of the {@link Buffer} after the consumer is 
called.
+     *
+     * Note that the {@link ByteBuf} used by the consumer should not be 
released by the caller.
+     * In case the provided {@link Buffer} instance is not a {@link 
NettyBuffer}, it will create a {@link ByteBuf}
+     * wrapper for the consumer to use, releasing it after use.
+     */
+    public static void writeRaw(Buffer buffer, Consumer<ByteBuf> consumer) {
+        if (buffer instanceof NettyBuffer) {
+            consumer.accept(((NettyBuffer)buffer).getUnderlyingBuffer());
+            return;
+        }
+
+        // Create a new ByteBuf as a wrapper
+        final int initialIndex = buffer.writerIndex();
+        final ByteBuf newBuffer = getFromIndex(buffer, initialIndex);
+
+        // Set writer index to 0
+        newBuffer.writerIndex(0);
+
+        try {
+            // Invoke the consumer to read from the ByteBuf
+            consumer.accept(newBuffer);
+
+            // Advance the reader index of the Buffer implementation
+            buffer.writerIndex(initialIndex + newBuffer.writerIndex());
+        } finally {
+            newBuffer.release();
+        }
+    }
 }
diff --git 
a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ser/NettyBufferFactoryTest.java
 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ser/NettyBufferFactoryTest.java
new file mode 100644
index 0000000..223b289
--- /dev/null
+++ 
b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/ser/NettyBufferFactoryTest.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.driver.ser;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.util.ReferenceCounted;
+import org.apache.tinkerpop.gremlin.structure.io.Buffer;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+public class NettyBufferFactoryTest {
+    public static final NettyBufferFactory factory = new NettyBufferFactory();
+    private static final ByteBufAllocator allocator = new 
UnpooledByteBufAllocator(false);
+    private static final List<ByteBuf> rawInstances = new ArrayList<>();
+
+    @AfterClass
+    public static void tearDown() {
+        rawInstances.forEach(ReferenceCounted::release);
+    }
+
+    private static ByteBuf getRaw() {
+        final ByteBuf raw = allocator.buffer(256);
+        rawInstances.add(raw);
+        return raw;
+    }
+
+    @Test
+    public void shouldReturnAWrapper() {
+        final ByteBuf raw = getRaw();
+        final Buffer buffer = factory.create(raw);
+        assertEquals(raw.refCnt(), buffer.referenceCount());
+    }
+
+    @Test
+    public void shouldAdvanceWriterAndReaderIndex() {
+        final ByteBuf raw = getRaw();
+        final Buffer buffer = factory.create(raw);
+
+        final int intValue = 100;
+        final long longValue = 2019L;
+        final float floatValue = 10.9F;
+
+        assertEquals(0, buffer.writerIndex());
+        assertEquals(0, buffer.readerIndex());
+
+        buffer.writeBoolean(true);
+        buffer.writeInt(intValue);
+        buffer.writeLong(longValue);
+        buffer.writeFloat(floatValue);
+
+        assertEquals(17, buffer.writerIndex());
+        assertEquals(0, buffer.readerIndex());
+
+        assertTrue(buffer.readBoolean());
+        assertEquals(intValue, buffer.readInt());
+        assertEquals(longValue, buffer.readLong());
+        assertEquals(floatValue, buffer.readFloat(), 0f);
+
+        assertEquals(17, buffer.writerIndex());
+        assertEquals(17, buffer.readerIndex());
+    }
+
+    @Test
+    public void readRawShouldAdvanceReaderIndexAndReleaseIt() {
+        final int intValue = 5;
+        final FakeBuffer fakeBuffer = new FakeBuffer();
+        fakeBuffer.writeInt(intValue);
+        assertEquals(4, fakeBuffer.writerIndex());
+        assertEquals(0, fakeBuffer.readerIndex());
+        final ByteBuf[] captured = new ByteBuf[1];
+
+        NettyBufferFactory.readRaw(fakeBuffer, byteBuf -> {
+            assertEquals(intValue, byteBuf.readInt());
+            assertNotSame(byteBuf, fakeBuffer.getUnderlyingRaw());
+            assertEquals(1, byteBuf.refCnt());
+            captured[0] = byteBuf;
+        });
+
+        assertEquals(4, fakeBuffer.writerIndex());
+
+        // The reader index advanced
+        assertEquals(4, fakeBuffer.readerIndex());
+
+        // Should be released afterwards
+        assertEquals(0, captured[0].refCnt());
+    }
+
+    @Test
+    public void writeRawShouldAdvanceWriterIndexAndReleaseIt() {
+        final int intValue1 = 314;
+        final int intValue2 = 314;
+        final FakeBuffer fakeBuffer = new FakeBuffer();
+
+        fakeBuffer.writeInt(intValue1);
+        assertEquals(4, fakeBuffer.writerIndex());
+        assertEquals(0, fakeBuffer.readerIndex());
+        final ByteBuf[] captured = new ByteBuf[1];
+
+        NettyBufferFactory.writeRaw(fakeBuffer, byteBuf -> {
+            byteBuf.writeInt(intValue2);
+            assertNotSame(byteBuf, fakeBuffer.getUnderlyingRaw());
+            assertEquals(1, byteBuf.refCnt());
+            captured[0] = byteBuf;
+        });
+
+        // The writer index advanced
+        assertEquals(8, fakeBuffer.writerIndex());
+        assertEquals(0, fakeBuffer.readerIndex());
+
+        // Should have painted the underlying bytes
+        assertEquals(intValue1, fakeBuffer.readInt());
+        assertEquals(intValue2, fakeBuffer.readInt());
+
+        // Should be released afterwards
+        assertEquals(0, captured[0].refCnt());
+    }
+
+    @Test
+    public void readRawShouldUseTheSameBufferWhenNettyBuffer() {
+        final NettyBuffer wrapperBuffer = new NettyBuffer(allocator.buffer());
+
+        NettyBufferFactory.readRaw(wrapperBuffer, byteBuf -> {
+            assertSame(byteBuf, wrapperBuffer.getUnderlyingBuffer());
+            assertEquals(1, byteBuf.refCnt());
+        });
+
+        // It shouldn't have released it
+        assertEquals(1, wrapperBuffer.referenceCount());
+    }
+
+    @Test
+    public void writeRawShouldUseTheSameBufferWhenNettyBuffer() {
+        final NettyBuffer wrapperBuffer = new NettyBuffer(allocator.buffer());
+
+        NettyBufferFactory.writeRaw(wrapperBuffer, byteBuf -> {
+            assertSame(byteBuf, wrapperBuffer.getUnderlyingBuffer());
+            assertEquals(1, byteBuf.refCnt());
+        });
+
+        // It shouldn't have released it
+        assertEquals(1, wrapperBuffer.referenceCount());
+    }
+
+    /** An incomplete implementation that allows testing */
+    class FakeBuffer implements Buffer {
+        private final ByteBuf buffer = getRaw();
+
+        FakeBuffer() {
+
+        }
+
+        ByteBuf getUnderlyingRaw() {
+            return buffer;
+        }
+
+        @Override
+        public int readableBytes() {
+            return buffer.readableBytes();
+        }
+
+        @Override
+        public int readerIndex() {
+            return buffer.readerIndex();
+        }
+
+        @Override
+        public Buffer readerIndex(int readerIndex) {
+            buffer.readerIndex(readerIndex);
+            return this;
+        }
+
+        @Override
+        public int writerIndex() {
+            return buffer.writerIndex();
+        }
+
+        @Override
+        public Buffer writerIndex(int writerIndex) {
+            buffer.writerIndex(writerIndex);
+            return this;
+        }
+
+        @Override
+        public Buffer markWriterIndex() {
+            return null;
+        }
+
+        @Override
+        public Buffer resetWriterIndex() {
+            return null;
+        }
+
+        @Override
+        public int capacity() {
+            return buffer.capacity();
+        }
+
+        @Override
+        public boolean isDirect() {
+            return false;
+        }
+
+        @Override
+        public boolean readBoolean() {
+            return false;
+        }
+
+        @Override
+        public byte readByte() {
+            return 0;
+        }
+
+        @Override
+        public short readShort() {
+            return 0;
+        }
+
+        @Override
+        public int readInt() {
+            return buffer.readInt();
+        }
+
+        @Override
+        public long readLong() {
+            return 0;
+        }
+
+        @Override
+        public float readFloat() {
+            return 0;
+        }
+
+        @Override
+        public double readDouble() {
+            return 0;
+        }
+
+        @Override
+        public Buffer readBytes(byte[] destination) {
+            return null;
+        }
+
+        @Override
+        public Buffer readBytes(byte[] destination, int dstIndex, int length) {
+            return null;
+        }
+
+        @Override
+        public Buffer readBytes(ByteBuffer dst) {
+            return null;
+        }
+
+        @Override
+        public Buffer readBytes(OutputStream out, int length) {
+            return null;
+        }
+
+        @Override
+        public Buffer writeBoolean(boolean value) {
+            return null;
+        }
+
+        @Override
+        public Buffer writeByte(int value) {
+            return null;
+        }
+
+        @Override
+        public Buffer writeShort(int value) {
+            return null;
+        }
+
+        @Override
+        public Buffer writeInt(int value) {
+            buffer.writeInt(value);
+            return this;
+        }
+
+        @Override
+        public Buffer writeLong(long value) {
+            return null;
+        }
+
+        @Override
+        public Buffer writeFloat(float value) {
+            return null;
+        }
+
+        @Override
+        public Buffer writeDouble(double value) {
+            return null;
+        }
+
+        @Override
+        public Buffer writeBytes(byte[] src) {
+            return null;
+        }
+
+        @Override
+        public Buffer writeBytes(ByteBuffer src) {
+            return null;
+        }
+
+        @Override
+        public Buffer writeBytes(byte[] src, int srcIndex, int length) {
+            return null;
+        }
+
+        @Override
+        public boolean release() {
+            return buffer.release();
+        }
+
+        @Override
+        public Buffer retain() {
+            buffer.retain();
+            return this;
+        }
+
+        @Override
+        public int referenceCount() {
+            return buffer.refCnt();
+        }
+
+        @Override
+        public int nioBufferCount() {
+            return buffer.nioBufferCount();
+        }
+
+        @Override
+        public ByteBuffer[] nioBuffers() {
+            return buffer.nioBuffers();
+        }
+
+        @Override
+        public ByteBuffer[] nioBuffers(int index, int length) {
+            return buffer.nioBuffers(index, length);
+        }
+
+        @Override
+        public ByteBuffer nioBuffer() {
+            return buffer.nioBuffer();
+        }
+
+        @Override
+        public ByteBuffer nioBuffer(int index, int length) {
+            return buffer.nioBuffer(index, length);
+        }
+
+        @Override
+        public Buffer getBytes(int index, byte[] dst) {
+            return null;
+        }
+    }
+}

Reply via email to