This is an automated email from the ASF dual-hosted git repository. jackylk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 2c0ee8e [CARBONDATA-3653] Support huge data for complex child columns 2c0ee8e is described below commit 2c0ee8e2c0e74e9a2f1f3f62e13238a45442b012 Author: ajantha-bhat <ajanthab...@gmail.com> AuthorDate: Sat Jan 4 23:51:51 2020 +0800 [CARBONDATA-3653] Support huge data for complex child columns Why is this PR needed? Currently complex child columns string and binary is stored as short length. So, if the data is more than 32000 characters. Data load will fail for binary and long string columns. What changes were proposed in this PR? complex child columns string, binary, decimal, date is stored as byte_array page with short length. Changed it to int length. [Just separating string and binary is hard now, to do in future] Handled compatibility by introducing the new encoding type for complex child columns Does this PR introduce any user interface change? No Is any new testcase added? Yes This closes #3562 --- .../carbondata/core/datastore/page/ColumnPage.java | 23 ++++++++---- .../core/datastore/page/LazyColumnPage.java | 2 +- .../core/datastore/page/LocalDictColumnPage.java | 7 ++-- .../datastore/page/SafeFixLengthColumnPage.java | 2 +- .../datastore/page/SafeVarLengthColumnPage.java | 10 ++++-- .../datastore/page/UnsafeFixLengthColumnPage.java | 3 +- .../datastore/page/VarLengthColumnPageBase.java | 39 +++++++++++++++----- .../datastore/page/encoding/ColumnPageEncoder.java | 5 +++ .../datastore/page/encoding/EncodingFactory.java | 7 +++- .../adaptive/AdaptiveDeltaFloatingCodec.java | 2 +- .../adaptive/AdaptiveDeltaIntegralCodec.java | 2 +- .../encoding/adaptive/AdaptiveFloatingCodec.java | 2 +- .../encoding/adaptive/AdaptiveIntegralCodec.java | 2 +- .../encoding/compress/DirectCompressCodec.java | 14 ++++++-- .../ThriftWrapperSchemaConverterImpl.java | 4 +++ .../carbondata/core/metadata/encoder/Encoding.java | 5 ++- .../core/scan/complextypes/PrimitiveQueryType.java | 13 +++++-- .../apache/carbondata/core/util/CarbonUtil.java | 1 + .../apache/carbondata/core/util/DataTypeUtil.java | 14 ++++++++ format/src/main/thrift/schema.thrift | 1 + .../complexType/TestComplexDataType.scala | 37 +++++++++++++++++-- .../processing/datatypes/PrimitiveDataType.java | 36 +++++++++++++++---- .../org/apache/carbondata/sdk/file/ImageTest.java | 41 ++++++++++++++++++++++ 23 files changed, 229 insertions(+), 43 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java index 338f0b2..b3463f4 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java @@ -698,7 +698,8 @@ public abstract class ColumnPage { * @return * @throws IOException */ - public abstract byte[] getComplexChildrenLVFlattenedBytePage() throws IOException; + public abstract byte[] getComplexChildrenLVFlattenedBytePage(DataType dataType) + throws IOException; /** * For complex type columns @@ -746,7 +747,8 @@ public abstract class ColumnPage { return getDecimalPage().length; } else if (dataType == BYTE_ARRAY && columnPageEncoderMeta.getColumnSpec().getColumnType() == ColumnType.COMPLEX_PRIMITIVE) { - return getComplexChildrenLVFlattenedBytePage().length; + return getComplexChildrenLVFlattenedBytePage( + columnPageEncoderMeta.getColumnSpec().getSchemaDataType()).length; } else if (dataType == BYTE_ARRAY && (columnPageEncoderMeta.getColumnSpec().getColumnType() == ColumnType.COMPLEX_STRUCT || columnPageEncoderMeta.getColumnSpec().getColumnType() == ColumnType.COMPLEX_ARRAY @@ -785,7 +787,8 @@ public abstract class ColumnPage { return compressor.compressByte(getDecimalPage()); } else if (dataType == BYTE_ARRAY && columnPageEncoderMeta.getColumnSpec().getColumnType() == ColumnType.COMPLEX_PRIMITIVE) { - return compressor.compressByte(getComplexChildrenLVFlattenedBytePage()); + return compressor.compressByte(getComplexChildrenLVFlattenedBytePage( + columnPageEncoderMeta.getColumnSpec().getSchemaDataType())); } else if (dataType == BYTE_ARRAY && (columnPageEncoderMeta.getColumnSpec().getColumnType() == ColumnType.COMPLEX_STRUCT || columnPageEncoderMeta.getColumnSpec().getColumnType() == ColumnType.COMPLEX_ARRAY @@ -805,8 +808,8 @@ public abstract class ColumnPage { * Decompress data and create a column page using the decompressed data, * except for decimal page */ - public static ColumnPage decompress(ColumnPageEncoderMeta meta, byte[] compressedData, - int offset, int length, boolean isLVEncoded) + public static ColumnPage decompress(ColumnPageEncoderMeta meta, byte[] compressedData, int offset, + int length, boolean isLVEncoded, boolean isComplexPrimitiveIntLengthEncoding) throws MemoryException { Compressor compressor = CompressorFactory.getInstance().getCompressor(meta.getCompressorName()); TableSpec.ColumnSpec columnSpec = meta.getColumnSpec(); @@ -836,8 +839,14 @@ public abstract class ColumnPage { columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE || columnSpec.getColumnType() == ColumnType.PLAIN_VALUE)) { byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length); - return newComplexLVBytesPage(columnSpec, lvVarBytes, - CarbonCommonConstants.SHORT_SIZE_IN_BYTE, meta.getCompressorName()); + if (isComplexPrimitiveIntLengthEncoding) { + // decode as int length + return newComplexLVBytesPage(columnSpec, lvVarBytes, + CarbonCommonConstants.INT_SIZE_IN_BYTE, meta.getCompressorName()); + } else { + return newComplexLVBytesPage(columnSpec, lvVarBytes, + CarbonCommonConstants.SHORT_SIZE_IN_BYTE, meta.getCompressorName()); + } } else if (isLVEncoded && storeDataType == BYTE_ARRAY && columnSpec.getColumnType() == ColumnType.COMPLEX_PRIMITIVE) { byte[] lvVarBytes = compressor.unCompressByte(compressedData, offset, length); diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java index d0389d3..b789e00 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/LazyColumnPage.java @@ -179,7 +179,7 @@ public class LazyColumnPage extends ColumnPage { } @Override - public byte[] getComplexChildrenLVFlattenedBytePage() { + public byte[] getComplexChildrenLVFlattenedBytePage(DataType dataType) { throw new UnsupportedOperationException("internal error"); } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java index f428c60..2e3a330 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/LocalDictColumnPage.java @@ -28,6 +28,7 @@ import org.apache.carbondata.core.keygenerator.factory.KeyGeneratorFactory; import org.apache.carbondata.core.localdictionary.PageLevelDictionary; import org.apache.carbondata.core.localdictionary.exception.DictionaryThresholdReachedException; import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator; +import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.log4j.Logger; @@ -364,11 +365,11 @@ public class LocalDictColumnPage extends ColumnPage { } @Override - public byte[] getComplexChildrenLVFlattenedBytePage() throws IOException { + public byte[] getComplexChildrenLVFlattenedBytePage(DataType dataType) throws IOException { if (null != encodedDataColumnPage) { - return encodedDataColumnPage.getComplexChildrenLVFlattenedBytePage(); + return encodedDataColumnPage.getComplexChildrenLVFlattenedBytePage(dataType); } else { - return actualDataColumnPage.getComplexChildrenLVFlattenedBytePage(); + return actualDataColumnPage.getComplexChildrenLVFlattenedBytePage(dataType); } } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java index f45f482..a579bdd 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeFixLengthColumnPage.java @@ -296,7 +296,7 @@ public class SafeFixLengthColumnPage extends ColumnPage { } @Override - public byte[] getComplexChildrenLVFlattenedBytePage() throws IOException { + public byte[] getComplexChildrenLVFlattenedBytePage(DataType dataType) throws IOException { ByteArrayOutputStream stream = new ByteArrayOutputStream(); DataOutputStream out = new DataOutputStream(stream); for (int i = 0; i < arrayElementCount; i++) { diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java index b105239..0d34b06 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/SafeVarLengthColumnPage.java @@ -25,6 +25,8 @@ import java.util.ArrayList; import java.util.List; import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta; +import org.apache.carbondata.core.metadata.datatype.DataType; +import org.apache.carbondata.core.util.DataTypeUtil; public class SafeVarLengthColumnPage extends VarLengthColumnPageBase { @@ -88,11 +90,15 @@ public class SafeVarLengthColumnPage extends VarLengthColumnPageBase { } @Override - public byte[] getComplexChildrenLVFlattenedBytePage() throws IOException { + public byte[] getComplexChildrenLVFlattenedBytePage(DataType dataType) throws IOException { ByteArrayOutputStream stream = new ByteArrayOutputStream(); DataOutputStream out = new DataOutputStream(stream); for (byte[] byteArrayDatum : byteArrayData) { - out.writeShort((short)byteArrayDatum.length); + if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) { + out.writeInt(byteArrayDatum.length); + } else { + out.writeShort((short) byteArrayDatum.length); + } out.write(byteArrayDatum); } return stream.toByteArray(); diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java index 4ef5e5d..00c8ca0 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeFixLengthColumnPage.java @@ -24,6 +24,7 @@ import org.apache.carbondata.core.memory.CarbonUnsafe; import org.apache.carbondata.core.memory.MemoryBlock; import org.apache.carbondata.core.memory.MemoryException; import org.apache.carbondata.core.memory.UnsafeMemoryManager; +import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.util.ByteUtil; import org.apache.carbondata.core.util.ThreadLocalTaskInfo; @@ -396,7 +397,7 @@ public class UnsafeFixLengthColumnPage extends ColumnPage { } @Override - public byte[] getComplexChildrenLVFlattenedBytePage() { + public byte[] getComplexChildrenLVFlattenedBytePage(DataType dataType) { byte[] data = new byte[totalLength]; CarbonUnsafe.getUnsafe() .copyMemory(baseAddress, baseOffset, data, CarbonUnsafe.BYTE_ARRAY_OFFSET, totalLength); diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java index 01f1d55..1381dc6 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java @@ -243,9 +243,15 @@ public abstract class VarLengthColumnPageBase extends ColumnPage { int counter = 0; // extract Length field in input and calculate total length for (offset = 0; lvEncodedOffset < lvEncodedBytes.length; offset += length) { - length = ByteUtil.toShort(lvEncodedBytes, lvEncodedOffset); - rowOffset.putInt(counter, offset); - lvEncodedOffset += lvLength + length; + if (lvLength == CarbonCommonConstants.INT_SIZE_IN_BYTE) { + length = ByteUtil.toInt(lvEncodedBytes, lvEncodedOffset); + rowOffset.putInt(counter, offset); + lvEncodedOffset += lvLength + length; + } else { + length = ByteUtil.toShort(lvEncodedBytes, lvEncodedOffset); + rowOffset.putInt(counter, offset); + lvEncodedOffset += lvLength + length; + } rowId++; counter++; } @@ -465,15 +471,30 @@ public abstract class VarLengthColumnPageBase extends ColumnPage { } @Override - public byte[] getComplexChildrenLVFlattenedBytePage() throws IOException { + public byte[] getComplexChildrenLVFlattenedBytePage(DataType dataType) throws IOException { // output LV encoded byte array int offset = 0; - byte[] data = new byte[totalLength + ((rowOffset.getActualRowCount() - 1) * 2)]; + int outputLength; + if (dataType == DataTypes.BYTE_ARRAY) { + outputLength = totalLength + ((rowOffset.getActualRowCount() - 1) + * CarbonCommonConstants.INT_SIZE_IN_BYTE); + } else { + outputLength = totalLength + ((rowOffset.getActualRowCount() - 1) + * CarbonCommonConstants.SHORT_SIZE_IN_BYTE); + } + byte[] data = new byte[outputLength]; for (int rowId = 0; rowId < rowOffset.getActualRowCount() - 1; rowId++) { - short length = (short) (rowOffset.getInt(rowId + 1) - rowOffset.getInt(rowId)); - ByteUtil.setShort(data, offset, length); - copyBytes(rowId, data, offset + 2, length); - offset += 2 + length; + if (dataType == DataTypes.BYTE_ARRAY) { + int length = rowOffset.getInt(rowId + 1) - rowOffset.getInt(rowId); + ByteUtil.setInt(data, offset, length); + copyBytes(rowId, data, offset + CarbonCommonConstants.INT_SIZE_IN_BYTE, length); + offset += CarbonCommonConstants.INT_SIZE_IN_BYTE + length; + } else { + short length = (short) (rowOffset.getInt(rowId + 1) - rowOffset.getInt(rowId)); + ByteUtil.setShort(data, offset, length); + copyBytes(rowId, data, offset + CarbonCommonConstants.SHORT_SIZE_IN_BYTE, length); + offset += CarbonCommonConstants.SHORT_SIZE_IN_BYTE + length; + } } return data; } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java index b7b2529..e63dd82 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/ColumnPageEncoder.java @@ -194,6 +194,11 @@ public abstract class ColumnPageEncoder { while (index < input.getComplexColumnIndex()) { ColumnPage subColumnPage = input.getColumnPage(index); encodedPages[index] = encodedColumn(subColumnPage); + // by default add this encoding, + // it is used for checking length of + // complex child byte array columns (short and int) + encodedPages[index].getPageMetadata().getEncoders() + .add(Encoding.INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY); index++; } return encodedPages; diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java index cb39bb9..36e2bc6 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java @@ -83,6 +83,8 @@ public abstract class EncodingFactory { String compressor, boolean fullVectorFill) throws IOException { assert (encodings.size() >= 1); assert (encoderMetas.size() == 1); + boolean isComplexPrimitiveIntLengthEncoding = + encodings.contains(Encoding.INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY); Encoding encoding = encodings.get(0); byte[] encoderMeta = encoderMetas.get(0).array(); ByteArrayInputStream stream = new ByteArrayInputStream(encoderMeta); @@ -91,7 +93,10 @@ public abstract class EncodingFactory { ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta(); metadata.setFillCompleteVector(fullVectorFill); metadata.readFields(in); - return new DirectCompressCodec(metadata.getStoreDataType()).createDecoder(metadata); + DirectCompressCodec directCompressCodec = + new DirectCompressCodec(metadata.getStoreDataType()); + directCompressCodec.setComplexPrimitiveIntLengthEncoding(isComplexPrimitiveIntLengthEncoding); + return directCompressCodec.createDecoder(metadata); } else if (encoding == ADAPTIVE_INTEGRAL) { ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta(); metadata.setFillCompleteVector(fullVectorFill); diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java index 36f1e64..1400f25 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaFloatingCodec.java @@ -128,7 +128,7 @@ public class AdaptiveDeltaFloatingCodec extends AdaptiveCodec { @Override public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException, IOException { - ColumnPage page = ColumnPage.decompress(meta, input, offset, length, false); + ColumnPage page = ColumnPage.decompress(meta, input, offset, length, false, false); return LazyColumnPage.newPage(page, converter); } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java index d0bbedb..7b1c12d 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveDeltaIntegralCodec.java @@ -139,7 +139,7 @@ public class AdaptiveDeltaIntegralCodec extends AdaptiveCodec { if (DataTypes.isDecimal(meta.getSchemaDataType())) { page = ColumnPage.decompressDecimalPage(meta, input, offset, length); } else { - page = ColumnPage.decompress(meta, input, offset, length, false); + page = ColumnPage.decompress(meta, input, offset, length, false, false); } return LazyColumnPage.newPage(page, converter); } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java index 64a0ebf..cf9b739 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveFloatingCodec.java @@ -116,7 +116,7 @@ public class AdaptiveFloatingCodec extends AdaptiveCodec { @Override public ColumnPage decode(byte[] input, int offset, int length) throws MemoryException, IOException { - ColumnPage page = ColumnPage.decompress(meta, input, offset, length, false); + ColumnPage page = ColumnPage.decompress(meta, input, offset, length, false, false); return LazyColumnPage.newPage(page, converter); } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java index 5651368..4638652 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/adaptive/AdaptiveIntegralCodec.java @@ -116,7 +116,7 @@ public class AdaptiveIntegralCodec extends AdaptiveCodec { if (DataTypes.isDecimal(meta.getSchemaDataType())) { page = ColumnPage.decompressDecimalPage(meta, input, offset, length); } else { - page = ColumnPage.decompress(meta, input, offset, length, false); + page = ColumnPage.decompress(meta, input, offset, length, false, false); } return LazyColumnPage.newPage(page, converter); } diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java index a378988..9683cb8 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java @@ -58,6 +58,12 @@ public class DirectCompressCodec implements ColumnPageCodec { this.dataType = dataType; } + boolean isComplexPrimitiveIntLengthEncoding = false; + + public void setComplexPrimitiveIntLengthEncoding(boolean complexPrimitiveIntLengthEncoding) { + isComplexPrimitiveIntLengthEncoding = complexPrimitiveIntLengthEncoding; + } + @Override public String getName() { return "DirectCompressCodec"; @@ -102,7 +108,8 @@ public class DirectCompressCodec implements ColumnPageCodec { if (DataTypes.isDecimal(dataType)) { decodedPage = ColumnPage.decompressDecimalPage(meta, input, offset, length); } else { - decodedPage = ColumnPage.decompress(meta, input, offset, length, false); + decodedPage = ColumnPage + .decompress(meta, input, offset, length, false, isComplexPrimitiveIntLengthEncoding); } return LazyColumnPage.newPage(decodedPage, converter); } @@ -150,8 +157,9 @@ public class DirectCompressCodec implements ColumnPageCodec { @Override public ColumnPage decode(byte[] input, int offset, int length, boolean isLVEncoded) throws MemoryException, IOException { - return LazyColumnPage - .newPage(ColumnPage.decompress(meta, input, offset, length, isLVEncoded), converter); + return LazyColumnPage.newPage(ColumnPage + .decompress(meta, input, offset, length, isLVEncoded, + isComplexPrimitiveIntLengthEncoding), converter); } }; } diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java index 9fd25d6..b7fabe2 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java @@ -120,6 +120,8 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter { return org.apache.carbondata.format.Encoding.BIT_PACKED; case DIRECT_DICTIONARY: return org.apache.carbondata.format.Encoding.DIRECT_DICTIONARY; + case INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY: + return org.apache.carbondata.format.Encoding.INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY; default: return org.apache.carbondata.format.Encoding.DICTIONARY; } @@ -457,6 +459,8 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter { return Encoding.DIRECT_COMPRESS_VARCHAR; case BIT_PACKED: return Encoding.BIT_PACKED; + case INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY: + return Encoding.INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY; case DIRECT_DICTIONARY: return Encoding.DIRECT_DICTIONARY; default: diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java b/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java index 6e32c89..30f83a1 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java @@ -38,7 +38,8 @@ public enum Encoding { ADAPTIVE_FLOATING, BOOL_BYTE, ADAPTIVE_DELTA_FLOATING, - DIRECT_COMPRESS_VARCHAR; + DIRECT_COMPRESS_VARCHAR, + INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY; public static Encoding valueOf(int ordinal) { if (ordinal == DICTIONARY.ordinal()) { @@ -73,6 +74,8 @@ public enum Encoding { return ADAPTIVE_DELTA_FLOATING; } else if (ordinal == DIRECT_COMPRESS_VARCHAR.ordinal()) { return DIRECT_COMPRESS_VARCHAR; + } else if (ordinal == INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY.ordinal()) { + return INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY; } else { throw new RuntimeException("create Encoding with invalid ordinal: " + ordinal); } diff --git a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java index b275a27..7d25fe7 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/complextypes/PrimitiveQueryType.java @@ -107,7 +107,11 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery byte[] currentVal = copyBlockDataChunk(rawColumnChunks, dimensionColumnPages, rowNumber, pageNumber); if (!this.isDictionary && !this.isDirectDictionary) { - dataOutputStream.writeShort(currentVal.length); + if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) { + dataOutputStream.writeInt(currentVal.length); + } else { + dataOutputStream.writeShort(currentVal.length); + } } dataOutputStream.write(currentVal); } @@ -158,7 +162,12 @@ public class PrimitiveQueryType extends ComplexQueryType implements GenericQuery actualData = directDictionaryGenerator.getValueFromSurrogate(surrgateValue); } else if (!isDictionary) { if (size == -1) { - size = dataBuffer.getShort(); + if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) { + size = dataBuffer.getInt(); + } else { + size = dataBuffer.getShort(); + } + } byte[] value = new byte[size]; dataBuffer.get(value, 0, size); diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index b3c7093..babbafa 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -3225,6 +3225,7 @@ public final class CarbonUtil { case ADAPTIVE_DELTA_INTEGRAL: case ADAPTIVE_FLOATING: case ADAPTIVE_DELTA_FLOATING: + case INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY: return true; } } diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java index 8422c78..0f8f5ab 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java @@ -1125,4 +1125,18 @@ public final class DataTypeUtil { return false; } + /** + * utility function to check complex column child columns that can exceed 32000 length + * + * @param dataType + * @return + */ + public static boolean isByteArrayComplexChildColumn(DataType dataType) { + return ((dataType == DataTypes.STRING) || + (dataType == DataTypes.VARCHAR) || + (dataType == DataTypes.BINARY) || + (dataType == DataTypes.DATE) || + DataTypes.isDecimal(dataType) || + (dataType == DataTypes.BYTE_ARRAY)); + } } diff --git a/format/src/main/thrift/schema.thrift b/format/src/main/thrift/schema.thrift index ca4bbad..f4aa9b7 100644 --- a/format/src/main/thrift/schema.thrift +++ b/format/src/main/thrift/schema.thrift @@ -62,6 +62,7 @@ enum Encoding{ BOOL_BYTE = 12; // Identifies that a column is encoded using BooleanPageCodec ADAPTIVE_DELTA_FLOATING = 13; // Identifies that a column is encoded using AdaptiveDeltaFloatingCodec DIRECT_COMPRESS_VARCHAR = 14; // Identifies that a columm is encoded using DirectCompressCodec, it is used for long string columns + INT_LENGTH_COMPLEX_CHILD_BYTE_ARRAY = 15; // Identifies that a complex column child stored as INT length or SHORT length } // Only NATIVE_HIVE is supported, others are deprecated since CarbonData 2.0 diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala index 32a5d92..c673e56 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestComplexDataType.scala @@ -21,6 +21,7 @@ import java.sql.Timestamp import scala.collection.mutable +import org.apache.commons.lang3.RandomStringUtils import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll @@ -39,6 +40,8 @@ class TestComplexDataType extends QueryTest with BeforeAndAfterAll { val badRecordAction = CarbonProperties.getInstance() .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION) + val hugeBinary = RandomStringUtils.randomAlphabetic(33000) + override def beforeAll(): Unit = { sql("DROP TABLE IF EXISTS table1") sql("DROP TABLE IF EXISTS test") @@ -969,6 +972,16 @@ class TestComplexDataType extends QueryTest with BeforeAndAfterAll { sql("drop table if exists hive_table") } + test("test array of huge binary data type") { + sql("drop table if exists carbon_table") + sql("create table if not exists carbon_table(id int, label boolean, name string," + + "binaryField array<binary>, autoLabel boolean) stored by 'carbondata'") + sql(s"insert into carbon_table values(1,true,'abc',array('$hugeBinary'),false)") + val result = sql("SELECT binaryField[0] FROM carbon_table").collect() + assert(hugeBinary.equals(new String(result(0).get(0).asInstanceOf[Array[Byte]]))) + sql("drop table if exists carbon_table") + } + test("test struct of binary data type") { sql("drop table if exists carbon_table") sql("drop table if exists parquet_table") @@ -982,7 +995,17 @@ class TestComplexDataType extends QueryTest with BeforeAndAfterAll { checkAnswer(sql("SELECT binaryField.b FROM carbon_table"), sql("SELECT binaryField.b FROM parquet_table")) sql("drop table if exists carbon_table") - sql("drop table if exists hive_table") + sql("drop table if exists parquet_table") + } + + test("test struct of huge binary data type") { + sql("drop table if exists carbon_table") + sql("create table if not exists carbon_table(id int, label boolean, name string," + + "binaryField struct<b:binary>, autoLabel boolean) stored as carbondata ") + sql(s"insert into carbon_table values(1,true,'abc',named_struct('b','$hugeBinary'),false)") + val result = sql("SELECT binaryField.b FROM carbon_table").collect() + assert(hugeBinary.equals(new String(result(0).get(0).asInstanceOf[Array[Byte]]))) + sql("drop table if exists carbon_table") } test("test map of binary data type") { @@ -1000,6 +1023,16 @@ class TestComplexDataType extends QueryTest with BeforeAndAfterAll { sql("drop table if exists hive_table") } + test("test map of huge binary data type") { + sql("drop table if exists carbon_table") + sql("create table if not exists carbon_table(id int, label boolean, name string," + + "binaryField map<int, binary>, autoLabel boolean) stored by 'carbondata'") + sql(s"insert into carbon_table values(1,true,'abc',map(1,'$hugeBinary'),false)") + val result = sql("SELECT binaryField[1] FROM carbon_table").collect() + assert(hugeBinary.equals(new String(result(0).get(0).asInstanceOf[Array[Byte]]))) + sql("drop table if exists carbon_table") + } + test("test map of array and struct binary data type") { sql("drop table if exists carbon_table") sql("drop table if exists parquet_table") @@ -1017,7 +1050,7 @@ class TestComplexDataType extends QueryTest with BeforeAndAfterAll { sql("SELECT binaryField1[1][1] FROM parquet_table")) checkAnswer(sql("SELECT binaryField2[1].b FROM carbon_table"), sql("SELECT binaryField2[1].b FROM parquet_table")) - sql("drop table if exists hive_table") + sql("drop table if exists parquet_table") sql("drop table if exists carbon_table") } diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java index b7a4508..fdfe9e9 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java @@ -387,17 +387,29 @@ public class PrimitiveDataType implements GenericDataType<Object> { private void updateValueToByteStream(DataOutputStream dataOutputStream, byte[] value) throws IOException { - dataOutputStream.writeShort(value.length); + if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) { + dataOutputStream.writeInt(value.length); + } else { + dataOutputStream.writeShort(value.length); + } dataOutputStream.write(value); } private void updateNullValue(DataOutputStream dataOutputStream, BadRecordLogHolder logHolder) throws IOException { if (this.carbonDimension.getDataType() == DataTypes.STRING) { - dataOutputStream.writeShort(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length); + if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) { + dataOutputStream.writeInt(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length); + } else { + dataOutputStream.writeShort(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY.length); + } dataOutputStream.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY); } else { - dataOutputStream.writeShort(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length); + if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) { + dataOutputStream.writeInt(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length); + } else { + dataOutputStream.writeShort(CarbonCommonConstants.EMPTY_BYTE_ARRAY.length); + } dataOutputStream.write(CarbonCommonConstants.EMPTY_BYTE_ARRAY); } String message = logHolder.getColumnMessageMap().get(carbonDimension.getColName()); @@ -422,8 +434,14 @@ public class PrimitiveDataType implements GenericDataType<Object> { KeyGenerator[] generator) throws IOException, KeyGenException { if (!this.isDictionary) { - int sizeOfData = byteArrayInput.getShort(); - dataOutputStream.writeShort(sizeOfData); + int sizeOfData; + if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) { + sizeOfData = byteArrayInput.getInt(); + dataOutputStream.writeInt(sizeOfData); + } else { + sizeOfData = byteArrayInput.getShort(); + dataOutputStream.writeShort(sizeOfData); + } byte[] bb = new byte[sizeOfData]; byteArrayInput.get(bb, 0, sizeOfData); dataOutputStream.write(bb); @@ -465,7 +483,13 @@ public class PrimitiveDataType implements GenericDataType<Object> { public void getColumnarDataForComplexType(List<ArrayList<byte[]>> columnsArray, ByteBuffer inputArray) { if (!isDictionary) { - byte[] key = new byte[inputArray.getShort()]; + int length; + if (DataTypeUtil.isByteArrayComplexChildColumn(dataType)) { + length = inputArray.getInt(); + } else { + length = inputArray.getShort(); + } + byte[] key = new byte[length]; inputArray.get(key); columnsArray.get(outputArrayIndex).add(key); } else { diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java index 6f90155..3a64377 100644 --- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java +++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/ImageTest.java @@ -33,6 +33,7 @@ import org.apache.carbondata.util.BinaryUtil; import org.apache.commons.codec.DecoderException; import org.apache.commons.codec.binary.Hex; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.hadoop.mapreduce.InputSplit; import org.junit.Assert; import org.junit.Test; @@ -1168,4 +1169,44 @@ public class ImageTest extends TestCase { reader.close(); } + @Test public void testHugeBinaryWithComplexType() + throws IOException, InvalidLoadOptionException, InterruptedException { + int num = 1; + int rows = 1; + String path = "./target/binary"; + try { + FileUtils.deleteDirectory(new File(path)); + } catch (IOException e) { + e.printStackTrace(); + } + Field[] fields = new Field[2]; + fields[0] = new Field("arrayField", DataTypes.createArrayType(DataTypes.BINARY)); + ArrayList<StructField> structFields = new ArrayList<>(); + structFields.add(new StructField("b", DataTypes.BINARY)); + fields[1] = new Field("structField", DataTypes.createStructType(structFields)); + + String description = RandomStringUtils.randomAlphabetic(33000); + + // read and write image data + for (int j = 0; j < num; j++) { + CarbonWriter writer = CarbonWriter.builder().outputPath(path).withCsvInput(new Schema(fields)) + .writtenBy("BinaryExample").withPageSizeInMb(5).build(); + + for (int i = 0; i < rows; i++) { + // write data + writer.write(new String[] { description, description }); + } + writer.close(); + } + CarbonReader reader = CarbonReader.builder(path, "_temp").build(); + while (reader.hasNext()) { + Object[] row = (Object[]) reader.readNextRow(); + Object[] arrayResult = (Object[]) row[0]; + Object[] structResult = (Object[]) row[1]; + assert (new String((byte[]) arrayResult[0]).equalsIgnoreCase(description)); + assert (new String((byte[]) structResult[0]).equalsIgnoreCase(description)); + } + reader.close(); + } + }