[GitHub] spark pull request #21070: SPARK-23972: Update Parquet to 1.10.0.

2018-04-15 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r181589911
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +58,139 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+for (int i = 0; i < total; i += 1) {
--- End diff --

I think that better integration between `ByteBuffer` and `ColumnVector` 
would be addressed in another PR.
Since tableCache also uses `ByteBuffer`, it would be good to discuss in 
another PR.
cc: @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: SPARK-23972: Update Parquet to 1.10.0.

2018-04-14 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r181564129
  
--- Diff: pom.xml ---
@@ -129,7 +129,7 @@
 
 1.2.1
 10.12.1.1
-1.8.2
+1.10.0
--- End diff --

Unlike [last 
time](https://github.com/apache/spark/commit/26a4cba3ffaadf382ca14980378965704ccef9ab),
 it seems that this PR touches `commons-pool` dependency together? Can we avoid 
this?
```
-commons-pool-1.5.4.jar
+commons-pool-1.6.jar
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: SPARK-23972: Update Parquet to 1.10.0.

2018-04-14 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r181563672
  
--- Diff: pom.xml ---
@@ -129,7 +129,7 @@
 
 1.2.1
 10.12.1.1
-1.8.2
+1.10.0
--- End diff --

@rdblue . To see the Jenkins result, could you resolve the dependency check 
failure with the following?
```
./dev/test-dependencies.sh --replace-manifest
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: SPARK-23972: Update Parquet to 1.10.0.

2018-04-13 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r181540952
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +58,139 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+for (int i = 0; i < total; i += 1) {
--- End diff --

Couldn't it also be writing to an `OffHeapColumnVector`? 
https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java#L199

If so, I think the copy is 1MB at a time: 
https://github.com/apache/spark/blob/master/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java#L189

I agree that ByteBuffer shouldn't be supported in this PR. But there's an 
opportunity to use the bulk copy APIs which would benefit from any future 
optimization that happens. Plus even if the copy does eventually become a loop 
inside the column vector implementation, there's more chance of the JIT 
unrolling the loop since it's smaller.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: SPARK-23972: Update Parquet to 1.10.0.

2018-04-13 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r181535144
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +58,139 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+for (int i = 0; i < total; i += 1) {
--- End diff --

It looks that way, but it actually replaces a similar loop: 
https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java#L283-L291

The main problem is that ByteBufffer isn't supported in the column vectors. 
That seems beyond the scope of this PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: SPARK-23972: Update Parquet to 1.10.0.

2018-04-13 Thread henryr
Github user henryr commented on a diff in the pull request:

https://github.com/apache/spark/pull/21070#discussion_r181528729
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 ---
@@ -63,115 +58,139 @@ public final void readBooleans(int total, 
WritableColumnVector c, int rowId) {
 }
   }
 
+  private ByteBuffer getBuffer(int length) {
+try {
+  return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
+} catch (IOException e) {
+  throw new ParquetDecodingException("Failed to read " + length + " 
bytes", e);
+}
+  }
+
   @Override
   public final void readIntegers(int total, WritableColumnVector c, int 
rowId) {
-c.putIntsLittleEndian(rowId, total, buffer, offset - 
Platform.BYTE_ARRAY_OFFSET);
-offset += 4 * total;
+int requiredBytes = total * 4;
+ByteBuffer buffer = getBuffer(requiredBytes);
+
+for (int i = 0; i < total; i += 1) {
--- End diff --

Here and elsewhere a bulk copy has been replaced by many smaller copies. It 
would be better to be able to use the bulk version. I think it would be 
preferable to at least have:

if (buffer.hasArray()) { 
  c.putIntsLittleEndian(rowId, total, buffer.array(), 0); 
} else {
  for (int i = 0 // ... etc
} 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21070: SPARK-23972: Update Parquet to 1.10.0.

2018-04-13 Thread rdblue
GitHub user rdblue opened a pull request:

https://github.com/apache/spark/pull/21070

SPARK-23972: Update Parquet to 1.10.0.

## What changes were proposed in this pull request?

This updates Parquet to 1.10.0 and updates the vectorized path for buffer 
management changes. Parquet 1.10.0 uses ByteBufferInputStream instead of byte 
arrays in encoders. This allows Parquet to break allocations into smaller 
chunks that are better for garbage collection.

## How was this patch tested?

Existing Parquet tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rdblue/spark 
SPARK-23972-update-parquet-to-1.10.0

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21070.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21070


commit 4df17a6e9726cb22e499d479a9ab48f5db18a538
Author: Ryan Blue 
Date:   2017-12-01T01:25:53Z

SPARK-23972: Update Parquet to 1.10.0.

This updates the vectorized path for changes in Parquet 1.10.0, which
uses ByteBufferInputStream instead of byte arrays in encoders. This
allows Parquet to break allocations into smaller chunks that are better
for garbage collection.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org