git commit: [SPARK-3320][SQL] Made batched in-memory column buffer building work for SchemaRDDs with empty partitions
Repository: spark Updated Branches: refs/heads/master 13901764f - 32b18dd52 [SPARK-3320][SQL] Made batched in-memory column buffer building work for SchemaRDDs with empty partitions Author: Cheng Lian lian.cs@gmail.com Closes #2213 from liancheng/spark-3320 and squashes the following commits: 45a0139 [Cheng Lian] Fixed typo in InMemoryColumnarQuerySuite f67067d [Cheng Lian] Fixed SPARK-3320 Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/32b18dd5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/32b18dd5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/32b18dd5 Branch: refs/heads/master Commit: 32b18dd52cf8920903819f23e406271ecd8ac6bb Parents: 1390176 Author: Cheng Lian lian.cs@gmail.com Authored: Fri Aug 29 18:16:47 2014 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Fri Aug 29 18:16:47 2014 -0700 -- .../columnar/InMemoryColumnarTableScan.scala| 49 .../scala/org/apache/spark/sql/TestData.scala | 5 ++ .../columnar/InMemoryColumnarQuerySuite.scala | 19 ++-- 3 files changed, 39 insertions(+), 34 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/32b18dd5/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index bc36bac..cb055cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -104,40 +104,29 @@ private[sql] case class InMemoryColumnarTableScan( override def execute() = { relation.cachedColumnBuffers.mapPartitions { iterator = // Find the ordinals of the requested columns. If none are requested, use the first. - val requestedColumns = -if (attributes.isEmpty) { - Seq(0) -} else { - attributes.map(a = relation.output.indexWhere(_.exprId == a.exprId)) -} - - new Iterator[Row] { -private[this] var columnBuffers: Array[ByteBuffer] = null -private[this] var columnAccessors: Seq[ColumnAccessor] = null -nextBatch() - -private[this] val nextRow = new GenericMutableRow(columnAccessors.length) - -def nextBatch() = { - columnBuffers = iterator.next() - columnAccessors = requestedColumns.map(columnBuffers(_)).map(ColumnAccessor(_)) -} + val requestedColumns = if (attributes.isEmpty) { +Seq(0) + } else { +attributes.map(a = relation.output.indexWhere(_.exprId == a.exprId)) + } -override def next() = { - if (!columnAccessors.head.hasNext) { -nextBatch() - } + iterator +.map(batch = requestedColumns.map(batch(_)).map(ColumnAccessor(_))) +.flatMap { columnAccessors = + val nextRow = new GenericMutableRow(columnAccessors.length) + new Iterator[Row] { +override def next() = { + var i = 0 + while (i nextRow.length) { +columnAccessors(i).extractTo(nextRow, i) +i += 1 + } + nextRow +} - var i = 0 - while (i nextRow.length) { -columnAccessors(i).extractTo(nextRow, i) -i += 1 +override def hasNext = columnAccessors.head.hasNext } - nextRow } - -override def hasNext = columnAccessors.head.hasNext || iterator.hasNext - } } } } http://git-wip-us.apache.org/repos/asf/spark/blob/32b18dd5/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala index c3ec82f..eb33a61 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala @@ -151,4 +151,9 @@ object TestData { TimestampField(new Timestamp(i)) }) timestamps.registerTempTable(timestamps) + + case class IntField(i: Int) + // An RDD with 4 elements and 8 partitions + val withEmptyParts = TestSQLContext.sparkContext.parallelize((1 to 4).map(IntField), 8) + withEmptyParts.registerTempTable(withEmptyParts) } http://git-wip-us.apache.org/repos/asf/spark/blob/32b18dd5/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
git commit: [SPARK-3320][SQL] Made batched in-memory column buffer building work for SchemaRDDs with empty partitions
Repository: spark Updated Branches: refs/heads/branch-1.1 b0facb590 - aa9364a03 [SPARK-3320][SQL] Made batched in-memory column buffer building work for SchemaRDDs with empty partitions Author: Cheng Lian lian.cs@gmail.com Closes #2213 from liancheng/spark-3320 and squashes the following commits: 45a0139 [Cheng Lian] Fixed typo in InMemoryColumnarQuerySuite f67067d [Cheng Lian] Fixed SPARK-3320 (cherry picked from commit 32b18dd52cf8920903819f23e406271ecd8ac6bb) Signed-off-by: Michael Armbrust mich...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa9364a0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa9364a0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa9364a0 Branch: refs/heads/branch-1.1 Commit: aa9364a03ddc793fd2c94981fb168ef8100a507c Parents: b0facb5 Author: Cheng Lian lian.cs@gmail.com Authored: Fri Aug 29 18:16:47 2014 -0700 Committer: Michael Armbrust mich...@databricks.com Committed: Fri Aug 29 18:16:58 2014 -0700 -- .../columnar/InMemoryColumnarTableScan.scala| 49 .../scala/org/apache/spark/sql/TestData.scala | 5 ++ .../columnar/InMemoryColumnarQuerySuite.scala | 19 ++-- 3 files changed, 39 insertions(+), 34 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/aa9364a0/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index bc36bac..cb055cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -104,40 +104,29 @@ private[sql] case class InMemoryColumnarTableScan( override def execute() = { relation.cachedColumnBuffers.mapPartitions { iterator = // Find the ordinals of the requested columns. If none are requested, use the first. - val requestedColumns = -if (attributes.isEmpty) { - Seq(0) -} else { - attributes.map(a = relation.output.indexWhere(_.exprId == a.exprId)) -} - - new Iterator[Row] { -private[this] var columnBuffers: Array[ByteBuffer] = null -private[this] var columnAccessors: Seq[ColumnAccessor] = null -nextBatch() - -private[this] val nextRow = new GenericMutableRow(columnAccessors.length) - -def nextBatch() = { - columnBuffers = iterator.next() - columnAccessors = requestedColumns.map(columnBuffers(_)).map(ColumnAccessor(_)) -} + val requestedColumns = if (attributes.isEmpty) { +Seq(0) + } else { +attributes.map(a = relation.output.indexWhere(_.exprId == a.exprId)) + } -override def next() = { - if (!columnAccessors.head.hasNext) { -nextBatch() - } + iterator +.map(batch = requestedColumns.map(batch(_)).map(ColumnAccessor(_))) +.flatMap { columnAccessors = + val nextRow = new GenericMutableRow(columnAccessors.length) + new Iterator[Row] { +override def next() = { + var i = 0 + while (i nextRow.length) { +columnAccessors(i).extractTo(nextRow, i) +i += 1 + } + nextRow +} - var i = 0 - while (i nextRow.length) { -columnAccessors(i).extractTo(nextRow, i) -i += 1 +override def hasNext = columnAccessors.head.hasNext } - nextRow } - -override def hasNext = columnAccessors.head.hasNext || iterator.hasNext - } } } } http://git-wip-us.apache.org/repos/asf/spark/blob/aa9364a0/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala index c3ec82f..eb33a61 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala @@ -151,4 +151,9 @@ object TestData { TimestampField(new Timestamp(i)) }) timestamps.registerTempTable(timestamps) + + case class IntField(i: Int) + // An RDD with 4 elements and 8 partitions + val withEmptyParts = TestSQLContext.sparkContext.parallelize((1 to 4).map(IntField), 8) + withEmptyParts.registerTempTable(withEmptyParts) }