spark git commit: [SPARK-11362] [SQL] Use Spark BitSet in BroadcastNestedLoopJoin

2015-11-09 Thread andrewor14
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 fc2942d12 -> 2946c85f5


[SPARK-11362] [SQL] Use Spark BitSet in BroadcastNestedLoopJoin

JIRA: https://issues.apache.org/jira/browse/SPARK-11362

We use scala.collection.mutable.BitSet in BroadcastNestedLoopJoin now. We 
should use Spark's BitSet.

Author: Liang-Chi Hsieh 

Closes #9316 from viirya/use-spark-bitset.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2946c85f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2946c85f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2946c85f

Branch: refs/heads/branch-1.6
Commit: 2946c85f5f48516637a6ce52ba9e31caf3c8ee3a
Parents: fc2942d
Author: Liang-Chi Hsieh 
Authored: Sat Nov 7 19:44:45 2015 -0800
Committer: Andrew Or 
Committed: Mon Nov 9 10:02:46 2015 -0800

--
 .../execution/joins/BroadcastNestedLoopJoin.scala | 18 --
 1 file changed, 8 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2946c85f/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
index 05d20f5..aab177b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
@@ -24,7 +24,7 @@ import 
org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
 import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.util.collection.CompactBuffer
+import org.apache.spark.util.collection.{BitSet, CompactBuffer}
 
 
 case class BroadcastNestedLoopJoin(
@@ -95,9 +95,7 @@ case class BroadcastNestedLoopJoin(
 /** All rows that either match both-way, or rows from streamed joined with 
nulls. */
 val matchesOrStreamedRowsWithNulls = streamed.execute().mapPartitions { 
streamedIter =>
   val matchedRows = new CompactBuffer[InternalRow]
-  // TODO: Use Spark's BitSet.
-  val includedBroadcastTuples =
-new scala.collection.mutable.BitSet(broadcastedRelation.value.size)
+  val includedBroadcastTuples = new BitSet(broadcastedRelation.value.size)
   val joinedRow = new JoinedRow
 
   val leftNulls = new GenericMutableRow(left.output.size)
@@ -115,11 +113,11 @@ case class BroadcastNestedLoopJoin(
 case BuildRight if boundCondition(joinedRow(streamedRow, 
broadcastedRow)) =>
   matchedRows += resultProj(joinedRow(streamedRow, 
broadcastedRow)).copy()
   streamRowMatched = true
-  includedBroadcastTuples += i
+  includedBroadcastTuples.set(i)
 case BuildLeft if boundCondition(joinedRow(broadcastedRow, 
streamedRow)) =>
   matchedRows += resultProj(joinedRow(broadcastedRow, 
streamedRow)).copy()
   streamRowMatched = true
-  includedBroadcastTuples += i
+  includedBroadcastTuples.set(i)
 case _ =>
   }
   i += 1
@@ -138,8 +136,8 @@ case class BroadcastNestedLoopJoin(
 
 val includedBroadcastTuples = matchesOrStreamedRowsWithNulls.map(_._2)
 val allIncludedBroadcastTuples = includedBroadcastTuples.fold(
-  new scala.collection.mutable.BitSet(broadcastedRelation.value.size)
-)(_ ++ _)
+  new BitSet(broadcastedRelation.value.size)
+)(_ | _)
 
 val leftNulls = new GenericMutableRow(left.output.size)
 val rightNulls = new GenericMutableRow(right.output.size)
@@ -155,7 +153,7 @@ case class BroadcastNestedLoopJoin(
   val joinedRow = new JoinedRow
   joinedRow.withLeft(leftNulls)
   while (i < rel.length) {
-if (!allIncludedBroadcastTuples.contains(i)) {
+if (!allIncludedBroadcastTuples.get(i)) {
   buf += resultProj(joinedRow.withRight(rel(i))).copy()
 }
 i += 1
@@ -164,7 +162,7 @@ case class BroadcastNestedLoopJoin(
   val joinedRow = new JoinedRow
   joinedRow.withRight(rightNulls)
   while (i < rel.length) {
-if (!allIncludedBroadcastTuples.contains(i)) {
+if (!allIncludedBroadcastTuples.get(i)) {
   buf += resultProj(joinedRow.withLeft(rel(i))).copy()
 }
 i += 1


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spa

spark git commit: [SPARK-11362] [SQL] Use Spark BitSet in BroadcastNestedLoopJoin

2015-11-07 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master ef362846e -> 4b69a42ed


[SPARK-11362] [SQL] Use Spark BitSet in BroadcastNestedLoopJoin

JIRA: https://issues.apache.org/jira/browse/SPARK-11362

We use scala.collection.mutable.BitSet in BroadcastNestedLoopJoin now. We 
should use Spark's BitSet.

Author: Liang-Chi Hsieh 

Closes #9316 from viirya/use-spark-bitset.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b69a42e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b69a42e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b69a42e

Branch: refs/heads/master
Commit: 4b69a42eda3aff08eb7437c353fe2cc87ed67181
Parents: ef36284
Author: Liang-Chi Hsieh 
Authored: Sat Nov 7 19:44:45 2015 -0800
Committer: Davies Liu 
Committed: Sat Nov 7 19:44:45 2015 -0800

--
 .../execution/joins/BroadcastNestedLoopJoin.scala | 18 --
 1 file changed, 8 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4b69a42e/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
index 05d20f5..aab177b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala
@@ -24,7 +24,7 @@ import 
org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
 import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.util.collection.CompactBuffer
+import org.apache.spark.util.collection.{BitSet, CompactBuffer}
 
 
 case class BroadcastNestedLoopJoin(
@@ -95,9 +95,7 @@ case class BroadcastNestedLoopJoin(
 /** All rows that either match both-way, or rows from streamed joined with 
nulls. */
 val matchesOrStreamedRowsWithNulls = streamed.execute().mapPartitions { 
streamedIter =>
   val matchedRows = new CompactBuffer[InternalRow]
-  // TODO: Use Spark's BitSet.
-  val includedBroadcastTuples =
-new scala.collection.mutable.BitSet(broadcastedRelation.value.size)
+  val includedBroadcastTuples = new BitSet(broadcastedRelation.value.size)
   val joinedRow = new JoinedRow
 
   val leftNulls = new GenericMutableRow(left.output.size)
@@ -115,11 +113,11 @@ case class BroadcastNestedLoopJoin(
 case BuildRight if boundCondition(joinedRow(streamedRow, 
broadcastedRow)) =>
   matchedRows += resultProj(joinedRow(streamedRow, 
broadcastedRow)).copy()
   streamRowMatched = true
-  includedBroadcastTuples += i
+  includedBroadcastTuples.set(i)
 case BuildLeft if boundCondition(joinedRow(broadcastedRow, 
streamedRow)) =>
   matchedRows += resultProj(joinedRow(broadcastedRow, 
streamedRow)).copy()
   streamRowMatched = true
-  includedBroadcastTuples += i
+  includedBroadcastTuples.set(i)
 case _ =>
   }
   i += 1
@@ -138,8 +136,8 @@ case class BroadcastNestedLoopJoin(
 
 val includedBroadcastTuples = matchesOrStreamedRowsWithNulls.map(_._2)
 val allIncludedBroadcastTuples = includedBroadcastTuples.fold(
-  new scala.collection.mutable.BitSet(broadcastedRelation.value.size)
-)(_ ++ _)
+  new BitSet(broadcastedRelation.value.size)
+)(_ | _)
 
 val leftNulls = new GenericMutableRow(left.output.size)
 val rightNulls = new GenericMutableRow(right.output.size)
@@ -155,7 +153,7 @@ case class BroadcastNestedLoopJoin(
   val joinedRow = new JoinedRow
   joinedRow.withLeft(leftNulls)
   while (i < rel.length) {
-if (!allIncludedBroadcastTuples.contains(i)) {
+if (!allIncludedBroadcastTuples.get(i)) {
   buf += resultProj(joinedRow.withRight(rel(i))).copy()
 }
 i += 1
@@ -164,7 +162,7 @@ case class BroadcastNestedLoopJoin(
   val joinedRow = new JoinedRow
   joinedRow.withRight(rightNulls)
   while (i < rel.length) {
-if (!allIncludedBroadcastTuples.contains(i)) {
+if (!allIncludedBroadcastTuples.get(i)) {
   buf += resultProj(joinedRow.withLeft(rel(i))).copy()
 }
 i += 1


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apac