git commit: [SPARK-3320][SQL] Made batched in-memory column buffer building work for SchemaRDDs with empty partitions

2014-08-29 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 13901764f - 32b18dd52


[SPARK-3320][SQL] Made batched in-memory column buffer building work for 
SchemaRDDs with empty partitions

Author: Cheng Lian lian.cs@gmail.com

Closes #2213 from liancheng/spark-3320 and squashes the following commits:

45a0139 [Cheng Lian] Fixed typo in InMemoryColumnarQuerySuite
f67067d [Cheng Lian] Fixed SPARK-3320


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/32b18dd5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/32b18dd5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/32b18dd5

Branch: refs/heads/master
Commit: 32b18dd52cf8920903819f23e406271ecd8ac6bb
Parents: 1390176
Author: Cheng Lian lian.cs@gmail.com
Authored: Fri Aug 29 18:16:47 2014 -0700
Committer: Michael Armbrust mich...@databricks.com
Committed: Fri Aug 29 18:16:47 2014 -0700

--
 .../columnar/InMemoryColumnarTableScan.scala| 49 
 .../scala/org/apache/spark/sql/TestData.scala   |  5 ++
 .../columnar/InMemoryColumnarQuerySuite.scala   | 19 ++--
 3 files changed, 39 insertions(+), 34 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/32b18dd5/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index bc36bac..cb055cd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -104,40 +104,29 @@ private[sql] case class InMemoryColumnarTableScan(
   override def execute() = {
 relation.cachedColumnBuffers.mapPartitions { iterator =
   // Find the ordinals of the requested columns.  If none are requested, 
use the first.
-  val requestedColumns =
-if (attributes.isEmpty) {
-  Seq(0)
-} else {
-  attributes.map(a = relation.output.indexWhere(_.exprId == a.exprId))
-}
-
-  new Iterator[Row] {
-private[this] var columnBuffers: Array[ByteBuffer] = null
-private[this] var columnAccessors: Seq[ColumnAccessor] = null
-nextBatch()
-
-private[this] val nextRow = new 
GenericMutableRow(columnAccessors.length)
-
-def nextBatch() = {
-  columnBuffers = iterator.next()
-  columnAccessors = 
requestedColumns.map(columnBuffers(_)).map(ColumnAccessor(_))
-}
+  val requestedColumns = if (attributes.isEmpty) {
+Seq(0)
+  } else {
+attributes.map(a = relation.output.indexWhere(_.exprId == a.exprId))
+  }
 
-override def next() = {
-  if (!columnAccessors.head.hasNext) {
-nextBatch()
-  }
+  iterator
+.map(batch = requestedColumns.map(batch(_)).map(ColumnAccessor(_)))
+.flatMap { columnAccessors =
+  val nextRow = new GenericMutableRow(columnAccessors.length)
+  new Iterator[Row] {
+override def next() = {
+  var i = 0
+  while (i  nextRow.length) {
+columnAccessors(i).extractTo(nextRow, i)
+i += 1
+  }
+  nextRow
+}
 
-  var i = 0
-  while (i  nextRow.length) {
-columnAccessors(i).extractTo(nextRow, i)
-i += 1
+override def hasNext = columnAccessors.head.hasNext
   }
-  nextRow
 }
-
-override def hasNext = columnAccessors.head.hasNext || iterator.hasNext
-  }
 }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/32b18dd5/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
index c3ec82f..eb33a61 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
@@ -151,4 +151,9 @@ object TestData {
 TimestampField(new Timestamp(i))
   })
   timestamps.registerTempTable(timestamps)
+
+  case class IntField(i: Int)
+  // An RDD with 4 elements and 8 partitions
+  val withEmptyParts = TestSQLContext.sparkContext.parallelize((1 to 
4).map(IntField), 8)
+  withEmptyParts.registerTempTable(withEmptyParts)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/32b18dd5/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala

git commit: [SPARK-3320][SQL] Made batched in-memory column buffer building work for SchemaRDDs with empty partitions

2014-08-29 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/branch-1.1 b0facb590 - aa9364a03


[SPARK-3320][SQL] Made batched in-memory column buffer building work for 
SchemaRDDs with empty partitions

Author: Cheng Lian lian.cs@gmail.com

Closes #2213 from liancheng/spark-3320 and squashes the following commits:

45a0139 [Cheng Lian] Fixed typo in InMemoryColumnarQuerySuite
f67067d [Cheng Lian] Fixed SPARK-3320

(cherry picked from commit 32b18dd52cf8920903819f23e406271ecd8ac6bb)
Signed-off-by: Michael Armbrust mich...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa9364a0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa9364a0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa9364a0

Branch: refs/heads/branch-1.1
Commit: aa9364a03ddc793fd2c94981fb168ef8100a507c
Parents: b0facb5
Author: Cheng Lian lian.cs@gmail.com
Authored: Fri Aug 29 18:16:47 2014 -0700
Committer: Michael Armbrust mich...@databricks.com
Committed: Fri Aug 29 18:16:58 2014 -0700

--
 .../columnar/InMemoryColumnarTableScan.scala| 49 
 .../scala/org/apache/spark/sql/TestData.scala   |  5 ++
 .../columnar/InMemoryColumnarQuerySuite.scala   | 19 ++--
 3 files changed, 39 insertions(+), 34 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/aa9364a0/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index bc36bac..cb055cd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -104,40 +104,29 @@ private[sql] case class InMemoryColumnarTableScan(
   override def execute() = {
 relation.cachedColumnBuffers.mapPartitions { iterator =
   // Find the ordinals of the requested columns.  If none are requested, 
use the first.
-  val requestedColumns =
-if (attributes.isEmpty) {
-  Seq(0)
-} else {
-  attributes.map(a = relation.output.indexWhere(_.exprId == a.exprId))
-}
-
-  new Iterator[Row] {
-private[this] var columnBuffers: Array[ByteBuffer] = null
-private[this] var columnAccessors: Seq[ColumnAccessor] = null
-nextBatch()
-
-private[this] val nextRow = new 
GenericMutableRow(columnAccessors.length)
-
-def nextBatch() = {
-  columnBuffers = iterator.next()
-  columnAccessors = 
requestedColumns.map(columnBuffers(_)).map(ColumnAccessor(_))
-}
+  val requestedColumns = if (attributes.isEmpty) {
+Seq(0)
+  } else {
+attributes.map(a = relation.output.indexWhere(_.exprId == a.exprId))
+  }
 
-override def next() = {
-  if (!columnAccessors.head.hasNext) {
-nextBatch()
-  }
+  iterator
+.map(batch = requestedColumns.map(batch(_)).map(ColumnAccessor(_)))
+.flatMap { columnAccessors =
+  val nextRow = new GenericMutableRow(columnAccessors.length)
+  new Iterator[Row] {
+override def next() = {
+  var i = 0
+  while (i  nextRow.length) {
+columnAccessors(i).extractTo(nextRow, i)
+i += 1
+  }
+  nextRow
+}
 
-  var i = 0
-  while (i  nextRow.length) {
-columnAccessors(i).extractTo(nextRow, i)
-i += 1
+override def hasNext = columnAccessors.head.hasNext
   }
-  nextRow
 }
-
-override def hasNext = columnAccessors.head.hasNext || iterator.hasNext
-  }
 }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/aa9364a0/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
index c3ec82f..eb33a61 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
@@ -151,4 +151,9 @@ object TestData {
 TimestampField(new Timestamp(i))
   })
   timestamps.registerTempTable(timestamps)
+
+  case class IntField(i: Int)
+  // An RDD with 4 elements and 8 partitions
+  val withEmptyParts = TestSQLContext.sparkContext.parallelize((1 to 
4).map(IntField), 8)
+  withEmptyParts.registerTempTable(withEmptyParts)
 }