spark git commit: [SPARK-11362] [SQL] Use Spark BitSet in BroadcastNestedLoopJoin
Repository: spark Updated Branches: refs/heads/branch-1.6 fc2942d12 -> 2946c85f5 [SPARK-11362] [SQL] Use Spark BitSet in BroadcastNestedLoopJoin JIRA: https://issues.apache.org/jira/browse/SPARK-11362 We use scala.collection.mutable.BitSet in BroadcastNestedLoopJoin now. We should use Spark's BitSet. Author: Liang-Chi Hsieh Closes #9316 from viirya/use-spark-bitset. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2946c85f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2946c85f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2946c85f Branch: refs/heads/branch-1.6 Commit: 2946c85f5f48516637a6ce52ba9e31caf3c8ee3a Parents: fc2942d Author: Liang-Chi Hsieh Authored: Sat Nov 7 19:44:45 2015 -0800 Committer: Andrew Or Committed: Mon Nov 9 10:02:46 2015 -0800 -- .../execution/joins/BroadcastNestedLoopJoin.scala | 18 -- 1 file changed, 8 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2946c85f/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala index 05d20f5..aab177b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.util.collection.CompactBuffer +import org.apache.spark.util.collection.{BitSet, CompactBuffer} case class BroadcastNestedLoopJoin( @@ -95,9 +95,7 @@ case class BroadcastNestedLoopJoin( /** All rows that either match both-way, or rows from streamed joined with nulls. */ val matchesOrStreamedRowsWithNulls = streamed.execute().mapPartitions { streamedIter => val matchedRows = new CompactBuffer[InternalRow] - // TODO: Use Spark's BitSet. - val includedBroadcastTuples = -new scala.collection.mutable.BitSet(broadcastedRelation.value.size) + val includedBroadcastTuples = new BitSet(broadcastedRelation.value.size) val joinedRow = new JoinedRow val leftNulls = new GenericMutableRow(left.output.size) @@ -115,11 +113,11 @@ case class BroadcastNestedLoopJoin( case BuildRight if boundCondition(joinedRow(streamedRow, broadcastedRow)) => matchedRows += resultProj(joinedRow(streamedRow, broadcastedRow)).copy() streamRowMatched = true - includedBroadcastTuples += i + includedBroadcastTuples.set(i) case BuildLeft if boundCondition(joinedRow(broadcastedRow, streamedRow)) => matchedRows += resultProj(joinedRow(broadcastedRow, streamedRow)).copy() streamRowMatched = true - includedBroadcastTuples += i + includedBroadcastTuples.set(i) case _ => } i += 1 @@ -138,8 +136,8 @@ case class BroadcastNestedLoopJoin( val includedBroadcastTuples = matchesOrStreamedRowsWithNulls.map(_._2) val allIncludedBroadcastTuples = includedBroadcastTuples.fold( - new scala.collection.mutable.BitSet(broadcastedRelation.value.size) -)(_ ++ _) + new BitSet(broadcastedRelation.value.size) +)(_ | _) val leftNulls = new GenericMutableRow(left.output.size) val rightNulls = new GenericMutableRow(right.output.size) @@ -155,7 +153,7 @@ case class BroadcastNestedLoopJoin( val joinedRow = new JoinedRow joinedRow.withLeft(leftNulls) while (i < rel.length) { -if (!allIncludedBroadcastTuples.contains(i)) { +if (!allIncludedBroadcastTuples.get(i)) { buf += resultProj(joinedRow.withRight(rel(i))).copy() } i += 1 @@ -164,7 +162,7 @@ case class BroadcastNestedLoopJoin( val joinedRow = new JoinedRow joinedRow.withRight(rightNulls) while (i < rel.length) { -if (!allIncludedBroadcastTuples.contains(i)) { +if (!allIncludedBroadcastTuples.get(i)) { buf += resultProj(joinedRow.withLeft(rel(i))).copy() } i += 1 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spa
spark git commit: [SPARK-11362] [SQL] Use Spark BitSet in BroadcastNestedLoopJoin
Repository: spark Updated Branches: refs/heads/master ef362846e -> 4b69a42ed [SPARK-11362] [SQL] Use Spark BitSet in BroadcastNestedLoopJoin JIRA: https://issues.apache.org/jira/browse/SPARK-11362 We use scala.collection.mutable.BitSet in BroadcastNestedLoopJoin now. We should use Spark's BitSet. Author: Liang-Chi Hsieh Closes #9316 from viirya/use-spark-bitset. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b69a42e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b69a42e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b69a42e Branch: refs/heads/master Commit: 4b69a42eda3aff08eb7437c353fe2cc87ed67181 Parents: ef36284 Author: Liang-Chi Hsieh Authored: Sat Nov 7 19:44:45 2015 -0800 Committer: Davies Liu Committed: Sat Nov 7 19:44:45 2015 -0800 -- .../execution/joins/BroadcastNestedLoopJoin.scala | 18 -- 1 file changed, 8 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4b69a42e/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala index 05d20f5..aab177b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoin.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.util.collection.CompactBuffer +import org.apache.spark.util.collection.{BitSet, CompactBuffer} case class BroadcastNestedLoopJoin( @@ -95,9 +95,7 @@ case class BroadcastNestedLoopJoin( /** All rows that either match both-way, or rows from streamed joined with nulls. */ val matchesOrStreamedRowsWithNulls = streamed.execute().mapPartitions { streamedIter => val matchedRows = new CompactBuffer[InternalRow] - // TODO: Use Spark's BitSet. - val includedBroadcastTuples = -new scala.collection.mutable.BitSet(broadcastedRelation.value.size) + val includedBroadcastTuples = new BitSet(broadcastedRelation.value.size) val joinedRow = new JoinedRow val leftNulls = new GenericMutableRow(left.output.size) @@ -115,11 +113,11 @@ case class BroadcastNestedLoopJoin( case BuildRight if boundCondition(joinedRow(streamedRow, broadcastedRow)) => matchedRows += resultProj(joinedRow(streamedRow, broadcastedRow)).copy() streamRowMatched = true - includedBroadcastTuples += i + includedBroadcastTuples.set(i) case BuildLeft if boundCondition(joinedRow(broadcastedRow, streamedRow)) => matchedRows += resultProj(joinedRow(broadcastedRow, streamedRow)).copy() streamRowMatched = true - includedBroadcastTuples += i + includedBroadcastTuples.set(i) case _ => } i += 1 @@ -138,8 +136,8 @@ case class BroadcastNestedLoopJoin( val includedBroadcastTuples = matchesOrStreamedRowsWithNulls.map(_._2) val allIncludedBroadcastTuples = includedBroadcastTuples.fold( - new scala.collection.mutable.BitSet(broadcastedRelation.value.size) -)(_ ++ _) + new BitSet(broadcastedRelation.value.size) +)(_ | _) val leftNulls = new GenericMutableRow(left.output.size) val rightNulls = new GenericMutableRow(right.output.size) @@ -155,7 +153,7 @@ case class BroadcastNestedLoopJoin( val joinedRow = new JoinedRow joinedRow.withLeft(leftNulls) while (i < rel.length) { -if (!allIncludedBroadcastTuples.contains(i)) { +if (!allIncludedBroadcastTuples.get(i)) { buf += resultProj(joinedRow.withRight(rel(i))).copy() } i += 1 @@ -164,7 +162,7 @@ case class BroadcastNestedLoopJoin( val joinedRow = new JoinedRow joinedRow.withRight(rightNulls) while (i < rel.length) { -if (!allIncludedBroadcastTuples.contains(i)) { +if (!allIncludedBroadcastTuples.get(i)) { buf += resultProj(joinedRow.withLeft(rel(i))).copy() } i += 1 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apac