spark git commit: [SPARK-22092] Reallocation in OffHeapColumnVector.reserveInternal corrupts struct and array data

2017-09-23 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 c0a34a9ff -> 1a829df94


[SPARK-22092] Reallocation in OffHeapColumnVector.reserveInternal corrupts 
struct and array data

`OffHeapColumnVector.reserveInternal()` will only copy already inserted values 
during reallocation if `data != null`. In vectors containing arrays or structs 
this is incorrect, since there field `data` is not used at all. We need to 
check `nulls` instead.

Adds new tests to `ColumnVectorSuite` that reproduce the errors.

Author: Ala Luszczak 

Closes #19323 from ala/port-vector-realloc.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1a829df9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1a829df9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1a829df9

Branch: refs/heads/branch-2.2
Commit: 1a829df94a9cfee4395353b0f93fb5bcd628dce4
Parents: c0a34a9
Author: Ala Luszczak 
Authored: Sat Sep 23 16:09:47 2017 +0200
Committer: Herman van Hovell 
Committed: Sat Sep 23 16:09:47 2017 +0200

--
 .../vectorized/OffHeapColumnVector.java |   2 +-
 .../vectorized/ColumnVectorSuite.scala  | 227 +++
 2 files changed, 228 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1a829df9/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
--
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index a7d3744..cda7f2f 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -436,7 +436,7 @@ public final class OffHeapColumnVector extends ColumnVector 
{
   // Split out the slow path.
   @Override
   protected void reserveInternal(int newCapacity) {
-int oldCapacity = (this.data == 0L) ? 0 : capacity;
+int oldCapacity = (nulls == 0L) ? 0 : capacity;
 if (this.resultArray != null) {
   this.lengthData =
   Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 
4);

http://git-wip-us.apache.org/repos/asf/spark/blob/1a829df9/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
new file mode 100644
index 000..19b93c9
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.vectorized
+
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach {
+
+  var testVector: ColumnVector = _
+
+  private def allocate(capacity: Int, dt: DataType): ColumnVector = {
+new OnHeapColumnVector(capacity, dt)
+  }
+
+  override def afterEach(): Unit = {
+testVector.close()
+  }
+
+  test("boolean") {
+testVector = allocate(10, BooleanType)
+(0 until 10).foreach { i =>
+  testVector.appendBoolean(i % 2 == 0)
+}
+
+val array = new ColumnVector.Array(testVector)
+
+(0 until 10).foreach { i =>
+  assert(array.getBoolean(i) === (i % 2 == 0))
+}
+  }
+
+  test("byte") {
+testVector = allocate(10, ByteType)
+(0 until 10).foreach { i =>
+  testVector.appendByte(i.toByte)
+}
+
+val array = 

spark git commit: [SPARK-22092] Reallocation in OffHeapColumnVector.reserveInternal corrupts struct and array data

2017-09-22 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 10e37f6eb -> d2b2932d8


[SPARK-22092] Reallocation in OffHeapColumnVector.reserveInternal corrupts 
struct and array data

## What changes were proposed in this pull request?

`OffHeapColumnVector.reserveInternal()` will only copy already inserted values 
during reallocation if `data != null`. In vectors containing arrays or structs 
this is incorrect, since there field `data` is not used at all. We need to 
check `nulls` instead.

## How was this patch tested?

Adds new tests to `ColumnVectorSuite` that reproduce the errors.

Author: Ala Luszczak 

Closes #19308 from ala/vector-realloc.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d2b2932d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d2b2932d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d2b2932d

Branch: refs/heads/master
Commit: d2b2932d8be01dee31983121f6fffd16177bf48a
Parents: 10e37f6
Author: Ala Luszczak 
Authored: Fri Sep 22 15:31:43 2017 +0200
Committer: Herman van Hovell 
Committed: Fri Sep 22 15:31:43 2017 +0200

--
 .../vectorized/OffHeapColumnVector.java |  2 +-
 .../vectorized/ColumnVectorSuite.scala  | 26 
 2 files changed, 27 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d2b2932d/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
--
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 3568275..e1d3685 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -515,7 +515,7 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
   // Split out the slow path.
   @Override
   protected void reserveInternal(int newCapacity) {
-int oldCapacity = (this.data == 0L) ? 0 : capacity;
+int oldCapacity = (nulls == 0L) ? 0 : capacity;
 if (this.resultArray != null) {
   this.lengthData =
   Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 
4);

http://git-wip-us.apache.org/repos/asf/spark/blob/d2b2932d/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
index 998067a..f7b06c9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala
@@ -198,4 +198,30 @@ class ColumnVectorSuite extends SparkFunSuite with 
BeforeAndAfterEach {
 assert(array.get(1, schema).asInstanceOf[ColumnarBatch.Row].get(0, 
IntegerType) === 456)
 assert(array.get(1, schema).asInstanceOf[ColumnarBatch.Row].get(1, 
DoubleType) === 5.67)
   }
+
+  test("[SPARK-22092] off-heap column vector reallocation corrupts array 
data") {
+val arrayType = ArrayType(IntegerType, true)
+testVector = new OffHeapColumnVector(8, arrayType)
+
+val data = testVector.arrayData()
+(0 until 8).foreach(i => data.putInt(i, i))
+(0 until 8).foreach(i => testVector.putArray(i, i, 1))
+
+// Increase vector's capacity and reallocate the data to new bigger 
buffers.
+testVector.reserve(16)
+
+// Check that none of the values got lost/overwritten.
+val array = new ColumnVector.Array(testVector)
+(0 until 8).foreach { i =>
+  assert(array.get(i, arrayType).asInstanceOf[ArrayData].toIntArray() === 
Array(i))
+}
+  }
+
+  test("[SPARK-22092] off-heap column vector reallocation corrupts struct 
nullability") {
+val structType = new StructType().add("int", IntegerType).add("double", 
DoubleType)
+testVector = new OffHeapColumnVector(8, structType)
+(0 until 8).foreach(i => if (i % 2 == 0) testVector.putNull(i) else 
testVector.putNotNull(i))
+testVector.reserve(16)
+(0 until 8).foreach(i => assert(testVector.isNullAt(i) == (i % 2 == 0)))
+  }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org