This is an automated email from the ASF dual-hosted git repository. agoncharuk pushed a commit to branch ignite-13617 in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 7ccf99be30ac0b352349bf83357437eb85ee11e6 Author: Alexey Goncharuk <alexey.goncha...@gmail.com> AuthorDate: Thu Nov 12 18:49:30 2020 +0300 IGNITE-13617 Tuple assembler no longer require the size of the tuple to be known in advance --- .../ignite/internal/schema/AssemblyException.java} | 23 +- .../ignite/internal/schema/ExpandableByteBuf.java | 253 +++++++++++++++++++++ .../ignite/internal/schema/TupleAssembler.java | 69 ++---- .../ignite/internal/schema/package-info.java | 4 +- .../internal/schema/ExpandableByteBufTest.java | 153 +++++++++++++ .../ignite/internal/schema/SchemaTestSuite.java | 1 + .../apache/ignite/internal/schema/TupleTest.java | 9 +- 7 files changed, 447 insertions(+), 65 deletions(-) diff --git a/modules/commons/src/test/java/org/apache/ignite/internal/schema/SchemaTestSuite.java b/modules/commons/src/main/java/org/apache/ignite/internal/schema/AssemblyException.java similarity index 64% copy from modules/commons/src/test/java/org/apache/ignite/internal/schema/SchemaTestSuite.java copy to modules/commons/src/main/java/org/apache/ignite/internal/schema/AssemblyException.java index 3eded6a..8d01c91 100644 --- a/modules/commons/src/test/java/org/apache/ignite/internal/schema/SchemaTestSuite.java +++ b/modules/commons/src/main/java/org/apache/ignite/internal/schema/AssemblyException.java @@ -17,19 +17,16 @@ package org.apache.ignite.internal.schema; -import org.junit.platform.runner.JUnitPlatform; -import org.junit.platform.suite.api.SelectClasses; -import org.junit.runner.RunWith; - /** - * + * The exception is thrown when the tuple assembler encountered an unrecoverable error during the field encoding. + * After the exception is thrown, the assembler remains in an invalid state and should be discarded. */ -@RunWith(JUnitPlatform.class) -@SelectClasses({ - NativeTypeTest.class, - ColumnTest.class, - ColumnsTest.class, - TupleTest.class -}) -public class SchemaTestSuite { +public class AssemblyException extends RuntimeException { + /** + * @param errMsg Error message + * @param cause Cause for this error. + */ + public AssemblyException(String errMsg, Exception cause) { + super(errMsg, cause); + } } diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/schema/ExpandableByteBuf.java b/modules/commons/src/main/java/org/apache/ignite/internal/schema/ExpandableByteBuf.java new file mode 100644 index 0000000..542b539 --- /dev/null +++ b/modules/commons/src/main/java/org/apache/ignite/internal/schema/ExpandableByteBuf.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.schema; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.CoderResult; +import java.util.Arrays; + +/** + * A simple byte array wrapper to allow dynamic byte array expansion during the tuple construction. Grows exponentially + * up to 1MB, then expands by 1 MB each time an expansion is required. Values are always written in LITTLE_ENDIAN + * format. + * <p> + * Additionally, it tracks the high watermark of the values ever written to the buffer so that only written bytes are + * returned from the {@link #toArray()} method. If the current (expanded) buffer size does not match the high watermark, + * the {@link #toArray()} method will return a smaller copy of the array to exactly match the watermark. + * <p> + * All write methods have an absolute position. The buffer will automatically expand to fit the value being written. If + * there is a gap between previously written values and the current value, it will be filled with zero bytes: + * <pre> + * ExpandableByteBuf b = new ExpandableByteBuf(1); + * b.put(0, (byte)1); // Does not expand. + * b.put(5, (byte)1); // Expands, meaningful bytes are [0..5] + * + * byte[] data = b.toArray(); // data.length == 6 + * </pre> + */ +@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType") +public class ExpandableByteBuf { + /** */ + private static final int MB = 1024 * 1024; + + /** */ + private byte[] arr; + + /** */ + private ByteBuffer buf; + + /** */ + private int len; + + /** + * @param size Start buffer size. + */ + public ExpandableByteBuf(int size) { + if (size <= 0) + size = 16; + + arr = new byte[size]; + buf = ByteBuffer.wrap(arr); + buf.order(ByteOrder.LITTLE_ENDIAN); + } + + /** + * + */ + public void put(int off, byte val) { + ensureCapacity(off + 1); + + buf.put(off, val); + } + + /** + * + */ + public void putShort(int off, short val) { + ensureCapacity(off + 2); + + buf.putShort(off, val); + } + + /** + * + */ + public void putInt(int off, int val) { + ensureCapacity(off + 4); + + buf.putInt(off, val); + } + + /** + * + */ + public void putFloat(int off, float val) { + ensureCapacity(off + 4); + + buf.putFloat(off, val); + } + + /** + * + */ + public void putLong(int off, long val) { + ensureCapacity(off + 8); + + buf.putLong(off, val); + } + + /** + * + */ + public void putDouble(int off, double val) { + ensureCapacity(off + 8); + + buf.putDouble(off, val); + } + + /** + * + */ + public void putBytes(int curOff, byte[] val) { + ensureCapacity(curOff + val.length); + + buf.position(curOff); + + try { + buf.put(val); + } + finally { + buf.position(0); + } + } + + /** + * + */ + public int putString(int off, String val, CharsetEncoder encoder) throws CharacterCodingException { + ensureCapacity(off); + + encoder.reset(); + + buf.position(off); + + try { + CharBuffer valBuf = CharBuffer.wrap(val); + + while (true) { + CoderResult cr = encoder.encode(valBuf, buf, true); + + len = buf.position(); + + if (cr.isUnderflow()) + break; + + if (cr.isOverflow()) { + expand(len + 1); + + continue; + } + + if (cr.isError()) + cr.throwException(); + + } + + while (true) { + CoderResult cr = encoder.flush(buf); + + len = buf.position(); + + if (cr.isOverflow()) { + expand(len + 1); + + continue; + } + + if (cr.isUnderflow()) + break; + + if (cr.isError()) + cr.throwException(); + } + + return len - off; + } + finally { + buf.position(0); + } + } + + /** + * + */ + public byte get(int off) { + return buf.get(off); + } + + /** + * @return The byte array of all bytes written to this array, including gaps. + */ + public byte[] toArray() { + if (arr.length == len) + return arr; + else + return Arrays.copyOf(arr, len); + } + + /** + * If the current capacity is smaller than {@code cap}, will expand the buffer size. + * + * @param cap Target capacity. + */ + private void ensureCapacity(int cap) { + if (arr.length < cap) + expand(cap); + + if (cap > len) + len = cap; + } + + /** + * @param cap Capacity to expand. + */ + private void expand(int cap) { + int l = arr.length; + + while (l < cap) { + if (l < MB) + l *= 2; + else + l += MB; + } + + byte[] tmp = new byte[cap]; + + System.arraycopy(arr, 0, tmp, 0, arr.length); + + arr = tmp; + int oldPos = buf.position(); + buf = ByteBuffer.wrap(arr); + buf.position(oldPos); + buf.order(ByteOrder.LITTLE_ENDIAN); + } +} diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/schema/TupleAssembler.java b/modules/commons/src/main/java/org/apache/ignite/internal/schema/TupleAssembler.java index f1bf2d2..45d3982 100644 --- a/modules/commons/src/main/java/org/apache/ignite/internal/schema/TupleAssembler.java +++ b/modules/commons/src/main/java/org/apache/ignite/internal/schema/TupleAssembler.java @@ -19,8 +19,8 @@ package org.apache.ignite.internal.schema; import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; -import java.nio.ByteOrder; import java.nio.CharBuffer; +import java.nio.charset.CharacterCodingException; import java.nio.charset.CharsetEncoder; import java.nio.charset.CoderResult; import java.nio.charset.StandardCharsets; @@ -40,7 +40,7 @@ public class TupleAssembler { private final int nonNullVarlenValCols; /** Target byte buffer to write to. */ - private final ByteBuffer buf; + private final ExpandableByteBuf buf; /** Current columns chunk. */ private Columns curCols; @@ -112,7 +112,8 @@ public class TupleAssembler { /** * @param schema Tuple schema. - * @param size Target tuple size. For now, must exactly match the resulting tuple size. + * @param size Target tuple size. If the tuple size is known in advance, it should be provided upfront to avoid + * unnccessary arrays copy. * @param nonNullVarsizeKeyCols Number of null varlen columns in key chunk. * @param nonNullVarlenValCols Number of null varlen columns in value chunk. */ @@ -126,9 +127,7 @@ public class TupleAssembler { this.nonNullVarlenValCols = nonNullVarlenValCols; - buf = ByteBuffer.allocate(size); - - buf.order(ByteOrder.LITTLE_ENDIAN); + buf = new ExpandableByteBuf(size); curCols = schema.columns(0); @@ -242,26 +241,16 @@ public class TupleAssembler { public void appendString(String val) { checkType(NativeType.STRING); - assert buf.position() == 0; - - ByteBuffer wrapper = buf.slice(); - wrapper.position(curOff); - - CharsetEncoder encoder = encoder(); - encoder.reset(); - CoderResult cr = encoder.encode(CharBuffer.wrap(val), wrapper, true); - - if (!cr.isUnderflow()) - throw new BufferUnderflowException(); - - cr = encoder.flush(wrapper); - - if (!cr.isUnderflow()) - throw new BufferUnderflowException(); + try { + int written = buf.putString(curOff, val, encoder()); - writeOffset(curVarlenTblEntry, curOff - baseOff); + writeOffset(curVarlenTblEntry, curOff - baseOff); - shiftColumn(wrapper.position() - curOff, true); + shiftColumn(written, true); + } + catch (CharacterCodingException e) { + throw new AssemblyException("Failed to encode string", e); + } } /** @@ -269,18 +258,11 @@ public class TupleAssembler { public void appendBytes(byte[] val) { checkType(NativeType.BYTES); - buf.position(curOff); + buf.putBytes(curOff, val); - try { - buf.put(val); - - writeOffset(curVarlenTblEntry, curOff - baseOff); + writeOffset(curVarlenTblEntry, curOff - baseOff); - shiftColumn(val.length, true); - } - finally { - buf.position(0); - } + shiftColumn(val.length, true); } /** @@ -296,27 +278,20 @@ public class TupleAssembler { throw new IllegalArgumentException("Failed to set bitmask for column '" + col.name() + "' " + "(mask size exceeds allocated size) [mask=" + bitSet + ", maxSize=" + maskType.bits() + "]"); - buf.position(curOff); - - try { - byte[] arr = bitSet.toByteArray(); + byte[] arr = bitSet.toByteArray(); - buf.put(arr); + buf.putBytes(curOff, arr); - for (int i = 0; i < maskType.length() - arr.length; i++) - buf.put((byte)0); + for (int i = 0; i < maskType.length() - arr.length; i++) + buf.put(curOff + arr.length + i, (byte)0); - shiftColumn(maskType); - } - finally { - buf.position(0); - } + shiftColumn(maskType); } /** */ public byte[] build() { - return buf.array(); + return buf.toArray(); } /** diff --git a/modules/commons/src/main/java/org/apache/ignite/internal/schema/package-info.java b/modules/commons/src/main/java/org/apache/ignite/internal/schema/package-info.java index 3690267..110317c 100644 --- a/modules/commons/src/main/java/org/apache/ignite/internal/schema/package-info.java +++ b/modules/commons/src/main/java/org/apache/ignite/internal/schema/package-info.java @@ -55,8 +55,8 @@ * To assemble a tuple with some schema, an instance of {@link org.apache.ignite.internal.schema.TupleAssembler} * must be used which provides the low-level API for building tuples. When using the tuple assembler, the * columns must be passed to the assembler in the internal schema sort order. Additionally, when constructing - * the instance of the assembler, the user must pre-calculate the size of the tuple (using tooling provided by the - * assembler) and the number of non-null varlen columns for key and value chunks. Less restrictive building techniques + * the instance of the assembler, the user should pre-calculate the size of the tuple to avoid extra array copies, + * and the number of non-null varlen columns for key and value chunks. Less restrictive building techniques * are provided by class (de)serializers and tuple builder, which take care of sizing and column order. * <p> * To read column values of a tuple, one needs to construct a subclass of diff --git a/modules/commons/src/test/java/org/apache/ignite/internal/schema/ExpandableByteBufTest.java b/modules/commons/src/test/java/org/apache/ignite/internal/schema/ExpandableByteBufTest.java new file mode 100644 index 0000000..b6e2135 --- /dev/null +++ b/modules/commons/src/test/java/org/apache/ignite/internal/schema/ExpandableByteBufTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.schema; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.charset.StandardCharsets; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * + */ +public class ExpandableByteBufTest { + /** */ + private ExpandableByteBuf buf; + + /** + * + */ + @Test + public void testAllTypesDirectOrder() throws Exception { + buf = new ExpandableByteBuf(5); + + byte[] targetBytes = {1, 2, 3, 4, 5, 6, 7}; + String targetStr = "abcdefg"; + + buf.put(0, (byte)1); + buf.putShort(1, (short)2); + buf.putInt(3, 3); + buf.putLong(7, 4L); + buf.putFloat(15, 5.f); + buf.putDouble(19, 6.d); + buf.putBytes(27, targetBytes); + buf.putString(34, targetStr, StandardCharsets.UTF_8.newEncoder()); + + byte[] arr = buf.toArray(); + assertEquals(41, arr.length); + + ByteBuffer b = ByteBuffer.wrap(arr); + b.order(ByteOrder.LITTLE_ENDIAN); + + assertEquals((byte)1, b.get(0)); + assertEquals((short)2, b.getShort(1)); + assertEquals(3, b.getInt(3)); + assertEquals(4L, b.getLong(7)); + assertEquals(5.f, b.getFloat(15)); + assertEquals(6.d, b.getDouble(19)); + + byte[] bytes = new byte[7]; + b.position(27); + b.get(bytes); + + assertArrayEquals(targetBytes, bytes); + + b.position(34); + b.get(bytes); + + assertEquals(targetStr, new String(bytes, StandardCharsets.UTF_8)); + } + + /** + * + */ + @Test + public void testAllTypesReverseOrder() throws Exception { + buf = new ExpandableByteBuf(5); + + byte[] targetBytes = {1, 2, 3, 4, 5, 6, 7}; + String targetStr = "abcdefg"; + + buf.putString(34, targetStr, StandardCharsets.UTF_8.newEncoder()); + buf.putBytes(27, targetBytes); + buf.putDouble(19, 6.d); + buf.putFloat(15, 5.f); + buf.putLong(7, 4L); + buf.putInt(3, 3); + buf.putShort(1, (short)2); + buf.put(0, (byte)1); + + byte[] arr = buf.toArray(); + assertEquals(41, arr.length); + + ByteBuffer b = ByteBuffer.wrap(arr); + b.order(ByteOrder.LITTLE_ENDIAN); + + assertEquals((byte)1, b.get(0)); + assertEquals((short)2, b.getShort(1)); + assertEquals(3, b.getInt(3)); + assertEquals(4L, b.getLong(7)); + assertEquals(5.f, b.getFloat(15)); + assertEquals(6.d, b.getDouble(19)); + + byte[] bytes = new byte[7]; + b.position(27); + b.get(bytes); + + assertArrayEquals(targetBytes, bytes); + + b.position(34); + b.get(bytes); + + assertEquals(targetStr, new String(bytes, StandardCharsets.UTF_8)); + } + + /** + * + */ + @Test + public void testJavadocDesc() { + ExpandableByteBuf b = new ExpandableByteBuf(1); + b.put(0, (byte)1); // Does not expand. + b.put(5, (byte)1); // Expands, meaningful bytes are [0..5] + byte[] data = b.toArray(); + + assertEquals(6, data.length); + } + + /** + * + */ + @Test + public void testStringExpandMultipleTimes() throws Exception { + // Expansion chain 1->2->4->8->16->32. + buf = new ExpandableByteBuf(1); + + String str = "abcdefghijklmnopq"; + + buf.putString(0, str, StandardCharsets.UTF_8.newEncoder()); + + byte[] arr = buf.toArray(); + + assertEquals(str.length(), arr.length); + assertEquals(str, new String(arr)); + } +} diff --git a/modules/commons/src/test/java/org/apache/ignite/internal/schema/SchemaTestSuite.java b/modules/commons/src/test/java/org/apache/ignite/internal/schema/SchemaTestSuite.java index 3eded6a..253e5cd 100644 --- a/modules/commons/src/test/java/org/apache/ignite/internal/schema/SchemaTestSuite.java +++ b/modules/commons/src/test/java/org/apache/ignite/internal/schema/SchemaTestSuite.java @@ -26,6 +26,7 @@ import org.junit.runner.RunWith; */ @RunWith(JUnitPlatform.class) @SelectClasses({ + ExpandableByteBufTest.class, NativeTypeTest.class, ColumnTest.class, ColumnsTest.class, diff --git a/modules/commons/src/test/java/org/apache/ignite/internal/schema/TupleTest.java b/modules/commons/src/test/java/org/apache/ignite/internal/schema/TupleTest.java index afe58c6..746e4d5 100644 --- a/modules/commons/src/test/java/org/apache/ignite/internal/schema/TupleTest.java +++ b/modules/commons/src/test/java/org/apache/ignite/internal/schema/TupleTest.java @@ -361,12 +361,15 @@ public class TupleTest { ByteBufferTuple tup = new ByteBufferTuple(schema, data); for (int i = 0; i < vals.length; i++) { - NativeTypeSpec type = schema.column(i).type().spec(); + Column col = schema.column(i); + + NativeTypeSpec type = col.type().spec(); if (type == NativeTypeSpec.BYTES) - assertArrayEquals((byte[])vals[i], (byte[])NativeTypeSpec.BYTES.objectValue(tup, i)); + assertArrayEquals((byte[])vals[i], (byte[])NativeTypeSpec.BYTES.objectValue(tup, i), + "Failed for column: " + col); else - assertEquals(vals[i], type.objectValue(tup, i)); + assertEquals(vals[i], type.objectValue(tup, i), "Failed for column: " + col); } }