This is an automated email from the ASF dual-hosted git repository.

agoncharuk pushed a commit to branch ignite-13617
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 7ccf99be30ac0b352349bf83357437eb85ee11e6
Author: Alexey Goncharuk <alexey.goncha...@gmail.com>
AuthorDate: Thu Nov 12 18:49:30 2020 +0300

    IGNITE-13617 Tuple assembler no longer require the size of the tuple to be 
known in advance
---
 .../ignite/internal/schema/AssemblyException.java} |  23 +-
 .../ignite/internal/schema/ExpandableByteBuf.java  | 253 +++++++++++++++++++++
 .../ignite/internal/schema/TupleAssembler.java     |  69 ++----
 .../ignite/internal/schema/package-info.java       |   4 +-
 .../internal/schema/ExpandableByteBufTest.java     | 153 +++++++++++++
 .../ignite/internal/schema/SchemaTestSuite.java    |   1 +
 .../apache/ignite/internal/schema/TupleTest.java   |   9 +-
 7 files changed, 447 insertions(+), 65 deletions(-)

diff --git 
a/modules/commons/src/test/java/org/apache/ignite/internal/schema/SchemaTestSuite.java
 
b/modules/commons/src/main/java/org/apache/ignite/internal/schema/AssemblyException.java
similarity index 64%
copy from 
modules/commons/src/test/java/org/apache/ignite/internal/schema/SchemaTestSuite.java
copy to 
modules/commons/src/main/java/org/apache/ignite/internal/schema/AssemblyException.java
index 3eded6a..8d01c91 100644
--- 
a/modules/commons/src/test/java/org/apache/ignite/internal/schema/SchemaTestSuite.java
+++ 
b/modules/commons/src/main/java/org/apache/ignite/internal/schema/AssemblyException.java
@@ -17,19 +17,16 @@
 
 package org.apache.ignite.internal.schema;
 
-import org.junit.platform.runner.JUnitPlatform;
-import org.junit.platform.suite.api.SelectClasses;
-import org.junit.runner.RunWith;
-
 /**
- *
+ * The exception is thrown when the tuple assembler encountered an 
unrecoverable error during the field encoding.
+ * After the exception is thrown, the assembler remains in an invalid state 
and should be discarded.
  */
