spark git commit: [SPARK-22092] Reallocation in OffHeapColumnVector.reserveInternal corrupts struct and array data
Repository: spark Updated Branches: refs/heads/branch-2.2 c0a34a9ff -> 1a829df94 [SPARK-22092] Reallocation in OffHeapColumnVector.reserveInternal corrupts struct and array data `OffHeapColumnVector.reserveInternal()` will only copy already inserted values during reallocation if `data != null`. In vectors containing arrays or structs this is incorrect, since there field `data` is not used at all. We need to check `nulls` instead. Adds new tests to `ColumnVectorSuite` that reproduce the errors. Author: Ala LuszczakCloses #19323 from ala/port-vector-realloc. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1a829df9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1a829df9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1a829df9 Branch: refs/heads/branch-2.2 Commit: 1a829df94a9cfee4395353b0f93fb5bcd628dce4 Parents: c0a34a9 Author: Ala Luszczak Authored: Sat Sep 23 16:09:47 2017 +0200 Committer: Herman van Hovell Committed: Sat Sep 23 16:09:47 2017 +0200 -- .../vectorized/OffHeapColumnVector.java | 2 +- .../vectorized/ColumnVectorSuite.scala | 227 +++ 2 files changed, 228 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1a829df9/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java -- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index a7d3744..cda7f2f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -436,7 +436,7 @@ public final class OffHeapColumnVector extends ColumnVector { // Split out the slow path. @Override protected void reserveInternal(int newCapacity) { -int oldCapacity = (this.data == 0L) ? 0 : capacity; +int oldCapacity = (nulls == 0L) ? 0 : capacity; if (this.resultArray != null) { this.lengthData = Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 4); http://git-wip-us.apache.org/repos/asf/spark/blob/1a829df9/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala new file mode 100644 index 000..19b93c9 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.vectorized + +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.util.ArrayData +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { + + var testVector: ColumnVector = _ + + private def allocate(capacity: Int, dt: DataType): ColumnVector = { +new OnHeapColumnVector(capacity, dt) + } + + override def afterEach(): Unit = { +testVector.close() + } + + test("boolean") { +testVector = allocate(10, BooleanType) +(0 until 10).foreach { i => + testVector.appendBoolean(i % 2 == 0) +} + +val array = new ColumnVector.Array(testVector) + +(0 until 10).foreach { i => + assert(array.getBoolean(i) === (i % 2 == 0)) +} + } + + test("byte") { +testVector = allocate(10, ByteType) +(0 until 10).foreach { i => + testVector.appendByte(i.toByte) +} + +val array =
spark git commit: [SPARK-22092] Reallocation in OffHeapColumnVector.reserveInternal corrupts struct and array data
Repository: spark Updated Branches: refs/heads/master 10e37f6eb -> d2b2932d8 [SPARK-22092] Reallocation in OffHeapColumnVector.reserveInternal corrupts struct and array data ## What changes were proposed in this pull request? `OffHeapColumnVector.reserveInternal()` will only copy already inserted values during reallocation if `data != null`. In vectors containing arrays or structs this is incorrect, since there field `data` is not used at all. We need to check `nulls` instead. ## How was this patch tested? Adds new tests to `ColumnVectorSuite` that reproduce the errors. Author: Ala LuszczakCloses #19308 from ala/vector-realloc. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d2b2932d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d2b2932d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d2b2932d Branch: refs/heads/master Commit: d2b2932d8be01dee31983121f6fffd16177bf48a Parents: 10e37f6 Author: Ala Luszczak Authored: Fri Sep 22 15:31:43 2017 +0200 Committer: Herman van Hovell Committed: Fri Sep 22 15:31:43 2017 +0200 -- .../vectorized/OffHeapColumnVector.java | 2 +- .../vectorized/ColumnVectorSuite.scala | 26 2 files changed, 27 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d2b2932d/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java -- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 3568275..e1d3685 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -515,7 +515,7 @@ public final class OffHeapColumnVector extends WritableColumnVector { // Split out the slow path. @Override protected void reserveInternal(int newCapacity) { -int oldCapacity = (this.data == 0L) ? 0 : capacity; +int oldCapacity = (nulls == 0L) ? 0 : capacity; if (this.resultArray != null) { this.lengthData = Platform.reallocateMemory(lengthData, oldCapacity * 4, newCapacity * 4); http://git-wip-us.apache.org/repos/asf/spark/blob/d2b2932d/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala index 998067a..f7b06c9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnVectorSuite.scala @@ -198,4 +198,30 @@ class ColumnVectorSuite extends SparkFunSuite with BeforeAndAfterEach { assert(array.get(1, schema).asInstanceOf[ColumnarBatch.Row].get(0, IntegerType) === 456) assert(array.get(1, schema).asInstanceOf[ColumnarBatch.Row].get(1, DoubleType) === 5.67) } + + test("[SPARK-22092] off-heap column vector reallocation corrupts array data") { +val arrayType = ArrayType(IntegerType, true) +testVector = new OffHeapColumnVector(8, arrayType) + +val data = testVector.arrayData() +(0 until 8).foreach(i => data.putInt(i, i)) +(0 until 8).foreach(i => testVector.putArray(i, i, 1)) + +// Increase vector's capacity and reallocate the data to new bigger buffers. +testVector.reserve(16) + +// Check that none of the values got lost/overwritten. +val array = new ColumnVector.Array(testVector) +(0 until 8).foreach { i => + assert(array.get(i, arrayType).asInstanceOf[ArrayData].toIntArray() === Array(i)) +} + } + + test("[SPARK-22092] off-heap column vector reallocation corrupts struct nullability") { +val structType = new StructType().add("int", IntegerType).add("double", DoubleType) +testVector = new OffHeapColumnVector(8, structType) +(0 until 8).foreach(i => if (i % 2 == 0) testVector.putNull(i) else testVector.putNotNull(i)) +testVector.reserve(16) +(0 until 8).foreach(i => assert(testVector.isNullAt(i) == (i % 2 == 0))) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org