[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21416#discussion_r190966442 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Column.scala --- @@ -786,6 +787,24 @@ class Column(val expr: Expression) extends Logging { @scala.annotation.varargs def isin(list: Any*): Column = withExpr { In(expr, list.map(lit(_).expr)) } + /** + * A boolean expression that is evaluated to true if the value of this expression is contained + * by the provided Set. + * + * @group expr_ops + * @since 2.4.0 + */ + def isinSet(values: scala.collection.Set[_]): Column = isin(values.toSeq: _*) --- End diff -- Sound good. How do we want to do the naming? `def isin` has `i` as lower case. If we do `isInCollection`, it will be slightly inconsistent. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21416#discussion_r190507907 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Column.scala --- @@ -786,6 +787,24 @@ class Column(val expr: Expression) extends Logging { @scala.annotation.varargs def isin(list: Any*): Column = withExpr { In(expr, list.map(lit(_).expr)) } + /** + * A boolean expression that is evaluated to true if the value of this expression is contained + * by the provided Set. + * + * @group expr_ops + * @since 2.4.0 + */ + def isinSet(values: scala.collection.Set[_]): Column = isin(values.toSeq: _*) --- End diff -- the name can be `isInCollection` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21416#discussion_r190496061 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -220,6 +219,7 @@ object OptimizeIn extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { case In(v, list) if list.isEmpty && !v.nullable => FalseLiteral + case In(v, list) if list.length == 1 => EqualTo(v, list.head) --- End diff -- can we update the comment to mention it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21416#discussion_r190507754 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Column.scala --- @@ -786,6 +787,24 @@ class Column(val expr: Expression) extends Logging { @scala.annotation.varargs def isin(list: Any*): Column = withExpr { In(expr, list.map(lit(_).expr)) } + /** + * A boolean expression that is evaluated to true if the value of this expression is contained + * by the provided Set. + * + * @group expr_ops + * @since 2.4.0 + */ + def isinSet(values: scala.collection.Set[_]): Column = isin(values.toSeq: _*) --- End diff -- shall we be more generic here and accept `Iterable`? Then Set, Seq, Array can all be accepted. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21416#discussion_r190799418 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala --- @@ -149,7 +149,7 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] with PredicateHelper { newPlan = dedupJoin( Join(newPlan, sub, ExistenceJoin(exists), conditions.reduceLeftOption(And))) exists -case In(value, Seq(ListQuery(sub, conditions, _, _))) => +case EqualTo(value, ListQuery(sub, conditions, _, _)) => --- End diff -- @dongjoon-hyun this should fix the test. I'll add one test for this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21416#discussion_r190759741 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -220,6 +219,7 @@ object OptimizeIn extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { case In(v, list) if list.isEmpty && !v.nullable => FalseLiteral + case In(v, list) if list.length == 1 => EqualTo(v, list.head) --- End diff -- Yep. This is that one. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21416#discussion_r190754229 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -220,6 +219,7 @@ object OptimizeIn extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { case In(v, list) if list.isEmpty && !v.nullable => FalseLiteral + case In(v, list) if list.length == 1 => EqualTo(v, list.head) --- End diff -- In fact, I'm debugging this StackOverflowError issue shown in Hive test. Thanks for this which helps me a lot for local testing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21416#discussion_r190722260 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -220,6 +219,7 @@ object OptimizeIn extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { case In(v, list) if list.isEmpty && !v.nullable => FalseLiteral + case In(v, list) if list.length == 1 => EqualTo(v, list.head) --- End diff -- Could you add the following test case, too? ```scala scala> sql("select * from t group by a having count(*) = (select count(*) from t)").explain == Physical Plan == *(2) Project [a#2L] +- *(2) Filter (count(1)#75L = Subquery subquery62) : +- Subquery subquery62 : +- *(2) HashAggregate(keys=[], functions=[count(1)]) :+- Exchange SinglePartition : +- *(1) HashAggregate(keys=[], functions=[partial_count(1)]) : +- *(1) Project : +- *(1) Range (0, 1, step=1, splits=8) +- *(2) HashAggregate(keys=[a#2L], functions=[count(1)]) +- Exchange hashpartitioning(a#2L, 200) +- *(1) HashAggregate(keys=[a#2L], functions=[partial_count(1)]) +- *(1) Project [id#0L AS a#2L] +- *(1) Range (0, 1, step=1, splits=8) scala> sql("select * from t group by a having count(*) in (select count(*) from t)").explain java.lang.StackOverflowError at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21416#discussion_r190717637 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -219,7 +219,11 @@ object ReorderAssociativeOperator extends Rule[LogicalPlan] { object OptimizeIn extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { - case In(v, list) if list.isEmpty && !v.nullable => FalseLiteral + case In(v, list) if list.isEmpty => +// When v is not nullable, the following expression will be optimized +// to FalseLiteral which is tested in OptimizeInSuite.scala +If(IsNotNull(v), FalseLiteral, Literal(null, BooleanType)) + case In(v, list) if list.length == 1 => EqualTo(v, list.head) --- End diff -- I got it. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21416#discussion_r190472138 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -219,7 +219,11 @@ object ReorderAssociativeOperator extends Rule[LogicalPlan] { object OptimizeIn extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { - case In(v, list) if list.isEmpty && !v.nullable => FalseLiteral + case In(v, list) if list.isEmpty => +// When v is not nullable, the following expression will be optimized +// to FalseLiteral which is tested in OptimizeInSuite.scala +If(IsNotNull(v), FalseLiteral, Literal(null, BooleanType)) + case In(v, list) if list.length == 1 => EqualTo(v, list.head) --- End diff -- Why does it have any implication on typecasting? With this PR, it seems I get the correct result. ```scala == Analyzed Logical Plan == (CAST(1.1 AS STRING) IN (CAST(1 AS STRING))): boolean, (CAST(1.1 AS INT) = 1): boolean Project [cast(1.1 as string) IN (cast(1 as string)) AS (CAST(1.1 AS STRING) IN (CAST(1 AS STRING)))#484, (cast(1.1 as int) = 1) AS (CAST(1.1 AS INT) = 1)#485] +- OneRowRelation == Optimized Logical Plan == Project [false AS (CAST(1.1 AS STRING) IN (CAST(1 AS STRING)))#484, true AS (CAST(1.1 AS INT) = 1)#485] +- OneRowRelation == Physical Plan == *(1) Project [false AS (CAST(1.1 AS STRING) IN (CAST(1 AS STRING)))#484, true AS (CAST(1.1 AS INT) = 1)#485] +- Scan OneRowRelation[] ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21416#discussion_r190470525 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala --- @@ -397,6 +399,68 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext { } } + test("isinSet: Scala Set") { +val df = Seq((1, "x"), (2, "y"), (3, "z")).toDF("a", "b") +checkAnswer(df.filter($"a".isinSet(Set(1, 2))), + df.collect().toSeq.filter(r => r.getInt(0) == 1 || r.getInt(0) == 2)) +checkAnswer(df.filter($"a".isinSet(Set(3, 2))), + df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 2)) +checkAnswer(df.filter($"a".isinSet(Set(3, 1))), + df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 1)) + +// Auto casting should work with mixture of different types in Set +checkAnswer(df.filter($"a".isinSet(Set(1.toShort, "2"))), + df.collect().toSeq.filter(r => r.getInt(0) == 1 || r.getInt(0) == 2)) +checkAnswer(df.filter($"a".isinSet(Set("3", 2.toLong))), + df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 2)) +checkAnswer(df.filter($"a".isinSet(Set(3, "1"))), + df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 1)) + +checkAnswer(df.filter($"b".isinSet(Set("y", "x"))), + df.collect().toSeq.filter(r => r.getString(1) == "y" || r.getString(1) == "x")) +checkAnswer(df.filter($"b".isinSet(Set("z", "x"))), + df.collect().toSeq.filter(r => r.getString(1) == "z" || r.getString(1) == "x")) +checkAnswer(df.filter($"b".isinSet(Set("z", "y"))), + df.collect().toSeq.filter(r => r.getString(1) == "z" || r.getString(1) == "y")) + +val df2 = Seq((1, Seq(1)), (2, Seq(2)), (3, Seq(3))).toDF("a", "b") + +intercept[AnalysisException] { + df2.filter($"a".isinSet(Set($"b"))) +} --- End diff -- Addressed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21416#discussion_r190407851 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -219,7 +219,11 @@ object ReorderAssociativeOperator extends Rule[LogicalPlan] { object OptimizeIn extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { - case In(v, list) if list.isEmpty && !v.nullable => FalseLiteral + case In(v, list) if list.isEmpty => +// When v is not nullable, the following expression will be optimized +// to FalseLiteral which is tested in OptimizeInSuite.scala +If(IsNotNull(v), FalseLiteral, Literal(null, BooleanType)) + case In(v, list) if list.length == 1 => EqualTo(v, list.head) --- End diff -- Ur, @dbtsai . This will cause side-effects on typecasting. For example, please see the following example. Could you add these kind of test cases? ```scala scala> sql("select '1.1' in (1), '1.1' = 1").collect() res0: Array[org.apache.spark.sql.Row] = Array([false,true]) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21416#discussion_r190399106 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala --- @@ -397,6 +399,68 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext { } } + test("isinSet: Scala Set") { +val df = Seq((1, "x"), (2, "y"), (3, "z")).toDF("a", "b") +checkAnswer(df.filter($"a".isinSet(Set(1, 2))), + df.collect().toSeq.filter(r => r.getInt(0) == 1 || r.getInt(0) == 2)) +checkAnswer(df.filter($"a".isinSet(Set(3, 2))), + df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 2)) +checkAnswer(df.filter($"a".isinSet(Set(3, 1))), + df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 1)) + +// Auto casting should work with mixture of different types in Set +checkAnswer(df.filter($"a".isinSet(Set(1.toShort, "2"))), + df.collect().toSeq.filter(r => r.getInt(0) == 1 || r.getInt(0) == 2)) +checkAnswer(df.filter($"a".isinSet(Set("3", 2.toLong))), + df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 2)) +checkAnswer(df.filter($"a".isinSet(Set(3, "1"))), + df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 1)) + +checkAnswer(df.filter($"b".isinSet(Set("y", "x"))), + df.collect().toSeq.filter(r => r.getString(1) == "y" || r.getString(1) == "x")) +checkAnswer(df.filter($"b".isinSet(Set("z", "x"))), + df.collect().toSeq.filter(r => r.getString(1) == "z" || r.getString(1) == "x")) +checkAnswer(df.filter($"b".isinSet(Set("z", "y"))), + df.collect().toSeq.filter(r => r.getString(1) == "z" || r.getString(1) == "y")) + +val df2 = Seq((1, Seq(1)), (2, Seq(2)), (3, Seq(3))).toDF("a", "b") + +intercept[AnalysisException] { + df2.filter($"a".isinSet(Set($"b"))) +} --- End diff -- Let's check the error message to prevent the future regression like raising different AnalysisException. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isinSet in DataFrame AP...
GitHub user dbtsai opened a pull request: https://github.com/apache/spark/pull/21416 [SPARK-24371] [SQL] Added isinSet in DataFrame API for Scala and Java. ## What changes were proposed in this pull request? Implemented **`isinSet`** in DataFrame API for both Scala and Java, so users can do ```scala val profileDF = Seq( Some(1), Some(2), Some(3), Some(4), Some(5), Some(6), Some(7), None ).toDF("profileID") val validUsers: Set[Any] = Set(6, 7.toShort, 8L, "3") val result = profileDF.withColumn("isValid", $"profileID".isinSet(validUsers)) result.show(10) """ +-+---+ |profileID|isValid| +-+---+ |1| false| |2| false| |3| true| |4| false| |5| false| |6| true| |7| true| | null| null| +-+---+ """.stripMargin ``` Two new rules in the logical plan optimizers are added. 1. When there is only one element in the **`Set`**, the physical plan will be optimized to **`EqualTo`**, so predicate pushdown can be used. ```scala profileDF.filter( $"profileID".isinSet(Set(6))).explain(true) """ |== Physical Plan == |*(1) Project [profileID#0] |+- *(1) Filter (isnotnull(profileID#0) && (profileID#0 = 6)) | +- *(1) FileScan parquet [profileID#0] Batched: true, Format: Parquet, | PartitionFilters: [], | PushedFilters: [IsNotNull(profileID), EqualTo(profileID,6)], | ReadSchema: struct """.stripMargin ``` 2. When the **`Set`** is empty, and the input is nullable, the logical plan will be simplified to ```scala profileDF.filter( $"profileID".isinSet(Set())).explain(true) """ |== Optimized Logical Plan == |Filter if (isnull(profileID#0)) null else false |+- Relation[profileID#0] parquet """.stripMargin ``` TODO: 1. For multiple conditions with numbers less than certain thresholds, we should still allow predicate pushdown. 2. Optimize the **`In`** using **`tableswitch`** or **`lookupswitch`** when the numbers of the categories are low, and they are **`Int`**, **`Long`**. 3. The default immutable hash trees set is slow for query, and we should do benchmark for using different set implementation for faster query. 4. **`filter(if (condition) null else false)`** can be optimized to false. ## How was this patch tested? Several unit tests are added. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dbtsai/spark optimize-set Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21416.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21416 commit ec91220d5e8658500be6d049f8ab3496fc8a914e Author: DB TsaiDate: 2018-05-17T00:21:14Z Added isinSet in DataFrame API for Scala and Java. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org