-@RunWith(JUnitPlatform.class)
-@SelectClasses({
-    NativeTypeTest.class,
-    ColumnTest.class,
-    ColumnsTest.class,
-    TupleTest.class
-})
-public class SchemaTestSuite {
+public class AssemblyException extends RuntimeException {
+    /**
+     * @param errMsg Error message
+     * @param cause Cause for this error.
+     */
+    public AssemblyException(String errMsg, Exception cause) {
+        super(errMsg, cause);
+    }
 }
diff --git 
a/modules/commons/src/main/java/org/apache/ignite/internal/schema/ExpandableByteBuf.java
 
b/modules/commons/src/main/java/org/apache/ignite/internal/schema/ExpandableByteBuf.java
new file mode 100644
index 0000000..542b539
--- /dev/null
+++ 
b/modules/commons/src/main/java/org/apache/ignite/internal/schema/ExpandableByteBuf.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.CoderResult;
+import java.util.Arrays;
+
+/**
+ * A simple byte array wrapper to allow dynamic byte array expansion during 
the tuple construction. Grows exponentially
+ * up to 1MB, then expands by 1 MB each time an expansion is required. Values 
are always written in LITTLE_ENDIAN
+ * format.
+ * <p>
+ * Additionally, it tracks the high watermark of the values ever written to 
the buffer so that only written bytes are
+ * returned from the {@link #toArray()} method. If the current (expanded) 
buffer size does not match the high watermark,
+ * the {@link #toArray()} method will return a smaller copy of the array to 
exactly match the watermark.
+ * <p>
+ * All write methods have an absolute position. The buffer will automatically 
expand to fit the value being written. If
+ * there is a gap between previously written values and the current value, it 
will be filled with zero bytes:
+ * <pre>
+ * ExpandableByteBuf b = new ExpandableByteBuf(1);
+ * b.put(0, (byte)1); // Does not expand.
+ * b.put(5, (byte)1); // Expands, meaningful bytes are [0..5]
+ *
+ * byte[] data = b.toArray(); // data.length == 6
+ * </pre>
+ */
+@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
+public class ExpandableByteBuf {
+    /** */
+    private static final int MB = 1024 * 1024;
+
+    /** */
+    private byte[] arr;
+
+    /** */
+    private ByteBuffer buf;
+
+    /** */
+    private int len;
+
+    /**
+     * @param size Start buffer size.
+     */
+    public ExpandableByteBuf(int size) {
+        if (size <= 0)
+            size = 16;
+
+        arr = new byte[size];
+        buf = ByteBuffer.wrap(arr);
+        buf.order(ByteOrder.LITTLE_ENDIAN);
+    }
+
+    /**
+     *
+     */
+    public void put(int off, byte val) {
+        ensureCapacity(off + 1);
+
+        buf.put(off, val);
+    }
+
+    /**
+     *
+     */
+    public void putShort(int off, short val) {
+        ensureCapacity(off + 2);
+
+        buf.putShort(off, val);
+    }
+
+    /**
+     *
+     */
+    public void putInt(int off, int val) {
+        ensureCapacity(off + 4);
+
+        buf.putInt(off, val);
+    }
+
+    /**
+     *
+     */
+    public void putFloat(int off, float val) {
+        ensureCapacity(off + 4);
+
+        buf.putFloat(off, val);
+    }
+
+    /**
+     *
+     */
+    public void putLong(int off, long val) {
+        ensureCapacity(off + 8);
+
+        buf.putLong(off, val);
+    }
+
+    /**
+     *
+     */
+    public void putDouble(int off, double val) {
+        ensureCapacity(off + 8);
+
+        buf.putDouble(off, val);
+    }
+
+    /**
+     *
+     */
+    public void putBytes(int curOff, byte[] val) {
+        ensureCapacity(curOff + val.length);
+
+        buf.position(curOff);
+
+        try {
+            buf.put(val);
+        }
+        finally {
+            buf.position(0);
+        }
+    }
+
+    /**
+     *
+     */
+    public int putString(int off, String val, CharsetEncoder encoder) throws 
CharacterCodingException {
+        ensureCapacity(off);
+
+        encoder.reset();
+
+        buf.position(off);
+
+        try {
+            CharBuffer valBuf = CharBuffer.wrap(val);
+
+            while (true) {
+                CoderResult cr = encoder.encode(valBuf, buf, true);
+
+                len = buf.position();
+
+                if (cr.isUnderflow())
+                    break;
+
+                if (cr.isOverflow()) {
+                    expand(len + 1);
+
+                    continue;
+                }
+
+                if (cr.isError())
+                    cr.throwException();
+
+            }
+
+            while (true) {
+                CoderResult cr = encoder.flush(buf);
+
+                len = buf.position();
+
+                if (cr.isOverflow()) {
+                    expand(len + 1);
+
+                    continue;
+                }
+
+                if (cr.isUnderflow())
+                    break;
+
+                if (cr.isError())
+                    cr.throwException();
+            }
+
+            return len - off;
+        }
+        finally {
+            buf.position(0);
+        }
+    }
+
+    /**
+     *
+     */
+    public byte get(int off) {
+        return buf.get(off);
+    }
+
+    /**
+     * @return The byte array of all bytes written to this array, including 
gaps.
+     */
+    public byte[] toArray() {
+        if (arr.length == len)
+            return arr;
+        else
+            return Arrays.copyOf(arr, len);
+    }
+
+    /**
+     * If the current capacity is smaller than {@code cap}, will expand the 
buffer size.
+     *
+     * @param cap Target capacity.
+     */
+    private void ensureCapacity(int cap) {
+        if (arr.length < cap)
+            expand(cap);
+
+        if (cap > len)
+            len = cap;
+    }
+
+    /**
+     * @param cap Capacity to expand.
+     */
+    private void expand(int cap) {
+        int l = arr.length;
+
+        while (l < cap) {
+            if (l < MB)
+                l *= 2;
+            else
+                l += MB;
+        }
+
+        byte[] tmp = new byte[cap];
+
+        System.arraycopy(arr, 0, tmp, 0, arr.length);
+
+        arr = tmp;
+        int oldPos = buf.position();
+        buf = ByteBuffer.wrap(arr);
+        buf.position(oldPos);
+        buf.order(ByteOrder.LITTLE_ENDIAN);
+    }
+}
diff --git 
a/modules/commons/src/main/java/org/apache/ignite/internal/schema/TupleAssembler.java
 
b/modules/commons/src/main/java/org/apache/ignite/internal/schema/TupleAssembler.java
index f1bf2d2..45d3982 100644
--- 
a/modules/commons/src/main/java/org/apache/ignite/internal/schema/TupleAssembler.java
+++ 
b/modules/commons/src/main/java/org/apache/ignite/internal/schema/TupleAssembler.java
@@ -19,8 +19,8 @@ package org.apache.ignite.internal.schema;
 
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
 import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
 import java.nio.charset.CharsetEncoder;
 import java.nio.charset.CoderResult;
 import java.nio.charset.StandardCharsets;
@@ -40,7 +40,7 @@ public class TupleAssembler {
     private final int nonNullVarlenValCols;
 
     /** Target byte buffer to write to. */
-    private final ByteBuffer buf;
+    private final ExpandableByteBuf buf;
 
     /** Current columns chunk. */
     private Columns curCols;
@@ -112,7 +112,8 @@ public class TupleAssembler {
 
     /**
      * @param schema Tuple schema.
-     * @param size Target tuple size. For now, must exactly match the 
resulting tuple size.
+     * @param size Target tuple size. If the tuple size is known in advance, 
it should be provided upfront to avoid
+     *      unnccessary arrays copy.
      * @param nonNullVarsizeKeyCols Number of null varlen columns in key chunk.
      * @param nonNullVarlenValCols Number of null varlen columns in value 
chunk.
      */
@@ -126,9 +127,7 @@ public class TupleAssembler {
 
         this.nonNullVarlenValCols = nonNullVarlenValCols;
 
-        buf = ByteBuffer.allocate(size);
-
-        buf.order(ByteOrder.LITTLE_ENDIAN);
+        buf = new ExpandableByteBuf(size);
 
         curCols = schema.columns(0);
 
@@ -242,26 +241,16 @@ public class TupleAssembler {
     public void appendString(String val) {
         checkType(NativeType.STRING);
 
-        assert buf.position() == 0;
-
-        ByteBuffer wrapper = buf.slice();
-        wrapper.position(curOff);
-
-        CharsetEncoder encoder = encoder();
-        encoder.reset();
-        CoderResult cr = encoder.encode(CharBuffer.wrap(val), wrapper, true);
-
-        if (!cr.isUnderflow())
-            throw new BufferUnderflowException();
-
-        cr = encoder.flush(wrapper);
-
-        if (!cr.isUnderflow())
-            throw new BufferUnderflowException();
+        try {
+            int written = buf.putString(curOff, val, encoder());
 
-        writeOffset(curVarlenTblEntry, curOff - baseOff);
+            writeOffset(curVarlenTblEntry, curOff - baseOff);
 
-        shiftColumn(wrapper.position() - curOff, true);
+            shiftColumn(written, true);
+        }
+        catch (CharacterCodingException e) {
+            throw new AssemblyException("Failed to encode string", e);
+        }
     }
 
     /**
@@ -269,18 +258,11 @@ public class TupleAssembler {
     public void appendBytes(byte[] val) {
         checkType(NativeType.BYTES);
 
-        buf.position(curOff);
+        buf.putBytes(curOff, val);
 
-        try {
-            buf.put(val);
-
-            writeOffset(curVarlenTblEntry, curOff - baseOff);
+        writeOffset(curVarlenTblEntry, curOff - baseOff);
 
-            shiftColumn(val.length, true);
-        }
-        finally {
-            buf.position(0);
-        }
+        shiftColumn(val.length, true);
     }
 
     /**
@@ -296,27 +278,20 @@ public class TupleAssembler {
             throw new IllegalArgumentException("Failed to set bitmask for 
column '" + col.name() + "' " +
                 "(mask size exceeds allocated size) [mask=" + bitSet + ", 
maxSize=" + maskType.bits() + "]");
 
-        buf.position(curOff);
-
-        try {
-            byte[] arr = bitSet.toByteArray();
+        byte[] arr = bitSet.toByteArray();
 
-            buf.put(arr);
+        buf.putBytes(curOff, arr);
 
-            for (int i = 0; i < maskType.length() - arr.length; i++)
-                buf.put((byte)0);
+        for (int i = 0; i < maskType.length() - arr.length; i++)
+            buf.put(curOff + arr.length + i, (byte)0);
 
-            shiftColumn(maskType);
-        }
-        finally {
-            buf.position(0);
-        }
+        shiftColumn(maskType);
     }
 
     /**
      */
     public byte[] build() {
-        return buf.array();
+        return buf.toArray();
     }
 
     /**
diff --git 
a/modules/commons/src/main/java/org/apache/ignite/internal/schema/package-info.java
 
b/modules/commons/src/main/java/org/apache/ignite/internal/schema/package-info.java
index 3690267..110317c 100644
--- 
a/modules/commons/src/main/java/org/apache/ignite/internal/schema/package-info.java
+++ 
b/modules/commons/src/main/java/org/apache/ignite/internal/schema/package-info.java
@@ -55,8 +55,8 @@
  * To assemble a tuple with some schema, an instance of {@link 
org.apache.ignite.internal.schema.TupleAssembler}
  * must be used which provides the low-level API for building tuples. When 
using the tuple assembler, the
  * columns must be passed to the assembler in the internal schema sort order. 
Additionally, when constructing
- * the instance of the assembler, the user must pre-calculate the size of the 
tuple (using tooling provided by the
- * assembler) and the number of non-null varlen columns for key and value 
chunks. Less restrictive building techniques
+ * the instance of the assembler, the user should pre-calculate the size of 
the tuple to avoid extra array copies,
+ * and the number of non-null varlen columns for key and value chunks. Less 
restrictive building techniques
  * are provided by class (de)serializers and tuple builder, which take care of 
sizing and column order.
  * <p>
  * To read column values of a tuple, one needs to construct a subclass of
diff --git 
a/modules/commons/src/test/java/org/apache/ignite/internal/schema/ExpandableByteBufTest.java
 
b/modules/commons/src/test/java/org/apache/ignite/internal/schema/ExpandableByteBufTest.java
new file mode 100644
index 0000000..b6e2135
--- /dev/null
+++ 
b/modules/commons/src/test/java/org/apache/ignite/internal/schema/ExpandableByteBufTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ *
+ */
+public class ExpandableByteBufTest {
+    /** */
+    private ExpandableByteBuf buf;
+
+    /**
+     *
+     */
+    @Test
+    public void testAllTypesDirectOrder() throws Exception {
+        buf = new ExpandableByteBuf(5);
+
+        byte[] targetBytes = {1, 2, 3, 4, 5, 6, 7};
+        String targetStr = "abcdefg";
+
+        buf.put(0, (byte)1);
+        buf.putShort(1, (short)2);
+        buf.putInt(3, 3);
+        buf.putLong(7, 4L);
+        buf.putFloat(15, 5.f);
+        buf.putDouble(19, 6.d);
+        buf.putBytes(27, targetBytes);
+        buf.putString(34, targetStr, StandardCharsets.UTF_8.newEncoder());
+
+        byte[] arr = buf.toArray();
+        assertEquals(41, arr.length);
+
+        ByteBuffer b = ByteBuffer.wrap(arr);
+        b.order(ByteOrder.LITTLE_ENDIAN);
+
+        assertEquals((byte)1, b.get(0));
+        assertEquals((short)2, b.getShort(1));
+        assertEquals(3, b.getInt(3));
+        assertEquals(4L, b.getLong(7));
+        assertEquals(5.f, b.getFloat(15));
+        assertEquals(6.d, b.getDouble(19));
+
+        byte[] bytes = new byte[7];
+        b.position(27);
+        b.get(bytes);
+
+        assertArrayEquals(targetBytes, bytes);
+
+        b.position(34);
+        b.get(bytes);
+
+        assertEquals(targetStr, new String(bytes, StandardCharsets.UTF_8));
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testAllTypesReverseOrder() throws Exception {
+        buf = new ExpandableByteBuf(5);
+
+        byte[] targetBytes = {1, 2, 3, 4, 5, 6, 7};
+        String targetStr = "abcdefg";
+
+        buf.putString(34, targetStr, StandardCharsets.UTF_8.newEncoder());
+        buf.putBytes(27, targetBytes);
+        buf.putDouble(19, 6.d);
+        buf.putFloat(15, 5.f);
+        buf.putLong(7, 4L);
+        buf.putInt(3, 3);
+        buf.putShort(1, (short)2);
+        buf.put(0, (byte)1);
+
+        byte[] arr = buf.toArray();
+        assertEquals(41, arr.length);
+
+        ByteBuffer b = ByteBuffer.wrap(arr);
+        b.order(ByteOrder.LITTLE_ENDIAN);
+
+        assertEquals((byte)1, b.get(0));
+        assertEquals((short)2, b.getShort(1));
+        assertEquals(3, b.getInt(3));
+        assertEquals(4L, b.getLong(7));
+        assertEquals(5.f, b.getFloat(15));
+        assertEquals(6.d, b.getDouble(19));
+
+        byte[] bytes = new byte[7];
+        b.position(27);
+        b.get(bytes);
+
+        assertArrayEquals(targetBytes, bytes);
+
+        b.position(34);
+        b.get(bytes);
+
+        assertEquals(targetStr, new String(bytes, StandardCharsets.UTF_8));
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testJavadocDesc() {
+        ExpandableByteBuf b = new ExpandableByteBuf(1);
+        b.put(0, (byte)1); // Does not expand.
+        b.put(5, (byte)1); // Expands, meaningful bytes are [0..5]
+        byte[] data = b.toArray();
+
+        assertEquals(6, data.length);
+    }
+
+    /**
+     *
+     */
+    @Test
+    public void testStringExpandMultipleTimes() throws Exception {
+        // Expansion chain 1->2->4->8->16->32.
+        buf = new ExpandableByteBuf(1);
+
+        String str = "abcdefghijklmnopq";
+
+        buf.putString(0, str, StandardCharsets.UTF_8.newEncoder());
+
+        byte[] arr = buf.toArray();
+
+        assertEquals(str.length(), arr.length);
+        assertEquals(str, new String(arr));
+    }
+}
diff --git 
a/modules/commons/src/test/java/org/apache/ignite/internal/schema/SchemaTestSuite.java
 
b/modules/commons/src/test/java/org/apache/ignite/internal/schema/SchemaTestSuite.java
index 3eded6a..253e5cd 100644
--- 
a/modules/commons/src/test/java/org/apache/ignite/internal/schema/SchemaTestSuite.java
+++ 
b/modules/commons/src/test/java/org/apache/ignite/internal/schema/SchemaTestSuite.java
@@ -26,6 +26,7 @@ import org.junit.runner.RunWith;
  */
 @RunWith(JUnitPlatform.class)
 @SelectClasses({
+    ExpandableByteBufTest.class,
     NativeTypeTest.class,
     ColumnTest.class,
     ColumnsTest.class,
diff --git 
a/modules/commons/src/test/java/org/apache/ignite/internal/schema/TupleTest.java
 
b/modules/commons/src/test/java/org/apache/ignite/internal/schema/TupleTest.java
index afe58c6..746e4d5 100644
--- 
a/modules/commons/src/test/java/org/apache/ignite/internal/schema/TupleTest.java
+++ 
b/modules/commons/src/test/java/org/apache/ignite/internal/schema/TupleTest.java
@@ -361,12 +361,15 @@ public class TupleTest {
         ByteBufferTuple tup = new ByteBufferTuple(schema, data);
 
         for (int i = 0; i < vals.length; i++) {
-            NativeTypeSpec type = schema.column(i).type().spec();
+            Column col = schema.column(i);
+
+            NativeTypeSpec type = col.type().spec();
 
             if (type == NativeTypeSpec.BYTES)
-                assertArrayEquals((byte[])vals[i], 
(byte[])NativeTypeSpec.BYTES.objectValue(tup, i));
+                assertArrayEquals((byte[])vals[i], 
(byte[])NativeTypeSpec.BYTES.objectValue(tup, i),
+                    "Failed for column: " + col);
             else
-                assertEquals(vals[i], type.objectValue(tup, i));
+                assertEquals(vals[i], type.objectValue(tup, i), "Failed for 
column: " + col);
         }
     }
 

Reply via email to