[GitHub] spark pull request: Add range join support to spark-sql

2014-12-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/2939


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Add range join support to spark-sql

2014-12-01 Thread marmbrus
Github user marmbrus commented on the pull request:

https://github.com/apache/spark/pull/2939#issuecomment-65164578
  
Thanks for working on this, my biggest concern is adding new syntax to SQL 
since it is something we will have to support forever.  Would it instead be 
possible to express the overlap based with standard predicates, and detect the 
case when a range join can be used during query planning?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Add range join support to spark-sql

2014-12-01 Thread kozanitis
Github user kozanitis commented on the pull request:

https://github.com/apache/spark/pull/2939#issuecomment-65169392
  
Yeah I see your point... I will revise the design as soon as I find some 
time to do some more clever parsing of the existing predicates


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Add range join support to spark-sql

2014-10-28 Thread kozanitis
Github user kozanitis commented on the pull request:

https://github.com/apache/spark/pull/2939#issuecomment-60821525
  
Thanks @sarutak for the recommendations. After fixing the issues that you 
found sbt/sbt -Phive scalastyle passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Add range join support to spark-sql

2014-10-28 Thread kozanitis
Github user kozanitis commented on the pull request:

https://github.com/apache/spark/pull/2939#issuecomment-60821550
  
Jenkins can you please test again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Add range join support to spark-sql

2014-10-27 Thread sarutak
Github user sarutak commented on a diff in the pull request:

https://github.com/apache/spark/pull/2939#discussion_r19392124
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
---
@@ -556,6 +556,8 @@ class SQLQuerySuite extends QueryTest with 
BeforeAndAfterAll {
   sql(SELECT * FROM lowerCaseData INTERSECT SELECT * FROM 
upperCaseData), Nil)
   }
 
+
+
--- End diff --

Please remove extra new lines.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Add range join support to spark-sql

2014-10-27 Thread sarutak
Github user sarutak commented on a diff in the pull request:

https://github.com/apache/spark/pull/2939#discussion_r19392140
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/RangeJoins.scala ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import scala.collection.mutable.{ArrayBuffer, BitSet}
+
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.SQLContext
+
+
+
--- End diff --

Please remove extra new lines.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Add range join support to spark-sql

2014-10-27 Thread sarutak
Github user sarutak commented on a diff in the pull request:

https://github.com/apache/spark/pull/2939#discussion_r19392150
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/RangeJoins.scala ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import scala.collection.mutable.{ArrayBuffer, BitSet}
+
+
--- End diff --

Please remove extra new lines.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Add range join support to spark-sql

2014-10-27 Thread sarutak
Github user sarutak commented on a diff in the pull request:

https://github.com/apache/spark/pull/2939#discussion_r19392165
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/RangeJoinImpl.scala ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
--- End diff --

extra new line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Add range join support to spark-sql

2014-10-27 Thread sarutak
Github user sarutak commented on a diff in the pull request:

