[GitHub] spark pull request #21747: [SPARK-24165][SQL][branch-2.3] Fixing conditional...
Github user mn-mikke closed the pull request at: https://github.com/apache/spark/pull/21747 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21747: [SPARK-24165][SQL][branch-2.3] Fixing conditional expres...
Github user mn-mikke commented on the issue: https://github.com/apache/spark/pull/21747 If nobody has any objections, I'm happy to close this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22471: [SPARK-25470][SQL][Performance] Concat.eval should use p...
Github user mn-mikke commented on the issue: https://github.com/apache/spark/pull/22471 If you don't mind, I will include fixes for ```Reverse``` and ```ElementAt``` to this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20858: [SPARK-23736][SQL] Extending the concat function ...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/20858#discussion_r218917303 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -665,3 +667,219 @@ case class ElementAt(left: Expression, right: Expression) extends GetMapValueUti override def prettyName: String = "element_at" } + +/** + * Concatenates multiple input columns together into a single column. + * The function works with strings, binary and compatible array columns. + */ +@ExpressionDescription( + usage = "_FUNC_(col1, col2, ..., colN) - Returns the concatenation of col1, col2, ..., colN.", + examples = """ +Examples: + > SELECT _FUNC_('Spark', 'SQL'); + SparkSQL + > SELECT _FUNC_(array(1, 2, 3), array(4, 5), array(6)); + | [1,2,3,4,5,6] + """) +case class Concat(children: Seq[Expression]) extends Expression { + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + val allowedTypes = Seq(StringType, BinaryType, ArrayType) + + override def checkInputDataTypes(): TypeCheckResult = { +if (children.isEmpty) { + TypeCheckResult.TypeCheckSuccess +} else { + val childTypes = children.map(_.dataType) + if (childTypes.exists(tpe => !allowedTypes.exists(_.acceptsType(tpe { +return TypeCheckResult.TypeCheckFailure( + s"input to function $prettyName should have been StringType, BinaryType or ArrayType," + +s" but it's " + childTypes.map(_.simpleString).mkString("[", ", ", "]")) + } + TypeUtils.checkForSameTypeInputExpr(childTypes, s"function $prettyName") +} + } + + override def dataType: DataType = children.map(_.dataType).headOption.getOrElse(StringType) + + lazy val javaType: String = CodeGenerator.javaType(dataType) + + override def nullable: Boolean = children.exists(_.nullable) + + override def foldable: Boolean = children.forall(_.foldable) + + override def eval(input: InternalRow): Any = dataType match { --- End diff -- Thanks! I've created #22471 to call the pattern matching only once. WDYT about [Reverse](https://github.com/apache/spark/pull/21034/files#diff-9853dcf5ce3d2ac1e94d473197ff5768R240)? It looks like a similar problem. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22471: [SPARK-25470][SQL][Performance] Concat.eval should use p...
Github user mn-mikke commented on the issue: https://github.com/apache/spark/pull/22471 cc @rxin @ueshin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22471: [SPARK-25470][SQL][Performance] Concat.eval shoul...
GitHub user mn-mikke opened a pull request: https://github.com/apache/spark/pull/22471 [SPARK-25470][SQL][Performance] Concat.eval should use pattern matching only once ## What changes were proposed in this pull request? The PR proposes a solution for a problem described [here](https://github.com/apache/spark/pull/20858#discussion_r218677837). ## How was this patch tested? Run the existing tests for Concat expression. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mn-mikke/spark SPARK-25470 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22471.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22471 commit 9051c235eb2e4a62293ea6bd55e9d7aa1cb096fe Author: Marek Novotny Date: 2018-09-19T17:49:15Z [SPARK-25470][SQL][Performance] Concat.eval will use pattern matching only once --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22243: [MINOR] Avoid code duplication for nullable in Hi...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22243#discussion_r213022884 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -155,6 +155,8 @@ trait HigherOrderFunction extends Expression with ExpectsInputTypes { */ trait SimpleHigherOrderFunction extends HigherOrderFunction { + override def nullable: Boolean = argument.nullable --- End diff -- If we moved the definition of ```nullable``` straight to ```HigherOrderFunction``` as ```arguments.exists(_.nullable)```, we could also avoid the duplicities in ```ZipWith``` and ```MapZipWith```. WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22131: [SPARK-25141][SQL][TEST] Modify tests for higher-order f...
Github user mn-mikke commented on the issue: https://github.com/apache/spark/pull/22131 LGTM too --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22126: [SPARK-23938][SQL][FOLLOW-UP][TEST] Nullabilities...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22126#discussion_r210724650 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HigherOrderFunctionsSuite.scala --- @@ -363,9 +363,9 @@ class HigherOrderFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper left: Expression, right: Expression, f: (Expression, Expression, Expression) => Expression): Expression = { - val MapType(kt, vt1, vcn1) = left.dataType.asInstanceOf[MapType] - val MapType(_, vt2, vcn2) = right.dataType.asInstanceOf[MapType] - MapZipWith(left, right, createLambda(kt, false, vt1, vcn1, vt2, vcn2, f)) + val MapType(kt, vt1, _) = left.dataType.asInstanceOf[MapType] --- End diff -- Optional suggestion: Maybe we could remove```asInstanceOf[MapType]``` here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22110: [SPARK-25122][SQL] Deduplication of supports equa...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22110#discussion_r210563516 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala --- @@ -73,4 +73,14 @@ object TypeUtils { } x.length - y.length } + + /** + * Returns true if elements of the data type could be used as items of a hash set or as keys + * of a hash map. + */ + def typeCanBeHashed(dataType: DataType): Boolean = dataType match { --- End diff -- Ok, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22045: [SPARK-23940][SQL] Add transform_values SQL funct...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22045#discussion_r210561102 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -497,6 +497,53 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } +/** + * Returns a map that applies the function to each value of the map. + */ +@ExpressionDescription( + usage = "_FUNC_(expr, func) - Transforms values in the map using the function.", + examples = """ +Examples: + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3)), (k, v) -> v + 1); +map(array(1, 2, 3), array(2, 3, 4)) + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3)), (k, v) -> k + v); +map(array(1, 2, 3), array(2, 4, 6)) + """, + since = "2.4.0") +case class TransformValues( +argument: Expression, +function: Expression) + extends MapBasedSimpleHigherOrderFunction with CodegenFallback { + + override def nullable: Boolean = argument.nullable + + @transient lazy val MapType(keyType, valueType, valueContainsNull) = argument.dataType + + override def dataType: DataType = MapType(keyType, function.dataType, valueContainsNull) --- End diff -- Shouldn't the ```dataType``` be defined as ```MapType(keyType, function.dataType, function.nullable)```? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22110: [SPARK-25122][SQL] Deduplication of supports equa...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22110#discussion_r210493260 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala --- @@ -73,4 +73,14 @@ object TypeUtils { } x.length - y.length } + + /** + * Returns true if elements of the data type could be used as items of a hash set or as keys + * of a hash map. + */ + def typeCanBeHashed(dataType: DataType): Boolean = dataType match { --- End diff -- I will change it :) Just one question to ```hashCode```. If ```case classes``` are used, ```equals``` and ```hashCode``` are generated by compiler. But if we define ```equals``` manually, shouldn't also hold ```a.equals(b) == true``` => ```a.hashCode == b.hashCode```? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22013#discussion_r210373675 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala --- @@ -2302,6 +2302,97 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { assert(ex5.getMessage.contains("function map_zip_with does not support ordering on type map")) } + test("transform keys function - test various primitive data types combinations") { +val dfExample1 = Seq( + Map[Int, Int](1 -> 1, 9 -> 9, 8 -> 8, 7 -> 7) +).toDF("i") + +val dfExample2 = Seq( + Map[Int, Double](1 -> 1.0E0, 2 -> 1.4E0, 3 -> 1.7E0) +).toDF("j") + +val dfExample3 = Seq( + Map[Int, Boolean](25 -> true, 26 -> false) +).toDF("x") + +val dfExample4 = Seq( + Map[Array[Int], Boolean](Array(1, 2) -> false) +).toDF("y") + + +def testMapOfPrimitiveTypesCombination(): Unit = { + checkAnswer(dfExample1.selectExpr("transform_keys(i, (k, v) -> k + v)"), +Seq(Row(Map(2 -> 1, 18 -> 9, 16 -> 8, 14 -> 7 + + checkAnswer(dfExample2.selectExpr("transform_keys(j, " + +"(k, v) -> map_from_arrays(ARRAY(1, 2, 3), ARRAY('one', 'two', 'three'))[k])"), +Seq(Row(Map("one" -> 1.0, "two" -> 1.4, "three" -> 1.7 + + checkAnswer(dfExample2.selectExpr("transform_keys(j, (k, v) -> CAST(v * 2 AS BIGINT) + k)"), +Seq(Row(Map(3 -> 1.0, 4 -> 1.4, 6 -> 1.7 + + checkAnswer(dfExample2.selectExpr("transform_keys(j, (k, v) -> k + v)"), +Seq(Row(Map(2.0 -> 1.0, 3.4 -> 1.4, 4.7 -> 1.7 + + checkAnswer(dfExample3.selectExpr("transform_keys(x, (k, v) -> k % 2 = 0 OR v)"), +Seq(Row(Map(true -> true, true -> false + + checkAnswer(dfExample3.selectExpr("transform_keys(x, (k, v) -> if(v, 2 * k, 3 * k))"), +Seq(Row(Map(50 -> true, 78 -> false + + checkAnswer(dfExample3.selectExpr("transform_keys(x, (k, v) -> if(v, 2 * k, 3 * k))"), +Seq(Row(Map(50 -> true, 78 -> false + + checkAnswer(dfExample4.selectExpr("transform_keys(y, (k, v) -> array_contains(k, 3) AND v)"), +Seq(Row(Map(false -> false +} +// Test with local relation, the Project will be evaluated without codegen +testMapOfPrimitiveTypesCombination() +dfExample1.cache() +dfExample2.cache() +dfExample3.cache() +dfExample4.cache() +// Test with cached relation, the Project will be evaluated with codegen +testMapOfPrimitiveTypesCombination() + } + + test("transform keys function - Invalid lambda functions and exceptions") { +val dfExample1 = Seq( + Map[Int, Int](1 -> 1, 9 -> 9, 8 -> 8, 7 -> 7) +).toDF("i") + +val dfExample2 = Seq( + Map[String, String]("a" -> "b") +).toDF("j") + +val dfExample3 = Seq( + Map[String, String]("a" -> null) +).toDF("x") + +def testInvalidLambdaFunctions(): Unit = { + val ex1 = intercept[AnalysisException] { +dfExample1.selectExpr("transform_keys(i, k -> k )") + } + assert(ex1.getMessage.contains("The number of lambda function arguments '1' does not match")) + + val ex2 = intercept[AnalysisException] { +dfExample2.selectExpr("transform_keys(j, (k, v, x) -> k + 1)") + } + assert(ex2.getMessage.contains( + "The number of lambda function arguments '3' does not match")) + + val ex3 = intercept[RuntimeException] { +dfExample3.selectExpr("transform_keys(x, (k, v) -> v)").show() + } + assert(ex3.getMessage.contains("Cannot use null as map key!")) +} + +testInvalidLambdaFunctions() +dfExample1.cache() +dfExample2.cache() +testInvalidLambdaFunctions() --- End diff -- @ueshin I would like to ask you a generic question regarding higher-order functions. Is it necessary to perform checks with codegen paths if all the newly added functions extends from ```CodegenFallback```? Eventually, is there a plan to add coden for these functions in future? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22013#discussion_r210366383 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -497,6 +497,62 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } +/** + * Transform Keys for every entry of the map by applying the transform_keys function. + * Returns map with transformed key entries + */ +@ExpressionDescription( + usage = "_FUNC_(expr, func) - Transforms elements in a map using the function.", + examples = """ +Examples: + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3)), (k, v) -> k + 1); + map(array(2, 3, 4), array(1, 2, 3)) + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3)), (k, v) -> k + v); + map(array(2, 4, 6), array(1, 2, 3)) + """, + since = "2.4.0") +case class TransformKeys( +argument: Expression, +function: Expression) + extends MapBasedSimpleHigherOrderFunction with CodegenFallback { + + override def nullable: Boolean = argument.nullable + + @transient lazy val MapType(keyType, valueType, valueContainsNull) = argument.dataType + + override def dataType: DataType = { +MapType(function.dataType, valueType, valueContainsNull) --- End diff -- nit: just in one line? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22013#discussion_r210368929 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -497,6 +497,62 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } +/** + * Transform Keys for every entry of the map by applying the transform_keys function. + * Returns map with transformed key entries + */ +@ExpressionDescription( + usage = "_FUNC_(expr, func) - Transforms elements in a map using the function.", + examples = """ +Examples: + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3)), (k, v) -> k + 1); + map(array(2, 3, 4), array(1, 2, 3)) + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3)), (k, v) -> k + v); + map(array(2, 4, 6), array(1, 2, 3)) + """, + since = "2.4.0") +case class TransformKeys( +argument: Expression, +function: Expression) + extends MapBasedSimpleHigherOrderFunction with CodegenFallback { + + override def nullable: Boolean = argument.nullable + + @transient lazy val MapType(keyType, valueType, valueContainsNull) = argument.dataType + + override def dataType: DataType = { +MapType(function.dataType, valueType, valueContainsNull) + } + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): TransformKeys = { +copy(function = f(function, (keyType, false) :: (valueType, valueContainsNull) :: Nil)) + } + + @transient lazy val LambdaFunction( + _, (keyVar: NamedLambdaVariable) :: (valueVar: NamedLambdaVariable) :: Nil, _) = function + + + override def nullSafeEval(inputRow: InternalRow, argumentValue: Any): Any = { +val map = argumentValue.asInstanceOf[MapData] +val f = functionForEval --- End diff -- Can't we use ```functionForEval``` directly? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22110: [SPARK-25122][SQL] Deduplication of supports equa...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22110#discussion_r210357474 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala --- @@ -73,4 +73,14 @@ object TypeUtils { } x.length - y.length } + + /** + * Returns true if elements of the data type could be used as items of a hash set or as keys --- End diff -- What about changing the name of the method to ```typeCanBeHashed```? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22110: [SPARK-25122][SQL] Deduplication of supports equa...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22110#discussion_r210350632 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala --- @@ -73,4 +73,14 @@ object TypeUtils { } x.length - y.length } + + /** + * Returns true if elements of the data type could be used as items of a hash set or as keys --- End diff -- I'm open to any changes :) But if you want to explicitly mention the ```equals``` method, I would also mention ```hashCode``` generally needed for usage in "hash" collections. But then this not 100% true for Spark's specialized ```OpenHashSets``` and ```OpenHashMaps``` since they calculate hash by themselves. WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22110: [SPARK-25122][SQL] Deduplication of supports equa...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22110#discussion_r210212586 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala --- @@ -115,6 +115,8 @@ protected[sql] abstract class AtomicType extends DataType { private[sql] type InternalType private[sql] val tag: TypeTag[InternalType] private[sql] val ordering: Ordering[InternalType] + + private[spark] override def supportsEquals: Boolean = true --- End diff -- Not all of the expressions utilize ```OpenHashSet``` or ```OpenHashMap```. What about ```TypeUtils``` that contains methods like ```getInterpretedOrdering```? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22110: [SPARK-25122][SQL] Deduplication of supports equals code
Github user mn-mikke commented on the issue: https://github.com/apache/spark/pull/22110 cc @ueshin @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22110: [SPARK-25122][SQL] Deduplication of supports equa...
GitHub user mn-mikke opened a pull request: https://github.com/apache/spark/pull/22110 [SPARK-25122][SQL] Deduplication of supports equals code ## What changes were proposed in this pull request? The method ```*supportEquals``` determining whether elements of a data type could be used as items in a hash set or as keys in a hash map is duplicated across multiple collection and higher-order functions. This PR suggests to deduplicate the method. ## How was this patch tested? Run tests in: - DataFrameFunctionsSuite - CollectionExpressionsSuite - HigherOrderExpressionsSuite You can merge this pull request into a Git repository by running: $ git pull https://github.com/mn-mikke/spark SPARK-25122 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22110.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22110 commit dd292e8cf3ed1788793e626da3a136e9acb9d81c Author: Marek Novotny Date: 2018-08-15T08:18:05Z [SPARK-25122][SQL] Deduplication of supports equals code --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22031: [TODO][SPARK-23932][SQL] Higher order function zi...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22031#discussion_r209920407 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,91 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(left, right, func) - Merges the two given arrays, element-wise, into a single array using function. If one array is shorter, nulls are appended at the end to match the length of the longer array, before applying function.", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, x)); + array(('a', 1), ('b', 3), ('c', 5)) + > SELECT _FUNC_(array(1, 2), array(3, 4), (x, y) -> x + y)); + array(4, 6) + > SELECT _FUNC_(array('a', 'b', 'c'), array('d', 'e', 'f'), (x, y) -> concat(x, y)); + array('ad', 'be', 'cf') + """, + since = "2.4.0") +// scalastyle:on line.size.limit +case class ArraysZipWith( +left: Expression, +right: Expression, +function: Expression) + extends HigherOrderFunction with CodegenFallback with ExpectsInputTypes { + + override def inputs: Seq[Expression] = List(left, right) + + override def functions: Seq[Expression] = List(function) + + def expectingFunctionType: AbstractDataType = AnyDataType + @transient lazy val functionForEval: Expression = functionsForEval.head + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType, expectingFunctionType) + + override def nullable: Boolean = inputs.exists(_.nullable) + + override def dataType: ArrayType = ArrayType(function.dataType, function.nullable) + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): ArraysZipWith = { +val (leftElementType, leftContainsNull) = left.dataType match { --- End diff -- This comment is not valid anymore. The method has been removed by [#22075](https://github.com/apache/spark/pull/22075). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r209876913 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -496,3 +496,194 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + def functionForEval: Expression = functionsForEval.head + + @transient lazy val MapType(leftKeyType, leftValueType, leftValueContainsNull) = left.dataType + + @transient lazy val MapType(rightKeyType, rightValueType, rightValueContainsNull) = right.dataType + + @transient lazy val keyType = +TypeCoercion.findCommonTypeDifferentOnlyInNullFlags(leftKeyType, rightKeyType).get --- End diff -- If ```leftKeyType``` is ```ArrayType(IntegerType, false)``` and ```rightKeyType``` is ```ArrayType(IntegerType, true)``` for instance, the coercion rule is not executed ```leftKeyType.sameType(rightKeyType) == true```. An array with nulls seems to be a valid key.: ``` scala> spark.range(1).selectExpr("map(array(1, 2, null), 12)").show() +---+ |map(array(1, 2, CAST(NULL AS INT)), 12)| +---+ |[[1, 2,] -> 12]| +---+ ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22075: [SPARK-23908][SQL][FOLLOW-UP] Rename inputs to ar...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22075#discussion_r209643649 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -422,45 +425,49 @@ case class ArrayExists( """, since = "2.4.0") case class ArrayAggregate( -input: Expression, +argument: Expression, zero: Expression, merge: Expression, finish: Expression) extends HigherOrderFunction with CodegenFallback { - def this(input: Expression, zero: Expression, merge: Expression) = { -this(input, zero, merge, LambdaFunction.identity) + def this(argument: Expression, zero: Expression, merge: Expression) = { +this(argument, zero, merge, LambdaFunction.identity) } - override def inputs: Seq[Expression] = input :: zero :: Nil + override def arguments: Seq[Expression] = argument :: zero :: Nil + + override def argumentTypes: Seq[AbstractDataType] = ArrayType :: AnyDataType :: Nil override def functions: Seq[Expression] = merge :: finish :: Nil - override def nullable: Boolean = input.nullable || finish.nullable + override def functionTypes: Seq[AbstractDataType] = zero.dataType :: AnyDataType :: Nil + + override def nullable: Boolean = argument.nullable || finish.nullable override def dataType: DataType = finish.dataType override def checkInputDataTypes(): TypeCheckResult = { -if (!ArrayType.acceptsType(input.dataType)) { - TypeCheckResult.TypeCheckFailure( -s"argument 1 requires ${ArrayType.simpleString} type, " + - s"however, '${input.sql}' is of ${input.dataType.catalogString} type.") -} else if (!DataType.equalsStructurally( -zero.dataType, merge.dataType, ignoreNullability = true)) { - TypeCheckResult.TypeCheckFailure( -s"argument 3 requires ${zero.dataType.simpleString} type, " + - s"however, '${merge.sql}' is of ${merge.dataType.catalogString} type.") -} else { - TypeCheckResult.TypeCheckSuccess +checkArgumentDataTypes() match { --- End diff -- just a quick question: Isn't calling of ```checkArgumentDataTypes``` extra here if ```checkArgumentDataTypes``` is called as such before ```checkInputDataTypes```? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r209533017 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,186 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val (leftKeyType, leftValueType, leftValueContainsNull) = +HigherOrderFunction.mapKeyValueArgumentType(left.dataType) + + @transient lazy val (rightKeyType, rightValueType, rightValueContainsNull) = +HigherOrderFunction.mapKeyValueArgumentType(right.dataType) + + @transient lazy val keyType = +TypeCoercion.findTightestCommonType(leftKeyType, rightKeyType).getOrElse(NullType) --- End diff -- Oh, I see. We also need to check the output data type of lambda functions for the expressions like ```ArrayFilter```. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r209515431 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -231,6 +231,15 @@ object TypeCoercion { }) } + /** + * Similar to [[findTightestCommonType]] but with string promotion. + */ + def findWiderTypeForTwoExceptDecimals(t1: DataType, t2: DataType): Option[DataType] = { --- End diff -- Thanks for both your PRs! I will submit changes once they get in. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r209514957 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,186 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val (leftKeyType, leftValueType, leftValueContainsNull) = +HigherOrderFunction.mapKeyValueArgumentType(left.dataType) + + @transient lazy val (rightKeyType, rightValueType, rightValueContainsNull) = +HigherOrderFunction.mapKeyValueArgumentType(right.dataType) + + @transient lazy val keyType = +TypeCoercion.findTightestCommonType(leftKeyType, rightKeyType).getOrElse(NullType) --- End diff -- IMHO, if ```checkInputDataTypes``` was executed before ```bind```, ```findTightestCommonType``` could play the same role. But yeah, ```findCommonTypeDifferentOnlyInNullFlags``` will be semantically more accurate. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on the issue: https://github.com/apache/spark/pull/22017 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r209311502 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -231,6 +231,15 @@ object TypeCoercion { }) } + /** + * Similar to [[findTightestCommonType]] but with string promotion. + */ + def findWiderTypeForTwoExceptDecimals(t1: DataType, t2: DataType): Option[DataType] = { --- End diff -- If we have maps with decimals of different precision as keys. ```Cast``` will fail in analysis phase since it can't cast a key to nullable (potential lost of precision). IMHO, the type mismatch exception from this function will be more accurate. WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r209188342 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,186 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val (leftKeyType, leftValueType, leftValueContainsNull) = +HigherOrderFunction.mapKeyValueArgumentType(left.dataType) + + @transient lazy val (rightKeyType, rightValueType, rightValueContainsNull) = +HigherOrderFunction.mapKeyValueArgumentType(right.dataType) + + @transient lazy val keyType = +TypeCoercion.findTightestCommonType(leftKeyType, rightKeyType).getOrElse(NullType) --- End diff -- Even though there is a coercion rule for unification of key types. The key types may differ in nullability flags if they are complex. In theory, we could use ```==``` and ```findTightestCommonType``` in the coercion rule since there is no codegen to be optimized for ```null``` checks. But unfortunatelly, ```bind``` gets called once before execution of coercion rules, so ```findTightestCommonType``` is important for setting up a correct input type for lamda function. Maybe, we could play with order of analysis rules, but I'm not sure about all the consequences. @ueshin could shad some light on analysis rules ordering? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r208941728 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,191 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val (keyType, leftValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(left.dataType) + + @transient lazy val (_, rightValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(right.dataType) + + @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType) + + override def inputs: Seq[Expression] = left :: right :: Nil + + override def functions: Seq[Expression] = function :: Nil + + override def nullable: Boolean = left.nullable || right.nullable + + override def dataType: DataType = MapType(keyType, function.dataType, function.nullable) + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) => +TypeUtils.checkForOrderingExpr(k1, s"function $prettyName") + case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " + +s"been two ${MapType.simpleString}s with the same key type, but it's " + +s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].") +} + } + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = { +val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true)) +copy(function = f(function, arguments)) + } + + override def eval(input: InternalRow): Any = { +val value1 = left.eval(input) +if (value1 == null) { + null +} else { + val value2 = right.eval(input) + if (value2 == null) { +null + } else { +nullSafeEval(input, value1, value2) + } +} + } + + @transient lazy val LambdaFunction(_, Seq( +keyVar: NamedLambdaVariable, +value1Var: NamedLambdaVariable, +value2Var: NamedLambdaVariable), +_) = function + + private def keyTypeSupportsEquals = keyType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } + + @transient private lazy val getKeysWithValueIndexes: + (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = { +if (keyTypeSupportsEquals) { + getKeysWithIndexesFast +} else { + getKeysWithIndexesBruteForce +} + } + + private def assertSizeOfArrayBuffer(size: Int): Unit = { +if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throw new RuntimeException(s"Unsuccessful try to zip maps with $size " + +s"unique keys due to exceeding the array size limit " + +s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") +} + } + + private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = { +val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])] --- End diff -- Like this idea, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...
Github user mn-mikke closed the pull request at: https://github.com/apache/spark/pull/21121 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21121: [SPARK-24042][SQL] Collection function: zip_with_index
Github user mn-mikke commented on the issue: https://github.com/apache/spark/pull/21121 Sure, closing ... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r208872928 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,191 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val (keyType, leftValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(left.dataType) + + @transient lazy val (_, rightValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(right.dataType) + + @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType) + + override def inputs: Seq[Expression] = left :: right :: Nil + + override def functions: Seq[Expression] = function :: Nil + + override def nullable: Boolean = left.nullable || right.nullable + + override def dataType: DataType = MapType(keyType, function.dataType, function.nullable) + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) => +TypeUtils.checkForOrderingExpr(k1, s"function $prettyName") + case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " + +s"been two ${MapType.simpleString}s with the same key type, but it's " + +s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].") +} + } + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = { +val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true)) +copy(function = f(function, arguments)) + } + + override def eval(input: InternalRow): Any = { +val value1 = left.eval(input) +if (value1 == null) { + null +} else { + val value2 = right.eval(input) + if (value2 == null) { +null + } else { +nullSafeEval(input, value1, value2) + } +} + } + + @transient lazy val LambdaFunction(_, Seq( +keyVar: NamedLambdaVariable, +value1Var: NamedLambdaVariable, +value2Var: NamedLambdaVariable), +_) = function + + private def keyTypeSupportsEquals = keyType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } + + @transient private lazy val getKeysWithValueIndexes: + (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = { +if (keyTypeSupportsEquals) { + getKeysWithIndexesFast +} else { + getKeysWithIndexesBruteForce +} + } + + private def assertSizeOfArrayBuffer(size: Int): Unit = { +if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throw new RuntimeException(s"Unsuccessful try to zip maps with $size " + +s"unique keys due to exceeding the array size limit " + +s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") +} + } + + private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = { +val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])] --- End diff -- @mgaido91 Are you comfortable with reverting back to the previous version? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r208871779 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,184 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val (keyType, leftValueType, _) = --- End diff -- You are right, thanks! WDYT about introducing a coercion rule handling different key types? For cases like (```IntType```, ```LongType```) might be handy. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r208868838 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala --- @@ -225,7 +264,9 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks with PlanTestBa val lit = InternalRow(expected, expected) val expectedRow = UnsafeProjection.create(Array(expression.dataType, expression.dataType)).apply(lit) - if (unsafeRow != expectedRow) { + val field = StructField("field", expression.dataType) + val dataType = StructType(field :: field :: Nil) + if (!checkResult(unsafeRow, expectedRow, dataType)) { --- End diff -- ```UnsafeRow```s are compared based on equality of backing arrays. This approach doesn't work well when ignoring order in unsafe representation of maps. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22045: [SPARK-23940][SQL] Add transform_values SQL funct...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22045#discussion_r208749197 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,61 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Transform Values for every entry of the map by applying transform_values function. + * Returns map wth transformed values + */ +@ExpressionDescription( +usage = "_FUNC_(expr, func) - Transforms values in the map using the function.", +examples = """ +Examples: + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1); +map(array(1, 2, 3), array(2, 3, 4)) + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k, v) -> k + v); +map(array(1, 2, 3), array(2, 4, 6)) + """, +since = "2.4.0") +case class TransformValues( +input: Expression, +function: Expression) + extends MapBasedSimpleHigherOrderFunction with CodegenFallback { + + override def nullable: Boolean = input.nullable + + override def dataType: DataType = { +val map = input.dataType.asInstanceOf[MapType] +MapType(map.keyType, function.dataType, map.valueContainsNull) --- End diff -- ```map.valueContainsNull``` -> ```function.nullable```? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22045: [SPARK-23940][SQL] Add transform_values SQL funct...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22045#discussion_r208750446 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,61 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Transform Values for every entry of the map by applying transform_values function. + * Returns map wth transformed values + */ +@ExpressionDescription( +usage = "_FUNC_(expr, func) - Transforms values in the map using the function.", +examples = """ +Examples: + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1); +map(array(1, 2, 3), array(2, 3, 4)) + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k, v) -> k + v); +map(array(1, 2, 3), array(2, 4, 6)) + """, +since = "2.4.0") +case class TransformValues( +input: Expression, +function: Expression) + extends MapBasedSimpleHigherOrderFunction with CodegenFallback { + + override def nullable: Boolean = input.nullable + + override def dataType: DataType = { +val map = input.dataType.asInstanceOf[MapType] +MapType(map.keyType, function.dataType, map.valueContainsNull) + } + + override def inputTypes: Seq[AbstractDataType] = Seq(MapType, expectingFunctionType) --- End diff -- This is already specified by ```MapBasedSimpleHigherOrderFunction```. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22045: [SPARK-23940][SQL] Add transform_values SQL funct...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22045#discussion_r208751629 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,61 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Transform Values for every entry of the map by applying transform_values function. + * Returns map wth transformed values + */ +@ExpressionDescription( +usage = "_FUNC_(expr, func) - Transforms values in the map using the function.", +examples = """ +Examples: + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1); +map(array(1, 2, 3), array(2, 3, 4)) + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k, v) -> k + v); +map(array(1, 2, 3), array(2, 4, 6)) + """, +since = "2.4.0") +case class TransformValues( +input: Expression, +function: Expression) + extends MapBasedSimpleHigherOrderFunction with CodegenFallback { + + override def nullable: Boolean = input.nullable + + override def dataType: DataType = { +val map = input.dataType.asInstanceOf[MapType] +MapType(map.keyType, function.dataType, map.valueContainsNull) + } + + override def inputTypes: Seq[AbstractDataType] = Seq(MapType, expectingFunctionType) + + @transient val (keyType, valueType, valueContainsNull) = +HigherOrderFunction.mapKeyValueArgumentType(input.dataType) + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): --- End diff -- nit: formatting --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22045: [SPARK-23940][SQL] Add transform_values SQL funct...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22045#discussion_r208747953 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,61 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Transform Values for every entry of the map by applying transform_values function. + * Returns map wth transformed values + */ +@ExpressionDescription( +usage = "_FUNC_(expr, func) - Transforms values in the map using the function.", +examples = """ +Examples: + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1); --- End diff -- nit:```(k, v)``` and maybe I would use ```v + 1``` instead of ```k + 1```. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22045: [SPARK-23940][SQL] Add transform_values SQL funct...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22045#discussion_r208746631 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,61 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Transform Values for every entry of the map by applying transform_values function. + * Returns map wth transformed values --- End diff -- typos: Transforms values; with Maybe can you think of a better comment? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r208675350 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,191 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val (keyType, leftValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(left.dataType) + + @transient lazy val (_, rightValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(right.dataType) + + @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType) + + override def inputs: Seq[Expression] = left :: right :: Nil + + override def functions: Seq[Expression] = function :: Nil + + override def nullable: Boolean = left.nullable || right.nullable + + override def dataType: DataType = MapType(keyType, function.dataType, function.nullable) + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) => +TypeUtils.checkForOrderingExpr(k1, s"function $prettyName") + case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " + +s"been two ${MapType.simpleString}s with the same key type, but it's " + +s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].") +} + } + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = { +val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true)) +copy(function = f(function, arguments)) + } + + override def eval(input: InternalRow): Any = { +val value1 = left.eval(input) +if (value1 == null) { + null +} else { + val value2 = right.eval(input) + if (value2 == null) { +null + } else { +nullSafeEval(input, value1, value2) + } +} + } + + @transient lazy val LambdaFunction(_, Seq( +keyVar: NamedLambdaVariable, +value1Var: NamedLambdaVariable, +value2Var: NamedLambdaVariable), +_) = function + + private def keyTypeSupportsEquals = keyType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } + + @transient private lazy val getKeysWithValueIndexes: + (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = { +if (keyTypeSupportsEquals) { + getKeysWithIndexesFast +} else { + getKeysWithIndexesBruteForce +} + } + + private def assertSizeOfArrayBuffer(size: Int): Unit = { +if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throw new RuntimeException(s"Unsuccessful try to zip maps with $size " + +s"unique keys due to exceeding the array size limit " + +s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") +} + } + + private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = { +val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])] --- End diff -- This is valid argument, it's a rare edge case. The last question before I change it. WDYT about performance a mutable array vs. ```oldTuple.copy(_2 = newValue)```? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r208647186 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,191 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val (keyType, leftValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(left.dataType) + + @transient lazy val (_, rightValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(right.dataType) + + @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType) + + override def inputs: Seq[Expression] = left :: right :: Nil + + override def functions: Seq[Expression] = function :: Nil + + override def nullable: Boolean = left.nullable || right.nullable + + override def dataType: DataType = MapType(keyType, function.dataType, function.nullable) + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) => +TypeUtils.checkForOrderingExpr(k1, s"function $prettyName") + case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " + +s"been two ${MapType.simpleString}s with the same key type, but it's " + +s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].") +} + } + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = { +val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true)) +copy(function = f(function, arguments)) + } + + override def eval(input: InternalRow): Any = { +val value1 = left.eval(input) +if (value1 == null) { + null +} else { + val value2 = right.eval(input) + if (value2 == null) { +null + } else { +nullSafeEval(input, value1, value2) + } +} + } + + @transient lazy val LambdaFunction(_, Seq( +keyVar: NamedLambdaVariable, +value1Var: NamedLambdaVariable, +value2Var: NamedLambdaVariable), +_) = function + + private def keyTypeSupportsEquals = keyType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } + + @transient private lazy val getKeysWithValueIndexes: + (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = { +if (keyTypeSupportsEquals) { + getKeysWithIndexesFast +} else { + getKeysWithIndexesBruteForce +} + } + + private def assertSizeOfArrayBuffer(size: Int): Unit = { +if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throw new RuntimeException(s"Unsuccessful try to zip maps with $size " + +s"unique keys due to exceeding the array size limit " + +s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") +} + } + + private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = { +val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])] --- End diff -- ```indexes(z).isEmpty``` ensures that we insert always the the first occurrence of the key, which follows behavior of ```GetMapValue```. If we didn't perform such a check the last occurrence would ended up in result. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r208605882 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,191 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val (keyType, leftValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(left.dataType) + + @transient lazy val (_, rightValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(right.dataType) + + @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType) + + override def inputs: Seq[Expression] = left :: right :: Nil + + override def functions: Seq[Expression] = function :: Nil + + override def nullable: Boolean = left.nullable || right.nullable + + override def dataType: DataType = MapType(keyType, function.dataType, function.nullable) + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) => +TypeUtils.checkForOrderingExpr(k1, s"function $prettyName") + case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " + +s"been two ${MapType.simpleString}s with the same key type, but it's " + +s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].") +} + } + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = { +val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true)) +copy(function = f(function, arguments)) + } + + override def eval(input: InternalRow): Any = { +val value1 = left.eval(input) +if (value1 == null) { + null +} else { + val value2 = right.eval(input) + if (value2 == null) { +null + } else { +nullSafeEval(input, value1, value2) + } +} + } + + @transient lazy val LambdaFunction(_, Seq( +keyVar: NamedLambdaVariable, +value1Var: NamedLambdaVariable, +value2Var: NamedLambdaVariable), +_) = function + + private def keyTypeSupportsEquals = keyType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } + + @transient private lazy val getKeysWithValueIndexes: + (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = { +if (keyTypeSupportsEquals) { + getKeysWithIndexesFast +} else { + getKeysWithIndexesBruteForce +} + } + + private def assertSizeOfArrayBuffer(size: Int): Unit = { +if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throw new RuntimeException(s"Unsuccessful try to zip maps with $size " + +s"unique keys due to exceeding the array size limit " + +s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") +} + } + + private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = { +val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])] --- End diff -- > you don't need to check neither whether the key is there nor the size of the output array, you just need to add them. What about duplicated keys? They can be created with other map functions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r208559241 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,191 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val (keyType, leftValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(left.dataType) + + @transient lazy val (_, rightValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(right.dataType) + + @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType) + + override def inputs: Seq[Expression] = left :: right :: Nil + + override def functions: Seq[Expression] = function :: Nil + + override def nullable: Boolean = left.nullable || right.nullable + + override def dataType: DataType = MapType(keyType, function.dataType, function.nullable) + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) => +TypeUtils.checkForOrderingExpr(k1, s"function $prettyName") + case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " + +s"been two ${MapType.simpleString}s with the same key type, but it's " + +s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].") +} + } + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = { +val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true)) +copy(function = f(function, arguments)) + } + + override def eval(input: InternalRow): Any = { +val value1 = left.eval(input) +if (value1 == null) { + null +} else { + val value2 = right.eval(input) + if (value2 == null) { +null + } else { +nullSafeEval(input, value1, value2) + } +} + } + + @transient lazy val LambdaFunction(_, Seq( +keyVar: NamedLambdaVariable, +value1Var: NamedLambdaVariable, +value2Var: NamedLambdaVariable), +_) = function + + private def keyTypeSupportsEquals = keyType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } + + @transient private lazy val getKeysWithValueIndexes: + (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = { +if (keyTypeSupportsEquals) { + getKeysWithIndexesFast +} else { + getKeysWithIndexesBruteForce +} + } + + private def assertSizeOfArrayBuffer(size: Int): Unit = { +if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throw new RuntimeException(s"Unsuccessful try to zip maps with $size " + +s"unique keys due to exceeding the array size limit " + +s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") +} + } + + private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = { +val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])] --- End diff -- Yeah, but my point is how to crate a new tuple from a old one without using ```_1```, ```_2```? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r208550479 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,191 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val (keyType, leftValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(left.dataType) + + @transient lazy val (_, rightValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(right.dataType) + + @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType) + + override def inputs: Seq[Expression] = left :: right :: Nil + + override def functions: Seq[Expression] = function :: Nil + + override def nullable: Boolean = left.nullable || right.nullable + + override def dataType: DataType = MapType(keyType, function.dataType, function.nullable) + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) => +TypeUtils.checkForOrderingExpr(k1, s"function $prettyName") + case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " + +s"been two ${MapType.simpleString}s with the same key type, but it's " + +s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].") +} + } + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = { +val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true)) +copy(function = f(function, arguments)) + } + + override def eval(input: InternalRow): Any = { +val value1 = left.eval(input) +if (value1 == null) { + null +} else { + val value2 = right.eval(input) + if (value2 == null) { +null + } else { +nullSafeEval(input, value1, value2) + } +} + } + + @transient lazy val LambdaFunction(_, Seq( +keyVar: NamedLambdaVariable, +value1Var: NamedLambdaVariable, +value2Var: NamedLambdaVariable), +_) = function + + private def keyTypeSupportsEquals = keyType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } + + @transient private lazy val getKeysWithValueIndexes: + (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = { +if (keyTypeSupportsEquals) { + getKeysWithIndexesFast +} else { + getKeysWithIndexesBruteForce +} + } + + private def assertSizeOfArrayBuffer(size: Int): Unit = { +if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throw new RuntimeException(s"Unsuccessful try to zip maps with $size " + +s"unique keys due to exceeding the array size limit " + +s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") +} + } + + private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = { +val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])] --- End diff -- > I really don't think so, it would be the same as now I think Let's assume that ```indexes``` are tuple for now. ```indexes(z).isEmpty``` could replace with ```indexes
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r208527423 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,191 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val (keyType, leftValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(left.dataType) + + @transient lazy val (_, rightValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(right.dataType) + + @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType) + + override def inputs: Seq[Expression] = left :: right :: Nil + + override def functions: Seq[Expression] = function :: Nil + + override def nullable: Boolean = left.nullable || right.nullable + + override def dataType: DataType = MapType(keyType, function.dataType, function.nullable) + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) => +TypeUtils.checkForOrderingExpr(k1, s"function $prettyName") + case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " + +s"been two ${MapType.simpleString}s with the same key type, but it's " + +s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].") +} + } + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = { +val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true)) +copy(function = f(function, arguments)) + } + + override def eval(input: InternalRow): Any = { +val value1 = left.eval(input) +if (value1 == null) { + null +} else { + val value2 = right.eval(input) + if (value2 == null) { +null + } else { +nullSafeEval(input, value1, value2) + } +} + } + + @transient lazy val LambdaFunction(_, Seq( +keyVar: NamedLambdaVariable, +value1Var: NamedLambdaVariable, +value2Var: NamedLambdaVariable), +_) = function + + private def keyTypeSupportsEquals = keyType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } + + @transient private lazy val getKeysWithValueIndexes: + (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = { +if (keyTypeSupportsEquals) { + getKeysWithIndexesFast +} else { + getKeysWithIndexesBruteForce +} + } + + private def assertSizeOfArrayBuffer(size: Int): Unit = { +if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throw new RuntimeException(s"Unsuccessful try to zip maps with $size " + +s"unique keys due to exceeding the array size limit " + +s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") +} + } + + private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = { +val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])] +val hashMap = new mutable.OpenHashMap[Any, Array[Option[Int]]] +val keys = Array(keys1, keys2) +var z = 0 +while(z < 2) { + var i = 0 + val array = keys(z) + while (i < array.numElements()) { +val
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r208519941 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,191 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val (keyType, leftValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(left.dataType) + + @transient lazy val (_, rightValueType, _) = +HigherOrderFunction.mapKeyValueArgumentType(right.dataType) + + @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType) + + override def inputs: Seq[Expression] = left :: right :: Nil + + override def functions: Seq[Expression] = function :: Nil + + override def nullable: Boolean = left.nullable || right.nullable + + override def dataType: DataType = MapType(keyType, function.dataType, function.nullable) + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) => +TypeUtils.checkForOrderingExpr(k1, s"function $prettyName") + case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " + +s"been two ${MapType.simpleString}s with the same key type, but it's " + +s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].") +} + } + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = { +val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true)) +copy(function = f(function, arguments)) + } + + override def eval(input: InternalRow): Any = { +val value1 = left.eval(input) +if (value1 == null) { + null +} else { + val value2 = right.eval(input) + if (value2 == null) { +null + } else { +nullSafeEval(input, value1, value2) + } +} + } + + @transient lazy val LambdaFunction(_, Seq( +keyVar: NamedLambdaVariable, +value1Var: NamedLambdaVariable, +value2Var: NamedLambdaVariable), +_) = function + + private def keyTypeSupportsEquals = keyType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } + + @transient private lazy val getKeysWithValueIndexes: + (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = { +if (keyTypeSupportsEquals) { + getKeysWithIndexesFast +} else { + getKeysWithIndexesBruteForce +} + } + + private def assertSizeOfArrayBuffer(size: Int): Unit = { +if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { + throw new RuntimeException(s"Unsuccessful try to zip maps with $size " + +s"unique keys due to exceeding the array size limit " + +s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.") +} + } + + private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = { +val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])] --- End diff -- If we changed it to ```(Option[Int], Option[Int])```, wouldn't we need two similar ```i``` loops instead of one? My motivation for using also the ```ArrayBuffer``` is preserve the order of keys. A random order would break map comparison
[GitHub] spark pull request #22031: [TODO][SPARK-23932][SQL] Higher order function zi...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22031#discussion_r208399620 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,91 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(left, right, func) - Merges the two given arrays, element-wise, into a single array using function. If one array is shorter, nulls are appended at the end to match the length of the longer array, before applying function.", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, x)); + array(('a', 1), ('b', 3), ('c', 5)) + > SELECT _FUNC_(array(1, 2), array(3, 4), (x, y) -> x + y)); + array(4, 6) + > SELECT _FUNC_(array('a', 'b', 'c'), array('d', 'e', 'f'), (x, y) -> concat(x, y)); + array('ad', 'be', 'cf') + """, + since = "2.4.0") +// scalastyle:on line.size.limit +case class ArraysZipWith( +left: Expression, +right: Expression, +function: Expression) + extends HigherOrderFunction with CodegenFallback with ExpectsInputTypes { + + override def inputs: Seq[Expression] = List(left, right) + + override def functions: Seq[Expression] = List(function) + + def expectingFunctionType: AbstractDataType = AnyDataType + @transient lazy val functionForEval: Expression = functionsForEval.head + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType, expectingFunctionType) + + override def nullable: Boolean = inputs.exists(_.nullable) + + override def dataType: ArrayType = ArrayType(function.dataType, function.nullable) + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): ArraysZipWith = { +val (leftElementType, leftContainsNull) = left.dataType match { + case ArrayType(elementType, containsNull) => (elementType, containsNull) + case _ => +val ArrayType(elementType, containsNull) = ArrayType.defaultConcreteType +(elementType, containsNull) +} +val (rightElementType, rightContainsNull) = right.dataType match { + case ArrayType(elementType, containsNull) => (elementType, containsNull) + case _ => +val ArrayType(elementType, containsNull) = ArrayType.defaultConcreteType +(elementType, containsNull) +} +copy(function = f(function, + (leftElementType, leftContainsNull) :: (rightElementType, rightContainsNull) :: Nil)) + } + + @transient lazy val (arr1Var, arr2Var) = { +val LambdaFunction(_, + (arr1Var: NamedLambdaVariable):: (arr2Var: NamedLambdaVariable) :: Nil, _) = function +(arr1Var, arr2Var) + } + + override def eval(input: InternalRow): Any = { +val leftArr = left.eval(input).asInstanceOf[ArrayData] +val rightArr = right.eval(input).asInstanceOf[ArrayData] + +if (leftArr == null || rightArr == null) { --- End diff -- If ```leftArr``` is ```null```, ```right``` doesn't have to be evaluated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22031: [TODO][SPARK-23932][SQL] Higher order function zi...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22031#discussion_r208403145 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,91 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(left, right, func) - Merges the two given arrays, element-wise, into a single array using function. If one array is shorter, nulls are appended at the end to match the length of the longer array, before applying function.", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, x)); + array(('a', 1), ('b', 3), ('c', 5)) + > SELECT _FUNC_(array(1, 2), array(3, 4), (x, y) -> x + y)); + array(4, 6) + > SELECT _FUNC_(array('a', 'b', 'c'), array('d', 'e', 'f'), (x, y) -> concat(x, y)); + array('ad', 'be', 'cf') + """, + since = "2.4.0") +// scalastyle:on line.size.limit +case class ArraysZipWith( +left: Expression, +right: Expression, +function: Expression) + extends HigherOrderFunction with CodegenFallback with ExpectsInputTypes { + + override def inputs: Seq[Expression] = List(left, right) + + override def functions: Seq[Expression] = List(function) + + def expectingFunctionType: AbstractDataType = AnyDataType + @transient lazy val functionForEval: Expression = functionsForEval.head + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType, expectingFunctionType) + + override def nullable: Boolean = inputs.exists(_.nullable) + + override def dataType: ArrayType = ArrayType(function.dataType, function.nullable) + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): ArraysZipWith = { +val (leftElementType, leftContainsNull) = left.dataType match { + case ArrayType(elementType, containsNull) => (elementType, containsNull) + case _ => +val ArrayType(elementType, containsNull) = ArrayType.defaultConcreteType +(elementType, containsNull) +} +val (rightElementType, rightContainsNull) = right.dataType match { + case ArrayType(elementType, containsNull) => (elementType, containsNull) + case _ => +val ArrayType(elementType, containsNull) = ArrayType.defaultConcreteType +(elementType, containsNull) +} +copy(function = f(function, + (leftElementType, leftContainsNull) :: (rightElementType, rightContainsNull) :: Nil)) --- End diff -- If you want to support different size of input arrays (The jira ticket says: _"Both arrays must be the same length."_), what about the scenario when one array is empty and the second has elements? Shouldn't we use ```true``` instead of ```leftContainsNull``` and ```rightContainsNull```? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22031: [TODO][SPARK-23932][SQL] Higher order function zi...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22031#discussion_r208398313 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -442,3 +442,91 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(left, right, func) - Merges the two given arrays, element-wise, into a single array using function. If one array is shorter, nulls are appended at the end to match the length of the longer array, before applying function.", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, x)); + array(('a', 1), ('b', 3), ('c', 5)) + > SELECT _FUNC_(array(1, 2), array(3, 4), (x, y) -> x + y)); + array(4, 6) + > SELECT _FUNC_(array('a', 'b', 'c'), array('d', 'e', 'f'), (x, y) -> concat(x, y)); + array('ad', 'be', 'cf') + """, + since = "2.4.0") +// scalastyle:on line.size.limit +case class ArraysZipWith( +left: Expression, +right: Expression, +function: Expression) + extends HigherOrderFunction with CodegenFallback with ExpectsInputTypes { + + override def inputs: Seq[Expression] = List(left, right) + + override def functions: Seq[Expression] = List(function) + + def expectingFunctionType: AbstractDataType = AnyDataType + @transient lazy val functionForEval: Expression = functionsForEval.head + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType, expectingFunctionType) + + override def nullable: Boolean = inputs.exists(_.nullable) + + override def dataType: ArrayType = ArrayType(function.dataType, function.nullable) + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): ArraysZipWith = { +val (leftElementType, leftContainsNull) = left.dataType match { --- End diff -- You can utilize ```HigherOrderFunction.arrayArgumentType```. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r208280351 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -365,3 +364,101 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val MapType(keyType, leftValueType, _) = getMapType(left) + + @transient lazy val MapType(_, rightValueType, _) = getMapType(right) + + @transient lazy val arrayDataUnion = new ArrayDataUnion(keyType) + + @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType) + + override def inputs: Seq[Expression] = left :: right :: Nil + + override def functions: Seq[Expression] = function :: Nil + + override def nullable: Boolean = left.nullable || right.nullable --- End diff -- ```nullable``` flag is rather related to the cases when the whole map is ```null```. The case that you are referring to is handled by ```valueContainsNull``` flag of ```MapType``` (see the line [423](https://github.com/apache/spark/pull/22017/files/ec583eb29ba6fdb79d0b85cbecb3f709e6648b25#diff-ef52827ed9b41efc1fbd056a06ef7c6aR423)). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r208211250 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -365,3 +364,101 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val MapType(keyType, leftValueType, _) = getMapType(left) + + @transient lazy val MapType(_, rightValueType, _) = getMapType(right) + + @transient lazy val arrayDataUnion = new ArrayDataUnion(keyType) + + @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType) + + override def inputs: Seq[Expression] = left :: right :: Nil + + override def functions: Seq[Expression] = function :: Nil + + override def nullable: Boolean = left.nullable || right.nullable + + override def dataType: DataType = MapType(keyType, function.dataType, function.nullable) + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) => +TypeUtils.checkForOrderingExpr(k1, s"function $prettyName") + case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " + +s"been two ${MapType.simpleString}s with the same key type, but it's " + +s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].") +} + } + + private def getMapType(expr: Expression) = expr.dataType match { +case m: MapType => m +case _ => MapType.defaultConcreteType + } + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = { +val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true)) +copy(function = f(function, arguments)) + } + + override def eval(input: InternalRow): Any = { +val value1 = left.eval(input) +if (value1 == null) { + null +} else { + val value2 = right.eval(input) + if (value2 == null) { +null + } else { +nullSafeEval(input, value1, value2) + } +} + } + + @transient lazy val LambdaFunction(_, Seq( +keyVar: NamedLambdaVariable, +value1Var: NamedLambdaVariable, +value2Var: NamedLambdaVariable), +_) = function + + private def nullSafeEval(inputRow: InternalRow, value1: Any, value2: Any): Any = { +val mapData1 = value1.asInstanceOf[MapData] +val mapData2 = value2.asInstanceOf[MapData] +val keys = arrayDataUnion(mapData1.keyArray(), mapData2.keyArray()) +val values = new GenericArrayData(new Array[Any](keys.numElements())) +keys.foreach(keyType, (idx: Int, key: Any) => { + val v1 = GetMapValueUtil.getValueEval(mapData1, key, keyType, leftValueType, ordering) --- End diff -- Ok, I will change it. Thanks a lot! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22017#discussion_r208204796 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -365,3 +364,101 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Merges two given maps into a single map by applying function to the pair of values with + * the same key. + */ +@ExpressionDescription( + usage = +""" + _FUNC_(map1, map2, function) - Merges two given maps into a single map by applying + function to the pair of values with the same key. For keys only presented in one map, + NULL will be passed as the value for the missing key. If an input map contains duplicated + keys, only the first entry of the duplicated key is passed into the lambda function. +""", + examples = """ +Examples: + > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, v2) -> concat(v1, v2)); + {1:"ax",2:"by"} + """, + since = "2.4.0") +case class MapZipWith(left: Expression, right: Expression, function: Expression) + extends HigherOrderFunction with CodegenFallback { + + @transient lazy val functionForEval: Expression = functionsForEval.head + + @transient lazy val MapType(keyType, leftValueType, _) = getMapType(left) + + @transient lazy val MapType(_, rightValueType, _) = getMapType(right) + + @transient lazy val arrayDataUnion = new ArrayDataUnion(keyType) + + @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType) + + override def inputs: Seq[Expression] = left :: right :: Nil + + override def functions: Seq[Expression] = function :: Nil + + override def nullable: Boolean = left.nullable || right.nullable + + override def dataType: DataType = MapType(keyType, function.dataType, function.nullable) + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) => +TypeUtils.checkForOrderingExpr(k1, s"function $prettyName") + case _ => TypeCheckResult.TypeCheckFailure(s"The input to function $prettyName should have " + +s"been two ${MapType.simpleString}s with the same key type, but it's " + +s"[${left.dataType.catalogString}, ${right.dataType.catalogString}].") +} + } + + private def getMapType(expr: Expression) = expr.dataType match { +case m: MapType => m +case _ => MapType.defaultConcreteType + } + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = { +val arguments = Seq((keyType, false), (leftValueType, true), (rightValueType, true)) +copy(function = f(function, arguments)) + } + + override def eval(input: InternalRow): Any = { +val value1 = left.eval(input) +if (value1 == null) { + null +} else { + val value2 = right.eval(input) + if (value2 == null) { +null + } else { +nullSafeEval(input, value1, value2) + } +} + } + + @transient lazy val LambdaFunction(_, Seq( +keyVar: NamedLambdaVariable, +value1Var: NamedLambdaVariable, +value2Var: NamedLambdaVariable), +_) = function + + private def nullSafeEval(inputRow: InternalRow, value1: Any, value2: Any): Any = { +val mapData1 = value1.asInstanceOf[MapData] +val mapData2 = value2.asInstanceOf[MapData] +val keys = arrayDataUnion(mapData1.keyArray(), mapData2.keyArray()) +val values = new GenericArrayData(new Array[Any](keys.numElements())) +keys.foreach(keyType, (idx: Int, key: Any) => { + val v1 = GetMapValueUtil.getValueEval(mapData1, key, keyType, leftValueType, ordering) --- End diff -- Thanks for mentioning this! I'm not happy with the current complexity either. I've assumed that the implementation of maps will change into something with O(1) element access in future. By then, the complexity would be O(N) for types supporting equals as well and we would safe a portion of duplicated code. If you think that maps will remain like this for a long time, really like your suggestion with indexes. @ueshin What's your view on that? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22013#discussion_r208190999 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -365,3 +365,69 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Transform Keys in a map using the transform_keys function. + */ +@ExpressionDescription( + usage = "_FUNC_(expr, func) - Transforms elements in a map using the function.", + examples = """ +Examples: + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1); + map(array(2, 3, 4), array(1, 2, 3)) + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k, v) -> k + v); + map(array(2, 4, 6), array(1, 2, 3)) + """, + since = "2.4.0") +case class TransformKeys( +input: Expression, +function: Expression) + extends ArrayBasedHigherOrderFunction with CodegenFallback { + + override def nullable: Boolean = input.nullable + + override def dataType: DataType = { +val valueType = input.dataType.asInstanceOf[MapType].valueType +MapType(function.dataType, valueType, input.nullable) + } + + override def inputTypes: Seq[AbstractDataType] = Seq(MapType, expectingFunctionType) + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): + TransformKeys = { +val (keyElementType, valueElementType, containsNull) = input.dataType match { + case MapType(keyType, valueType, containsNullValue) => +(keyType, valueType, containsNullValue) + case _ => +val MapType(keyType, valueType, containsNullValue) = MapType.defaultConcreteType +(keyType, valueType, containsNullValue) +} +copy(function = f(function, (keyElementType, false) :: (valueElementType, containsNull) :: Nil)) + } + + @transient lazy val (keyVar, valueVar) = { +val LambdaFunction( +_, (keyVar: NamedLambdaVariable) :: (valueVar: NamedLambdaVariable) :: Nil, _) = function +(keyVar, valueVar) + } + + override def eval(input: InternalRow): Any = { +val arr = this.input.eval(input).asInstanceOf[MapData] +if (arr == null) { + null +} else { + val f = functionForEval + val resultKeys = new GenericArrayData(new Array[Any](arr.numElements)) + var i = 0 + while (i < arr.numElements) { +keyVar.value.set(arr.keyArray().get(i, keyVar.dataType)) +valueVar.value.set(arr.valueArray().get(i, valueVar.dataType)) +resultKeys.update(i, f.eval(input)) --- End diff -- I'm not a fun of duplicated keys either, but other functions transforming maps have the same problem. See the discussions [here](https://github.com/apache/spark/pull/21282#discussion_r187234431) and [here](https://github.com/apache/spark/pull/21258#discussion_r186410527). Example: ``` scala> spark.range(1).selectExpr("map(0,1,0,2)").show() ++ | map(0, 1, 0, 2)| ++ |[0 -> 1, 0 -> 2]| ++ ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22013#discussion_r208167785 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -365,3 +365,69 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Transform Keys in a map using the transform_keys function. + */ +@ExpressionDescription( + usage = "_FUNC_(expr, func) - Transforms elements in a map using the function.", + examples = """ +Examples: + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1); + map(array(2, 3, 4), array(1, 2, 3)) + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k, v) -> k + v); + map(array(2, 4, 6), array(1, 2, 3)) + """, + since = "2.4.0") +case class TransformKeys( +input: Expression, +function: Expression) + extends ArrayBasedHigherOrderFunction with CodegenFallback { + + override def nullable: Boolean = input.nullable + + override def dataType: DataType = { +val valueType = input.dataType.asInstanceOf[MapType].valueType +MapType(function.dataType, valueType, input.nullable) + } + + override def inputTypes: Seq[AbstractDataType] = Seq(MapType, expectingFunctionType) + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): + TransformKeys = { +val (keyElementType, valueElementType, containsNull) = input.dataType match { + case MapType(keyType, valueType, containsNullValue) => +(keyType, valueType, containsNullValue) + case _ => +val MapType(keyType, valueType, containsNullValue) = MapType.defaultConcreteType +(keyType, valueType, containsNullValue) +} +copy(function = f(function, (keyElementType, false) :: (valueElementType, containsNull) :: Nil)) + } + + @transient lazy val (keyVar, valueVar) = { +val LambdaFunction( +_, (keyVar: NamedLambdaVariable) :: (valueVar: NamedLambdaVariable) :: Nil, _) = function +(keyVar, valueVar) + } + + override def eval(input: InternalRow): Any = { +val arr = this.input.eval(input).asInstanceOf[MapData] +if (arr == null) { + null +} else { + val f = functionForEval + val resultKeys = new GenericArrayData(new Array[Any](arr.numElements)) + var i = 0 + while (i < arr.numElements) { +keyVar.value.set(arr.keyArray().get(i, keyVar.dataType)) +valueVar.value.set(arr.valueArray().get(i, valueVar.dataType)) +resultKeys.update(i, f.eval(input)) --- End diff -- Maybe I'm missing something, but couldn't ```f.eval(input)``` be evaluated to ```null```? Keys are not allowed to be```null```. Other functions have usually a ```null``` check and throw ```RuntimeException``` for such cases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22013#discussion_r208161643 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -365,3 +365,69 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Transform Keys in a map using the transform_keys function. --- End diff -- maybe a better comment? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22013#discussion_r208169969 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HigherOrderFunctionsSuite.scala --- @@ -181,4 +187,46 @@ class HigherOrderFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper (acc, array) => coalesce(aggregate(array, acc, (acc, elem) => acc + elem), acc)), 15) } + + test("TransformKeys") { +val ai0 = Literal.create( + Map(1 -> 1, 2 -> 2, 3 -> 3), --- End diff -- It's maybe irrelevant but WDYT about adding test cases with ```null``` values? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22013#discussion_r208164141 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -365,3 +365,69 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Transform Keys in a map using the transform_keys function. + */ +@ExpressionDescription( + usage = "_FUNC_(expr, func) - Transforms elements in a map using the function.", + examples = """ +Examples: + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1); + map(array(2, 3, 4), array(1, 2, 3)) + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k, v) -> k + v); + map(array(2, 4, 6), array(1, 2, 3)) + """, + since = "2.4.0") +case class TransformKeys( +input: Expression, +function: Expression) + extends ArrayBasedHigherOrderFunction with CodegenFallback { + + override def nullable: Boolean = input.nullable + + override def dataType: DataType = { +val valueType = input.dataType.asInstanceOf[MapType].valueType +MapType(function.dataType, valueType, input.nullable) --- End diff -- Is there any reason for changing ```valueContainsNull``` flag if the function transforms just keys? WDYT about: ``` val MapType(_, valueType, valueContainsNull) = input.dataType.asInstanceOf[MapType] MapType(function.dataType, valueType, valueContainsNull) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22013#discussion_r208169140 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala --- @@ -2071,6 +2071,158 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { assert(ex4.getMessage.contains("data type mismatch: argument 3 requires int type")) } + test("transform keys function - test various primitive data types combinations") { +val dfExample1 = Seq( + Map[Int, Int](1 -> 1, 9 -> 9, 8 -> 8, 7 -> 7) +).toDF("i") + +val dfExample2 = Seq( + Map[Int, String](1 -> "a", 2 -> "b", 3 -> "c") +).toDF("x") + +val dfExample3 = Seq( + Map[String, Int]("a" -> 1, "b" -> 2, "c" -> 3) +).toDF("y") + +val dfExample4 = Seq( + Map[Int, Double](1 -> 1.0E0, 2 -> 1.4E0, 3 -> 1.7E0) +).toDF("z") + +val dfExample5 = Seq( + Map[Int, Boolean](25 -> true, 26 -> false) +).toDF("a") + +val dfExample6 = Seq( + Map[Int, String](25 -> "ab", 26 -> "cd") +).toDF("b") + +val dfExample7 = Seq( + Map[Array[Int], Boolean](Array(1, 2) -> false) +).toDF("c") + + +def testMapOfPrimitiveTypesCombination(): Unit = { + checkAnswer(dfExample1.selectExpr("transform_keys(i, (k, v) -> k + v)"), +Seq(Row(Map(2 -> 1, 18 -> 9, 16 -> 8, 14 -> 7 + + checkAnswer(dfExample2.selectExpr("transform_keys(x, (k, v) -> k + 1)"), +Seq(Row(Map(2 -> "a", 3 -> "b", 4 -> "c" + + checkAnswer(dfExample3.selectExpr("transform_keys(y, (k, v) -> v * v)"), +Seq(Row(Map(1 -> 1, 4 -> 2, 9 -> 3 + + checkAnswer(dfExample3.selectExpr("transform_keys(y, (k, v) -> length(k) + v)"), +Seq(Row(Map(2 -> 1, 3 -> 2, 4 -> 3 + + checkAnswer( +dfExample3.selectExpr("transform_keys(y, (k, v) -> concat(k, cast(v as String)))"), +Seq(Row(Map("a1" -> 1, "b2" -> 2, "c3" -> 3 + + checkAnswer(dfExample4.selectExpr("transform_keys(z, " + +"(k, v) -> map_from_arrays(ARRAY(1, 2, 3), ARRAY('one', 'two', 'three'))[k])"), +Seq(Row(Map("one" -> 1.0, "two" -> 1.4, "three" -> 1.7 + + checkAnswer(dfExample4.selectExpr("transform_keys(z, (k, v) -> CAST(v * 2 AS BIGINT) + k)"), +Seq(Row(Map(3 -> 1.0, 4 -> 1.4, 6 -> 1.7 + + checkAnswer(dfExample4.selectExpr("transform_keys(z, (k, v) -> k + v)"), +Seq(Row(Map(2.0 -> 1.0, 3.4 -> 1.4, 4.7 -> 1.7 + + checkAnswer(dfExample5.selectExpr("transform_keys(a, (k, v) -> k % 2 = 0 OR v)"), +Seq(Row(Map(true -> true, true -> false + + checkAnswer(dfExample5.selectExpr("transform_keys(a, (k, v) -> if(v, 2 * k, 3 * k))"), +Seq(Row(Map(50 -> true, 78 -> false + + checkAnswer(dfExample5.selectExpr("transform_keys(a, (k, v) -> if(v, 2 * k, 3 * k))"), +Seq(Row(Map(50 -> true, 78 -> false + + checkAnswer(dfExample6.selectExpr( +"transform_keys(b, (k, v) -> concat(conv(k, 10, 16) , substr(v, 1, 1)))"), +Seq(Row(Map("19a" -> "ab", "1Ac" -> "cd" + + checkAnswer(dfExample7.selectExpr("transform_keys(c, (k, v) -> array_contains(k, 3) AND v)"), +Seq(Row(Map(false -> false +} +// Test with local relation, the Project will be evaluated without codegen +testMapOfPrimitiveTypesCombination() +dfExample1.cache() +dfExample2.cache() +dfExample3.cache() +dfExample4.cache() +dfExample5.cache() +dfExample6.cache() +// Test with cached relation, the Project will be evaluated with codegen +testMapOfPrimitiveTypesCombination() --- End diff -- Do we have do that if the expression implements ```CodegenFallback```? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/22013#discussion_r208160707 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -365,3 +365,69 @@ case class ArrayAggregate( override def prettyName: String = "aggregate" } + +/** + * Transform Keys in a map using the transform_keys function. + */ +@ExpressionDescription( + usage = "_FUNC_(expr, func) - Transforms elements in a map using the function.", + examples = """ +Examples: + > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1); --- End diff -- nit: missing space -> ```k, v``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function
Github user mn-mikke commented on the issue: https://github.com/apache/spark/pull/22017 cc @ueshin @mgaido91 @hvanhovell --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function
GitHub user mn-mikke opened a pull request: https://github.com/apache/spark/pull/22017 [SPARK-23938][SQL] Add map_zip_with function ## What changes were proposed in this pull request? This PR adds a new SQL function called ```map_zip_with```. It merges the two given maps into a single map by applying function to the pair of values with the same key. ## How was this patch tested? Added new tests into: - DataFrameFunctionsSuite.scala - HigherOrderFunctionsSuite.scala You can merge this pull request into a Git repository by running: $ git pull https://github.com/mn-mikke/spark SPARK-23938 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22017.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22017 commit ef56011f03d8bae4634e5d3108e4d6502482383c Author: Marek Novotny Date: 2018-08-06T23:42:45Z [SPARK-23938][SQL] Add map_zip_with function --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21986: [SPARK-23937][SQL] Add map_filter SQL function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21986#discussion_r207908454 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -205,29 +230,85 @@ case class ArrayTransform( (elementVar, indexVar) } - override def eval(input: InternalRow): Any = { -val arr = this.input.eval(input).asInstanceOf[ArrayData] -if (arr == null) { - null -} else { - val f = functionForEval - val result = new GenericArrayData(new Array[Any](arr.numElements)) - var i = 0 - while (i < arr.numElements) { -elementVar.value.set(arr.get(i, elementVar.dataType)) -if (indexVar.isDefined) { - indexVar.get.value.set(i) -} -result.update(i, f.eval(input)) -i += 1 + override def nullSafeEval(inputRow: InternalRow, inputValue: Any): Any = { +val arr = inputValue.asInstanceOf[ArrayData] +val f = functionForEval +val result = new GenericArrayData(new Array[Any](arr.numElements)) +var i = 0 +while (i < arr.numElements) { + elementVar.value.set(arr.get(i, elementVar.dataType)) + if (indexVar.isDefined) { +indexVar.get.value.set(i) } - result + result.update(i, f.eval(inputRow)) + i += 1 } +result } override def prettyName: String = "transform" } +/** + * Filters entries in a map using the provided function. + */ +@ExpressionDescription( +usage = "_FUNC_(expr, func) - Filters entries in a map using the function.", +examples = """ +Examples: + > SELECT _FUNC_(map(1, 0, 2, 2, 3, -1), (k, v) -> k > v); + [1 -> 0, 3 -> -1] + """, +since = "2.4.0") +case class MapFilter( +input: Expression, +function: Expression) + extends MapBasedUnaryHigherOrderFunction with CodegenFallback { + + @transient val (keyType, valueType, valueContainsNull) = input.dataType match { +case MapType(kType, vType, vContainsNull) => (kType, vType, vContainsNull) +case _ => + val MapType(kType, vType, vContainsNull) = MapType.defaultConcreteType + (kType, vType, vContainsNull) + } + + @transient lazy val (keyVar, valueVar) = { +val args = function.asInstanceOf[LambdaFunction].arguments +(args.head.asInstanceOf[NamedLambdaVariable], args.tail.head.asInstanceOf[NamedLambdaVariable]) + } + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapFilter = { +function match { + case LambdaFunction(_, _, _) => --- End diff -- Is this pattern matching necessary? If so, shouldn't ```ArrayFilter``` use it as well? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21982: [SPARK-23909][SQL] Add aggregate function.
Github user mn-mikke commented on the issue: https://github.com/apache/spark/pull/21982 Isn't this PR related to the Jira ticket [SPARK-23911](https://issues.apache.org/jira/browse/SPARK-23911)? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21236: [SPARK-23935][SQL] Adding map_entries function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21236#discussion_r207148777 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala --- @@ -98,6 +98,9 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks { if (expected.isNaN) result.isNaN else expected == result case (result: Float, expected: Float) => if (expected.isNaN) result.isNaN else expected == result + case (result: UnsafeRow, expected: GenericInternalRow) => --- End diff -- Hi @srowen, ```(InternalRow, InternalRow)``` case was introduced later in [21838](https://github.com/apache/spark/pull/21838) and covers the logic of the case with ```UnsafeRow```. So we can just remove the unreachable piece of code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21103#discussion_r206228544 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3651,14 +3651,9 @@ case class ArrayDistinct(child: Expression) } /** - * Will become common base class for [[ArrayUnion]], ArrayIntersect, and ArrayExcept. + * Will become common base class for [[ArrayUnion]], ArrayIntersect, and [[ArrayExcept]]. */ abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { --- End diff -- I'm not sure to what level Presto is considered as a reference. Just FYI these operations in Presto can accept arrays of different types. ``` presto:default> SELECT array_except(ARRAY[5, 1, 7], ARRAY[7.0, 1.0, 3.0]); _col0 --- [5.0] (1 row) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21103#discussion_r205847180 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3968,3 +3964,317 @@ object ArrayUnion { new GenericArrayData(arrayBuffer) } } + +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in array1 but not in array2, +without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(2) + """, + since = "2.4.0") +case class ArrayExcept(left: Expression, right: Expression) extends ArraySetLike +with ComplexTypeMergingExpression { --- End diff -- nit: indentation --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21103#discussion_r205848094 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3968,3 +3964,317 @@ object ArrayUnion { new GenericArrayData(arrayBuffer) } } + +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in array1 but not in array2, +without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(2) + """, + since = "2.4.0") +case class ArrayExcept(left: Expression, right: Expression) extends ArraySetLike +with ComplexTypeMergingExpression { + override def dataType: DataType = { +dataTypeCheck --- End diff -- Why do we need this check (see the question [here](https://github.com/apache/spark/pull/21103#discussion_r205806876))? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21103#discussion_r205806876 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3805,3 +3799,233 @@ object ArrayUnion { new GenericArrayData(arrayBuffer) } } + +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in array1 but not in array2, +without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(2) + """, + since = "2.4.0") +case class ArrayExcept(left: Expression, right: Expression) extends ArraySetLike { --- End diff -- Maybe I'm missing something, but why we need to apply these checks if there won't be any ```null``` flag merging performed? If ```left.dataType``` and ```right.dataType``` are different, will be casted according to the ```ImplicitTypeCasts``` coercion rule. If they differ only in ```null``` flags, ```left.dataType``` could be directly returned since there won't be any array elements from ```right``` present in the result. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21103: [SPARK-23915][SQL] Add array_except function
Github user mn-mikke commented on the issue: https://github.com/apache/spark/pull/21103 Added few minor comments, but nothing serious to be solved right now. LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21103#discussion_r205395435 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3805,3 +3799,330 @@ object ArrayUnion { new GenericArrayData(arrayBuffer) } } + +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in array1 but not in array2, +without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(2) + """, + since = "2.4.0") +case class ArrayExcept(left: Expression, right: Expression) extends ArraySetLike { + override def dataType: DataType = left.dataType + + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + if (resultArray != null) { +resultArray.setInt(pos, elem) + } + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + if (resultArray != null) { +resultArray.setLong(pos, elem) + } + hsLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + resultArray: ArrayData, + isLongType: Boolean): Int = { +// store elements into resultArray +var notFoundNullElement = true +var i = 0 +while (i < array2.numElements()) { + if (array2.isNullAt(i)) { +notFoundNullElement = false + } else { +val assigned = if (!isLongType) { + hsInt.add(array2.getInt(i)) +} else { + hsLong.add(array2.getLong(i)) +} + } + i += 1 +} +var pos = 0 +i = 0 +while (i < array1.numElements()) { + if (array1.isNullAt(i)) { +if (notFoundNullElement) { + if (resultArray != null) { +resultArray.setNullAt(pos) + } + pos += 1 + notFoundNullElement = false +} + } else { +val assigned = if (!isLongType) { + assignInt(array1, i, resultArray, pos) +} else { + assignLong(array1, i, resultArray, pos) +} +if (assigned) { + pos += 1 +} + } + i += 1 +} +pos + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + hsInt = new OpenHashSet[Int] + val elements = evalIntLongPrimitiveType(array1, array2, null, false) + // allocate result array + hsInt = new OpenHashSet[Int] + val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData( +IntegerType.defaultSize, elements)) { +new GenericArrayData(new Array[Any](elements)) + } else { +UnsafeArrayData.forPrimitiveArray( + Platform.INT_ARRAY_OFFSET, elements, IntegerType.defaultSize) + } + // assign elements into the result array + evalIntLongPrimitiveType(array1, array2, resultArray, false) + resultArray +case LongType => + // avoid boxing of primitive long array elements + // calculate result array size + hsLong = new OpenHashSet[Long] + val elements = evalIntLongPrimitiveType(array1, array2, null, true) + // allocate result array + hsLong = new OpenHashSet[Long] + val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData( +LongType.defaultSize, elements)) { +new GenericArrayData(new Array[Any](elements)) + } else { +UnsafeArrayData.forPrimitiv
[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21103#discussion_r205397226 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3805,3 +3799,330 @@ object ArrayUnion { new GenericArrayData(arrayBuffer) } } + +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in array1 but not in array2, +without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(2) + """, + since = "2.4.0") +case class ArrayExcept(left: Expression, right: Expression) extends ArraySetLike { + override def dataType: DataType = left.dataType + + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + if (resultArray != null) { +resultArray.setInt(pos, elem) + } + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + if (resultArray != null) { +resultArray.setLong(pos, elem) + } + hsLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + resultArray: ArrayData, + isLongType: Boolean): Int = { +// store elements into resultArray +var notFoundNullElement = true +var i = 0 +while (i < array2.numElements()) { + if (array2.isNullAt(i)) { +notFoundNullElement = false + } else { +val assigned = if (!isLongType) { + hsInt.add(array2.getInt(i)) +} else { + hsLong.add(array2.getLong(i)) +} + } + i += 1 +} +var pos = 0 +i = 0 +while (i < array1.numElements()) { + if (array1.isNullAt(i)) { +if (notFoundNullElement) { + if (resultArray != null) { +resultArray.setNullAt(pos) + } + pos += 1 + notFoundNullElement = false +} + } else { +val assigned = if (!isLongType) { + assignInt(array1, i, resultArray, pos) +} else { + assignLong(array1, i, resultArray, pos) +} +if (assigned) { + pos += 1 +} + } + i += 1 +} +pos + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + hsInt = new OpenHashSet[Int] + val elements = evalIntLongPrimitiveType(array1, array2, null, false) + // allocate result array + hsInt = new OpenHashSet[Int] + val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData( +IntegerType.defaultSize, elements)) { +new GenericArrayData(new Array[Any](elements)) + } else { +UnsafeArrayData.forPrimitiveArray( + Platform.INT_ARRAY_OFFSET, elements, IntegerType.defaultSize) + } + // assign elements into the result array + evalIntLongPrimitiveType(array1, array2, resultArray, false) + resultArray +case LongType => + // avoid boxing of primitive long array elements + // calculate result array size + hsLong = new OpenHashSet[Long] + val elements = evalIntLongPrimitiveType(array1, array2, null, true) + // allocate result array + hsLong = new OpenHashSet[Long] + val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData( +LongType.defaultSize, elements)) { +new GenericArrayData(new Array[Any](elements)) + } else { +UnsafeArrayData.forPrimitiv
[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21103#discussion_r205392257 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3805,3 +3799,330 @@ object ArrayUnion { new GenericArrayData(arrayBuffer) } } + +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in array1 but not in array2, +without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(2) + """, + since = "2.4.0") +case class ArrayExcept(left: Expression, right: Expression) extends ArraySetLike { + override def dataType: DataType = left.dataType + + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + if (resultArray != null) { +resultArray.setInt(pos, elem) + } + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + if (resultArray != null) { +resultArray.setLong(pos, elem) + } + hsLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + resultArray: ArrayData, + isLongType: Boolean): Int = { +// store elements into resultArray +var notFoundNullElement = true +var i = 0 +while (i < array2.numElements()) { + if (array2.isNullAt(i)) { +notFoundNullElement = false + } else { +val assigned = if (!isLongType) { + hsInt.add(array2.getInt(i)) +} else { + hsLong.add(array2.getLong(i)) +} + } + i += 1 +} +var pos = 0 +i = 0 +while (i < array1.numElements()) { + if (array1.isNullAt(i)) { +if (notFoundNullElement) { + if (resultArray != null) { +resultArray.setNullAt(pos) + } + pos += 1 + notFoundNullElement = false +} + } else { +val assigned = if (!isLongType) { + assignInt(array1, i, resultArray, pos) +} else { + assignLong(array1, i, resultArray, pos) +} +if (assigned) { + pos += 1 +} + } + i += 1 +} +pos + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + hsInt = new OpenHashSet[Int] + val elements = evalIntLongPrimitiveType(array1, array2, null, false) + // allocate result array + hsInt = new OpenHashSet[Int] + val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData( +IntegerType.defaultSize, elements)) { +new GenericArrayData(new Array[Any](elements)) + } else { +UnsafeArrayData.forPrimitiveArray( + Platform.INT_ARRAY_OFFSET, elements, IntegerType.defaultSize) + } + // assign elements into the result array + evalIntLongPrimitiveType(array1, array2, resultArray, false) + resultArray +case LongType => + // avoid boxing of primitive long array elements + // calculate result array size + hsLong = new OpenHashSet[Long] + val elements = evalIntLongPrimitiveType(array1, array2, null, true) + // allocate result array + hsLong = new OpenHashSet[Long] + val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData( +LongType.defaultSize, elements)) { +new GenericArrayData(new Array[Any](elements)) + } else { +UnsafeArrayData.forPrimitiv
[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21103#discussion_r205397802 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3805,3 +3799,330 @@ object ArrayUnion { new GenericArrayData(arrayBuffer) } } + +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in array1 but not in array2, +without duplicates. + """, + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(2) + """, + since = "2.4.0") +case class ArrayExcept(left: Expression, right: Expression) extends ArraySetLike { + override def dataType: DataType = left.dataType + + var hsInt: OpenHashSet[Int] = _ + var hsLong: OpenHashSet[Long] = _ + + def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getInt(idx) +if (!hsInt.contains(elem)) { + if (resultArray != null) { +resultArray.setInt(pos, elem) + } + hsInt.add(elem) + true +} else { + false +} + } + + def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: Int): Boolean = { +val elem = array.getLong(idx) +if (!hsLong.contains(elem)) { + if (resultArray != null) { +resultArray.setLong(pos, elem) + } + hsLong.add(elem) + true +} else { + false +} + } + + def evalIntLongPrimitiveType( + array1: ArrayData, + array2: ArrayData, + resultArray: ArrayData, + isLongType: Boolean): Int = { +// store elements into resultArray +var notFoundNullElement = true +var i = 0 +while (i < array2.numElements()) { + if (array2.isNullAt(i)) { +notFoundNullElement = false + } else { +val assigned = if (!isLongType) { + hsInt.add(array2.getInt(i)) +} else { + hsLong.add(array2.getLong(i)) +} + } + i += 1 +} +var pos = 0 +i = 0 +while (i < array1.numElements()) { + if (array1.isNullAt(i)) { +if (notFoundNullElement) { + if (resultArray != null) { +resultArray.setNullAt(pos) + } + pos += 1 + notFoundNullElement = false +} + } else { +val assigned = if (!isLongType) { + assignInt(array1, i, resultArray, pos) +} else { + assignLong(array1, i, resultArray, pos) +} +if (assigned) { + pos += 1 +} + } + i += 1 +} +pos + } + + override def nullSafeEval(input1: Any, input2: Any): Any = { +val array1 = input1.asInstanceOf[ArrayData] +val array2 = input2.asInstanceOf[ArrayData] + +if (elementTypeSupportEquals) { + elementType match { +case IntegerType => + // avoid boxing of primitive int array elements + // calculate result array size + hsInt = new OpenHashSet[Int] + val elements = evalIntLongPrimitiveType(array1, array2, null, false) --- End diff -- WDYT about using an ```Option``` instead of ```null``` value? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21103: [SPARK-23915][SQL] Add array_except function
Github user mn-mikke commented on the issue: https://github.com/apache/spark/pull/21103 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21102: [SPARK-23913][SQL] Add array_intersect function
Github user mn-mikke commented on the issue: https://github.com/apache/spark/pull/21102 Sure. Also cc @gatorsmile, who created the Jira ticket. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21102: [SPARK-23913][SQL] Add array_intersect function
Github user mn-mikke commented on the issue: https://github.com/apache/spark/pull/21102 It seems that Presto returns the result in ascending order. ``` presto> SELECT array_intersect(ARRAY[5, 8, null, 1], ARRAY[8, null, 1, 5]); _col0 - [null, 1, 5, 8] (1 row) ``` Shouldn't we follow the same behavior if Presto is used as a reference? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21102#discussion_r204349890 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3805,3 +3801,339 @@ object ArrayUnion { new GenericArrayData(arrayBuffer) } } + +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in the intersection of array1 and +array2, without duplicates. + """, + examples = """ +Examples:Fun --- End diff -- Just ```Examples:```? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21687: [SPARK-24165][SQL] Fixing conditional expressions...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21687#discussion_r204340969 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala --- @@ -695,6 +695,41 @@ abstract class TernaryExpression extends Expression { } } +/** + * A trait resolving nullable, containsNull, valueContainsNull flags of the output date type. + * This logic is usually utilized by expressions combining data from multiple child expressions + * of non-primitive types (e.g. [[CaseWhen]]). + */ +trait ComplexTypeMergingExpression extends Expression { + + /** + * A collection of data types used for resolution the output type of the expression. By default, + * data types of all child expressions. The collection must not be empty. + */ + @transient + lazy val inputTypesForMerging: Seq[DataType] = children.map(_.dataType) + + /** + * A method determining whether the input types are equal ignoring nullable, containsNull and + * valueContainsNull flags and thus convenient for resolution of the final data type. + */ + def areInputTypesForMergingEqual: Boolean = { +inputTypesForMerging.length <= 1 || inputTypesForMerging.sliding(2, 1).forall { + case Seq(dt1, dt2) => dt1.sameType(dt2) +} + } + + override def dataType: DataType = { +require( + inputTypesForMerging.nonEmpty, + "The collection of input data types must not be empty.") +require( + areInputTypesForMergingEqual, + "All input types must be the same except nullable, containsNull, valueContainsNull flags.") --- End diff -- ```NullType``` is ```sameType``` equal only with itself. So it's up to coercion rules to cast ```NullType``` to a common type of other children. All the rules transitively utilize ```TypeCoercion.findTightestCommonType``` that does that job. (```case (NullType, t1) => Some(t1)```, ```case (t1, NullType) => Some(t1)```) If I don't miss anything, this behavior was the same before this PR. I can add more tests to cover this scenario if you wish. :-) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21830: [SPARK-24878][SQL] Fix reverse function for array type o...
Github user mn-mikke commented on the issue: https://github.com/apache/spark/pull/21830 Thanks @ueshin for this PR! Good to know about the re-assignments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21103#discussion_r203964623 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3805,3 +3799,332 @@ object ArrayUnion { new GenericArrayData(arrayBuffer) } } + +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in array1 but not in array2, +without duplicates. + """, + examples = """ +Examples:Fun + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(2) + """, + since = "2.4.0") +case class ArrayExcept(left: Expression, right: Expression) extends ArraySetLike { + override def dataType: DataType = ArrayType(elementType, --- End diff -- Yeah this case is valid. But ```containsNull``` flag is defined for the whole column (accross multiple rows). Since this flag could cause removal of null safe check in expressions that will use ```ArrayExcept``` as a child, it could lead to failures with ```NullPointerException``` for the cases as in the second row of the example dataset. WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21103#discussion_r203746577 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3805,3 +3799,332 @@ object ArrayUnion { new GenericArrayData(arrayBuffer) } } + +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in array1 but not in array2, +without duplicates. + """, + examples = """ +Examples:Fun + > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5)); + array(2) + """, + since = "2.4.0") +case class ArrayExcept(left: Expression, right: Expression) extends ArraySetLike { + override def dataType: DataType = ArrayType(elementType, --- End diff -- What about ```ArrayType(elementType, left.dataType.asInstanceOf[ArrayType].containsNull)```? Example df: left | right | result --- | --- | --- [1,2] | [1, null] | [2] [1, null, 3] | [1,2] | **[null, 3]** --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21103#discussion_r203741325 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -3805,3 +3799,332 @@ object ArrayUnion { new GenericArrayData(arrayBuffer) } } + +/** + * Returns an array of the elements in the intersect of x and y, without duplicates + */ +@ExpressionDescription( + usage = """ + _FUNC_(array1, array2) - Returns an array of the elements in array1 but not in array2, +without duplicates. + """, + examples = """ +Examples:Fun --- End diff -- just "Examples:"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21774: [SPARK-24811][SQL]Avro: add new function from_avr...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21774#discussion_r203693021 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.avro.generic.GenericDatumReader +import org.apache.avro.io.{BinaryDecoder, DecoderFactory} + +import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters, SerializableSchema} +import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, UnaryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType} + +case class AvroDataToCatalyst(child: Expression, avroType: SerializableSchema) + extends UnaryExpression with CodegenFallback with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType) + + override lazy val dataType: DataType = --- End diff -- Since the schema could be quite complex, I am wondering about sending the same schema to executors twice (```dateType```, ```avroType```). But yeah, transmission of an extra payload and deserialization of ```dataType``` might be faster then the schema conversion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21774: [SPARK-24811][SQL]Avro: add new function from_avr...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21774#discussion_r203678944 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala --- @@ -36,4 +40,27 @@ package object avro { @scala.annotation.varargs def avro(sources: String*): DataFrame = reader.format("avro").load(sources: _*) } + --- End diff -- Thanks guys for your explanation! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21774: [SPARK-24811][SQL]Avro: add new function from_avr...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21774#discussion_r203422909 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.ByteArrayOutputStream + +import org.apache.avro.generic.GenericDatumWriter +import org.apache.avro.io.{BinaryEncoder, EncoderFactory} + +import org.apache.spark.sql.avro.{AvroSerializer, SchemaConverters} +import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.types.{BinaryType, DataType} + --- End diff -- ```ExpressionDescription``` and javadoc? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21774: [SPARK-24811][SQL]Avro: add new function from_avr...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21774#discussion_r203424033 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala --- @@ -36,4 +40,27 @@ package object avro { @scala.annotation.varargs def avro(sources: String*): DataFrame = reader.format("avro").load(sources: _*) } + --- End diff -- Why these two functions are not a part of ```org.apache.spark.sql.functions```? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21774: [SPARK-24811][SQL]Avro: add new function from_avr...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21774#discussion_r203418945 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.avro.generic.GenericDatumReader +import org.apache.avro.io.{BinaryDecoder, DecoderFactory} + +import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters, SerializableSchema} +import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, UnaryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType} + +case class AvroDataToCatalyst(child: Expression, avroType: SerializableSchema) + extends UnaryExpression with CodegenFallback with ExpectsInputTypes { --- End diff -- see the [comment](https://github.com/apache/spark/pull/21061#discussion_r181399858) about ```CodegenFallback``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21774: [SPARK-24811][SQL]Avro: add new function from_avr...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21774#discussion_r203416936 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.avro.generic.GenericDatumReader +import org.apache.avro.io.{BinaryDecoder, DecoderFactory} + +import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters, SerializableSchema} +import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, UnaryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType} + +case class AvroDataToCatalyst(child: Expression, avroType: SerializableSchema) + extends UnaryExpression with CodegenFallback with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType) + + override lazy val dataType: DataType = --- End diff -- ```@transient```? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21774: [SPARK-24811][SQL]Avro: add new function from_avr...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21774#discussion_r203422417 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.io.ByteArrayOutputStream + +import org.apache.avro.generic.GenericDatumWriter +import org.apache.avro.io.{BinaryEncoder, EncoderFactory} + +import org.apache.spark.sql.avro.{AvroSerializer, SchemaConverters} +import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.types.{BinaryType, DataType} + +case class CatalystDataToAvro(child: Expression) extends UnaryExpression with CodegenFallback { + + override lazy val dataType: DataType = BinaryType --- End diff -- just ```def```? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21774: [SPARK-24811][SQL]Avro: add new function from_avr...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21774#discussion_r203415956 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala --- @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.avro.generic.GenericDatumReader +import org.apache.avro.io.{BinaryDecoder, DecoderFactory} + +import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters, SerializableSchema} +import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, UnaryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType} + --- End diff -- I would add ```ExpressionDescription``` and javadoc here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21802: [SPARK-23928][SQL] Add shuffle collection functio...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21802#discussion_r203388798 --- Diff: python/pyspark/sql/functions.py --- @@ -2382,6 +2382,20 @@ def array_sort(col): return Column(sc._jvm.functions.array_sort(_to_java_column(col))) +@since(2.4) +def shuffle(col): +""" +Collection function: Generates a random permutation of the given array. + +.. note:: The function is non-deterministic because its results depends on order of rows which --- End diff -- Isn't it non-deterministic rather for the fact that the permutation is determined randomly? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21802: [SPARK-23928][SQL] Add shuffle collection functio...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21802#discussion_r203407122 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala --- @@ -1444,6 +1444,51 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { ) } + test("shuffle function") { +// Shuffle expressions should produce same results at retries in the same DataFrame. +def checkResult(df: DataFrame): Unit = { + checkAnswer(df, df.collect()) +} + +// primitive-type elements +val idf = Seq( + Seq(1, 9, 8, 7), + Seq(5, 8, 9, 7, 2), + Seq.empty, + null +).toDF("i") + +def checkResult1(): Unit = { --- End diff -- Maybe a different name for the method? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21795: [SPARK-24840][SQL] do not use dummy filter to swi...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21795#discussion_r203379244 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala --- @@ -1147,65 +1149,66 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { val nseqi : Seq[Int] = null val nseqs : Seq[String] = null val df = Seq( - (Seq(1), Seq(2, 3), Seq(5L, 6L), nseqi, Seq("a", "b", "c"), Seq("d", "e"), Seq("f"), nseqs), (Seq(1, 0), Seq.empty[Int], Seq(2L), nseqi, Seq("a"), Seq.empty[String], Seq(null), nseqs) ).toDF("i1", "i2", "i3", "in", "s1", "s2", "s3", "sn") -val dummyFilter = (c: Column) => c.isNull || c.isNotNull // switch codeGen on - // Simple test cases -checkAnswer( --- End diff -- Good catch! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21795: [SPARK-24840][SQL] do not use dummy filter to swi...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21795#discussion_r203378508 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala --- @@ -924,26 +926,26 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext { null ).toDF("i") -checkAnswer( - idf.select(reverse('i)), - Seq(Row(Seq(7, 8, 9, 1)), Row(Seq(2, 7, 9, 8, 5)), Row(Seq.empty), Row(null)) -) -checkAnswer( - idf.filter(dummyFilter('i)).select(reverse('i)), - Seq(Row(Seq(7, 8, 9, 1)), Row(Seq(2, 7, 9, 8, 5)), Row(Seq.empty), Row(null)) -) -checkAnswer( - idf.selectExpr("reverse(i)"), - Seq(Row(Seq(7, 8, 9, 1)), Row(Seq(2, 7, 9, 8, 5)), Row(Seq.empty), Row(null)) -) -checkAnswer( - oneRowDF.selectExpr("reverse(array(1, null, 2, null))"), - Seq(Row(Seq(null, 2, null, 1))) -) -checkAnswer( - oneRowDF.filter(dummyFilter('i)).selectExpr("reverse(array(1, null, 2, null))"), - Seq(Row(Seq(null, 2, null, 1))) -) +def checkResult2(): Unit = { --- End diff -- What about using more specific names for functions ```checkResult2```, ```checkResult3``` etc.? Maybe ```checkStringTestCases```, ```checkCasesWithArraysOfComplexTypes``` or something like that? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21795: [SPARK-24840][SQL] do not use dummy filter to swi...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21795#discussion_r203305670 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -2336,46 +2336,40 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val sourceDF = spark.createDataFrame(rows, schema) -val structWhenDF = sourceDF +def structWhenDF: DataFrame = sourceDF .select(when('cond, struct(lit("a").as("val1"), lit(10).as("val2"))).otherwise('s) as "res") .select('res.getField("val1")) -val arrayWhenDF = sourceDF +def arrayWhenDF: DataFrame = sourceDF .select(when('cond, array(lit("a"), lit("b"))).otherwise('a) as "res") .select('res.getItem(0)) -val mapWhenDF = sourceDF +def mapWhenDF: DataFrame = sourceDF .select(when('cond, map(lit(0), lit("a"))).otherwise('m) as "res") .select('res.getItem(0)) -val structIfDF = sourceDF +def structIfDF: DataFrame = sourceDF .select(expr("if(cond, struct('a' as val1, 10 as val2), s)") as "res") .select('res.getField("val1")) -val arrayIfDF = sourceDF +def arrayIfDF: DataFrame = sourceDF .select(expr("if(cond, array('a', 'b'), a)") as "res") .select('res.getItem(0)) -val mapIfDF = sourceDF +def mapIfDF: DataFrame = sourceDF .select(expr("if(cond, map(0, 'a'), m)") as "res") .select('res.getItem(0)) -def checkResult(df: DataFrame, codegenExpected: Boolean): Unit = { - assert(df.queryExecution.executedPlan.isInstanceOf[WholeStageCodegenExec] == codegenExpected) - checkAnswer(df, Seq(Row("a"), Row(null))) +def checkResult(): Unit = { + checkAnswer(structWhenDF, Seq(Row("a"), Row(null))) + checkAnswer(arrayWhenDF, Seq(Row("a"), Row(null))) + checkAnswer(mapWhenDF, Seq(Row("a"), Row(null))) + checkAnswer(structIfDF, Seq(Row("a"), Row(null))) + checkAnswer(arrayIfDF, Seq(Row("a"), Row(null))) + checkAnswer(mapIfDF, Seq(Row("a"), Row(null))) } -// without codegen -checkResult(structWhenDF, false) -checkResult(arrayWhenDF, false) -checkResult(mapWhenDF, false) -checkResult(structIfDF, false) -checkResult(arrayIfDF, false) -checkResult(mapIfDF, false) - -// with codegen -checkResult(structWhenDF.filter('cond.isNotNull), true) --- End diff -- @cloud-fan Thanks for the clarification and this PR! Btw, there are many tests in ```DataFrameFunctionsSuite``` that test only the scenarios without codgen. WDYT about adding a generic ```checkAnswer``` method to ```QueryTest``` that would evaluate a dataframe for both cases similarly like ```ExressionEvalHelper.checkEvaluation``` does for expressions? If it's possible, of course. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21795: [SPARK-24165][SQL][followup] Fixing conditional e...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21795#discussion_r203064514 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala --- @@ -2336,46 +2336,40 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val sourceDF = spark.createDataFrame(rows, schema) -val structWhenDF = sourceDF +def structWhenDF: DataFrame = sourceDF .select(when('cond, struct(lit("a").as("val1"), lit(10).as("val2"))).otherwise('s) as "res") .select('res.getField("val1")) -val arrayWhenDF = sourceDF +def arrayWhenDF: DataFrame = sourceDF .select(when('cond, array(lit("a"), lit("b"))).otherwise('a) as "res") .select('res.getItem(0)) -val mapWhenDF = sourceDF +def mapWhenDF: DataFrame = sourceDF .select(when('cond, map(lit(0), lit("a"))).otherwise('m) as "res") .select('res.getItem(0)) -val structIfDF = sourceDF +def structIfDF: DataFrame = sourceDF .select(expr("if(cond, struct('a' as val1, 10 as val2), s)") as "res") .select('res.getField("val1")) -val arrayIfDF = sourceDF +def arrayIfDF: DataFrame = sourceDF .select(expr("if(cond, array('a', 'b'), a)") as "res") .select('res.getItem(0)) -val mapIfDF = sourceDF +def mapIfDF: DataFrame = sourceDF .select(expr("if(cond, map(0, 'a'), m)") as "res") .select('res.getItem(0)) -def checkResult(df: DataFrame, codegenExpected: Boolean): Unit = { - assert(df.queryExecution.executedPlan.isInstanceOf[WholeStageCodegenExec] == codegenExpected) - checkAnswer(df, Seq(Row("a"), Row(null))) +def checkResult(): Unit = { + checkAnswer(structWhenDF, Seq(Row("a"), Row(null))) + checkAnswer(arrayWhenDF, Seq(Row("a"), Row(null))) + checkAnswer(mapWhenDF, Seq(Row("a"), Row(null))) + checkAnswer(structIfDF, Seq(Row("a"), Row(null))) + checkAnswer(arrayIfDF, Seq(Row("a"), Row(null))) + checkAnswer(mapIfDF, Seq(Row("a"), Row(null))) } -// without codegen -checkResult(structWhenDF, false) -checkResult(arrayWhenDF, false) -checkResult(mapWhenDF, false) -checkResult(structIfDF, false) -checkResult(arrayIfDF, false) -checkResult(mapIfDF, false) - -// with codegen -checkResult(structWhenDF.filter('cond.isNotNull), true) --- End diff -- If it's the case why the [assert](https://github.com/apache/spark/pull/21795/files/d1bc6124628808444a4ab5bed6bfc6f897d76e03#diff-5d2ebf4e9ca5a990136b276859769289L2360) didn't fail? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21352: [SPARK-24305][SQL][FOLLOWUP] Avoid serialization ...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21352#discussion_r203017442 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -737,21 +733,22 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres since = "2.4.0") case class MapFromEntries(child: Expression) extends UnaryExpression { - @transient - private lazy val dataTypeDetails: Option[(MapType, Boolean, Boolean)] = child.dataType match { -case ArrayType( - StructType(Array( -StructField(_, keyType, keyNullable, _), -StructField(_, valueType, valueNullable, _))), - containsNull) => Some((MapType(keyType, valueType, valueNullable), keyNullable, containsNull)) -case _ => None + @transient private lazy val dataTypeDetails: Option[(MapType, Boolean, Boolean)] = { +child.dataType match { + case ArrayType( +StructType(Array( + StructField(_, kt, kn, _), --- End diff -- the motivation is described [here](https://github.com/apache/spark/pull/21352/files#r202983734). I will revert this piece of code shortly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21352: [SPARK-24305][SQL][FOLLOWUP] Avoid serialization ...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21352#discussion_r202983734 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -737,21 +733,22 @@ case class MapConcat(children: Seq[Expression]) extends ComplexTypeMergingExpres since = "2.4.0") case class MapFromEntries(child: Expression) extends UnaryExpression { - @transient - private lazy val dataTypeDetails: Option[(MapType, Boolean, Boolean)] = child.dataType match { -case ArrayType( - StructType(Array( -StructField(_, keyType, keyNullable, _), -StructField(_, valueType, valueNullable, _))), - containsNull) => Some((MapType(keyType, valueType, valueNullable), keyNullable, containsNull)) -case _ => None + @transient private lazy val dataTypeDetails: Option[(MapType, Boolean, Boolean)] = { --- End diff -- Here I wanted to be consistent in terms of formatting. (```@transient``` to be on the same line as ```private lazy val dataTypeDetails```) After the change, two lines were exceeding 100 characters. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21352: [SPARK-24305][SQL][FOLLOWUP] Avoid serialization ...
Github user mn-mikke commented on a diff in the pull request: https://github.com/apache/spark/pull/21352#discussion_r202982740 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -360,7 +356,7 @@ case class MapEntries(child: Expression) extends UnaryExpression with ExpectsInp override def inputTypes: Seq[AbstractDataType] = Seq(MapType) - lazy val childDataType: MapType = child.dataType.asInstanceOf[MapType] + private def childDataType: MapType = child.dataType.asInstanceOf[MapType] --- End diff -- I missed that one. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org