Github user rdblue commented on a diff in the pull request:
https://github.com/apache/spark/pull/21070#discussion_r182563063
--- Diff:
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
---
@@ -63,115 +59,159 @@ public final void readBooleans(int total,
WritableColumnVector c, int rowId) {
}
}
+ private ByteBuffer getBuffer(int length) {
+ try {
+ return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to read " + length + "
bytes", e);
+ }
+ }
+
@Override
public final void readIntegers(int total, WritableColumnVector c, int
rowId) {
- c.putIntsLittleEndian(rowId, total, buffer, offset -
Platform.BYTE_ARRAY_OFFSET);
- offset += 4 * total;
+ int requiredBytes = total * 4;
+ ByteBuffer buffer = getBuffer(requiredBytes);
+
+ if (buffer.hasArray()) {
+ int offset = buffer.arrayOffset() + buffer.position();
+ c.putIntsLittleEndian(rowId, total, buffer.array(), offset -
Platform.BYTE_ARRAY_OFFSET);
+ } else {
+ for (int i = 0; i < total; i += 1) {
+ c.putInt(rowId + i, buffer.getInt());
+ }
+ }
}
@Override
public final void readLongs(int total, WritableColumnVector c, int
rowId) {
- c.putLongsLittleEndian(rowId, total, buffer, offset -
Platform.BYTE_ARRAY_OFFSET);
- offset += 8 * total;
+ int requiredBytes = total * 8;
+ ByteBuffer buffer = getBuffer(requiredBytes);
+
+ if (buffer.hasArray()) {
+ int offset = buffer.arrayOffset() + buffer.position();
+ c.putLongsLittleEndian(rowId, total, buffer.array(), offset -
Platform.BYTE_ARRAY_OFFSET);
+ } else {
+ for (int i = 0; i < total; i += 1) {
+ c.putLong(rowId + i, buffer.getLong());
+ }
+ }
}
@Override
public final void readFloats(int total, WritableColumnVector c, int
rowId) {
- c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
- offset += 4 * total;
+ int requiredBytes = total * 4;
+ ByteBuffer buffer = getBuffer(requiredBytes);
+
+ if (buffer.hasArray()) {
+ int offset = buffer.arrayOffset() + buffer.position();
+ c.putFloats(rowId, total, buffer.array(), offset -
Platform.BYTE_ARRAY_OFFSET);
+ } else {
+ for (int i = 0; i < total; i += 1) {
+ c.putFloat(rowId + i, buffer.getFloat());
+ }
+ }
}
@Override
public final void readDoubles(int total, WritableColumnVector c, int
rowId) {
- c.putDoubles(rowId, total, buffer, offset -
Platform.BYTE_ARRAY_OFFSET);
- offset += 8 * total;
+ int requiredBytes = total * 8;
+ ByteBuffer buffer = getBuffer(requiredBytes);
+
+ if (buffer.hasArray()) {
+ int offset = buffer.arrayOffset() + buffer.position();
+ c.putDoubles(rowId, total, buffer.array(), offset -
Platform.BYTE_ARRAY_OFFSET);
+ } else {
+ for (int i = 0; i < total; i += 1) {
+ c.putDouble(rowId + i, buffer.getDouble());
+ }
+ }
+ }
+
+ private byte getByte() {
+ try {
+ return (byte) in.read();
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to read a byte", e);
+ }
}
@Override
public final void readBytes(int total, WritableColumnVector c, int
rowId) {
- for (int i = 0; i < total; i++) {
- // Bytes are stored as a 4-byte little endian int. Just read the
first byte.
- // TODO: consider pushing this in ColumnVector by adding a readBytes
with a stride.
- c.putByte(rowId + i, Platform.getByte(buffer, offset));
- offset += 4;
+ int requiredBytes = total * 4;
+ ByteBuffer buffer = getBuffer(requiredBytes);
+
+ for (int i = 0; i < total; i += 1) {
+ c.putByte(rowId + i, buffer.get());
+ // skip the next 3 bytes
+ buffer.position(buffer.position() + 3);
}
}
@Override
public final boolean readBoolean() {
- byte b = Platform.getByte(buffer, offset);
- boolean v = (b & (1 << bitOffset)) != 0;
+ // TODO: vectorize decoding and keep boolean[] instead of currentByte
+ if (bitOffset == 0) {
+ currentByte = getByte();
+ }
+
+ boolean v = (currentByte & (1 << bitOffset)) != 0;
bitOffset += 1;
if (bitOffset == 8) {
bitOffset = 0;
- offset++;
}
return v;
}
@Override
public final int readInteger() {
- int v = Platform.getInt(buffer, offset);
- if (bigEndianPlatform) {
- v = java.lang.Integer.reverseBytes(v);
- }
- offset += 4;
- return v;
+ return getBuffer(4).getInt();
}
@Override
public final long readLong() {
- long v = Platform.getLong(buffer, offset);
- if (bigEndianPlatform) {
- v = java.lang.Long.reverseBytes(v);
- }
- offset += 8;
- return v;
+ return getBuffer(8).getLong();
}
@Override
public final byte readByte() {
- return (byte)readInteger();
+ return (byte) readInteger();
}
@Override
public final float readFloat() {
- float v;
- if (!bigEndianPlatform) {
- v = Platform.getFloat(buffer, offset);
- } else {
- v = byteBuffer.getFloat(offset - Platform.BYTE_ARRAY_OFFSET);
- }
- offset += 4;
- return v;
+ return getBuffer(4).getFloat();
}
@Override
public final double readDouble() {
- double v;
- if (!bigEndianPlatform) {
- v = Platform.getDouble(buffer, offset);
- } else {
- v = byteBuffer.getDouble(offset - Platform.BYTE_ARRAY_OFFSET);
- }
- offset += 8;
- return v;
+ return getBuffer(8).getDouble();
--- End diff --
Yeah, that's a good point. It isn't read this way in Parquet, but it would
be useful for libraries that use this deep integration. Feel free to submit a
PR on Parquet and I'll review it there.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]