https://github.com/apache/spark/pull/2939#discussion_r19392161
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/RangeJoinImpl.scala ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{sql, SparkContext}
+import org.apache.spark.SparkContext._
+import scala.reflect.ClassTag
+
+object RangeJoinImpl extends Serializable{
+
+  /**
+   * Multi-joins together two RDDs that contain objects that map to 
reference regions.
+   * The elements from the first RDD become the key of the output RDD, and 
the value
+   * contains all elements from the second RDD which overlap the region of 
the key.
+   * This is a multi-join, so it preserves n-to-m relationships between 
regions.
+   *
+   * @param sc A spark context from the cluster that will perform the join
+   * @param rdd1 RDD of values on which we build an interval tree. Assume 
|rdd1|  |rdd2|
+   */
+  def overlapJoin(sc: SparkContext,
+  rdd1: RDD[(Interval[Long],sql.Row)],
+  rdd2: RDD[(Interval[Long],sql.Row)]): 
RDD[(sql.Row, Iterable[sql.Row])] =
+  {
+
+val indexedRdd1 = rdd1.zipWithIndex().map(_.swap)
+
+/*Collect only Reference regions and the index of indexedRdd1*/
+val localIntervals = indexedRdd1.map(x = (x._2._1, x._1)).collect()
+/*Create and broadcast an interval tree*/
+val intervalTree = sc.broadcast(new 
IntervalTree[Long](localIntervals.toList))
+
+val kvrdd2: RDD[(Long, Iterable[sql.Row])] = rdd2
+  //join entry with the intervals returned from the interval tree
+  .map(x = (intervalTree.value.getAllOverlappings(x._1), x._2))
+  .filter(x = x._1 != Nil) //filter out entries that do not join 
anywhere
+  .flatMap(t = t._1.map(s = (s._2, t._2))) //create pairs of 
(index1, rdd2Elem)
+  .groupByKey
+
+
--- End diff --

extra new line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Add range join support to spark-sql

2014-10-27 Thread sarutak
Github user sarutak commented on a diff in the pull request:

https://github.com/apache/spark/pull/2939#discussion_r19392217
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala ---
@@ -188,8 +191,18 @@ class SqlParser extends AbstractSparkSQLParser {
 }
 }
 
-  protected lazy val joinConditions: Parser[Expression] =
-ON ~ expression
+   protected lazy val rangeJoinedRelation: Parser[LogicalPlan] =
+relationFactor ~ RANGEJOIN ~ relationFactor ~ ON ~ OVERLAPS ~
--- End diff --

Wrong indentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Add range join support to spark-sql

2014-10-27 Thread sarutak
Github user sarutak commented on a diff in the pull request:

https://github.com/apache/spark/pull/2939#discussion_r19392262
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/RangeJoinImpl.scala ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{sql, SparkContext}
+import org.apache.spark.SparkContext._
+import scala.reflect.ClassTag
+
+object RangeJoinImpl extends Serializable{
+
+  /**
+   * Multi-joins together two RDDs that contain objects that map to 
reference regions.
+   * The elements from the first RDD become the key of the output RDD, and 
the value
+   * contains all elements from the second RDD which overlap the region of 
the key.
+   * This is a multi-join, so it preserves n-to-m relationships between 
regions.
+   *
+   * @param sc A spark context from the cluster that will perform the join
+   * @param rdd1 RDD of values on which we build an interval tree. Assume 
|rdd1|  |rdd2|
+   */
+  def overlapJoin(sc: SparkContext,
+  rdd1: RDD[(Interval[Long],sql.Row)],
+  rdd2: RDD[(Interval[Long],sql.Row)]): 
RDD[(sql.Row, Iterable[sql.Row])] =
+  {
+
+val indexedRdd1 = rdd1.zipWithIndex().map(_.swap)
+
+/*Collect only Reference regions and the index of indexedRdd1*/
--- End diff --

Please add white space after * here and other places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Add range join support to spark-sql

2014-10-27 Thread sarutak
Github user sarutak commented on a diff in the pull request:

https://github.com/apache/spark/pull/2939#discussion_r19392296
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/RangeJoins.scala ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import scala.collection.mutable.{ArrayBuffer, BitSet}
+
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.SQLContext
+
+
+
+@DeveloperApi
+case class RangeJoin(left: SparkPlan,
+ right: SparkPlan,
+ condition: Seq[Expression],
+ context: SQLContext) extends BinaryNode with 
Serializable{
+  def output = left.output ++ right.output
+
+  lazy val (buildPlan, streamedPlan) = (left, right)
+
+  lazy val (buildKeys, streamedKeys) = (List(condition(0),condition(1)),
+List(condition(2), condition(3)))
+
+  @transient lazy val buildKeyGenerator = new 
InterpretedProjection(buildKeys, buildPlan.output)
+  @transient lazy val streamKeyGenerator = new 
InterpretedProjection(streamedKeys,
+streamedPlan.output)
+
+  def execute() = {
+
+val v1 = left.execute()
+val v1kv = v1.map(x = {
+  val v1Key = buildKeyGenerator(x)
+  (new Interval[Long](v1Key.apply(0).asInstanceOf[Long], 
v1Key.apply(1).asInstanceOf[Long]),
+x.copy() )
+})
+
+val v2 = right.execute()
+val v2kv = v2.map(x = {
+  val v2Key = streamKeyGenerator(x)
+  (new Interval[Long](v2Key.apply(0).asInstanceOf[Long], 
v2Key.apply(1).asInstanceOf[Long]),
+x.copy() )
+})
+
+/*As we are going to collect v1 and build an interval tree on its 
intervals,
+make sure that its size is the smaller one.*/
+   assert(v1.count = v2.count)
+
+
--- End diff --

extra new lines.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Add range join support to spark-sql

2014-10-27 Thread sarutak
Github user sarutak commented on a diff in the pull request:

https://github.com/apache/spark/pull/2939#discussion_r19392285
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/RangeJoins.scala ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import scala.collection.mutable.{ArrayBuffer, BitSet}
+
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.SQLContext
+
+
+
+@DeveloperApi
+case class RangeJoin(left: SparkPlan,
+ right: SparkPlan,
+ condition: Seq[Expression],
+ context: SQLContext) extends BinaryNode with 
Serializable{
+  def output = left.output ++ right.output
+
+  lazy val (buildPlan, streamedPlan) = (left, right)
+
+  lazy val (buildKeys, streamedKeys) = (List(condition(0),condition(1)),
+List(condition(2), condition(3)))
+
+  @transient lazy val buildKeyGenerator = new 
InterpretedProjection(buildKeys, buildPlan.output)
+  @transient lazy val streamKeyGenerator = new 
InterpretedProjection(streamedKeys,
+streamedPlan.output)
+
+  def execute() = {
+
+val v1 = left.execute()
+val v1kv = v1.map(x = {
+  val v1Key = buildKeyGenerator(x)
+  (new Interval[Long](v1Key.apply(0).asInstanceOf[Long], 
v1Key.apply(1).asInstanceOf[Long]),
+x.copy() )
+})
+
+val v2 = right.execute()
+val v2kv = v2.map(x = {
+  val v2Key = streamKeyGenerator(x)
+  (new Interval[Long](v2Key.apply(0).asInstanceOf[Long], 
v2Key.apply(1).asInstanceOf[Long]),
+x.copy() )
+})
+
+/*As we are going to collect v1 and build an interval tree on its 
intervals,
+make sure that its size is the smaller one.*/
+   assert(v1.count = v2.count)
+
+
+val v3 = RangeJoinImpl.overlapJoin(context.sparkContext, v1kv, v2kv)
+  .flatMap(l = l._2.map(r = (l._1,r)))
+
+val v4 = v3.map {
+  case (l: Row, r: Row) = new JoinedRow(l, r).withLeft(l)
+}
+v4
+  }
+
+
--- End diff --

extra new lines.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Add range join support to spark-sql

2014-10-27 Thread sarutak
Github user sarutak commented on a diff in the pull request:

https://github.com/apache/spark/pull/2939#discussion_r19392318
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/RangeJoins.scala ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import scala.collection.mutable.{ArrayBuffer, BitSet}
+
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.SQLContext
+
+
+
+@DeveloperApi
+case class RangeJoin(left: SparkPlan,
+ right: SparkPlan,
+ condition: Seq[Expression],
+ context: SQLContext) extends BinaryNode with 
Serializable{
+  def output = left.output ++ right.output
+
+  lazy val (buildPlan, streamedPlan) = (left, right)
+
+  lazy val (buildKeys, streamedKeys) = (List(condition(0),condition(1)),
+List(condition(2), condition(3)))
+
+  @transient lazy val buildKeyGenerator = new 
InterpretedProjection(buildKeys, buildPlan.output)
+  @transient lazy val streamKeyGenerator = new 
InterpretedProjection(streamedKeys,
+streamedPlan.output)
+
+  def execute() = {
+
+val v1 = left.execute()
+val v1kv = v1.map(x = {
+  val v1Key = buildKeyGenerator(x)
+  (new Interval[Long](v1Key.apply(0).asInstanceOf[Long], 
v1Key.apply(1).asInstanceOf[Long]),
+x.copy() )
+})
+
+val v2 = right.execute()
+val v2kv = v2.map(x = {
+  val v2Key = streamKeyGenerator(x)
+  (new Interval[Long](v2Key.apply(0).asInstanceOf[Long], 
v2Key.apply(1).asInstanceOf[Long]),
+x.copy() )
+})
+
+/*As we are going to collect v1 and build an interval tree on its 
intervals,
+make sure that its size is the smaller one.*/
+   assert(v1.count = v2.count)
+
+
+val v3 = RangeJoinImpl.overlapJoin(context.sparkContext, v1kv, v2kv)
+  .flatMap(l = l._2.map(r = (l._1,r)))
+
+val v4 = v3.map {
+  case (l: Row, r: Row) = new JoinedRow(l, r).withLeft(l)
+}
+v4
+  }
+
+
+}
+
+case class Interval[T % Long](start: T, end: T){
--- End diff --

Please add white space between ) and { here and other places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Add range join support to spark-sql

2014-10-27 Thread sarutak
Github user sarutak commented on a diff in the pull request:

https://github.com/apache/spark/pull/2939#discussion_r19392340
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -102,6 +102,13 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 }
   }
 
+  object RangeJoin extends Strategy{
--- End diff --

Please add white space between symbol and { here and other places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Add range join support to spark-sql

2014-10-27 Thread sarutak
Github user sarutak commented on a diff in the pull request:

https://github.com/apache/spark/pull/2939#discussion_r19392360
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/SQLRangeJoinSuite.scala 
---
@@ -0,0 +1,81 @@
+/**
+ * Licensed to Big Data Genomics (BDG) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The BDG licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{SQLContext, QueryTest}
+import org.apache.spark.sql.test._
+import TestSQLContext._
+
+case class RecordData1(start1: Long, end1: Long) extends Serializable
+case class RecordData2(start2: Long, end2: Long) extends Serializable
+
+class SQLRangeJoinSuite extends QueryTest {
+
+
+  val sc = TestSQLContext.sparkContext
+  val sqlContext = new SQLContext(sc)
+  import sqlContext._
+
+  test(joining non overlappings results into no entries){
+
+val rdd1 = sc.parallelize(Seq((1L,5L), (2L,7L))).map(i = 
RecordData1(i._1, i._2))
+val rdd2 = sc.parallelize(Seq((11L,44L), (23L, 45L))).map(i = 
RecordData2(i._1, i._2))
+
+rdd1.registerTempTable(t1)
+rdd2.registerTempTable(t2)
+checkAnswer(
+  sql(select * from t1 RANGEJOIN t2 on OVERLAPS( (start1, end1), 
(start2, end2))),
+  Nil
+)
+
--- End diff --

extra new line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Add range join support to spark-sql

2014-10-27 Thread sarutak
Github user sarutak commented on a diff in the pull request:

https://github.com/apache/spark/pull/2939#discussion_r19392347
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/SQLRangeJoinSuite.scala 
---
@@ -0,0 +1,81 @@
+/**
+ * Licensed to Big Data Genomics (BDG) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The BDG licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{SQLContext, QueryTest}
+import org.apache.spark.sql.test._
+import TestSQLContext._
+
+case class RecordData1(start1: Long, end1: Long) extends Serializable
+case class RecordData2(start2: Long, end2: Long) extends Serializable
+
+class SQLRangeJoinSuite extends QueryTest {
+
+
+  val sc = TestSQLContext.sparkContext
+  val sqlContext = new SQLContext(sc)
+  import sqlContext._
+
+  test(joining non overlappings results into no entries){
+
+val rdd1 = sc.parallelize(Seq((1L,5L), (2L,7L))).map(i = 
RecordData1(i._1, i._2))
+val rdd2 = sc.parallelize(Seq((11L,44L), (23L, 45L))).map(i = 
RecordData2(i._1, i._2))
+
+rdd1.registerTempTable(t1)
+rdd2.registerTempTable(t2)
+checkAnswer(
+  sql(select * from t1 RANGEJOIN t2 on OVERLAPS( (start1, end1), 
(start2, end2))),
+  Nil
+)
+
+  }
+
+  test(basic range join){
+val rdd1 = sc.parallelize(Seq((100L, 199L),
+  (200L, 299L),
+  (400L, 600L),
+  (1L, 2L)))
+  .map(i = RecordData1(i._1, i._2))
+
+val rdd2 = sc.parallelize(Seq((150L, 250L),
+  (300L, 500L),
+  (500L, 700L),
+  (22000L, 22300L)))
+  .map(i = RecordData2(i._1, i._2))
+
+rdd1.registerTempTable(s1)
+rdd2.registerTempTable(s2)
+
+
+checkAnswer(
+  sql(select start1, end1, start2, end2 from s1 RANGEJOIN s2 on 
OVERLAPS( (start1, end1), (start2, end2))),
+  (100L, 199L, 150L, 250L) ::
+(200L, 299L, 150L, 250L) ::
+(400L, 600L, 300L, 500L) ::
+(400L, 600L, 500L, 700L) :: Nil
+)
+
+checkAnswer(
+  sql(select end1 from s1 RANGEJOIN s2 on OVERLAPS( (start1, end1), 
(start2, end2))),
+  Seq(199L) :: Seq(299L) :: Seq(600L) :: Seq(600L) :: Nil
+)
+  }
+
+
--- End diff --

Extra new lines.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Add range join support to spark-sql

2014-10-27 Thread sarutak
Github user sarutak commented on a diff in the pull request:

https://github.com/apache/spark/pull/2939#discussion_r19392368
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/SQLRangeJoinSuite.scala 
---
@@ -0,0 +1,81 @@
+/**
+ * Licensed to Big Data Genomics (BDG) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The BDG licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{SQLContext, QueryTest}
+import org.apache.spark.sql.test._
+import TestSQLContext._
+
+case class RecordData1(start1: Long, end1: Long) extends Serializable
+case class RecordData2(start2: Long, end2: Long) extends Serializable
+
+class SQLRangeJoinSuite extends QueryTest {
+
+
--- End diff --

 extra new line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Add range join support to spark-sql

2014-10-27 Thread sarutak
Github user sarutak commented on the pull request:

https://github.com/apache/spark/pull/2939#issuecomment-60562141
  
Hi @kozanitis , did you try to run sbt/sbt -Phive scalastyle ?
That command helps you, telling which code is invalid style.
Maybe, the length of some lines is over 100.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Add range join support to spark-sql

2014-10-26 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2939#issuecomment-60535431
  
  [Test build #469 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/469/consoleFull)
 for   PR 2939 at commit 
[`092c7c8`](https://github.com/apache/spark/commit/092c7c805353651f0da47378d92415dba7c3b591).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Add range join support to spark-sql

2014-10-26 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2939#issuecomment-60535473
  
  [Test build #469 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/469/consoleFull)
 for   PR 2939 at commit 
[`092c7c8`](https://github.com/apache/spark/commit/092c7c805353651f0da47378d92415dba7c3b591).
 * This patch **fails Scala style tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Add range join support to spark-sql

2014-10-26 Thread kozanitis
Github user kozanitis commented on the pull request:

https://github.com/apache/spark/pull/2939#issuecomment-60550301
  
Any idea how to deal with those formatting errors? They come from files 
that I didn't modify and I made sure I rebased just before pushing the commit


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Add range join support to spark-sql

2014-10-25 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2939#issuecomment-60475266
  
  [Test build #434 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/434/consoleFull)
 for   PR 2939 at commit 
[`092c7c8`](https://github.com/apache/spark/commit/092c7c805353651f0da47378d92415dba7c3b591).
 * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Add range join support to spark-sql

2014-10-25 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/2939#issuecomment-60475344
  
  [Test build #434 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/434/consoleFull)
 for   PR 2939 at commit 
[`092c7c8`](https://github.com/apache/spark/commit/092c7c805353651f0da47378d92415dba7c3b591).
 * This patch **fails Scala style tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `case class RangeJoin(`
  * `class IntervalTree[T](allRegions: List[(Interval[Long], T)]) extends 
Serializable `
  * `  class Node(allRegions: List[(Interval[Long], T)]) extends 
Serializable`
  * `case class RangeJoin(left: SparkPlan,`
  * `case class Interval[T % Long](start: T, end: T)`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Add range join support to spark-sql

2014-10-24 Thread kozanitis
GitHub user kozanitis opened a pull request:

https://github.com/apache/spark/pull/2939

Add range join support to spark-sql

This PR enables spark-sql to join two tables based on the interval 
overlapping between their columns. This is a very useful feature to add as you 
can see here: https://issues.apache.org/jira/browse/HIVE-556

Syntax:
select * from table1 RANGEJOIN table2 on OVERLAPS( (t1Attribute1, 
t1Attribute2), (t2Attribute1, t2Attribute2))

Example:
Table1 (id : String, start1: Int, end1: Int) = (  (i1, 10,20), (i2, 50, 
60) )
Table2(start2: Int, end2: Int) = (  (15,70), (80, 90) )
select * from Table1 RANGEJOIN Table2 on OVERLAPS( (start1, end1), (start2, 
end2) ) produces the following entries:
(i1, 10, 20, 15, 70), (i2, 50, 60, 15, 70)

Method:
Assuming rdd1 corresponds to the smaller table and rdd2 to the bigger one, 
I create an interval tree from all regions of rdd1 which I broadcast to all 
nodes and I query the tree with all entries of rdd2.

Note that the interval tree uses metadata only, not all contents of rdd1. I 
keep track of the correspondence between the contents of interval tree and rdd1 
through a collection of keys that I zipped with rdd1.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kozanitis/spark rangeJoins-1.2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/2939.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2939


commit 092c7c805353651f0da47378d92415dba7c3b591
Author: Kozanitis kozani...@berkeley.edu
Date:   2014-10-02T01:57:17Z

Add range join support to spark-sql




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: Add range join support to spark-sql

2014-10-24 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/2939#issuecomment-60469727
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org