[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-05-08 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21070


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-05-08 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r186793493
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +59,157 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putIntsLittleEndian(rowId, total, buffer.array(), offset);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putInt(rowId + i, buffer.getInt());
+  }
+}
   }
 
   @Override
   public final void readLongs(int total, WritableColumnVector c, int 
rowId) {
-c.putLongsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 8 * total;
+int requiredBytes = total * 8;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putLongsLittleEndian(rowId, total, buffer.array(), offset);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putLong(rowId + i, buffer.getLong());
+  }
+}
   }
 
   @Override
   public final void readFloats(int total, WritableColumnVector c, int 
rowId) {
-c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putFloats(rowId, total, buffer.array(), offset);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putFloat(rowId + i, buffer.getFloat());
+  }
+}
   }
 
   @Override
   public final void readDoubles(int total, WritableColumnVector c, int 
rowId) {
-c.putDoubles(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 8 * total;
+int requiredBytes = total * 8;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putDoubles(rowId, total, buffer.array(), offset);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putDouble(rowId + i, buffer.getDouble());
+  }
+}
   }
 
   @Override
   public final void readBytes(int total, WritableColumnVector c, int 
rowId) {
-for (int i = 0; i < total; i++) {
-  // Bytes are stored as a 4-byte little endian int. Just read the 
first byte.
-  // TODO: consider pushing this in ColumnVector by adding a readBytes 
with a stride.
-  c.putByte(rowId + i, Platform.getByte(buffer, offset));
-  offset += 4;
+// Bytes are stored as a 4-byte little endian int. Just read the first 
byte.
+// TODO: consider pushing this in ColumnVector by adding a readBytes 
with a stride.
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+for (int i = 0; i < total; i += 1) {
+  c.putByte(rowId + i, buffer.get());
+  // skip the next 3 bytes
+  buffer.position(buffer.position() + 3);
 }
   }
 
   @Override
   public final boolean readBoolean() {
-byte b = Platform.getByte(buffer, offset);
-boolean v = (b & (1 << bitOffset)) != 0;
+// TODO: vectorize decoding and keep boolean[] instead of currentByte
+if (bitOffset == 0) {
+  try {
+currentByte = (byte) in.read();
+  } catch (IOException e) {
+throw new ParquetDecodingException("Failed to read a byte", e);
+  }
+}
+
+boolean v = (currentByte & (1 << bitOffset)) != 0;
 bitOffset += 1;
 if (bitOffset == 8) {
   bitOffset = 0;
-  offset++;
 }
 return v;
   }
 
   @Override
   public final int readInteger() {
-int v = Platform.getInt(buffer, offset);
-if (bigEndianPlatform) 

[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-05-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r186791067
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +59,157 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putIntsLittleEndian(rowId, total, buffer.array(), offset);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putInt(rowId + i, buffer.getInt());
+  }
+}
   }
 
   @Override
   public final void readLongs(int total, WritableColumnVector c, int 
rowId) {
-c.putLongsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 8 * total;
+int requiredBytes = total * 8;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putLongsLittleEndian(rowId, total, buffer.array(), offset);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putLong(rowId + i, buffer.getLong());
+  }
+}
   }
 
   @Override
   public final void readFloats(int total, WritableColumnVector c, int 
rowId) {
-c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putFloats(rowId, total, buffer.array(), offset);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putFloat(rowId + i, buffer.getFloat());
+  }
+}
   }
 
   @Override
   public final void readDoubles(int total, WritableColumnVector c, int 
rowId) {
-c.putDoubles(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 8 * total;
+int requiredBytes = total * 8;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putDoubles(rowId, total, buffer.array(), offset);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putDouble(rowId + i, buffer.getDouble());
+  }
+}
   }
 
   @Override
   public final void readBytes(int total, WritableColumnVector c, int 
rowId) {
-for (int i = 0; i < total; i++) {
-  // Bytes are stored as a 4-byte little endian int. Just read the 
first byte.
-  // TODO: consider pushing this in ColumnVector by adding a readBytes 
with a stride.
-  c.putByte(rowId + i, Platform.getByte(buffer, offset));
-  offset += 4;
+// Bytes are stored as a 4-byte little endian int. Just read the first 
byte.
+// TODO: consider pushing this in ColumnVector by adding a readBytes 
with a stride.
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+for (int i = 0; i < total; i += 1) {
+  c.putByte(rowId + i, buffer.get());
+  // skip the next 3 bytes
+  buffer.position(buffer.position() + 3);
 }
   }
 
   @Override
   public final boolean readBoolean() {
-byte b = Platform.getByte(buffer, offset);
-boolean v = (b & (1 << bitOffset)) != 0;
+// TODO: vectorize decoding and keep boolean[] instead of currentByte
+if (bitOffset == 0) {
+  try {
+currentByte = (byte) in.read();
+  } catch (IOException e) {
+throw new ParquetDecodingException("Failed to read a byte", e);
+  }
+}
+
+boolean v = (currentByte & (1 << bitOffset)) != 0;
 bitOffset += 1;
 if (bitOffset == 8) {
   bitOffset = 0;
-  offset++;
 }
 return v;
   }
 
   @Override
   public final int readInteger() {
-int v = Platform.getInt(buffer, offset);
-if 

[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-05-08 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r186789552
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
 ---
@@ -619,32 +608,37 @@ private int ceil8(int value) {
   /**
* Reads the next group.
*/
-  private void readNextGroup()  {
-int header = readUnsignedVarInt();
-this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
-switch (mode) {
-  case RLE:
-this.currentCount = header >>> 1;
-this.currentValue = readIntLittleEndianPaddedOnBitWidth();
-return;
-  case PACKED:
-int numGroups = header >>> 1;
-this.currentCount = numGroups * 8;
-int bytesToRead = ceil8(this.currentCount * this.bitWidth);
-
-if (this.currentBuffer.length < this.currentCount) {
-  this.currentBuffer = new int[this.currentCount];
-}
-currentBufferIdx = 0;
-int valueIndex = 0;
-for (int byteIndex = offset; valueIndex < this.currentCount; 
byteIndex += this.bitWidth) {
-  this.packer.unpack8Values(in, byteIndex, this.currentBuffer, 
valueIndex);
-  valueIndex += 8;
-}
-offset += bytesToRead;
-return;
-  default:
-throw new ParquetDecodingException("not a valid mode " + 
this.mode);
+  private void readNextGroup() {
+try {
+  int header = readUnsignedVarInt();
+  this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
+  switch (mode) {
+case RLE:
+  this.currentCount = header >>> 1;
+  this.currentValue = readIntLittleEndianPaddedOnBitWidth();
+  return;
+case PACKED:
+  int numGroups = header >>> 1;
+  this.currentCount = numGroups * 8;
+
+  if (this.currentBuffer.length < this.currentCount) {
+this.currentBuffer = new int[this.currentCount];
+  }
+  currentBufferIdx = 0;
+  int valueIndex = 0;
+  while (valueIndex < this.currentCount) {
+// values are bit packed 8 at a time, so reading bitWidth will 
always work
+ByteBuffer buffer = in.slice(bitWidth);
+this.packer.unpack8Values(
+buffer, buffer.arrayOffset() + buffer.position(), 
this.currentBuffer, valueIndex);
--- End diff --

Good catch. Fixed to remove the call to `arrayOffset`. It should work with 
both on- and off-heap buffers now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-05-08 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r186785996
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +59,157 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putIntsLittleEndian(rowId, total, buffer.array(), offset);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putInt(rowId + i, buffer.getInt());
+  }
+}
   }
 
   @Override
   public final void readLongs(int total, WritableColumnVector c, int 
rowId) {
-c.putLongsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 8 * total;
+int requiredBytes = total * 8;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putLongsLittleEndian(rowId, total, buffer.array(), offset);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putLong(rowId + i, buffer.getLong());
+  }
+}
   }
 
   @Override
   public final void readFloats(int total, WritableColumnVector c, int 
rowId) {
-c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putFloats(rowId, total, buffer.array(), offset);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putFloat(rowId + i, buffer.getFloat());
+  }
+}
   }
 
   @Override
   public final void readDoubles(int total, WritableColumnVector c, int 
rowId) {
-c.putDoubles(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 8 * total;
+int requiredBytes = total * 8;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putDoubles(rowId, total, buffer.array(), offset);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putDouble(rowId + i, buffer.getDouble());
+  }
+}
   }
 
   @Override
   public final void readBytes(int total, WritableColumnVector c, int 
rowId) {
-for (int i = 0; i < total; i++) {
-  // Bytes are stored as a 4-byte little endian int. Just read the 
first byte.
-  // TODO: consider pushing this in ColumnVector by adding a readBytes 
with a stride.
-  c.putByte(rowId + i, Platform.getByte(buffer, offset));
-  offset += 4;
+// Bytes are stored as a 4-byte little endian int. Just read the first 
byte.
+// TODO: consider pushing this in ColumnVector by adding a readBytes 
with a stride.
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+for (int i = 0; i < total; i += 1) {
+  c.putByte(rowId + i, buffer.get());
+  // skip the next 3 bytes
+  buffer.position(buffer.position() + 3);
 }
   }
 
   @Override
   public final boolean readBoolean() {
-byte b = Platform.getByte(buffer, offset);
-boolean v = (b & (1 << bitOffset)) != 0;
+// TODO: vectorize decoding and keep boolean[] instead of currentByte
+if (bitOffset == 0) {
+  try {
+currentByte = (byte) in.read();
+  } catch (IOException e) {
+throw new ParquetDecodingException("Failed to read a byte", e);
+  }
+}
+
+boolean v = (currentByte & (1 << bitOffset)) != 0;
 bitOffset += 1;
 if (bitOffset == 8) {
   bitOffset = 0;
-  offset++;
 }
 return v;
   }
 
   @Override
   public final int readInteger() {
-int v = Platform.getInt(buffer, offset);
-if (bigEndianPlatform) 

[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-05-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r186604021
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +59,157 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putIntsLittleEndian(rowId, total, buffer.array(), offset);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putInt(rowId + i, buffer.getInt());
+  }
+}
   }
 
   @Override
   public final void readLongs(int total, WritableColumnVector c, int 
rowId) {
-c.putLongsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 8 * total;
+int requiredBytes = total * 8;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putLongsLittleEndian(rowId, total, buffer.array(), offset);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putLong(rowId + i, buffer.getLong());
+  }
+}
   }
 
   @Override
   public final void readFloats(int total, WritableColumnVector c, int 
rowId) {
-c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putFloats(rowId, total, buffer.array(), offset);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putFloat(rowId + i, buffer.getFloat());
+  }
+}
   }
 
   @Override
   public final void readDoubles(int total, WritableColumnVector c, int 
rowId) {
-c.putDoubles(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 8 * total;
+int requiredBytes = total * 8;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putDoubles(rowId, total, buffer.array(), offset);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putDouble(rowId + i, buffer.getDouble());
+  }
+}
   }
 
   @Override
   public final void readBytes(int total, WritableColumnVector c, int 
rowId) {
-for (int i = 0; i < total; i++) {
-  // Bytes are stored as a 4-byte little endian int. Just read the 
first byte.
-  // TODO: consider pushing this in ColumnVector by adding a readBytes 
with a stride.
-  c.putByte(rowId + i, Platform.getByte(buffer, offset));
-  offset += 4;
+// Bytes are stored as a 4-byte little endian int. Just read the first 
byte.
+// TODO: consider pushing this in ColumnVector by adding a readBytes 
with a stride.
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+for (int i = 0; i < total; i += 1) {
+  c.putByte(rowId + i, buffer.get());
+  // skip the next 3 bytes
+  buffer.position(buffer.position() + 3);
 }
   }
 
   @Override
   public final boolean readBoolean() {
-byte b = Platform.getByte(buffer, offset);
-boolean v = (b & (1 << bitOffset)) != 0;
+// TODO: vectorize decoding and keep boolean[] instead of currentByte
+if (bitOffset == 0) {
+  try {
+currentByte = (byte) in.read();
+  } catch (IOException e) {
+throw new ParquetDecodingException("Failed to read a byte", e);
+  }
+}
+
+boolean v = (currentByte & (1 << bitOffset)) != 0;
 bitOffset += 1;
 if (bitOffset == 8) {
   bitOffset = 0;
-  offset++;
 }
 return v;
   }
 
   @Override
   public final int readInteger() {
-int v = Platform.getInt(buffer, offset);
-if 

[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-05-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r186603708
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
 ---
@@ -619,32 +608,37 @@ private int ceil8(int value) {
   /**
* Reads the next group.
*/
-  private void readNextGroup()  {
-int header = readUnsignedVarInt();
-this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
-switch (mode) {
-  case RLE:
-this.currentCount = header >>> 1;
-this.currentValue = readIntLittleEndianPaddedOnBitWidth();
-return;
-  case PACKED:
-int numGroups = header >>> 1;
-this.currentCount = numGroups * 8;
-int bytesToRead = ceil8(this.currentCount * this.bitWidth);
-
-if (this.currentBuffer.length < this.currentCount) {
-  this.currentBuffer = new int[this.currentCount];
-}
-currentBufferIdx = 0;
-int valueIndex = 0;
-for (int byteIndex = offset; valueIndex < this.currentCount; 
byteIndex += this.bitWidth) {
-  this.packer.unpack8Values(in, byteIndex, this.currentBuffer, 
valueIndex);
-  valueIndex += 8;
-}
-offset += bytesToRead;
-return;
-  default:
-throw new ParquetDecodingException("not a valid mode " + 
this.mode);
+  private void readNextGroup() {
+try {
+  int header = readUnsignedVarInt();
+  this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
+  switch (mode) {
+case RLE:
+  this.currentCount = header >>> 1;
+  this.currentValue = readIntLittleEndianPaddedOnBitWidth();
+  return;
+case PACKED:
+  int numGroups = header >>> 1;
+  this.currentCount = numGroups * 8;
+
+  if (this.currentBuffer.length < this.currentCount) {
+this.currentBuffer = new int[this.currentCount];
+  }
+  currentBufferIdx = 0;
+  int valueIndex = 0;
+  while (valueIndex < this.currentCount) {
+// values are bit packed 8 at a time, so reading bitWidth will 
always work
+ByteBuffer buffer = in.slice(bitWidth);
+this.packer.unpack8Values(
+buffer, buffer.arrayOffset() + buffer.position(), 
this.currentBuffer, valueIndex);
--- End diff --

shall we assume the `ByteBuffer` may not be on-heap?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-05-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r186603010
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +59,157 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putIntsLittleEndian(rowId, total, buffer.array(), offset);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putInt(rowId + i, buffer.getInt());
+  }
+}
   }
 
   @Override
   public final void readLongs(int total, WritableColumnVector c, int 
rowId) {
-c.putLongsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 8 * total;
+int requiredBytes = total * 8;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putLongsLittleEndian(rowId, total, buffer.array(), offset);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putLong(rowId + i, buffer.getLong());
+  }
+}
   }
 
   @Override
   public final void readFloats(int total, WritableColumnVector c, int 
rowId) {
-c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putFloats(rowId, total, buffer.array(), offset);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putFloat(rowId + i, buffer.getFloat());
+  }
+}
   }
 
   @Override
   public final void readDoubles(int total, WritableColumnVector c, int 
rowId) {
-c.putDoubles(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 8 * total;
+int requiredBytes = total * 8;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putDoubles(rowId, total, buffer.array(), offset);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putDouble(rowId + i, buffer.getDouble());
+  }
+}
   }
 
   @Override
   public final void readBytes(int total, WritableColumnVector c, int 
rowId) {
-for (int i = 0; i < total; i++) {
-  // Bytes are stored as a 4-byte little endian int. Just read the 
first byte.
-  // TODO: consider pushing this in ColumnVector by adding a readBytes 
with a stride.
-  c.putByte(rowId + i, Platform.getByte(buffer, offset));
-  offset += 4;
+// Bytes are stored as a 4-byte little endian int. Just read the first 
byte.
+// TODO: consider pushing this in ColumnVector by adding a readBytes 
with a stride.
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+for (int i = 0; i < total; i += 1) {
+  c.putByte(rowId + i, buffer.get());
+  // skip the next 3 bytes
+  buffer.position(buffer.position() + 3);
 }
   }
 
   @Override
   public final boolean readBoolean() {
-byte b = Platform.getByte(buffer, offset);
-boolean v = (b & (1 << bitOffset)) != 0;
+// TODO: vectorize decoding and keep boolean[] instead of currentByte
+if (bitOffset == 0) {
+  try {
+currentByte = (byte) in.read();
+  } catch (IOException e) {
+throw new ParquetDecodingException("Failed to read a byte", e);
+  }
+}
+
+boolean v = (currentByte & (1 << bitOffset)) != 0;
 bitOffset += 1;
 if (bitOffset == 8) {
   bitOffset = 0;
-  offset++;
 }
 return v;
   }
 
   @Override
   public final int readInteger() {
-int v = Platform.getInt(buffer, offset);
-if 

[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-05-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r186602846
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +59,157 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putIntsLittleEndian(rowId, total, buffer.array(), offset);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putInt(rowId + i, buffer.getInt());
+  }
+}
   }
 
   @Override
   public final void readLongs(int total, WritableColumnVector c, int 
rowId) {
-c.putLongsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 8 * total;
+int requiredBytes = total * 8;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putLongsLittleEndian(rowId, total, buffer.array(), offset);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putLong(rowId + i, buffer.getLong());
+  }
+}
   }
 
   @Override
   public final void readFloats(int total, WritableColumnVector c, int 
rowId) {
-c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putFloats(rowId, total, buffer.array(), offset);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putFloat(rowId + i, buffer.getFloat());
+  }
+}
   }
 
   @Override
   public final void readDoubles(int total, WritableColumnVector c, int 
rowId) {
-c.putDoubles(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 8 * total;
+int requiredBytes = total * 8;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putDoubles(rowId, total, buffer.array(), offset);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putDouble(rowId + i, buffer.getDouble());
+  }
+}
   }
 
   @Override
   public final void readBytes(int total, WritableColumnVector c, int 
rowId) {
-for (int i = 0; i < total; i++) {
-  // Bytes are stored as a 4-byte little endian int. Just read the 
first byte.
-  // TODO: consider pushing this in ColumnVector by adding a readBytes 
with a stride.
-  c.putByte(rowId + i, Platform.getByte(buffer, offset));
-  offset += 4;
+// Bytes are stored as a 4-byte little endian int. Just read the first 
byte.
+// TODO: consider pushing this in ColumnVector by adding a readBytes 
with a stride.
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+for (int i = 0; i < total; i += 1) {
+  c.putByte(rowId + i, buffer.get());
+  // skip the next 3 bytes
+  buffer.position(buffer.position() + 3);
 }
   }
 
   @Override
   public final boolean readBoolean() {
-byte b = Platform.getByte(buffer, offset);
-boolean v = (b & (1 << bitOffset)) != 0;
+// TODO: vectorize decoding and keep boolean[] instead of currentByte
+if (bitOffset == 0) {
+  try {
+currentByte = (byte) in.read();
+  } catch (IOException e) {
+throw new ParquetDecodingException("Failed to read a byte", e);
+  }
+}
+
+boolean v = (currentByte & (1 << bitOffset)) != 0;
 bitOffset += 1;
 if (bitOffset == 8) {
   bitOffset = 0;
-  offset++;
 }
 return v;
   }
 
   @Override
   public final int readInteger() {
-int v = Platform.getInt(buffer, offset);
-if 

[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-05-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r186469029
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -345,7 +345,7 @@ object SQLConf {
   "snappy, gzip, lzo.")
 .stringConf
 .transform(_.toLowerCase(Locale.ROOT))
-.checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo"))
+.checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo", 
"lz4", "brotli", "zstd"))
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-05-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r186468896
  
--- Diff: dev/deps/spark-deps-hadoop-2.7 ---
@@ -163,13 +163,13 @@ orc-mapreduce-1.4.3-nohive.jar
 oro-2.0.8.jar
 osgi-resource-locator-1.0.1.jar
 paranamer-2.8.jar
-parquet-column-1.8.2.jar
-parquet-common-1.8.2.jar
-parquet-encoding-1.8.2.jar
-parquet-format-2.3.1.jar
-parquet-hadoop-1.8.2.jar
+parquet-column-1.10.0.jar
+parquet-common-1.10.0.jar
+parquet-encoding-1.10.0.jar
+parquet-format-2.4.0.jar
+parquet-hadoop-1.10.0.jar
 parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.8.2.jar
+parquet-jackson-1.10.0.jar
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-05-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r186464674
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +59,157 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
--- End diff --

No, there is no guarantee that the buffer from Parquet is on the heap.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-05-07 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r186464557
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +59,157 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
--- End diff --

No, `slice` doesn't copy. That's why we're using `ByteBuffer` now, to avoid 
copy operations.

Setting the byte order to `LITTLE_ENDIAN` is correct because it is for the 
buffer and Parquet buffers store values in little endian: 
https://github.com/apache/parquet-format/blob/master/Encodings.md.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-05-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r186358096
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +59,157 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
--- End diff --

previously we only call `.order(ByteOrder.LITTLE_ENDIAN)` if it's a 
big-endian platform. Is it OK to alway call it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-05-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r186357714
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +59,157 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
--- End diff --

shall we assert `buffer.hasArray()` is always true?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-05-07 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r186357371
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +59,157 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
--- End diff --

does `in.slice(length)` do copy?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-05-06 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r186325093
  
--- Diff: dev/deps/spark-deps-hadoop-2.7 ---
@@ -163,13 +163,13 @@ orc-mapreduce-1.4.3-nohive.jar
 oro-2.0.8.jar
 osgi-resource-locator-1.0.1.jar
 paranamer-2.8.jar
-parquet-column-1.8.2.jar
-parquet-common-1.8.2.jar
-parquet-encoding-1.8.2.jar
-parquet-format-2.3.1.jar
-parquet-hadoop-1.8.2.jar
+parquet-column-1.10.0.jar
+parquet-common-1.10.0.jar
+parquet-encoding-1.10.0.jar
+parquet-format-2.4.0.jar
+parquet-hadoop-1.10.0.jar
 parquet-hadoop-bundle-1.6.0.jar
-parquet-jackson-1.8.2.jar
+parquet-jackson-1.10.0.jar
--- End diff --

(btw, don't forget to fix 
https://github.com/apache/spark/blob/ce7ba2e98e0a3b038e881c271b5905058c43155b/dev/deps/spark-deps-hadoop-3.1#L184-L190
 too)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-05-05 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r186276440
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -345,7 +345,7 @@ object SQLConf {
   "snappy, gzip, lzo.")
 .stringConf
 .transform(_.toLowerCase(Locale.ROOT))
-.checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo"))
+.checkValues(Set("none", "uncompressed", "snappy", "gzip", "lzo", 
"lz4", "brotli", "zstd"))
--- End diff --

Could you update 
[sql-programming-guide.md](https://github.com/apache/spark/blame/master/docs/sql-programming-guide.md#L967)
 together?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-04-25 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r184156731
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 ---
@@ -503,7 +503,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with 
SharedSQLContext {
 case plan: InMemoryRelation => plan
   }.head
   // InMemoryRelation's stats is file size before the underlying 
RDD is materialized
-  assert(inMemoryRelation.computeStats().sizeInBytes === 740)
+  assert(inMemoryRelation.computeStats().sizeInBytes === 800)
--- End diff --

This is data dependent so it is hard to estimate. We write the stats for 
older readers when the type uses a signed sort order, so it is limited to 
mostly primitive types and won't be written for byte arrays or utf8 data. That 
limits the size to 16 bytes + thrift overhead per page and you might have about 
100 pages per row group. So 1.5k per 128MB, which is about 0.001%.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-04-25 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r184144930
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 ---
@@ -503,7 +503,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with 
SharedSQLContext {
 case plan: InMemoryRelation => plan
   }.head
   // InMemoryRelation's stats is file size before the underlying 
RDD is materialized
-  assert(inMemoryRelation.computeStats().sizeInBytes === 740)
+  assert(inMemoryRelation.computeStats().sizeInBytes === 800)
--- End diff --

Then it looks fine, but the new metadata max/min fields are added in file 
metadata, column metadata and page header metadata? Any formula we can use to 
calculate the size increase?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-04-25 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r184139736
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 ---
@@ -503,7 +503,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with 
SharedSQLContext {
 case plan: InMemoryRelation => plan
   }.head
   // InMemoryRelation's stats is file size before the underlying 
RDD is materialized
-  assert(inMemoryRelation.computeStats().sizeInBytes === 740)
+  assert(inMemoryRelation.computeStats().sizeInBytes === 800)
--- End diff --

Parquet fixed a problem with value ordering in statistics, which required 
adding new metadata min and max fields. For older readers, Parquet also writes 
the old values when it makes sense to. This is a slight increase in overhead, 
which is more noticeable when files contain just a few records.

Don't be alarmed at the percentage difference here, it is just a small 
file. Parquet isn't increasing file sizes by 8%, that would be silly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-04-25 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r184116235
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
 ---
@@ -503,7 +503,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with 
SharedSQLContext {
 case plan: InMemoryRelation => plan
   }.head
   // InMemoryRelation's stats is file size before the underlying 
RDD is materialized
-  assert(inMemoryRelation.computeStats().sizeInBytes === 740)
+  assert(inMemoryRelation.computeStats().sizeInBytes === 800)
--- End diff --

Our optimizer uses the statistics to decide the plans (e.g., in join 
algorithm selection). Thus, the plans could be completely different if the file 
size increases by 8 percents. Could you give us more contexts? cc @rdblue 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-04-18 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r182563063
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +59,159 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putIntsLittleEndian(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putInt(rowId + i, buffer.getInt());
+  }
+}
   }
 
   @Override
   public final void readLongs(int total, WritableColumnVector c, int 
rowId) {
-c.putLongsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 8 * total;
+int requiredBytes = total * 8;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putLongsLittleEndian(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putLong(rowId + i, buffer.getLong());
+  }
+}
   }
 
   @Override
   public final void readFloats(int total, WritableColumnVector c, int 
rowId) {
-c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putFloats(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putFloat(rowId + i, buffer.getFloat());
+  }
+}
   }
 
   @Override
   public final void readDoubles(int total, WritableColumnVector c, int 
rowId) {
-c.putDoubles(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 8 * total;
+int requiredBytes = total * 8;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putDoubles(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putDouble(rowId + i, buffer.getDouble());
+  }
+}
+  }
+
+  private byte getByte() {
+try {
+  return (byte) in.read();
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read a byte", e);
+}
   }
 
   @Override
   public final void readBytes(int total, WritableColumnVector c, int 
rowId) {
-for (int i = 0; i < total; i++) {
-  // Bytes are stored as a 4-byte little endian int. Just read the 
first byte.
-  // TODO: consider pushing this in ColumnVector by adding a readBytes 
with a stride.
-  c.putByte(rowId + i, Platform.getByte(buffer, offset));
-  offset += 4;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+for (int i = 0; i < total; i += 1) {
+  c.putByte(rowId + i, buffer.get());
+  // skip the next 3 bytes
+  buffer.position(buffer.position() + 3);
 }
   }
 
   @Override
   public final boolean readBoolean() {
-byte b = Platform.getByte(buffer, offset);
-boolean v = (b & (1 << bitOffset)) != 0;
+// TODO: vectorize decoding and keep boolean[] instead of currentByte
+if (bitOffset == 0) {
+  currentByte = getByte();
+}
+
+boolean v = (currentByte & (1 << bitOffset)) != 0;
 bitOffset += 1;
 if (bitOffset == 8) {
   bitOffset = 0;
-  offset++;
 }
 return v;
   }
 
   @Override
   public final int readInteger() {
-int v = Platform.getInt(buffer, offset);
-if 

[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-04-17 Thread scottcarey
Github user scottcarey commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r182161734
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +59,159 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putIntsLittleEndian(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putInt(rowId + i, buffer.getInt());
+  }
+}
   }
 
   @Override
   public final void readLongs(int total, WritableColumnVector c, int 
rowId) {
-c.putLongsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 8 * total;
+int requiredBytes = total * 8;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putLongsLittleEndian(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putLong(rowId + i, buffer.getLong());
+  }
+}
   }
 
   @Override
   public final void readFloats(int total, WritableColumnVector c, int 
rowId) {
-c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putFloats(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putFloat(rowId + i, buffer.getFloat());
+  }
+}
   }
 
   @Override
   public final void readDoubles(int total, WritableColumnVector c, int 
rowId) {
-c.putDoubles(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 8 * total;
+int requiredBytes = total * 8;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putDoubles(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putDouble(rowId + i, buffer.getDouble());
+  }
+}
+  }
+
+  private byte getByte() {
+try {
+  return (byte) in.read();
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read a byte", e);
+}
   }
 
   @Override
   public final void readBytes(int total, WritableColumnVector c, int 
rowId) {
-for (int i = 0; i < total; i++) {
-  // Bytes are stored as a 4-byte little endian int. Just read the 
first byte.
-  // TODO: consider pushing this in ColumnVector by adding a readBytes 
with a stride.
-  c.putByte(rowId + i, Platform.getByte(buffer, offset));
-  offset += 4;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+for (int i = 0; i < total; i += 1) {
+  c.putByte(rowId + i, buffer.get());
+  // skip the next 3 bytes
+  buffer.position(buffer.position() + 3);
 }
   }
 
   @Override
   public final boolean readBoolean() {
-byte b = Platform.getByte(buffer, offset);
-boolean v = (b & (1 << bitOffset)) != 0;
+// TODO: vectorize decoding and keep boolean[] instead of currentByte
+if (bitOffset == 0) {
+  currentByte = getByte();
+}
+
+boolean v = (currentByte & (1 << bitOffset)) != 0;
 bitOffset += 1;
 if (bitOffset == 8) {
   bitOffset = 0;
-  offset++;
 }
 return v;
   }
 
   @Override
   public final int readInteger() {
-int v = Platform.getInt(buffer, offset);
-if 

[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-04-16 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r181901031
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +59,159 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putIntsLittleEndian(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putInt(rowId + i, buffer.getInt());
+  }
+}
   }
 
   @Override
   public final void readLongs(int total, WritableColumnVector c, int 
rowId) {
-c.putLongsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 8 * total;
+int requiredBytes = total * 8;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putLongsLittleEndian(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putLong(rowId + i, buffer.getLong());
+  }
+}
   }
 
   @Override
   public final void readFloats(int total, WritableColumnVector c, int 
rowId) {
-c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putFloats(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putFloat(rowId + i, buffer.getFloat());
+  }
+}
   }
 
   @Override
   public final void readDoubles(int total, WritableColumnVector c, int 
rowId) {
-c.putDoubles(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 8 * total;
+int requiredBytes = total * 8;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putDoubles(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putDouble(rowId + i, buffer.getDouble());
+  }
+}
+  }
+
+  private byte getByte() {
--- End diff --

Is this used anywhere other than line 154? If not, can be inlined.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-04-16 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r181902045
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +59,159 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putIntsLittleEndian(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putInt(rowId + i, buffer.getInt());
+  }
+}
   }
 
   @Override
   public final void readLongs(int total, WritableColumnVector c, int 
rowId) {
-c.putLongsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 8 * total;
+int requiredBytes = total * 8;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putLongsLittleEndian(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putLong(rowId + i, buffer.getLong());
+  }
+}
   }
 
   @Override
   public final void readFloats(int total, WritableColumnVector c, int 
rowId) {
-c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putFloats(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putFloat(rowId + i, buffer.getFloat());
+  }
+}
   }
 
   @Override
   public final void readDoubles(int total, WritableColumnVector c, int 
rowId) {
-c.putDoubles(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 8 * total;
+int requiredBytes = total * 8;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+if (buffer.hasArray()) {
+  int offset = buffer.arrayOffset() + buffer.position();
+  c.putDoubles(rowId, total, buffer.array(), offset - 
Platform.BYTE_ARRAY_OFFSET);
+} else {
+  for (int i = 0; i < total; i += 1) {
+c.putDouble(rowId + i, buffer.getDouble());
+  }
+}
+  }
+
+  private byte getByte() {
+try {
+  return (byte) in.read();
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read a byte", e);
+}
   }
 
   @Override
   public final void readBytes(int total, WritableColumnVector c, int 
rowId) {
-for (int i = 0; i < total; i++) {
-  // Bytes are stored as a 4-byte little endian int. Just read the 
first byte.
-  // TODO: consider pushing this in ColumnVector by adding a readBytes 
with a stride.
-  c.putByte(rowId + i, Platform.getByte(buffer, offset));
-  offset += 4;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+for (int i = 0; i < total; i += 1) {
+  c.putByte(rowId + i, buffer.get());
--- End diff --

could you preserve the comment about "Bytes are stored as 4-byte little 
endian int. Just read the first byte."?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-04-16 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r181899509
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +58,139 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+for (int i = 0; i < total; i += 1) {
--- End diff --

Updated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-04-16 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r181889287
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +58,139 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+for (int i = 0; i < total; i += 1) {
--- End diff --

It seems worth it to me, to be defensive against performance changes - but 
feel free to punt it to me as a follow-on patch if you'd rather.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-04-16 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r181883674
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +58,139 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+for (int i = 0; i < total; i += 1) {
--- End diff --

Yeah, exactly. We can detect that the buffer is backed by an array and use 
the other call. If we think this is worth it as a short-term fix, I'll update 
this PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-04-16 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r181882476
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +58,139 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+for (int i = 0; i < total; i += 1) {
--- End diff --

Isn't that what `hasArray()` is for though? If the buffers are backed by a 
byte array, `hasArray()` returns true and accessing the byte array via 
`array()` should be 0 cost. (If `array()` actually copies any data, that would 
invalidate this line of reasoning but would also be unexpected).

So for example, here you'd have:

public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
int requiredBytes = total * 4;
ByteBuffer buffer = getBuffer(requiredBytes);
if (buffer.hasArray()) {
  c.putIntsLittleEndian(rowId, total, buffer.array(), 0); 
} else {
for (int i = 0; i < total; i += 1) {
c.putInt(rowId + i, buffer.getInt());
}
}
}

This seems to be the same pattern that's in `readBinary()`, below. Let me 
know if I'm missing something!



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-04-16 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r181864492
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +58,139 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+for (int i = 0; i < total; i += 1) {
--- End diff --

The reason why I moved the loop out was that I didn't think that using the 
byte[] API would actually be better. Parquet doesn't guarantee that these byte 
buffers are on the heap and backed by byte arrays, so we would need to copy the 
bytes out of the buffer into an array only to copy them again in the column 
vector call. Two copies (and possibly allocation) seemed worse than moving the 
loop.

We could catch the case where the buffers are on-heap and make the bulk 
call. The drawback is that this will be temporary and will be removed when 
ColumnVector supports ByteBuffer. And, it only works/matters when Parquet uses 
on-heap buffers and Spark uses off-heap buffers. Is that worth the change to 
this PR? I can take a shot at it if you think it is. I'd rather update 
ColumnVector and then follow up though.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-04-16 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r181846514
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +58,139 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+for (int i = 0; i < total; i += 1) {
--- End diff --

Agreed that fixing the `ByteBuffer` / `ColumnVector` interaction should be 
dealt with elsewhere. I'm just raising the possibility of _regressing_ the read 
path here because the copies are less efficient. Since it's going to be a while 
before 2.4.0, that might be ok if we commit to fixing it - but it superficially 
seems like a manageable change to the PR since the code to call the bulk APIs 
is already there.  What do you think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10....

2018-04-16 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r181826671
  
--- Diff: pom.xml ---
@@ -129,7 +129,7 @@
 
 1.2.1
 10.12.1.1
-1.8.2
+1.10.0
--- End diff --

I excluded the commons-pool dependency from parquet-hadoop to avoid this. I 
also tested the latest Parquet release with commons-pool 1.5.4 and everything 
passes. I don't think it actually requires 1.6.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org