[GitHub] spark pull request: Add range join support to spark-sql
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/2939 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Add range join support to spark-sql
Github user marmbrus commented on the pull request: https://github.com/apache/spark/pull/2939#issuecomment-65164578 Thanks for working on this, my biggest concern is adding new syntax to SQL since it is something we will have to support forever. Would it instead be possible to express the overlap based with standard predicates, and detect the case when a range join can be used during query planning? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Add range join support to spark-sql
Github user kozanitis commented on the pull request: https://github.com/apache/spark/pull/2939#issuecomment-65169392 Yeah I see your point... I will revise the design as soon as I find some time to do some more clever parsing of the existing predicates --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Add range join support to spark-sql
Github user kozanitis commented on the pull request: https://github.com/apache/spark/pull/2939#issuecomment-60821525 Thanks @sarutak for the recommendations. After fixing the issues that you found sbt/sbt -Phive scalastyle passed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Add range join support to spark-sql
Github user kozanitis commented on the pull request: https://github.com/apache/spark/pull/2939#issuecomment-60821550 Jenkins can you please test again? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Add range join support to spark-sql
Github user sarutak commented on a diff in the pull request: https://github.com/apache/spark/pull/2939#discussion_r19392124 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala --- @@ -556,6 +556,8 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { sql(SELECT * FROM lowerCaseData INTERSECT SELECT * FROM upperCaseData), Nil) } + + --- End diff -- Please remove extra new lines. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Add range join support to spark-sql
Github user sarutak commented on a diff in the pull request: https://github.com/apache/spark/pull/2939#discussion_r19392140 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/RangeJoins.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import scala.collection.mutable.{ArrayBuffer, BitSet} + + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.SQLContext + + + --- End diff -- Please remove extra new lines. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Add range join support to spark-sql
Github user sarutak commented on a diff in the pull request: https://github.com/apache/spark/pull/2939#discussion_r19392150 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/RangeJoins.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import scala.collection.mutable.{ArrayBuffer, BitSet} + + --- End diff -- Please remove extra new lines. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Add range join support to spark-sql
Github user sarutak commented on a diff in the pull request: https://github.com/apache/spark/pull/2939#discussion_r19392165 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/RangeJoinImpl.scala --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + --- End diff -- extra new line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Add range join support to spark-sql
Github user sarutak commented on a diff in the pull request: https://github.com/apache/spark/pull/2939#discussion_r19392161 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/RangeJoinImpl.scala --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.execution + +import org.apache.spark.rdd.RDD +import org.apache.spark.{sql, SparkContext} +import org.apache.spark.SparkContext._ +import scala.reflect.ClassTag + +object RangeJoinImpl extends Serializable{ + + /** + * Multi-joins together two RDDs that contain objects that map to reference regions. + * The elements from the first RDD become the key of the output RDD, and the value + * contains all elements from the second RDD which overlap the region of the key. + * This is a multi-join, so it preserves n-to-m relationships between regions. + * + * @param sc A spark context from the cluster that will perform the join + * @param rdd1 RDD of values on which we build an interval tree. Assume |rdd1| |rdd2| + */ + def overlapJoin(sc: SparkContext, + rdd1: RDD[(Interval[Long],sql.Row)], + rdd2: RDD[(Interval[Long],sql.Row)]): RDD[(sql.Row, Iterable[sql.Row])] = + { + +val indexedRdd1 = rdd1.zipWithIndex().map(_.swap) + +/*Collect only Reference regions and the index of indexedRdd1*/ +val localIntervals = indexedRdd1.map(x = (x._2._1, x._1)).collect() +/*Create and broadcast an interval tree*/ +val intervalTree = sc.broadcast(new IntervalTree[Long](localIntervals.toList)) + +val kvrdd2: RDD[(Long, Iterable[sql.Row])] = rdd2 + //join entry with the intervals returned from the interval tree + .map(x = (intervalTree.value.getAllOverlappings(x._1), x._2)) + .filter(x = x._1 != Nil) //filter out entries that do not join anywhere + .flatMap(t = t._1.map(s = (s._2, t._2))) //create pairs of (index1, rdd2Elem) + .groupByKey + + --- End diff -- extra new line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Add range join support to spark-sql
Github user sarutak commented on a diff in the pull request: https://github.com/apache/spark/pull/2939#discussion_r19392217 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala --- @@ -188,8 +191,18 @@ class SqlParser extends AbstractSparkSQLParser { } } - protected lazy val joinConditions: Parser[Expression] = -ON ~ expression + protected lazy val rangeJoinedRelation: Parser[LogicalPlan] = +relationFactor ~ RANGEJOIN ~ relationFactor ~ ON ~ OVERLAPS ~ --- End diff -- Wrong indentation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Add range join support to spark-sql
Github user sarutak commented on a diff in the pull request: https://github.com/apache/spark/pull/2939#discussion_r19392262 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/RangeJoinImpl.scala --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.sql.execution + +import org.apache.spark.rdd.RDD +import org.apache.spark.{sql, SparkContext} +import org.apache.spark.SparkContext._ +import scala.reflect.ClassTag + +object RangeJoinImpl extends Serializable{ + + /** + * Multi-joins together two RDDs that contain objects that map to reference regions. + * The elements from the first RDD become the key of the output RDD, and the value + * contains all elements from the second RDD which overlap the region of the key. + * This is a multi-join, so it preserves n-to-m relationships between regions. + * + * @param sc A spark context from the cluster that will perform the join + * @param rdd1 RDD of values on which we build an interval tree. Assume |rdd1| |rdd2| + */ + def overlapJoin(sc: SparkContext, + rdd1: RDD[(Interval[Long],sql.Row)], + rdd2: RDD[(Interval[Long],sql.Row)]): RDD[(sql.Row, Iterable[sql.Row])] = + { + +val indexedRdd1 = rdd1.zipWithIndex().map(_.swap) + +/*Collect only Reference regions and the index of indexedRdd1*/ --- End diff -- Please add white space after * here and other places. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Add range join support to spark-sql
Github user sarutak commented on a diff in the pull request: https://github.com/apache/spark/pull/2939#discussion_r19392296 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/RangeJoins.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import scala.collection.mutable.{ArrayBuffer, BitSet} + + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.SQLContext + + + +@DeveloperApi +case class RangeJoin(left: SparkPlan, + right: SparkPlan, + condition: Seq[Expression], + context: SQLContext) extends BinaryNode with Serializable{ + def output = left.output ++ right.output + + lazy val (buildPlan, streamedPlan) = (left, right) + + lazy val (buildKeys, streamedKeys) = (List(condition(0),condition(1)), +List(condition(2), condition(3))) + + @transient lazy val buildKeyGenerator = new InterpretedProjection(buildKeys, buildPlan.output) + @transient lazy val streamKeyGenerator = new InterpretedProjection(streamedKeys, +streamedPlan.output) + + def execute() = { + +val v1 = left.execute() +val v1kv = v1.map(x = { + val v1Key = buildKeyGenerator(x) + (new Interval[Long](v1Key.apply(0).asInstanceOf[Long], v1Key.apply(1).asInstanceOf[Long]), +x.copy() ) +}) + +val v2 = right.execute() +val v2kv = v2.map(x = { + val v2Key = streamKeyGenerator(x) + (new Interval[Long](v2Key.apply(0).asInstanceOf[Long], v2Key.apply(1).asInstanceOf[Long]), +x.copy() ) +}) + +/*As we are going to collect v1 and build an interval tree on its intervals, +make sure that its size is the smaller one.*/ + assert(v1.count = v2.count) + + --- End diff -- extra new lines. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Add range join support to spark-sql
Github user sarutak commented on a diff in the pull request: https://github.com/apache/spark/pull/2939#discussion_r19392285 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/RangeJoins.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import scala.collection.mutable.{ArrayBuffer, BitSet} + + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.SQLContext + + + +@DeveloperApi +case class RangeJoin(left: SparkPlan, + right: SparkPlan, + condition: Seq[Expression], + context: SQLContext) extends BinaryNode with Serializable{ + def output = left.output ++ right.output + + lazy val (buildPlan, streamedPlan) = (left, right) + + lazy val (buildKeys, streamedKeys) = (List(condition(0),condition(1)), +List(condition(2), condition(3))) + + @transient lazy val buildKeyGenerator = new InterpretedProjection(buildKeys, buildPlan.output) + @transient lazy val streamKeyGenerator = new InterpretedProjection(streamedKeys, +streamedPlan.output) + + def execute() = { + +val v1 = left.execute() +val v1kv = v1.map(x = { + val v1Key = buildKeyGenerator(x) + (new Interval[Long](v1Key.apply(0).asInstanceOf[Long], v1Key.apply(1).asInstanceOf[Long]), +x.copy() ) +}) + +val v2 = right.execute() +val v2kv = v2.map(x = { + val v2Key = streamKeyGenerator(x) + (new Interval[Long](v2Key.apply(0).asInstanceOf[Long], v2Key.apply(1).asInstanceOf[Long]), +x.copy() ) +}) + +/*As we are going to collect v1 and build an interval tree on its intervals, +make sure that its size is the smaller one.*/ + assert(v1.count = v2.count) + + +val v3 = RangeJoinImpl.overlapJoin(context.sparkContext, v1kv, v2kv) + .flatMap(l = l._2.map(r = (l._1,r))) + +val v4 = v3.map { + case (l: Row, r: Row) = new JoinedRow(l, r).withLeft(l) +} +v4 + } + + --- End diff -- extra new lines. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Add range join support to spark-sql
Github user sarutak commented on a diff in the pull request: https://github.com/apache/spark/pull/2939#discussion_r19392318 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/RangeJoins.scala --- @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import scala.collection.mutable.{ArrayBuffer, BitSet} + + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.SQLContext + + + +@DeveloperApi +case class RangeJoin(left: SparkPlan, + right: SparkPlan, + condition: Seq[Expression], + context: SQLContext) extends BinaryNode with Serializable{ + def output = left.output ++ right.output + + lazy val (buildPlan, streamedPlan) = (left, right) + + lazy val (buildKeys, streamedKeys) = (List(condition(0),condition(1)), +List(condition(2), condition(3))) + + @transient lazy val buildKeyGenerator = new InterpretedProjection(buildKeys, buildPlan.output) + @transient lazy val streamKeyGenerator = new InterpretedProjection(streamedKeys, +streamedPlan.output) + + def execute() = { + +val v1 = left.execute() +val v1kv = v1.map(x = { + val v1Key = buildKeyGenerator(x) + (new Interval[Long](v1Key.apply(0).asInstanceOf[Long], v1Key.apply(1).asInstanceOf[Long]), +x.copy() ) +}) + +val v2 = right.execute() +val v2kv = v2.map(x = { + val v2Key = streamKeyGenerator(x) + (new Interval[Long](v2Key.apply(0).asInstanceOf[Long], v2Key.apply(1).asInstanceOf[Long]), +x.copy() ) +}) + +/*As we are going to collect v1 and build an interval tree on its intervals, +make sure that its size is the smaller one.*/ + assert(v1.count = v2.count) + + +val v3 = RangeJoinImpl.overlapJoin(context.sparkContext, v1kv, v2kv) + .flatMap(l = l._2.map(r = (l._1,r))) + +val v4 = v3.map { + case (l: Row, r: Row) = new JoinedRow(l, r).withLeft(l) +} +v4 + } + + +} + +case class Interval[T % Long](start: T, end: T){ --- End diff -- Please add white space between ) and { here and other places. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Add range join support to spark-sql
Github user sarutak commented on a diff in the pull request: https://github.com/apache/spark/pull/2939#discussion_r19392340 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -102,6 +102,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } } + object RangeJoin extends Strategy{ --- End diff -- Please add white space between symbol and { here and other places. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Add range join support to spark-sql
Github user sarutak commented on a diff in the pull request: https://github.com/apache/spark/pull/2939#discussion_r19392360 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/SQLRangeJoinSuite.scala --- @@ -0,0 +1,81 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{SQLContext, QueryTest} +import org.apache.spark.sql.test._ +import TestSQLContext._ + +case class RecordData1(start1: Long, end1: Long) extends Serializable +case class RecordData2(start2: Long, end2: Long) extends Serializable + +class SQLRangeJoinSuite extends QueryTest { + + + val sc = TestSQLContext.sparkContext + val sqlContext = new SQLContext(sc) + import sqlContext._ + + test(joining non overlappings results into no entries){ + +val rdd1 = sc.parallelize(Seq((1L,5L), (2L,7L))).map(i = RecordData1(i._1, i._2)) +val rdd2 = sc.parallelize(Seq((11L,44L), (23L, 45L))).map(i = RecordData2(i._1, i._2)) + +rdd1.registerTempTable(t1) +rdd2.registerTempTable(t2) +checkAnswer( + sql(select * from t1 RANGEJOIN t2 on OVERLAPS( (start1, end1), (start2, end2))), + Nil +) + --- End diff -- extra new line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Add range join support to spark-sql
Github user sarutak commented on a diff in the pull request: https://github.com/apache/spark/pull/2939#discussion_r19392347 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/SQLRangeJoinSuite.scala --- @@ -0,0 +1,81 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{SQLContext, QueryTest} +import org.apache.spark.sql.test._ +import TestSQLContext._ + +case class RecordData1(start1: Long, end1: Long) extends Serializable +case class RecordData2(start2: Long, end2: Long) extends Serializable + +class SQLRangeJoinSuite extends QueryTest { + + + val sc = TestSQLContext.sparkContext + val sqlContext = new SQLContext(sc) + import sqlContext._ + + test(joining non overlappings results into no entries){ + +val rdd1 = sc.parallelize(Seq((1L,5L), (2L,7L))).map(i = RecordData1(i._1, i._2)) +val rdd2 = sc.parallelize(Seq((11L,44L), (23L, 45L))).map(i = RecordData2(i._1, i._2)) + +rdd1.registerTempTable(t1) +rdd2.registerTempTable(t2) +checkAnswer( + sql(select * from t1 RANGEJOIN t2 on OVERLAPS( (start1, end1), (start2, end2))), + Nil +) + + } + + test(basic range join){ +val rdd1 = sc.parallelize(Seq((100L, 199L), + (200L, 299L), + (400L, 600L), + (1L, 2L))) + .map(i = RecordData1(i._1, i._2)) + +val rdd2 = sc.parallelize(Seq((150L, 250L), + (300L, 500L), + (500L, 700L), + (22000L, 22300L))) + .map(i = RecordData2(i._1, i._2)) + +rdd1.registerTempTable(s1) +rdd2.registerTempTable(s2) + + +checkAnswer( + sql(select start1, end1, start2, end2 from s1 RANGEJOIN s2 on OVERLAPS( (start1, end1), (start2, end2))), + (100L, 199L, 150L, 250L) :: +(200L, 299L, 150L, 250L) :: +(400L, 600L, 300L, 500L) :: +(400L, 600L, 500L, 700L) :: Nil +) + +checkAnswer( + sql(select end1 from s1 RANGEJOIN s2 on OVERLAPS( (start1, end1), (start2, end2))), + Seq(199L) :: Seq(299L) :: Seq(600L) :: Seq(600L) :: Nil +) + } + + --- End diff -- Extra new lines. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Add range join support to spark-sql
Github user sarutak commented on a diff in the pull request: https://github.com/apache/spark/pull/2939#discussion_r19392368 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/SQLRangeJoinSuite.scala --- @@ -0,0 +1,81 @@ +/** + * Licensed to Big Data Genomics (BDG) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The BDG licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{SQLContext, QueryTest} +import org.apache.spark.sql.test._ +import TestSQLContext._ + +case class RecordData1(start1: Long, end1: Long) extends Serializable +case class RecordData2(start2: Long, end2: Long) extends Serializable + +class SQLRangeJoinSuite extends QueryTest { + + --- End diff -- extra new line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Add range join support to spark-sql
Github user sarutak commented on the pull request: https://github.com/apache/spark/pull/2939#issuecomment-60562141 Hi @kozanitis , did you try to run sbt/sbt -Phive scalastyle ? That command helps you, telling which code is invalid style. Maybe, the length of some lines is over 100. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Add range join support to spark-sql
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2939#issuecomment-60535431 [Test build #469 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/469/consoleFull) for PR 2939 at commit [`092c7c8`](https://github.com/apache/spark/commit/092c7c805353651f0da47378d92415dba7c3b591). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Add range join support to spark-sql
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2939#issuecomment-60535473 [Test build #469 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/469/consoleFull) for PR 2939 at commit [`092c7c8`](https://github.com/apache/spark/commit/092c7c805353651f0da47378d92415dba7c3b591). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Add range join support to spark-sql
Github user kozanitis commented on the pull request: https://github.com/apache/spark/pull/2939#issuecomment-60550301 Any idea how to deal with those formatting errors? They come from files that I didn't modify and I made sure I rebased just before pushing the commit --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Add range join support to spark-sql
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2939#issuecomment-60475266 [Test build #434 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/434/consoleFull) for PR 2939 at commit [`092c7c8`](https://github.com/apache/spark/commit/092c7c805353651f0da47378d92415dba7c3b591). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Add range join support to spark-sql
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2939#issuecomment-60475344 [Test build #434 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/434/consoleFull) for PR 2939 at commit [`092c7c8`](https://github.com/apache/spark/commit/092c7c805353651f0da47378d92415dba7c3b591). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `case class RangeJoin(` * `class IntervalTree[T](allRegions: List[(Interval[Long], T)]) extends Serializable ` * ` class Node(allRegions: List[(Interval[Long], T)]) extends Serializable` * `case class RangeJoin(left: SparkPlan,` * `case class Interval[T % Long](start: T, end: T)` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Add range join support to spark-sql
GitHub user kozanitis opened a pull request: https://github.com/apache/spark/pull/2939 Add range join support to spark-sql This PR enables spark-sql to join two tables based on the interval overlapping between their columns. This is a very useful feature to add as you can see here: https://issues.apache.org/jira/browse/HIVE-556 Syntax: select * from table1 RANGEJOIN table2 on OVERLAPS( (t1Attribute1, t1Attribute2), (t2Attribute1, t2Attribute2)) Example: Table1 (id : String, start1: Int, end1: Int) = ( (i1, 10,20), (i2, 50, 60) ) Table2(start2: Int, end2: Int) = ( (15,70), (80, 90) ) select * from Table1 RANGEJOIN Table2 on OVERLAPS( (start1, end1), (start2, end2) ) produces the following entries: (i1, 10, 20, 15, 70), (i2, 50, 60, 15, 70) Method: Assuming rdd1 corresponds to the smaller table and rdd2 to the bigger one, I create an interval tree from all regions of rdd1 which I broadcast to all nodes and I query the tree with all entries of rdd2. Note that the interval tree uses metadata only, not all contents of rdd1. I keep track of the correspondence between the contents of interval tree and rdd1 through a collection of keys that I zipped with rdd1. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kozanitis/spark rangeJoins-1.2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2939.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2939 commit 092c7c805353651f0da47378d92415dba7c3b591 Author: Kozanitis kozani...@berkeley.edu Date: 2014-10-02T01:57:17Z Add range join support to spark-sql --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Add range join support to spark-sql
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2939#issuecomment-60469727 Can one of the admins verify this patch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org