[GitHub] spark pull request #21747: [SPARK-24165][SQL][branch-2.3] Fixing conditional...

2018-09-24 Thread mn-mikke
Github user mn-mikke closed the pull request at:

https://github.com/apache/spark/pull/21747


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21747: [SPARK-24165][SQL][branch-2.3] Fixing conditional expres...

2018-09-24 Thread mn-mikke
Github user mn-mikke commented on the issue:

https://github.com/apache/spark/pull/21747
  
If nobody has any objections, I'm happy to close this PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22471: [SPARK-25470][SQL][Performance] Concat.eval should use p...

2018-09-20 Thread mn-mikke
Github user mn-mikke commented on the issue:

https://github.com/apache/spark/pull/22471
  
If you don't mind, I will include fixes for ```Reverse``` and 
```ElementAt``` to this PR. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20858: [SPARK-23736][SQL] Extending the concat function ...

2018-09-19 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/20858#discussion_r218917303
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -665,3 +667,219 @@ case class ElementAt(left: Expression, right: 
Expression) extends GetMapValueUti
 
   override def prettyName: String = "element_at"
 }
+
+/**
+ * Concatenates multiple input columns together into a single column.
+ * The function works with strings, binary and compatible array columns.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(col1, col2, ..., colN) - Returns the concatenation of 
col1, col2, ..., colN.",
+  examples = """
+Examples:
+  > SELECT _FUNC_('Spark', 'SQL');
+   SparkSQL
+  > SELECT _FUNC_(array(1, 2, 3), array(4, 5), array(6));
+ | [1,2,3,4,5,6]
+  """)
+case class Concat(children: Seq[Expression]) extends Expression {
+
+  private val MAX_ARRAY_LENGTH: Int = 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
+
+  val allowedTypes = Seq(StringType, BinaryType, ArrayType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+if (children.isEmpty) {
+  TypeCheckResult.TypeCheckSuccess
+} else {
+  val childTypes = children.map(_.dataType)
+  if (childTypes.exists(tpe => 
!allowedTypes.exists(_.acceptsType(tpe {
+return TypeCheckResult.TypeCheckFailure(
+  s"input to function $prettyName should have been StringType, 
BinaryType or ArrayType," +
+s" but it's " + childTypes.map(_.simpleString).mkString("[", 
", ", "]"))
+  }
+  TypeUtils.checkForSameTypeInputExpr(childTypes, s"function 
$prettyName")
+}
+  }
+
+  override def dataType: DataType = 
children.map(_.dataType).headOption.getOrElse(StringType)
+
+  lazy val javaType: String = CodeGenerator.javaType(dataType)
+
+  override def nullable: Boolean = children.exists(_.nullable)
+
+  override def foldable: Boolean = children.forall(_.foldable)
+
+  override def eval(input: InternalRow): Any = dataType match {
--- End diff --

Thanks! I've created #22471 to call the pattern matching only once.

WDYT about 
[Reverse](https://github.com/apache/spark/pull/21034/files#diff-9853dcf5ce3d2ac1e94d473197ff5768R240)?
 It looks like a similar problem.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22471: [SPARK-25470][SQL][Performance] Concat.eval should use p...

2018-09-19 Thread mn-mikke
Github user mn-mikke commented on the issue:

https://github.com/apache/spark/pull/22471
  
cc @rxin @ueshin 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22471: [SPARK-25470][SQL][Performance] Concat.eval shoul...

2018-09-19 Thread mn-mikke
GitHub user mn-mikke opened a pull request:

https://github.com/apache/spark/pull/22471

[SPARK-25470][SQL][Performance] Concat.eval should use pattern matching 
only once

## What changes were proposed in this pull request?

The PR proposes a solution for a problem described 
[here](https://github.com/apache/spark/pull/20858#discussion_r218677837).

## How was this patch tested?

Run the existing tests for Concat expression.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mn-mikke/spark SPARK-25470

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22471.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22471


commit 9051c235eb2e4a62293ea6bd55e9d7aa1cb096fe
Author: Marek Novotny 
Date:   2018-09-19T17:49:15Z

[SPARK-25470][SQL][Performance] Concat.eval will use pattern matching only 
once




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22243: [MINOR] Avoid code duplication for nullable in Hi...

2018-08-27 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22243#discussion_r213022884
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -155,6 +155,8 @@ trait HigherOrderFunction extends Expression with 
ExpectsInputTypes {
  */
 trait SimpleHigherOrderFunction extends HigherOrderFunction  {
 
+  override def nullable: Boolean = argument.nullable
--- End diff --

If we moved the definition of ```nullable``` straight to 
```HigherOrderFunction``` as ```arguments.exists(_.nullable)```, we could also 
avoid the duplicities in ```ZipWith``` and ```MapZipWith```. WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22131: [SPARK-25141][SQL][TEST] Modify tests for higher-order f...

2018-08-17 Thread mn-mikke
Github user mn-mikke commented on the issue:

https://github.com/apache/spark/pull/22131
  
LGTM too


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22126: [SPARK-23938][SQL][FOLLOW-UP][TEST] Nullabilities...

2018-08-16 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22126#discussion_r210724650
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HigherOrderFunctionsSuite.scala
 ---
@@ -363,9 +363,9 @@ class HigherOrderFunctionsSuite extends SparkFunSuite 
with ExpressionEvalHelper
 left: Expression,
 right: Expression,
 f: (Expression, Expression, Expression) => Expression): Expression 
= {
-  val MapType(kt, vt1, vcn1) = left.dataType.asInstanceOf[MapType]
-  val MapType(_, vt2, vcn2) = right.dataType.asInstanceOf[MapType]
-  MapZipWith(left, right, createLambda(kt, false, vt1, vcn1, vt2, 
vcn2, f))
+  val MapType(kt, vt1, _) = left.dataType.asInstanceOf[MapType]
--- End diff --

Optional suggestion: Maybe we could remove```asInstanceOf[MapType]```  here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22110: [SPARK-25122][SQL] Deduplication of supports equa...

2018-08-16 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22110#discussion_r210563516
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala 
---
@@ -73,4 +73,14 @@ object TypeUtils {
 }
 x.length - y.length
   }
+
+  /**
+   * Returns true if elements of the data type could be used as items of a 
hash set or as keys
+   * of a hash map.
+   */
+  def typeCanBeHashed(dataType: DataType): Boolean = dataType match {
--- End diff --

Ok, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22045: [SPARK-23940][SQL] Add transform_values SQL funct...

2018-08-16 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22045#discussion_r210561102
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -497,6 +497,53 @@ case class ArrayAggregate(
   override def prettyName: String = "aggregate"
 }
 
+/**
+ * Returns a map that applies the function to each value of the map.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, func) - Transforms values in the map using the 
function.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3)), (k, v) -> v + 
1);
+map(array(1, 2, 3), array(2, 3, 4))
+  > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3)), (k, v) -> k + 
v);
+map(array(1, 2, 3), array(2, 4, 6))
+  """,
+  since = "2.4.0")
+case class TransformValues(
+argument: Expression,
+function: Expression)
+  extends MapBasedSimpleHigherOrderFunction with CodegenFallback {
+
+  override def nullable: Boolean = argument.nullable
+
+  @transient lazy val MapType(keyType, valueType, valueContainsNull) = 
argument.dataType
+
+  override def dataType: DataType = MapType(keyType, function.dataType, 
valueContainsNull)
--- End diff --

Shouldn't the ```dataType``` be defined as ```MapType(keyType, 
function.dataType, function.nullable)```?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22110: [SPARK-25122][SQL] Deduplication of supports equa...

2018-08-15 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22110#discussion_r210493260
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala 
---
@@ -73,4 +73,14 @@ object TypeUtils {
 }
 x.length - y.length
   }
+
+  /**
+   * Returns true if elements of the data type could be used as items of a 
hash set or as keys
+   * of a hash map.
+   */
+  def typeCanBeHashed(dataType: DataType): Boolean = dataType match {
--- End diff --

I will change it :)

Just one question to ```hashCode```. If ```case classes``` are used, 
```equals``` and ```hashCode``` are generated by compiler. But if we define 
```equals``` manually, shouldn't also hold ```a.equals(b) == true``` => 
```a.hashCode == b.hashCode```?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function

2018-08-15 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22013#discussion_r210373675
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala ---
@@ -2302,6 +2302,97 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSQLContext {
 assert(ex5.getMessage.contains("function map_zip_with does not support 
ordering on type map"))
   }
 
+  test("transform keys function - test various primitive data types 
combinations") {
+val dfExample1 = Seq(
+  Map[Int, Int](1 -> 1, 9 -> 9, 8 -> 8, 7 -> 7)
+).toDF("i")
+
+val dfExample2 = Seq(
+  Map[Int, Double](1 -> 1.0E0, 2 -> 1.4E0, 3 -> 1.7E0)
+).toDF("j")
+
+val dfExample3 = Seq(
+  Map[Int, Boolean](25 -> true, 26 -> false)
+).toDF("x")
+
+val dfExample4 = Seq(
+  Map[Array[Int], Boolean](Array(1, 2) -> false)
+).toDF("y")
+
+
+def testMapOfPrimitiveTypesCombination(): Unit = {
+  checkAnswer(dfExample1.selectExpr("transform_keys(i, (k, v) -> k + 
v)"),
+Seq(Row(Map(2 -> 1, 18 -> 9, 16 -> 8, 14 -> 7
+
+  checkAnswer(dfExample2.selectExpr("transform_keys(j, " +
+"(k, v) -> map_from_arrays(ARRAY(1, 2, 3), ARRAY('one', 'two', 
'three'))[k])"),
+Seq(Row(Map("one" -> 1.0, "two" -> 1.4, "three" -> 1.7
+
+  checkAnswer(dfExample2.selectExpr("transform_keys(j, (k, v) -> 
CAST(v * 2 AS BIGINT) + k)"),
+Seq(Row(Map(3 -> 1.0, 4 -> 1.4, 6 -> 1.7
+
+  checkAnswer(dfExample2.selectExpr("transform_keys(j, (k, v) -> k + 
v)"),
+Seq(Row(Map(2.0 -> 1.0, 3.4 -> 1.4, 4.7 -> 1.7
+
+  checkAnswer(dfExample3.selectExpr("transform_keys(x, (k, v) ->  k % 
2 = 0 OR v)"),
+Seq(Row(Map(true -> true, true -> false
+
+  checkAnswer(dfExample3.selectExpr("transform_keys(x, (k, v) -> if(v, 
2 * k, 3 * k))"),
+Seq(Row(Map(50 -> true, 78 -> false
+
+  checkAnswer(dfExample3.selectExpr("transform_keys(x, (k, v) -> if(v, 
2 * k, 3 * k))"),
+Seq(Row(Map(50 -> true, 78 -> false
+
+  checkAnswer(dfExample4.selectExpr("transform_keys(y, (k, v) -> 
array_contains(k, 3) AND v)"),
+Seq(Row(Map(false -> false
+}
+// Test with local relation, the Project will be evaluated without 
codegen
+testMapOfPrimitiveTypesCombination()
+dfExample1.cache()
+dfExample2.cache()
+dfExample3.cache()
+dfExample4.cache()
+// Test with cached relation, the Project will be evaluated with 
codegen
+testMapOfPrimitiveTypesCombination()
+  }
+
+  test("transform keys function - Invalid lambda functions and 
exceptions") {
+val dfExample1 = Seq(
+  Map[Int, Int](1 -> 1, 9 -> 9, 8 -> 8, 7 -> 7)
+).toDF("i")
+
+val dfExample2 = Seq(
+  Map[String, String]("a" -> "b")
+).toDF("j")
+
+val dfExample3 = Seq(
+  Map[String, String]("a" -> null)
+).toDF("x")
+
+def testInvalidLambdaFunctions(): Unit = {
+  val ex1 = intercept[AnalysisException] {
+dfExample1.selectExpr("transform_keys(i, k -> k )")
+  }
+  assert(ex1.getMessage.contains("The number of lambda function 
arguments '1' does not match"))
+
+  val ex2 = intercept[AnalysisException] {
+dfExample2.selectExpr("transform_keys(j, (k, v, x) -> k + 1)")
+  }
+  assert(ex2.getMessage.contains(
+  "The number of lambda function arguments '3' does not match"))
+
+  val ex3 = intercept[RuntimeException] {
+dfExample3.selectExpr("transform_keys(x, (k, v) -> v)").show()
+  }
+  assert(ex3.getMessage.contains("Cannot use null as map key!"))
+}
+
+testInvalidLambdaFunctions()
+dfExample1.cache()
+dfExample2.cache()
+testInvalidLambdaFunctions()
--- End diff --

@ueshin I would like to ask you a generic question regarding higher-order 
functions. Is it necessary to perform checks with codegen paths if all the 
newly added functions extends from ```CodegenFallback```? Eventually, is there 
a plan to add coden for these functions in future? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function

2018-08-15 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22013#discussion_r210366383
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -497,6 +497,62 @@ case class ArrayAggregate(
   override def prettyName: String = "aggregate"
 }
 
+/**
+ * Transform Keys for every entry of the map by applying the 
transform_keys function.
+ * Returns map with transformed key entries
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, func) - Transforms elements in a map using the 
function.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3)), (k, v) -> k + 
1);
+   map(array(2, 3, 4), array(1, 2, 3))
+  > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3)), (k, v) -> k + 
v);
+   map(array(2, 4, 6), array(1, 2, 3))
+  """,
+  since = "2.4.0")
+case class TransformKeys(
+argument: Expression,
+function: Expression)
+  extends MapBasedSimpleHigherOrderFunction with CodegenFallback {
+
+  override def nullable: Boolean = argument.nullable
+
+  @transient lazy val MapType(keyType, valueType, valueContainsNull) = 
argument.dataType
+
+  override def dataType: DataType = {
+MapType(function.dataType, valueType, valueContainsNull)
--- End diff --

nit: just in one line?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function

2018-08-15 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22013#discussion_r210368929
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -497,6 +497,62 @@ case class ArrayAggregate(
   override def prettyName: String = "aggregate"
 }
 
+/**
+ * Transform Keys for every entry of the map by applying the 
transform_keys function.
+ * Returns map with transformed key entries
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, func) - Transforms elements in a map using the 
function.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3)), (k, v) -> k + 
1);
+   map(array(2, 3, 4), array(1, 2, 3))
+  > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3)), (k, v) -> k + 
v);
+   map(array(2, 4, 6), array(1, 2, 3))
+  """,
+  since = "2.4.0")
+case class TransformKeys(
+argument: Expression,
+function: Expression)
+  extends MapBasedSimpleHigherOrderFunction with CodegenFallback {
+
+  override def nullable: Boolean = argument.nullable
+
+  @transient lazy val MapType(keyType, valueType, valueContainsNull) = 
argument.dataType
+
+  override def dataType: DataType = {
+MapType(function.dataType, valueType, valueContainsNull)
+  }
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): TransformKeys = {
+copy(function = f(function, (keyType, false) :: (valueType, 
valueContainsNull) :: Nil))
+  }
+
+  @transient lazy val LambdaFunction(
+  _, (keyVar: NamedLambdaVariable) :: (valueVar: NamedLambdaVariable) :: 
Nil, _) = function
+
+
+  override def nullSafeEval(inputRow: InternalRow, argumentValue: Any): 
Any = {
+val map = argumentValue.asInstanceOf[MapData]
+val f = functionForEval
--- End diff --

Can't we use ```functionForEval``` directly?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22110: [SPARK-25122][SQL] Deduplication of supports equa...

2018-08-15 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22110#discussion_r210357474
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala 
---
@@ -73,4 +73,14 @@ object TypeUtils {
 }
 x.length - y.length
   }
+
+  /**
+   * Returns true if elements of the data type could be used as items of a 
hash set or as keys
--- End diff --

What about changing the name of the method to ```typeCanBeHashed```?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22110: [SPARK-25122][SQL] Deduplication of supports equa...

2018-08-15 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22110#discussion_r210350632
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala 
---
@@ -73,4 +73,14 @@ object TypeUtils {
 }
 x.length - y.length
   }
+
+  /**
+   * Returns true if elements of the data type could be used as items of a 
hash set or as keys
--- End diff --

I'm open to any changes :) But if you want to explicitly mention the 
```equals``` method, I would also mention ```hashCode``` generally needed for 
usage in "hash" collections. But then this not 100% true for Spark's 
specialized ```OpenHashSets``` and ```OpenHashMaps``` since they calculate hash 
by themselves. WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22110: [SPARK-25122][SQL] Deduplication of supports equa...

2018-08-15 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22110#discussion_r210212586
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/types/AbstractDataType.scala 
---
@@ -115,6 +115,8 @@ protected[sql] abstract class AtomicType extends 
DataType {
   private[sql] type InternalType
   private[sql] val tag: TypeTag[InternalType]
   private[sql] val ordering: Ordering[InternalType]
+
+  private[spark] override def supportsEquals: Boolean = true
--- End diff --

Not all of the expressions utilize ```OpenHashSet``` or ```OpenHashMap```. 
What about ```TypeUtils``` that contains methods like 
```getInterpretedOrdering```?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22110: [SPARK-25122][SQL] Deduplication of supports equals code

2018-08-15 Thread mn-mikke
Github user mn-mikke commented on the issue:

https://github.com/apache/spark/pull/22110
  
cc @ueshin @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22110: [SPARK-25122][SQL] Deduplication of supports equa...

2018-08-15 Thread mn-mikke
GitHub user mn-mikke opened a pull request:

https://github.com/apache/spark/pull/22110

[SPARK-25122][SQL] Deduplication of supports equals code

## What changes were proposed in this pull request?

The method ```*supportEquals``` determining whether elements of a data type 
could be used as items in a hash set or as keys in a hash map is duplicated 
across multiple collection and higher-order functions.

This PR suggests to deduplicate the method.

## How was this patch tested?

Run tests in:
- DataFrameFunctionsSuite
- CollectionExpressionsSuite
- HigherOrderExpressionsSuite


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mn-mikke/spark SPARK-25122

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22110.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22110


commit dd292e8cf3ed1788793e626da3a136e9acb9d81c
Author: Marek Novotny 
Date:   2018-08-15T08:18:05Z

[SPARK-25122][SQL] Deduplication of supports equals code




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22031: [TODO][SPARK-23932][SQL] Higher order function zi...

2018-08-14 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22031#discussion_r209920407
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,91 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(left, right, func) - Merges the two given arrays, 
element-wise, into a single array using function. If one array is shorter, 
nulls are appended at the end to match the length of the longer array, before 
applying function.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, 
x));
+   array(('a', 1), ('b', 3), ('c', 5))
+  > SELECT _FUNC_(array(1, 2), array(3, 4), (x, y) -> x + y));
+   array(4, 6)
+  > SELECT _FUNC_(array('a', 'b', 'c'), array('d', 'e', 'f'), (x, y) 
-> concat(x, y));
+   array('ad', 'be', 'cf')
+  """,
+  since = "2.4.0")
+// scalastyle:on line.size.limit
+case class ArraysZipWith(
+left: Expression,
+right: Expression,
+function: Expression)
+  extends HigherOrderFunction with CodegenFallback with ExpectsInputTypes {
+
+  override def inputs: Seq[Expression] = List(left, right)
+
+  override def functions: Seq[Expression] = List(function)
+
+  def expectingFunctionType: AbstractDataType = AnyDataType
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, 
ArrayType, expectingFunctionType)
+
+  override def nullable: Boolean = inputs.exists(_.nullable)
+
+  override def dataType: ArrayType = ArrayType(function.dataType, 
function.nullable)
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): ArraysZipWith = {
+val (leftElementType, leftContainsNull) = left.dataType match {
--- End diff --

This comment is not valid anymore. The method has been removed by 
[#22075](https://github.com/apache/spark/pull/22075).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-14 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r209876913
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -496,3 +496,194 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  def functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val MapType(leftKeyType, leftValueType, 
leftValueContainsNull) = left.dataType
+
+  @transient lazy val MapType(rightKeyType, rightValueType, 
rightValueContainsNull) = right.dataType
+
+  @transient lazy val keyType =
+TypeCoercion.findCommonTypeDifferentOnlyInNullFlags(leftKeyType, 
rightKeyType).get
--- End diff --

If ```leftKeyType``` is ```ArrayType(IntegerType, false)``` and 
```rightKeyType``` is ```ArrayType(IntegerType, true)``` for instance, the 
coercion rule is not executed ```leftKeyType.sameType(rightKeyType) == true```.

An array with nulls seems to be a valid key.:
```
scala> spark.range(1).selectExpr("map(array(1, 2, null), 12)").show()
+---+
|map(array(1, 2, CAST(NULL AS INT)), 12)|
+---+
|[[1, 2,] -> 12]|
+---+
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22075: [SPARK-23908][SQL][FOLLOW-UP] Rename inputs to ar...

2018-08-13 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22075#discussion_r209643649
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -422,45 +425,49 @@ case class ArrayExists(
   """,
   since = "2.4.0")
 case class ArrayAggregate(
-input: Expression,
+argument: Expression,
 zero: Expression,
 merge: Expression,
 finish: Expression)
   extends HigherOrderFunction with CodegenFallback {
 
-  def this(input: Expression, zero: Expression, merge: Expression) = {
-this(input, zero, merge, LambdaFunction.identity)
+  def this(argument: Expression, zero: Expression, merge: Expression) = {
+this(argument, zero, merge, LambdaFunction.identity)
   }
 
-  override def inputs: Seq[Expression] = input :: zero :: Nil
+  override def arguments: Seq[Expression] = argument :: zero :: Nil
+
+  override def argumentTypes: Seq[AbstractDataType] = ArrayType :: 
AnyDataType :: Nil
 
   override def functions: Seq[Expression] = merge :: finish :: Nil
 
-  override def nullable: Boolean = input.nullable || finish.nullable
+  override def functionTypes: Seq[AbstractDataType] = zero.dataType :: 
AnyDataType :: Nil
+
+  override def nullable: Boolean = argument.nullable || finish.nullable
 
   override def dataType: DataType = finish.dataType
 
   override def checkInputDataTypes(): TypeCheckResult = {
-if (!ArrayType.acceptsType(input.dataType)) {
-  TypeCheckResult.TypeCheckFailure(
-s"argument 1 requires ${ArrayType.simpleString} type, " +
-  s"however, '${input.sql}' is of ${input.dataType.catalogString} 
type.")
-} else if (!DataType.equalsStructurally(
-zero.dataType, merge.dataType, ignoreNullability = true)) {
-  TypeCheckResult.TypeCheckFailure(
-s"argument 3 requires ${zero.dataType.simpleString} type, " +
-  s"however, '${merge.sql}' is of ${merge.dataType.catalogString} 
type.")
-} else {
-  TypeCheckResult.TypeCheckSuccess
+checkArgumentDataTypes() match {
--- End diff --

just a quick question: Isn't calling of ```checkArgumentDataTypes``` extra 
here if ```checkArgumentDataTypes``` is called  as such before 
```checkInputDataTypes```?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-13 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r209533017
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,186 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val (leftKeyType, leftValueType, leftValueContainsNull) =
+HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
+
+  @transient lazy val (rightKeyType, rightValueType, 
rightValueContainsNull) =
+HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
+
+  @transient lazy val keyType =
+TypeCoercion.findTightestCommonType(leftKeyType, 
rightKeyType).getOrElse(NullType)
--- End diff --

Oh, I see. We also need to check the output data type of lambda functions 
for the expressions like ```ArrayFilter```.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-13 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r209515431
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -231,6 +231,15 @@ object TypeCoercion {
   })
   }
 
+  /**
+   * Similar to [[findTightestCommonType]] but with string promotion.
+   */
+  def findWiderTypeForTwoExceptDecimals(t1: DataType, t2: DataType): 
Option[DataType] = {
--- End diff --

Thanks for both your PRs! I will submit changes once they get in.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-13 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r209514957
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,186 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val (leftKeyType, leftValueType, leftValueContainsNull) =
+HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
+
+  @transient lazy val (rightKeyType, rightValueType, 
rightValueContainsNull) =
+HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
+
+  @transient lazy val keyType =
+TypeCoercion.findTightestCommonType(leftKeyType, 
rightKeyType).getOrElse(NullType)
--- End diff --

IMHO, if ```checkInputDataTypes``` was executed before ```bind```, 
```findTightestCommonType``` could play the same role. But yeah, 
```findCommonTypeDifferentOnlyInNullFlags``` will be semantically more 
accurate. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-10 Thread mn-mikke
Github user mn-mikke commented on the issue:

https://github.com/apache/spark/pull/22017
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-10 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r209311502
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 ---
@@ -231,6 +231,15 @@ object TypeCoercion {
   })
   }
 
+  /**
+   * Similar to [[findTightestCommonType]] but with string promotion.
+   */
+  def findWiderTypeForTwoExceptDecimals(t1: DataType, t2: DataType): 
Option[DataType] = {
--- End diff --

If we have maps with decimals of different precision as keys. ```Cast``` 
will fail in analysis phase since it can't cast a key to nullable (potential 
lost of precision). IMHO, the type mismatch exception from this function will 
be more accurate. WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-10 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r209188342
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,186 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val (leftKeyType, leftValueType, leftValueContainsNull) =
+HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
+
+  @transient lazy val (rightKeyType, rightValueType, 
rightValueContainsNull) =
+HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
+
+  @transient lazy val keyType =
+TypeCoercion.findTightestCommonType(leftKeyType, 
rightKeyType).getOrElse(NullType)
--- End diff --

Even though there is a coercion rule for unification of key types. The key 
types may differ in nullability flags if they are complex. In theory, we could 
use ```==``` and ```findTightestCommonType``` in the coercion rule  since there 
is no codegen to be optimized for ```null``` checks. But unfortunatelly, 
```bind``` gets called once before execution of coercion rules, so 
```findTightestCommonType``` is important for setting up a correct input type 
for lamda function.

Maybe, we could play with order of analysis rules, but I'm not sure about 
all the consequences. @ueshin could shad some light on analysis rules ordering?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-09 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208941728
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,191 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val (keyType, leftValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
+
+  @transient lazy val (_, rightValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
+
+  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
+
+  override def inputs: Seq[Expression] = left :: right :: Nil
+
+  override def functions: Seq[Expression] = function :: Nil
+
+  override def nullable: Boolean = left.nullable || right.nullable
+
+  override def dataType: DataType = MapType(keyType, function.dataType, 
function.nullable)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
+TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
+  case _ => TypeCheckResult.TypeCheckFailure(s"The input to function 
$prettyName should have " +
+s"been two ${MapType.simpleString}s with the same key type, but 
it's " +
+s"[${left.dataType.catalogString}, 
${right.dataType.catalogString}].")
+}
+  }
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): MapZipWith = {
+val arguments = Seq((keyType, false), (leftValueType, true), 
(rightValueType, true))
+copy(function = f(function, arguments))
+  }
+
+  override def eval(input: InternalRow): Any = {
+val value1 = left.eval(input)
+if (value1 == null) {
+  null
+} else {
+  val value2 = right.eval(input)
+  if (value2 == null) {
+null
+  } else {
+nullSafeEval(input, value1, value2)
+  }
+}
+  }
+
+  @transient lazy val LambdaFunction(_, Seq(
+keyVar: NamedLambdaVariable,
+value1Var: NamedLambdaVariable,
+value2Var: NamedLambdaVariable),
+_) = function
+
+  private def keyTypeSupportsEquals = keyType match {
+case BinaryType => false
+case _: AtomicType => true
+case _ => false
+  }
+
+  @transient private lazy val getKeysWithValueIndexes:
+  (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
+if (keyTypeSupportsEquals) {
+  getKeysWithIndexesFast
+} else {
+  getKeysWithIndexesBruteForce
+}
+  }
+
+  private def assertSizeOfArrayBuffer(size: Int): Unit = {
+if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+  throw new RuntimeException(s"Unsuccessful try to zip maps with $size 
" +
+s"unique keys due to exceeding the array size limit " +
+s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
+}
+  }
+
+  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = 
{
+val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
--- End diff --

Like this idea, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21121: [SPARK-24042][SQL] Collection function: zip_with_...

2018-08-09 Thread mn-mikke
Github user mn-mikke closed the pull request at:

https://github.com/apache/spark/pull/21121


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21121: [SPARK-24042][SQL] Collection function: zip_with_index

2018-08-09 Thread mn-mikke
Github user mn-mikke commented on the issue:

https://github.com/apache/spark/pull/21121
  
Sure, closing ...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-09 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208872928
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,191 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val (keyType, leftValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
+
+  @transient lazy val (_, rightValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
+
+  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
+
+  override def inputs: Seq[Expression] = left :: right :: Nil
+
+  override def functions: Seq[Expression] = function :: Nil
+
+  override def nullable: Boolean = left.nullable || right.nullable
+
+  override def dataType: DataType = MapType(keyType, function.dataType, 
function.nullable)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
+TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
+  case _ => TypeCheckResult.TypeCheckFailure(s"The input to function 
$prettyName should have " +
+s"been two ${MapType.simpleString}s with the same key type, but 
it's " +
+s"[${left.dataType.catalogString}, 
${right.dataType.catalogString}].")
+}
+  }
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): MapZipWith = {
+val arguments = Seq((keyType, false), (leftValueType, true), 
(rightValueType, true))
+copy(function = f(function, arguments))
+  }
+
+  override def eval(input: InternalRow): Any = {
+val value1 = left.eval(input)
+if (value1 == null) {
+  null
+} else {
+  val value2 = right.eval(input)
+  if (value2 == null) {
+null
+  } else {
+nullSafeEval(input, value1, value2)
+  }
+}
+  }
+
+  @transient lazy val LambdaFunction(_, Seq(
+keyVar: NamedLambdaVariable,
+value1Var: NamedLambdaVariable,
+value2Var: NamedLambdaVariable),
+_) = function
+
+  private def keyTypeSupportsEquals = keyType match {
+case BinaryType => false
+case _: AtomicType => true
+case _ => false
+  }
+
+  @transient private lazy val getKeysWithValueIndexes:
+  (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
+if (keyTypeSupportsEquals) {
+  getKeysWithIndexesFast
+} else {
+  getKeysWithIndexesBruteForce
+}
+  }
+
+  private def assertSizeOfArrayBuffer(size: Int): Unit = {
+if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+  throw new RuntimeException(s"Unsuccessful try to zip maps with $size 
" +
+s"unique keys due to exceeding the array size limit " +
+s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
+}
+  }
+
+  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = 
{
+val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
--- End diff --

@mgaido91 Are you comfortable with reverting back to the previous version?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-09 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208871779
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,184 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val (keyType, leftValueType, _) =
--- End diff --

You are right, thanks!

WDYT about introducing a coercion rule handling different key types? For 
cases like (```IntType```, ```LongType```) might be handy.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-09 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208868838
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
 ---
@@ -225,7 +264,9 @@ trait ExpressionEvalHelper extends 
GeneratorDrivenPropertyChecks with PlanTestBa
   val lit = InternalRow(expected, expected)
   val expectedRow =
 UnsafeProjection.create(Array(expression.dataType, 
expression.dataType)).apply(lit)
-  if (unsafeRow != expectedRow) {
+  val field = StructField("field", expression.dataType)
+  val dataType = StructType(field :: field :: Nil)
+  if (!checkResult(unsafeRow, expectedRow, dataType)) {
--- End diff --

```UnsafeRow```s are compared based on equality of backing arrays. This 
approach doesn't work well when ignoring order in unsafe representation of maps.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22045: [SPARK-23940][SQL] Add transform_values SQL funct...

2018-08-08 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22045#discussion_r208749197
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,61 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Transform Values for every entry of the map by applying 
transform_values function.
+ * Returns map wth transformed values
+ */
+@ExpressionDescription(
+usage = "_FUNC_(expr, func) - Transforms values in the map using the 
function.",
+examples = """
+Examples:
+   > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1);
+map(array(1, 2, 3), array(2, 3, 4))
+   > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k, v) -> k + 
v);
+map(array(1, 2, 3), array(2, 4, 6))
+  """,
+since = "2.4.0")
+case class TransformValues(
+input: Expression,
+function: Expression)
+  extends MapBasedSimpleHigherOrderFunction with CodegenFallback {
+
+  override def nullable: Boolean = input.nullable
+
+  override def dataType: DataType = {
+val map = input.dataType.asInstanceOf[MapType]
+MapType(map.keyType, function.dataType, map.valueContainsNull)
--- End diff --

```map.valueContainsNull``` -> ```function.nullable```?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22045: [SPARK-23940][SQL] Add transform_values SQL funct...

2018-08-08 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22045#discussion_r208750446
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,61 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Transform Values for every entry of the map by applying 
transform_values function.
+ * Returns map wth transformed values
+ */
+@ExpressionDescription(
+usage = "_FUNC_(expr, func) - Transforms values in the map using the 
function.",
+examples = """
+Examples:
+   > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1);
+map(array(1, 2, 3), array(2, 3, 4))
+   > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k, v) -> k + 
v);
+map(array(1, 2, 3), array(2, 4, 6))
+  """,
+since = "2.4.0")
+case class TransformValues(
+input: Expression,
+function: Expression)
+  extends MapBasedSimpleHigherOrderFunction with CodegenFallback {
+
+  override def nullable: Boolean = input.nullable
+
+  override def dataType: DataType = {
+val map = input.dataType.asInstanceOf[MapType]
+MapType(map.keyType, function.dataType, map.valueContainsNull)
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(MapType, 
expectingFunctionType)
--- End diff --

This is already specified by ```MapBasedSimpleHigherOrderFunction```.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22045: [SPARK-23940][SQL] Add transform_values SQL funct...

2018-08-08 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22045#discussion_r208751629
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,61 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Transform Values for every entry of the map by applying 
transform_values function.
+ * Returns map wth transformed values
+ */
+@ExpressionDescription(
+usage = "_FUNC_(expr, func) - Transforms values in the map using the 
function.",
+examples = """
+Examples:
+   > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1);
+map(array(1, 2, 3), array(2, 3, 4))
+   > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k, v) -> k + 
v);
+map(array(1, 2, 3), array(2, 4, 6))
+  """,
+since = "2.4.0")
+case class TransformValues(
+input: Expression,
+function: Expression)
+  extends MapBasedSimpleHigherOrderFunction with CodegenFallback {
+
+  override def nullable: Boolean = input.nullable
+
+  override def dataType: DataType = {
+val map = input.dataType.asInstanceOf[MapType]
+MapType(map.keyType, function.dataType, map.valueContainsNull)
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(MapType, 
expectingFunctionType)
+
+  @transient val (keyType, valueType, valueContainsNull) =
+HigherOrderFunction.mapKeyValueArgumentType(input.dataType)
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction):
--- End diff --

nit: formatting


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22045: [SPARK-23940][SQL] Add transform_values SQL funct...

2018-08-08 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22045#discussion_r208747953
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,61 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Transform Values for every entry of the map by applying 
transform_values function.
+ * Returns map wth transformed values
+ */
+@ExpressionDescription(
+usage = "_FUNC_(expr, func) - Transforms values in the map using the 
function.",
+examples = """
+Examples:
+   > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1);
--- End diff --

nit:```(k, v)``` and maybe I would use ```v + 1``` instead of ```k + 1```.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22045: [SPARK-23940][SQL] Add transform_values SQL funct...

2018-08-08 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22045#discussion_r208746631
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,61 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Transform Values for every entry of the map by applying 
transform_values function.
+ * Returns map wth transformed values
--- End diff --

typos: Transforms values; with 
Maybe can you think of a better comment?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-08 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208675350
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,191 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val (keyType, leftValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
+
+  @transient lazy val (_, rightValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
+
+  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
+
+  override def inputs: Seq[Expression] = left :: right :: Nil
+
+  override def functions: Seq[Expression] = function :: Nil
+
+  override def nullable: Boolean = left.nullable || right.nullable
+
+  override def dataType: DataType = MapType(keyType, function.dataType, 
function.nullable)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
+TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
+  case _ => TypeCheckResult.TypeCheckFailure(s"The input to function 
$prettyName should have " +
+s"been two ${MapType.simpleString}s with the same key type, but 
it's " +
+s"[${left.dataType.catalogString}, 
${right.dataType.catalogString}].")
+}
+  }
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): MapZipWith = {
+val arguments = Seq((keyType, false), (leftValueType, true), 
(rightValueType, true))
+copy(function = f(function, arguments))
+  }
+
+  override def eval(input: InternalRow): Any = {
+val value1 = left.eval(input)
+if (value1 == null) {
+  null
+} else {
+  val value2 = right.eval(input)
+  if (value2 == null) {
+null
+  } else {
+nullSafeEval(input, value1, value2)
+  }
+}
+  }
+
+  @transient lazy val LambdaFunction(_, Seq(
+keyVar: NamedLambdaVariable,
+value1Var: NamedLambdaVariable,
+value2Var: NamedLambdaVariable),
+_) = function
+
+  private def keyTypeSupportsEquals = keyType match {
+case BinaryType => false
+case _: AtomicType => true
+case _ => false
+  }
+
+  @transient private lazy val getKeysWithValueIndexes:
+  (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
+if (keyTypeSupportsEquals) {
+  getKeysWithIndexesFast
+} else {
+  getKeysWithIndexesBruteForce
+}
+  }
+
+  private def assertSizeOfArrayBuffer(size: Int): Unit = {
+if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+  throw new RuntimeException(s"Unsuccessful try to zip maps with $size 
" +
+s"unique keys due to exceeding the array size limit " +
+s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
+}
+  }
+
+  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = 
{
+val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
--- End diff --

This is valid argument, it's a rare edge case. The last question before I 
change it. WDYT about performance a mutable array vs. ```oldTuple.copy(_2 = 
newValue)```?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-08 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208647186
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,191 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val (keyType, leftValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
+
+  @transient lazy val (_, rightValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
+
+  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
+
+  override def inputs: Seq[Expression] = left :: right :: Nil
+
+  override def functions: Seq[Expression] = function :: Nil
+
+  override def nullable: Boolean = left.nullable || right.nullable
+
+  override def dataType: DataType = MapType(keyType, function.dataType, 
function.nullable)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
+TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
+  case _ => TypeCheckResult.TypeCheckFailure(s"The input to function 
$prettyName should have " +
+s"been two ${MapType.simpleString}s with the same key type, but 
it's " +
+s"[${left.dataType.catalogString}, 
${right.dataType.catalogString}].")
+}
+  }
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): MapZipWith = {
+val arguments = Seq((keyType, false), (leftValueType, true), 
(rightValueType, true))
+copy(function = f(function, arguments))
+  }
+
+  override def eval(input: InternalRow): Any = {
+val value1 = left.eval(input)
+if (value1 == null) {
+  null
+} else {
+  val value2 = right.eval(input)
+  if (value2 == null) {
+null
+  } else {
+nullSafeEval(input, value1, value2)
+  }
+}
+  }
+
+  @transient lazy val LambdaFunction(_, Seq(
+keyVar: NamedLambdaVariable,
+value1Var: NamedLambdaVariable,
+value2Var: NamedLambdaVariable),
+_) = function
+
+  private def keyTypeSupportsEquals = keyType match {
+case BinaryType => false
+case _: AtomicType => true
+case _ => false
+  }
+
+  @transient private lazy val getKeysWithValueIndexes:
+  (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
+if (keyTypeSupportsEquals) {
+  getKeysWithIndexesFast
+} else {
+  getKeysWithIndexesBruteForce
+}
+  }
+
+  private def assertSizeOfArrayBuffer(size: Int): Unit = {
+if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+  throw new RuntimeException(s"Unsuccessful try to zip maps with $size 
" +
+s"unique keys due to exceeding the array size limit " +
+s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
+}
+  }
+
+  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = 
{
+val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
--- End diff --

```indexes(z).isEmpty``` ensures that we insert always the the first 
occurrence of the key, which follows behavior of ```GetMapValue```. If we 
didn't perform such a check the last occurrence would ended up in result.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-08 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208605882
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,191 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val (keyType, leftValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
+
+  @transient lazy val (_, rightValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
+
+  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
+
+  override def inputs: Seq[Expression] = left :: right :: Nil
+
+  override def functions: Seq[Expression] = function :: Nil
+
+  override def nullable: Boolean = left.nullable || right.nullable
+
+  override def dataType: DataType = MapType(keyType, function.dataType, 
function.nullable)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
+TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
+  case _ => TypeCheckResult.TypeCheckFailure(s"The input to function 
$prettyName should have " +
+s"been two ${MapType.simpleString}s with the same key type, but 
it's " +
+s"[${left.dataType.catalogString}, 
${right.dataType.catalogString}].")
+}
+  }
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): MapZipWith = {
+val arguments = Seq((keyType, false), (leftValueType, true), 
(rightValueType, true))
+copy(function = f(function, arguments))
+  }
+
+  override def eval(input: InternalRow): Any = {
+val value1 = left.eval(input)
+if (value1 == null) {
+  null
+} else {
+  val value2 = right.eval(input)
+  if (value2 == null) {
+null
+  } else {
+nullSafeEval(input, value1, value2)
+  }
+}
+  }
+
+  @transient lazy val LambdaFunction(_, Seq(
+keyVar: NamedLambdaVariable,
+value1Var: NamedLambdaVariable,
+value2Var: NamedLambdaVariable),
+_) = function
+
+  private def keyTypeSupportsEquals = keyType match {
+case BinaryType => false
+case _: AtomicType => true
+case _ => false
+  }
+
+  @transient private lazy val getKeysWithValueIndexes:
+  (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
+if (keyTypeSupportsEquals) {
+  getKeysWithIndexesFast
+} else {
+  getKeysWithIndexesBruteForce
+}
+  }
+
+  private def assertSizeOfArrayBuffer(size: Int): Unit = {
+if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+  throw new RuntimeException(s"Unsuccessful try to zip maps with $size 
" +
+s"unique keys due to exceeding the array size limit " +
+s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
+}
+  }
+
+  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = 
{
+val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
--- End diff --

> you don't need to check neither whether the key is there nor the size of 
the output array, you just need to add them.

What about duplicated keys? They can be created with other map functions.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-08 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208559241
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,191 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val (keyType, leftValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
+
+  @transient lazy val (_, rightValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
+
+  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
+
+  override def inputs: Seq[Expression] = left :: right :: Nil
+
+  override def functions: Seq[Expression] = function :: Nil
+
+  override def nullable: Boolean = left.nullable || right.nullable
+
+  override def dataType: DataType = MapType(keyType, function.dataType, 
function.nullable)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
+TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
+  case _ => TypeCheckResult.TypeCheckFailure(s"The input to function 
$prettyName should have " +
+s"been two ${MapType.simpleString}s with the same key type, but 
it's " +
+s"[${left.dataType.catalogString}, 
${right.dataType.catalogString}].")
+}
+  }
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): MapZipWith = {
+val arguments = Seq((keyType, false), (leftValueType, true), 
(rightValueType, true))
+copy(function = f(function, arguments))
+  }
+
+  override def eval(input: InternalRow): Any = {
+val value1 = left.eval(input)
+if (value1 == null) {
+  null
+} else {
+  val value2 = right.eval(input)
+  if (value2 == null) {
+null
+  } else {
+nullSafeEval(input, value1, value2)
+  }
+}
+  }
+
+  @transient lazy val LambdaFunction(_, Seq(
+keyVar: NamedLambdaVariable,
+value1Var: NamedLambdaVariable,
+value2Var: NamedLambdaVariable),
+_) = function
+
+  private def keyTypeSupportsEquals = keyType match {
+case BinaryType => false
+case _: AtomicType => true
+case _ => false
+  }
+
+  @transient private lazy val getKeysWithValueIndexes:
+  (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
+if (keyTypeSupportsEquals) {
+  getKeysWithIndexesFast
+} else {
+  getKeysWithIndexesBruteForce
+}
+  }
+
+  private def assertSizeOfArrayBuffer(size: Int): Unit = {
+if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+  throw new RuntimeException(s"Unsuccessful try to zip maps with $size 
" +
+s"unique keys due to exceeding the array size limit " +
+s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
+}
+  }
+
+  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = 
{
+val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
--- End diff --

Yeah, but my point is how to crate a new tuple from a old one without using 
```_1```, ```_2```?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-08 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208550479
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,191 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val (keyType, leftValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
+
+  @transient lazy val (_, rightValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
+
+  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
+
+  override def inputs: Seq[Expression] = left :: right :: Nil
+
+  override def functions: Seq[Expression] = function :: Nil
+
+  override def nullable: Boolean = left.nullable || right.nullable
+
+  override def dataType: DataType = MapType(keyType, function.dataType, 
function.nullable)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
+TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
+  case _ => TypeCheckResult.TypeCheckFailure(s"The input to function 
$prettyName should have " +
+s"been two ${MapType.simpleString}s with the same key type, but 
it's " +
+s"[${left.dataType.catalogString}, 
${right.dataType.catalogString}].")
+}
+  }
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): MapZipWith = {
+val arguments = Seq((keyType, false), (leftValueType, true), 
(rightValueType, true))
+copy(function = f(function, arguments))
+  }
+
+  override def eval(input: InternalRow): Any = {
+val value1 = left.eval(input)
+if (value1 == null) {
+  null
+} else {
+  val value2 = right.eval(input)
+  if (value2 == null) {
+null
+  } else {
+nullSafeEval(input, value1, value2)
+  }
+}
+  }
+
+  @transient lazy val LambdaFunction(_, Seq(
+keyVar: NamedLambdaVariable,
+value1Var: NamedLambdaVariable,
+value2Var: NamedLambdaVariable),
+_) = function
+
+  private def keyTypeSupportsEquals = keyType match {
+case BinaryType => false
+case _: AtomicType => true
+case _ => false
+  }
+
+  @transient private lazy val getKeysWithValueIndexes:
+  (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
+if (keyTypeSupportsEquals) {
+  getKeysWithIndexesFast
+} else {
+  getKeysWithIndexesBruteForce
+}
+  }
+
+  private def assertSizeOfArrayBuffer(size: Int): Unit = {
+if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+  throw new RuntimeException(s"Unsuccessful try to zip maps with $size 
" +
+s"unique keys due to exceeding the array size limit " +
+s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
+}
+  }
+
+  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = 
{
+val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
--- End diff --

> I really don't think so, it would be the same as now I think

Let's assume that ```indexes``` are tuple for now. ```indexes(z).isEmpty``` 
could replace with ```indexes

[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-08 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208527423
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,191 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val (keyType, leftValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
+
+  @transient lazy val (_, rightValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
+
+  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
+
+  override def inputs: Seq[Expression] = left :: right :: Nil
+
+  override def functions: Seq[Expression] = function :: Nil
+
+  override def nullable: Boolean = left.nullable || right.nullable
+
+  override def dataType: DataType = MapType(keyType, function.dataType, 
function.nullable)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
+TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
+  case _ => TypeCheckResult.TypeCheckFailure(s"The input to function 
$prettyName should have " +
+s"been two ${MapType.simpleString}s with the same key type, but 
it's " +
+s"[${left.dataType.catalogString}, 
${right.dataType.catalogString}].")
+}
+  }
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): MapZipWith = {
+val arguments = Seq((keyType, false), (leftValueType, true), 
(rightValueType, true))
+copy(function = f(function, arguments))
+  }
+
+  override def eval(input: InternalRow): Any = {
+val value1 = left.eval(input)
+if (value1 == null) {
+  null
+} else {
+  val value2 = right.eval(input)
+  if (value2 == null) {
+null
+  } else {
+nullSafeEval(input, value1, value2)
+  }
+}
+  }
+
+  @transient lazy val LambdaFunction(_, Seq(
+keyVar: NamedLambdaVariable,
+value1Var: NamedLambdaVariable,
+value2Var: NamedLambdaVariable),
+_) = function
+
+  private def keyTypeSupportsEquals = keyType match {
+case BinaryType => false
+case _: AtomicType => true
+case _ => false
+  }
+
+  @transient private lazy val getKeysWithValueIndexes:
+  (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
+if (keyTypeSupportsEquals) {
+  getKeysWithIndexesFast
+} else {
+  getKeysWithIndexesBruteForce
+}
+  }
+
+  private def assertSizeOfArrayBuffer(size: Int): Unit = {
+if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+  throw new RuntimeException(s"Unsuccessful try to zip maps with $size 
" +
+s"unique keys due to exceeding the array size limit " +
+s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
+}
+  }
+
+  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = 
{
+val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
+val hashMap = new mutable.OpenHashMap[Any, Array[Option[Int]]]
+val keys = Array(keys1, keys2)
+var z = 0
+while(z < 2) {
+  var i = 0
+  val array = keys(z)
+  while (i < array.numElements()) {
+val

[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-08 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208519941
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,191 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val (keyType, leftValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(left.dataType)
+
+  @transient lazy val (_, rightValueType, _) =
+HigherOrderFunction.mapKeyValueArgumentType(right.dataType)
+
+  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
+
+  override def inputs: Seq[Expression] = left :: right :: Nil
+
+  override def functions: Seq[Expression] = function :: Nil
+
+  override def nullable: Boolean = left.nullable || right.nullable
+
+  override def dataType: DataType = MapType(keyType, function.dataType, 
function.nullable)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
+TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
+  case _ => TypeCheckResult.TypeCheckFailure(s"The input to function 
$prettyName should have " +
+s"been two ${MapType.simpleString}s with the same key type, but 
it's " +
+s"[${left.dataType.catalogString}, 
${right.dataType.catalogString}].")
+}
+  }
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): MapZipWith = {
+val arguments = Seq((keyType, false), (leftValueType, true), 
(rightValueType, true))
+copy(function = f(function, arguments))
+  }
+
+  override def eval(input: InternalRow): Any = {
+val value1 = left.eval(input)
+if (value1 == null) {
+  null
+} else {
+  val value2 = right.eval(input)
+  if (value2 == null) {
+null
+  } else {
+nullSafeEval(input, value1, value2)
+  }
+}
+  }
+
+  @transient lazy val LambdaFunction(_, Seq(
+keyVar: NamedLambdaVariable,
+value1Var: NamedLambdaVariable,
+value2Var: NamedLambdaVariable),
+_) = function
+
+  private def keyTypeSupportsEquals = keyType match {
+case BinaryType => false
+case _: AtomicType => true
+case _ => false
+  }
+
+  @transient private lazy val getKeysWithValueIndexes:
+  (ArrayData, ArrayData) => Seq[(Any, Array[Option[Int]])] = {
+if (keyTypeSupportsEquals) {
+  getKeysWithIndexesFast
+} else {
+  getKeysWithIndexesBruteForce
+}
+  }
+
+  private def assertSizeOfArrayBuffer(size: Int): Unit = {
+if (size > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
+  throw new RuntimeException(s"Unsuccessful try to zip maps with $size 
" +
+s"unique keys due to exceeding the array size limit " +
+s"${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.")
+}
+  }
+
+  private def getKeysWithIndexesFast(keys1: ArrayData, keys2: ArrayData) = 
{
+val arrayBuffer = new mutable.ArrayBuffer[(Any, Array[Option[Int]])]
--- End diff --

If we changed it to ```(Option[Int], Option[Int])```, wouldn't we need two 
similar ```i``` loops instead of one?

My motivation for using also the ```ArrayBuffer``` is preserve the order of 
keys. A random order would break map comparison 

[GitHub] spark pull request #22031: [TODO][SPARK-23932][SQL] Higher order function zi...

2018-08-07 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22031#discussion_r208399620
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,91 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(left, right, func) - Merges the two given arrays, 
element-wise, into a single array using function. If one array is shorter, 
nulls are appended at the end to match the length of the longer array, before 
applying function.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, 
x));
+   array(('a', 1), ('b', 3), ('c', 5))
+  > SELECT _FUNC_(array(1, 2), array(3, 4), (x, y) -> x + y));
+   array(4, 6)
+  > SELECT _FUNC_(array('a', 'b', 'c'), array('d', 'e', 'f'), (x, y) 
-> concat(x, y));
+   array('ad', 'be', 'cf')
+  """,
+  since = "2.4.0")
+// scalastyle:on line.size.limit
+case class ArraysZipWith(
+left: Expression,
+right: Expression,
+function: Expression)
+  extends HigherOrderFunction with CodegenFallback with ExpectsInputTypes {
+
+  override def inputs: Seq[Expression] = List(left, right)
+
+  override def functions: Seq[Expression] = List(function)
+
+  def expectingFunctionType: AbstractDataType = AnyDataType
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, 
ArrayType, expectingFunctionType)
+
+  override def nullable: Boolean = inputs.exists(_.nullable)
+
+  override def dataType: ArrayType = ArrayType(function.dataType, 
function.nullable)
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): ArraysZipWith = {
+val (leftElementType, leftContainsNull) = left.dataType match {
+  case ArrayType(elementType, containsNull) => (elementType, 
containsNull)
+  case _ =>
+val ArrayType(elementType, containsNull) = 
ArrayType.defaultConcreteType
+(elementType, containsNull)
+}
+val (rightElementType, rightContainsNull) = right.dataType match {
+  case ArrayType(elementType, containsNull) => (elementType, 
containsNull)
+  case _ =>
+val ArrayType(elementType, containsNull) = 
ArrayType.defaultConcreteType
+(elementType, containsNull)
+}
+copy(function = f(function,
+  (leftElementType, leftContainsNull) :: (rightElementType, 
rightContainsNull) :: Nil))
+  }
+
+  @transient lazy val (arr1Var, arr2Var) = {
+val LambdaFunction(_,
+  (arr1Var: NamedLambdaVariable):: (arr2Var: NamedLambdaVariable) :: 
Nil, _) = function
+(arr1Var, arr2Var)
+  }
+
+  override def eval(input: InternalRow): Any = {
+val leftArr = left.eval(input).asInstanceOf[ArrayData]
+val rightArr = right.eval(input).asInstanceOf[ArrayData]
+
+if (leftArr == null || rightArr == null) {
--- End diff --

If ```leftArr``` is ```null```, ```right``` doesn't have to be evaluated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22031: [TODO][SPARK-23932][SQL] Higher order function zi...

2018-08-07 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22031#discussion_r208403145
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,91 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(left, right, func) - Merges the two given arrays, 
element-wise, into a single array using function. If one array is shorter, 
nulls are appended at the end to match the length of the longer array, before 
applying function.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, 
x));
+   array(('a', 1), ('b', 3), ('c', 5))
+  > SELECT _FUNC_(array(1, 2), array(3, 4), (x, y) -> x + y));
+   array(4, 6)
+  > SELECT _FUNC_(array('a', 'b', 'c'), array('d', 'e', 'f'), (x, y) 
-> concat(x, y));
+   array('ad', 'be', 'cf')
+  """,
+  since = "2.4.0")
+// scalastyle:on line.size.limit
+case class ArraysZipWith(
+left: Expression,
+right: Expression,
+function: Expression)
+  extends HigherOrderFunction with CodegenFallback with ExpectsInputTypes {
+
+  override def inputs: Seq[Expression] = List(left, right)
+
+  override def functions: Seq[Expression] = List(function)
+
+  def expectingFunctionType: AbstractDataType = AnyDataType
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, 
ArrayType, expectingFunctionType)
+
+  override def nullable: Boolean = inputs.exists(_.nullable)
+
+  override def dataType: ArrayType = ArrayType(function.dataType, 
function.nullable)
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): ArraysZipWith = {
+val (leftElementType, leftContainsNull) = left.dataType match {
+  case ArrayType(elementType, containsNull) => (elementType, 
containsNull)
+  case _ =>
+val ArrayType(elementType, containsNull) = 
ArrayType.defaultConcreteType
+(elementType, containsNull)
+}
+val (rightElementType, rightContainsNull) = right.dataType match {
+  case ArrayType(elementType, containsNull) => (elementType, 
containsNull)
+  case _ =>
+val ArrayType(elementType, containsNull) = 
ArrayType.defaultConcreteType
+(elementType, containsNull)
+}
+copy(function = f(function,
+  (leftElementType, leftContainsNull) :: (rightElementType, 
rightContainsNull) :: Nil))
--- End diff --

If you want to support different size of input arrays (The jira ticket 
says: _"Both arrays must be the same length."_), what about the scenario when 
one array is empty and the second has elements? Shouldn't we use ```true``` 
instead of  ```leftContainsNull``` and ```rightContainsNull```?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22031: [TODO][SPARK-23932][SQL] Higher order function zi...

2018-08-07 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22031#discussion_r208398313
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -442,3 +442,91 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = "_FUNC_(left, right, func) - Merges the two given arrays, 
element-wise, into a single array using function. If one array is shorter, 
nulls are appended at the end to match the length of the longer array, before 
applying function.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, 
x));
+   array(('a', 1), ('b', 3), ('c', 5))
+  > SELECT _FUNC_(array(1, 2), array(3, 4), (x, y) -> x + y));
+   array(4, 6)
+  > SELECT _FUNC_(array('a', 'b', 'c'), array('d', 'e', 'f'), (x, y) 
-> concat(x, y));
+   array('ad', 'be', 'cf')
+  """,
+  since = "2.4.0")
+// scalastyle:on line.size.limit
+case class ArraysZipWith(
+left: Expression,
+right: Expression,
+function: Expression)
+  extends HigherOrderFunction with CodegenFallback with ExpectsInputTypes {
+
+  override def inputs: Seq[Expression] = List(left, right)
+
+  override def functions: Seq[Expression] = List(function)
+
+  def expectingFunctionType: AbstractDataType = AnyDataType
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, 
ArrayType, expectingFunctionType)
+
+  override def nullable: Boolean = inputs.exists(_.nullable)
+
+  override def dataType: ArrayType = ArrayType(function.dataType, 
function.nullable)
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): ArraysZipWith = {
+val (leftElementType, leftContainsNull) = left.dataType match {
--- End diff --

You can utilize ```HigherOrderFunction.arrayArgumentType```.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-07 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208280351
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -365,3 +364,101 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val MapType(keyType, leftValueType, _) = getMapType(left)
+
+  @transient lazy val MapType(_, rightValueType, _) = getMapType(right)
+
+  @transient lazy val arrayDataUnion = new ArrayDataUnion(keyType)
+
+  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
+
+  override def inputs: Seq[Expression] = left :: right :: Nil
+
+  override def functions: Seq[Expression] = function :: Nil
+
+  override def nullable: Boolean = left.nullable || right.nullable
--- End diff --

```nullable``` flag is rather related to the cases when the whole map is 
```null```. The case that you are referring to is handled by 
```valueContainsNull``` flag of ```MapType``` (see the line 
[423](https://github.com/apache/spark/pull/22017/files/ec583eb29ba6fdb79d0b85cbecb3f709e6648b25#diff-ef52827ed9b41efc1fbd056a06ef7c6aR423)).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-07 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208211250
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -365,3 +364,101 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val MapType(keyType, leftValueType, _) = getMapType(left)
+
+  @transient lazy val MapType(_, rightValueType, _) = getMapType(right)
+
+  @transient lazy val arrayDataUnion = new ArrayDataUnion(keyType)
+
+  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
+
+  override def inputs: Seq[Expression] = left :: right :: Nil
+
+  override def functions: Seq[Expression] = function :: Nil
+
+  override def nullable: Boolean = left.nullable || right.nullable
+
+  override def dataType: DataType = MapType(keyType, function.dataType, 
function.nullable)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
+TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
+  case _ => TypeCheckResult.TypeCheckFailure(s"The input to function 
$prettyName should have " +
+s"been two ${MapType.simpleString}s with the same key type, but 
it's " +
+s"[${left.dataType.catalogString}, 
${right.dataType.catalogString}].")
+}
+  }
+
+  private def getMapType(expr: Expression) = expr.dataType match {
+case m: MapType => m
+case _ => MapType.defaultConcreteType
+  }
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): MapZipWith = {
+val arguments = Seq((keyType, false), (leftValueType, true), 
(rightValueType, true))
+copy(function = f(function, arguments))
+  }
+
+  override def eval(input: InternalRow): Any = {
+val value1 = left.eval(input)
+if (value1 == null) {
+  null
+} else {
+  val value2 = right.eval(input)
+  if (value2 == null) {
+null
+  } else {
+nullSafeEval(input, value1, value2)
+  }
+}
+  }
+
+  @transient lazy val LambdaFunction(_, Seq(
+keyVar: NamedLambdaVariable,
+value1Var: NamedLambdaVariable,
+value2Var: NamedLambdaVariable),
+_) = function
+
+  private def nullSafeEval(inputRow: InternalRow, value1: Any, value2: 
Any): Any = {
+val mapData1 = value1.asInstanceOf[MapData]
+val mapData2 = value2.asInstanceOf[MapData]
+val keys = arrayDataUnion(mapData1.keyArray(), mapData2.keyArray())
+val values = new GenericArrayData(new Array[Any](keys.numElements()))
+keys.foreach(keyType, (idx: Int, key: Any) => {
+  val v1 = GetMapValueUtil.getValueEval(mapData1, key, keyType, 
leftValueType, ordering)
--- End diff --

Ok, I will change it. Thanks a lot!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-07 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22017#discussion_r208204796
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -365,3 +364,101 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Merges two given maps into a single map by applying function to the 
pair of values with
+ * the same key.
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(map1, map2, function) - Merges two given maps into a single 
map by applying
+  function to the pair of values with the same key. For keys only 
presented in one map,
+  NULL will be passed as the value for the missing key. If an input 
map contains duplicated
+  keys, only the first entry of the duplicated key is passed into the 
lambda function.
+""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 'a', 2, 'b'), map(1, 'x', 2, 'y'), (k, v1, 
v2) -> concat(v1, v2));
+   {1:"ax",2:"by"}
+  """,
+  since = "2.4.0")
+case class MapZipWith(left: Expression, right: Expression, function: 
Expression)
+  extends HigherOrderFunction with CodegenFallback {
+
+  @transient lazy val functionForEval: Expression = functionsForEval.head
+
+  @transient lazy val MapType(keyType, leftValueType, _) = getMapType(left)
+
+  @transient lazy val MapType(_, rightValueType, _) = getMapType(right)
+
+  @transient lazy val arrayDataUnion = new ArrayDataUnion(keyType)
+
+  @transient lazy val ordering = TypeUtils.getInterpretedOrdering(keyType)
+
+  override def inputs: Seq[Expression] = left :: right :: Nil
+
+  override def functions: Seq[Expression] = function :: Nil
+
+  override def nullable: Boolean = left.nullable || right.nullable
+
+  override def dataType: DataType = MapType(keyType, function.dataType, 
function.nullable)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (MapType(k1, _, _), MapType(k2, _, _)) if k1.sameType(k2) =>
+TypeUtils.checkForOrderingExpr(k1, s"function $prettyName")
+  case _ => TypeCheckResult.TypeCheckFailure(s"The input to function 
$prettyName should have " +
+s"been two ${MapType.simpleString}s with the same key type, but 
it's " +
+s"[${left.dataType.catalogString}, 
${right.dataType.catalogString}].")
+}
+  }
+
+  private def getMapType(expr: Expression) = expr.dataType match {
+case m: MapType => m
+case _ => MapType.defaultConcreteType
+  }
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): MapZipWith = {
+val arguments = Seq((keyType, false), (leftValueType, true), 
(rightValueType, true))
+copy(function = f(function, arguments))
+  }
+
+  override def eval(input: InternalRow): Any = {
+val value1 = left.eval(input)
+if (value1 == null) {
+  null
+} else {
+  val value2 = right.eval(input)
+  if (value2 == null) {
+null
+  } else {
+nullSafeEval(input, value1, value2)
+  }
+}
+  }
+
+  @transient lazy val LambdaFunction(_, Seq(
+keyVar: NamedLambdaVariable,
+value1Var: NamedLambdaVariable,
+value2Var: NamedLambdaVariable),
+_) = function
+
+  private def nullSafeEval(inputRow: InternalRow, value1: Any, value2: 
Any): Any = {
+val mapData1 = value1.asInstanceOf[MapData]
+val mapData2 = value2.asInstanceOf[MapData]
+val keys = arrayDataUnion(mapData1.keyArray(), mapData2.keyArray())
+val values = new GenericArrayData(new Array[Any](keys.numElements()))
+keys.foreach(keyType, (idx: Int, key: Any) => {
+  val v1 = GetMapValueUtil.getValueEval(mapData1, key, keyType, 
leftValueType, ordering)
--- End diff --

Thanks for mentioning this! I'm not happy with the current complexity 
either. I've assumed that the implementation of maps will change into something 
with O(1) element access in future. By then, the complexity would be O(N) for 
types supporting equals as well and we would safe a portion of duplicated code.

If you think that maps will remain like this for a long time, really like 
your suggestion with indexes.

@ueshin What's your view on that?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function

2018-08-07 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22013#discussion_r208190999
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -365,3 +365,69 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Transform Keys in a map using the transform_keys function.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, func) - Transforms elements in a map using the 
function.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1);
+   map(array(2, 3, 4), array(1, 2, 3))
+  > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k, v) -> k + v);
+   map(array(2, 4, 6), array(1, 2, 3))
+  """,
+  since = "2.4.0")
+case class TransformKeys(
+input: Expression,
+function: Expression)
+  extends ArrayBasedHigherOrderFunction with CodegenFallback {
+
+  override def nullable: Boolean = input.nullable
+
+  override def dataType: DataType = {
+val valueType = input.dataType.asInstanceOf[MapType].valueType
+MapType(function.dataType, valueType, input.nullable)
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(MapType, 
expectingFunctionType)
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction):
+  TransformKeys = {
+val (keyElementType, valueElementType, containsNull) = input.dataType 
match {
+  case MapType(keyType, valueType, containsNullValue) =>
+(keyType, valueType, containsNullValue)
+  case _ =>
+val MapType(keyType, valueType, containsNullValue) = 
MapType.defaultConcreteType
+(keyType, valueType, containsNullValue)
+}
+copy(function = f(function, (keyElementType, false) :: 
(valueElementType, containsNull) :: Nil))
+  }
+
+  @transient lazy val (keyVar, valueVar) = {
+val LambdaFunction(
+_, (keyVar: NamedLambdaVariable) :: (valueVar: NamedLambdaVariable) :: 
Nil, _) = function
+(keyVar, valueVar)
+  }
+
+  override def eval(input: InternalRow): Any = {
+val arr = this.input.eval(input).asInstanceOf[MapData]
+if (arr == null) {
+  null
+} else {
+  val f = functionForEval
+  val resultKeys = new GenericArrayData(new 
Array[Any](arr.numElements))
+  var i = 0
+  while (i < arr.numElements) {
+keyVar.value.set(arr.keyArray().get(i, keyVar.dataType))
+valueVar.value.set(arr.valueArray().get(i, valueVar.dataType))
+resultKeys.update(i, f.eval(input))
--- End diff --

I'm not a fun of duplicated keys either, but other functions transforming 
maps have the same problem. See the discussions 
[here](https://github.com/apache/spark/pull/21282#discussion_r187234431) and 
[here](https://github.com/apache/spark/pull/21258#discussion_r186410527).

Example:
```
scala> spark.range(1).selectExpr("map(0,1,0,2)").show()
++
| map(0, 1, 0, 2)|
++
|[0 -> 1, 0 -> 2]|
++
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function

2018-08-07 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22013#discussion_r208167785
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -365,3 +365,69 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Transform Keys in a map using the transform_keys function.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, func) - Transforms elements in a map using the 
function.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1);
+   map(array(2, 3, 4), array(1, 2, 3))
+  > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k, v) -> k + v);
+   map(array(2, 4, 6), array(1, 2, 3))
+  """,
+  since = "2.4.0")
+case class TransformKeys(
+input: Expression,
+function: Expression)
+  extends ArrayBasedHigherOrderFunction with CodegenFallback {
+
+  override def nullable: Boolean = input.nullable
+
+  override def dataType: DataType = {
+val valueType = input.dataType.asInstanceOf[MapType].valueType
+MapType(function.dataType, valueType, input.nullable)
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(MapType, 
expectingFunctionType)
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction):
+  TransformKeys = {
+val (keyElementType, valueElementType, containsNull) = input.dataType 
match {
+  case MapType(keyType, valueType, containsNullValue) =>
+(keyType, valueType, containsNullValue)
+  case _ =>
+val MapType(keyType, valueType, containsNullValue) = 
MapType.defaultConcreteType
+(keyType, valueType, containsNullValue)
+}
+copy(function = f(function, (keyElementType, false) :: 
(valueElementType, containsNull) :: Nil))
+  }
+
+  @transient lazy val (keyVar, valueVar) = {
+val LambdaFunction(
+_, (keyVar: NamedLambdaVariable) :: (valueVar: NamedLambdaVariable) :: 
Nil, _) = function
+(keyVar, valueVar)
+  }
+
+  override def eval(input: InternalRow): Any = {
+val arr = this.input.eval(input).asInstanceOf[MapData]
+if (arr == null) {
+  null
+} else {
+  val f = functionForEval
+  val resultKeys = new GenericArrayData(new 
Array[Any](arr.numElements))
+  var i = 0
+  while (i < arr.numElements) {
+keyVar.value.set(arr.keyArray().get(i, keyVar.dataType))
+valueVar.value.set(arr.valueArray().get(i, valueVar.dataType))
+resultKeys.update(i, f.eval(input))
--- End diff --

Maybe I'm missing something, but couldn't ```f.eval(input)``` be evaluated 
to ```null```? Keys are not allowed to be```null```. Other functions have 
usually a ```null``` check and throw ```RuntimeException``` for such cases.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function

2018-08-07 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22013#discussion_r208161643
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -365,3 +365,69 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Transform Keys in a map using the transform_keys function.
--- End diff --

maybe a better comment?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function

2018-08-07 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22013#discussion_r208169969
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/HigherOrderFunctionsSuite.scala
 ---
@@ -181,4 +187,46 @@ class HigherOrderFunctionsSuite extends SparkFunSuite 
with ExpressionEvalHelper
 (acc, array) => coalesce(aggregate(array, acc, (acc, elem) => acc 
+ elem), acc)),
   15)
   }
+
+  test("TransformKeys") {
+val ai0 = Literal.create(
+  Map(1 -> 1, 2 -> 2, 3 -> 3),
--- End diff --

It's maybe irrelevant but WDYT about adding test cases with ```null``` 
values?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function

2018-08-07 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22013#discussion_r208164141
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -365,3 +365,69 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Transform Keys in a map using the transform_keys function.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, func) - Transforms elements in a map using the 
function.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1);
+   map(array(2, 3, 4), array(1, 2, 3))
+  > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k, v) -> k + v);
+   map(array(2, 4, 6), array(1, 2, 3))
+  """,
+  since = "2.4.0")
+case class TransformKeys(
+input: Expression,
+function: Expression)
+  extends ArrayBasedHigherOrderFunction with CodegenFallback {
+
+  override def nullable: Boolean = input.nullable
+
+  override def dataType: DataType = {
+val valueType = input.dataType.asInstanceOf[MapType].valueType
+MapType(function.dataType, valueType, input.nullable)
--- End diff --

Is there any reason for changing ```valueContainsNull``` flag if the 
function transforms just keys? WDYT about:
```
val MapType(_, valueType, valueContainsNull) = 
input.dataType.asInstanceOf[MapType]
MapType(function.dataType, valueType, valueContainsNull)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function

2018-08-07 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22013#discussion_r208169140
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala ---
@@ -2071,6 +2071,158 @@ class DataFrameFunctionsSuite extends QueryTest 
with SharedSQLContext {
 assert(ex4.getMessage.contains("data type mismatch: argument 3 
requires int type"))
   }
 
+  test("transform keys function - test various primitive data types 
combinations") {
+val dfExample1 = Seq(
+  Map[Int, Int](1 -> 1, 9 -> 9, 8 -> 8, 7 -> 7)
+).toDF("i")
+
+val dfExample2 = Seq(
+  Map[Int, String](1 -> "a", 2 -> "b", 3 -> "c")
+).toDF("x")
+
+val dfExample3 = Seq(
+  Map[String, Int]("a" -> 1, "b" -> 2, "c" -> 3)
+).toDF("y")
+
+val dfExample4 = Seq(
+  Map[Int, Double](1 -> 1.0E0, 2 -> 1.4E0, 3 -> 1.7E0)
+).toDF("z")
+
+val dfExample5 = Seq(
+  Map[Int, Boolean](25 -> true, 26 -> false)
+).toDF("a")
+
+val dfExample6 = Seq(
+  Map[Int, String](25 -> "ab", 26 -> "cd")
+).toDF("b")
+
+val dfExample7 = Seq(
+  Map[Array[Int], Boolean](Array(1, 2) -> false)
+).toDF("c")
+
+
+def testMapOfPrimitiveTypesCombination(): Unit = {
+  checkAnswer(dfExample1.selectExpr("transform_keys(i, (k, v) -> k + 
v)"),
+Seq(Row(Map(2 -> 1, 18 -> 9, 16 -> 8, 14 -> 7
+
+  checkAnswer(dfExample2.selectExpr("transform_keys(x, (k, v) -> k + 
1)"),
+Seq(Row(Map(2 -> "a", 3 -> "b", 4 -> "c"
+
+  checkAnswer(dfExample3.selectExpr("transform_keys(y, (k, v) -> v * 
v)"),
+Seq(Row(Map(1 -> 1, 4 -> 2, 9 -> 3
+
+  checkAnswer(dfExample3.selectExpr("transform_keys(y, (k, v) -> 
length(k) + v)"),
+Seq(Row(Map(2 -> 1, 3 -> 2, 4 -> 3
+
+  checkAnswer(
+dfExample3.selectExpr("transform_keys(y, (k, v) -> concat(k, 
cast(v as String)))"),
+Seq(Row(Map("a1" -> 1, "b2" -> 2, "c3" -> 3
+
+  checkAnswer(dfExample4.selectExpr("transform_keys(z, " +
+"(k, v) -> map_from_arrays(ARRAY(1, 2, 3), ARRAY('one', 'two', 
'three'))[k])"),
+Seq(Row(Map("one" -> 1.0, "two" -> 1.4, "three" -> 1.7
+
+  checkAnswer(dfExample4.selectExpr("transform_keys(z, (k, v) -> 
CAST(v * 2 AS BIGINT) + k)"),
+Seq(Row(Map(3 -> 1.0, 4 -> 1.4, 6 -> 1.7
+
+  checkAnswer(dfExample4.selectExpr("transform_keys(z, (k, v) -> k + 
v)"),
+Seq(Row(Map(2.0 -> 1.0, 3.4 -> 1.4, 4.7 -> 1.7
+
+  checkAnswer(dfExample5.selectExpr("transform_keys(a, (k, v) ->  k % 
2 = 0 OR v)"),
+Seq(Row(Map(true -> true, true -> false
+
+  checkAnswer(dfExample5.selectExpr("transform_keys(a, (k, v) -> if(v, 
2 * k, 3 * k))"),
+Seq(Row(Map(50 -> true, 78 -> false
+
+  checkAnswer(dfExample5.selectExpr("transform_keys(a, (k, v) -> if(v, 
2 * k, 3 * k))"),
+Seq(Row(Map(50 -> true, 78 -> false
+
+  checkAnswer(dfExample6.selectExpr(
+"transform_keys(b, (k, v) ->  concat(conv(k, 10, 16) , substr(v, 
1, 1)))"),
+Seq(Row(Map("19a" -> "ab", "1Ac" -> "cd"
+
+  checkAnswer(dfExample7.selectExpr("transform_keys(c, (k, v) -> 
array_contains(k, 3) AND v)"),
+Seq(Row(Map(false -> false
+}
+// Test with local relation, the Project will be evaluated without 
codegen
+testMapOfPrimitiveTypesCombination()
+dfExample1.cache()
+dfExample2.cache()
+dfExample3.cache()
+dfExample4.cache()
+dfExample5.cache()
+dfExample6.cache()
+// Test with cached relation, the Project will be evaluated with 
codegen
+testMapOfPrimitiveTypesCombination()
--- End diff --

Do we have do that if the expression implements ```CodegenFallback```?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22013: [SPARK-23939][SQL] Add transform_keys function

2018-08-07 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/22013#discussion_r208160707
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -365,3 +365,69 @@ case class ArrayAggregate(
 
   override def prettyName: String = "aggregate"
 }
+
+/**
+ * Transform Keys in a map using the transform_keys function.
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(expr, func) - Transforms elements in a map using the 
function.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(map(array(1, 2, 3), array(1, 2, 3), (k,v) -> k + 1);
--- End diff --

nit: missing space -> ```k, v```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-06 Thread mn-mikke
Github user mn-mikke commented on the issue:

https://github.com/apache/spark/pull/22017
  
cc @ueshin @mgaido91 @hvanhovell 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22017: [SPARK-23938][SQL] Add map_zip_with function

2018-08-06 Thread mn-mikke
GitHub user mn-mikke opened a pull request:

https://github.com/apache/spark/pull/22017

[SPARK-23938][SQL] Add map_zip_with function

## What changes were proposed in this pull request?

This PR adds a new SQL function called ```map_zip_with```. It merges the 
two given maps into a single map by applying function to the pair of values 
with the same key. 

## How was this patch tested?

Added new tests into:
- DataFrameFunctionsSuite.scala
- HigherOrderFunctionsSuite.scala


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mn-mikke/spark SPARK-23938

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22017.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22017


commit ef56011f03d8bae4634e5d3108e4d6502482383c
Author: Marek Novotny 
Date:   2018-08-06T23:42:45Z

[SPARK-23938][SQL] Add map_zip_with function




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21986: [SPARK-23937][SQL] Add map_filter SQL function

2018-08-06 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21986#discussion_r207908454
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala
 ---
@@ -205,29 +230,85 @@ case class ArrayTransform(
 (elementVar, indexVar)
   }
 
-  override def eval(input: InternalRow): Any = {
-val arr = this.input.eval(input).asInstanceOf[ArrayData]
-if (arr == null) {
-  null
-} else {
-  val f = functionForEval
-  val result = new GenericArrayData(new Array[Any](arr.numElements))
-  var i = 0
-  while (i < arr.numElements) {
-elementVar.value.set(arr.get(i, elementVar.dataType))
-if (indexVar.isDefined) {
-  indexVar.get.value.set(i)
-}
-result.update(i, f.eval(input))
-i += 1
+  override def nullSafeEval(inputRow: InternalRow, inputValue: Any): Any = 
{
+val arr = inputValue.asInstanceOf[ArrayData]
+val f = functionForEval
+val result = new GenericArrayData(new Array[Any](arr.numElements))
+var i = 0
+while (i < arr.numElements) {
+  elementVar.value.set(arr.get(i, elementVar.dataType))
+  if (indexVar.isDefined) {
+indexVar.get.value.set(i)
   }
-  result
+  result.update(i, f.eval(inputRow))
+  i += 1
 }
+result
   }
 
   override def prettyName: String = "transform"
 }
 
+/**
+ * Filters entries in a map using the provided function.
+ */
+@ExpressionDescription(
+usage = "_FUNC_(expr, func) - Filters entries in a map using the 
function.",
+examples = """
+Examples:
+  > SELECT _FUNC_(map(1, 0, 2, 2, 3, -1), (k, v) -> k > v);
+   [1 -> 0, 3 -> -1]
+  """,
+since = "2.4.0")
+case class MapFilter(
+input: Expression,
+function: Expression)
+  extends MapBasedUnaryHigherOrderFunction with CodegenFallback {
+
+  @transient val (keyType, valueType, valueContainsNull) = input.dataType 
match {
+case MapType(kType, vType, vContainsNull) => (kType, vType, 
vContainsNull)
+case _ =>
+  val MapType(kType, vType, vContainsNull) = 
MapType.defaultConcreteType
+  (kType, vType, vContainsNull)
+  }
+
+  @transient lazy val (keyVar, valueVar) = {
+val args = function.asInstanceOf[LambdaFunction].arguments
+(args.head.asInstanceOf[NamedLambdaVariable], 
args.tail.head.asInstanceOf[NamedLambdaVariable])
+  }
+
+  override def bind(f: (Expression, Seq[(DataType, Boolean)]) => 
LambdaFunction): MapFilter = {
+function match {
+  case LambdaFunction(_, _, _) =>
--- End diff --

Is this pattern matching necessary? If so, shouldn't ```ArrayFilter``` use 
it as well?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21982: [SPARK-23909][SQL] Add aggregate function.

2018-08-03 Thread mn-mikke
Github user mn-mikke commented on the issue:

https://github.com/apache/spark/pull/21982
  
Isn't this PR related to the Jira ticket 
[SPARK-23911](https://issues.apache.org/jira/browse/SPARK-23911)?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21236: [SPARK-23935][SQL] Adding map_entries function

2018-08-02 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21236#discussion_r207148777
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala
 ---
@@ -98,6 +98,9 @@ trait ExpressionEvalHelper extends 
GeneratorDrivenPropertyChecks {
 if (expected.isNaN) result.isNaN else expected == result
   case (result: Float, expected: Float) =>
 if (expected.isNaN) result.isNaN else expected == result
+  case (result: UnsafeRow, expected: GenericInternalRow) =>
--- End diff --

Hi @srowen,
```(InternalRow, InternalRow)``` case was introduced later in 
[21838](https://github.com/apache/spark/pull/21838) and covers the logic of the 
case with ```UnsafeRow```. So we can just remove the unreachable piece of code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-30 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r206228544
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3651,14 +3651,9 @@ case class ArrayDistinct(child: Expression)
 }
 
 /**
- * Will become common base class for [[ArrayUnion]], ArrayIntersect, and 
ArrayExcept.
+ * Will become common base class for [[ArrayUnion]], ArrayIntersect, and 
[[ArrayExcept]].
  */
 abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast {
--- End diff --

I'm not sure to what level Presto is considered as a reference. Just FYI 
these operations in Presto can accept arrays of different types.
```
presto:default> SELECT array_except(ARRAY[5, 1, 7], ARRAY[7.0, 1.0, 3.0]);
 _col0 
---
 [5.0] 
(1 row)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-27 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205847180
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3968,3 +3964,317 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike
+with ComplexTypeMergingExpression {
--- End diff --

nit: indentation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-27 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205848094
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3968,3 +3964,317 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike
+with ComplexTypeMergingExpression {
+  override def dataType: DataType = {
+dataTypeCheck
--- End diff --

Why do we need this check (see the question 
[here](https://github.com/apache/spark/pull/21103#discussion_r205806876))?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-27 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205806876
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,233 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
--- End diff --

Maybe I'm missing something, but why we need to apply these checks if there 
won't be any ```null``` flag merging performed?

If ```left.dataType``` and ```right.dataType``` are different, will be 
casted according to the ```ImplicitTypeCasts``` coercion rule. If they differ 
only in ```null``` flags, ```left.dataType``` could be directly returned since 
there won't be any array elements from ```right``` present in the result.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21103: [SPARK-23915][SQL] Add array_except function

2018-07-26 Thread mn-mikke
Github user mn-mikke commented on the issue:

https://github.com/apache/spark/pull/21103
  
Added few minor comments, but nothing serious to be solved right now.

LGTM



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-26 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205395435
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,330 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = left.dataType
+
+  var hsInt: OpenHashSet[Int] = _
+  var hsLong: OpenHashSet[Long] = _
+
+  def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getInt(idx)
+if (!hsInt.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setInt(pos, elem)
+  }
+  hsInt.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getLong(idx)
+if (!hsLong.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setLong(pos, elem)
+  }
+  hsLong.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def evalIntLongPrimitiveType(
+  array1: ArrayData,
+  array2: ArrayData,
+  resultArray: ArrayData,
+  isLongType: Boolean): Int = {
+// store elements into resultArray
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val assigned = if (!isLongType) {
+  hsInt.add(array2.getInt(i))
+} else {
+  hsLong.add(array2.getLong(i))
+}
+  }
+  i += 1
+}
+var pos = 0
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  if (resultArray != null) {
+resultArray.setNullAt(pos)
+  }
+  pos += 1
+  notFoundNullElement = false
+}
+  } else {
+val assigned = if (!isLongType) {
+  assignInt(array1, i, resultArray, pos)
+} else {
+  assignLong(array1, i, resultArray, pos)
+}
+if (assigned) {
+  pos += 1
+}
+  }
+  i += 1
+}
+pos
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+if (elementTypeSupportEquals) {
+  elementType match {
+case IntegerType =>
+  // avoid boxing of primitive int array elements
+  // calculate result array size
+  hsInt = new OpenHashSet[Int]
+  val elements = evalIntLongPrimitiveType(array1, array2, null, 
false)
+  // allocate result array
+  hsInt = new OpenHashSet[Int]
+  val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData(
+IntegerType.defaultSize, elements)) {
+new GenericArrayData(new Array[Any](elements))
+  } else {
+UnsafeArrayData.forPrimitiveArray(
+  Platform.INT_ARRAY_OFFSET, elements, IntegerType.defaultSize)
+  }
+  // assign elements into the result array
+  evalIntLongPrimitiveType(array1, array2, resultArray, false)
+  resultArray
+case LongType =>
+  // avoid boxing of primitive long array elements
+  // calculate result array size
+  hsLong = new OpenHashSet[Long]
+  val elements = evalIntLongPrimitiveType(array1, array2, null, 
true)
+  // allocate result array
+  hsLong = new OpenHashSet[Long]
+  val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData(
+LongType.defaultSize, elements)) {
+new GenericArrayData(new Array[Any](elements))
+  } else {
+UnsafeArrayData.forPrimitiv

[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-26 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205397226
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,330 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = left.dataType
+
+  var hsInt: OpenHashSet[Int] = _
+  var hsLong: OpenHashSet[Long] = _
+
+  def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getInt(idx)
+if (!hsInt.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setInt(pos, elem)
+  }
+  hsInt.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getLong(idx)
+if (!hsLong.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setLong(pos, elem)
+  }
+  hsLong.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def evalIntLongPrimitiveType(
+  array1: ArrayData,
+  array2: ArrayData,
+  resultArray: ArrayData,
+  isLongType: Boolean): Int = {
+// store elements into resultArray
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val assigned = if (!isLongType) {
+  hsInt.add(array2.getInt(i))
+} else {
+  hsLong.add(array2.getLong(i))
+}
+  }
+  i += 1
+}
+var pos = 0
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  if (resultArray != null) {
+resultArray.setNullAt(pos)
+  }
+  pos += 1
+  notFoundNullElement = false
+}
+  } else {
+val assigned = if (!isLongType) {
+  assignInt(array1, i, resultArray, pos)
+} else {
+  assignLong(array1, i, resultArray, pos)
+}
+if (assigned) {
+  pos += 1
+}
+  }
+  i += 1
+}
+pos
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+if (elementTypeSupportEquals) {
+  elementType match {
+case IntegerType =>
+  // avoid boxing of primitive int array elements
+  // calculate result array size
+  hsInt = new OpenHashSet[Int]
+  val elements = evalIntLongPrimitiveType(array1, array2, null, 
false)
+  // allocate result array
+  hsInt = new OpenHashSet[Int]
+  val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData(
+IntegerType.defaultSize, elements)) {
+new GenericArrayData(new Array[Any](elements))
+  } else {
+UnsafeArrayData.forPrimitiveArray(
+  Platform.INT_ARRAY_OFFSET, elements, IntegerType.defaultSize)
+  }
+  // assign elements into the result array
+  evalIntLongPrimitiveType(array1, array2, resultArray, false)
+  resultArray
+case LongType =>
+  // avoid boxing of primitive long array elements
+  // calculate result array size
+  hsLong = new OpenHashSet[Long]
+  val elements = evalIntLongPrimitiveType(array1, array2, null, 
true)
+  // allocate result array
+  hsLong = new OpenHashSet[Long]
+  val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData(
+LongType.defaultSize, elements)) {
+new GenericArrayData(new Array[Any](elements))
+  } else {
+UnsafeArrayData.forPrimitiv

[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-26 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205392257
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,330 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = left.dataType
+
+  var hsInt: OpenHashSet[Int] = _
+  var hsLong: OpenHashSet[Long] = _
+
+  def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getInt(idx)
+if (!hsInt.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setInt(pos, elem)
+  }
+  hsInt.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getLong(idx)
+if (!hsLong.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setLong(pos, elem)
+  }
+  hsLong.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def evalIntLongPrimitiveType(
+  array1: ArrayData,
+  array2: ArrayData,
+  resultArray: ArrayData,
+  isLongType: Boolean): Int = {
+// store elements into resultArray
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val assigned = if (!isLongType) {
+  hsInt.add(array2.getInt(i))
+} else {
+  hsLong.add(array2.getLong(i))
+}
+  }
+  i += 1
+}
+var pos = 0
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  if (resultArray != null) {
+resultArray.setNullAt(pos)
+  }
+  pos += 1
+  notFoundNullElement = false
+}
+  } else {
+val assigned = if (!isLongType) {
+  assignInt(array1, i, resultArray, pos)
+} else {
+  assignLong(array1, i, resultArray, pos)
+}
+if (assigned) {
+  pos += 1
+}
+  }
+  i += 1
+}
+pos
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+if (elementTypeSupportEquals) {
+  elementType match {
+case IntegerType =>
+  // avoid boxing of primitive int array elements
+  // calculate result array size
+  hsInt = new OpenHashSet[Int]
+  val elements = evalIntLongPrimitiveType(array1, array2, null, 
false)
+  // allocate result array
+  hsInt = new OpenHashSet[Int]
+  val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData(
+IntegerType.defaultSize, elements)) {
+new GenericArrayData(new Array[Any](elements))
+  } else {
+UnsafeArrayData.forPrimitiveArray(
+  Platform.INT_ARRAY_OFFSET, elements, IntegerType.defaultSize)
+  }
+  // assign elements into the result array
+  evalIntLongPrimitiveType(array1, array2, resultArray, false)
+  resultArray
+case LongType =>
+  // avoid boxing of primitive long array elements
+  // calculate result array size
+  hsLong = new OpenHashSet[Long]
+  val elements = evalIntLongPrimitiveType(array1, array2, null, 
true)
+  // allocate result array
+  hsLong = new OpenHashSet[Long]
+  val resultArray = if (UnsafeArrayData.shouldUseGenericArrayData(
+LongType.defaultSize, elements)) {
+new GenericArrayData(new Array[Any](elements))
+  } else {
+UnsafeArrayData.forPrimitiv

[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-26 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r205397802
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,330 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = left.dataType
+
+  var hsInt: OpenHashSet[Int] = _
+  var hsLong: OpenHashSet[Long] = _
+
+  def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getInt(idx)
+if (!hsInt.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setInt(pos, elem)
+  }
+  hsInt.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getLong(idx)
+if (!hsLong.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setLong(pos, elem)
+  }
+  hsLong.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def evalIntLongPrimitiveType(
+  array1: ArrayData,
+  array2: ArrayData,
+  resultArray: ArrayData,
+  isLongType: Boolean): Int = {
+// store elements into resultArray
+var notFoundNullElement = true
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+notFoundNullElement = false
+  } else {
+val assigned = if (!isLongType) {
+  hsInt.add(array2.getInt(i))
+} else {
+  hsLong.add(array2.getLong(i))
+}
+  }
+  i += 1
+}
+var pos = 0
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (notFoundNullElement) {
+  if (resultArray != null) {
+resultArray.setNullAt(pos)
+  }
+  pos += 1
+  notFoundNullElement = false
+}
+  } else {
+val assigned = if (!isLongType) {
+  assignInt(array1, i, resultArray, pos)
+} else {
+  assignLong(array1, i, resultArray, pos)
+}
+if (assigned) {
+  pos += 1
+}
+  }
+  i += 1
+}
+pos
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+if (elementTypeSupportEquals) {
+  elementType match {
+case IntegerType =>
+  // avoid boxing of primitive int array elements
+  // calculate result array size
+  hsInt = new OpenHashSet[Int]
+  val elements = evalIntLongPrimitiveType(array1, array2, null, 
false)
--- End diff --

WDYT about using an ```Option``` instead of ```null``` value?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21103: [SPARK-23915][SQL] Add array_except function

2018-07-24 Thread mn-mikke
Github user mn-mikke commented on the issue:

https://github.com/apache/spark/pull/21103
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21102: [SPARK-23913][SQL] Add array_intersect function

2018-07-23 Thread mn-mikke
Github user mn-mikke commented on the issue:

https://github.com/apache/spark/pull/21102
  
Sure. Also cc @gatorsmile, who created the Jira ticket.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21102: [SPARK-23913][SQL] Add array_intersect function

2018-07-23 Thread mn-mikke
Github user mn-mikke commented on the issue:

https://github.com/apache/spark/pull/21102
  
It seems that Presto returns the result in ascending order.
```
presto> SELECT array_intersect(ARRAY[5, 8, null, 1], ARRAY[8, null, 1, 5]);
  _col0  
-
 [null, 1, 5, 8] 
(1 row)
```
Shouldn't we follow the same behavior if Presto is used as a reference?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-07-23 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r204349890
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3801,339 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in the 
intersection of array1 and
+array2, without duplicates.
+  """,
+  examples = """
+Examples:Fun
--- End diff --

Just ```Examples:```?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21687: [SPARK-24165][SQL] Fixing conditional expressions...

2018-07-23 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21687#discussion_r204340969
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 ---
@@ -695,6 +695,41 @@ abstract class TernaryExpression extends Expression {
   }
 }
 
+/**
+ * A trait resolving nullable, containsNull, valueContainsNull flags of 
the output date type.
+ * This logic is usually utilized by expressions combining data from 
multiple child expressions
+ * of non-primitive types (e.g. [[CaseWhen]]).
+ */
+trait ComplexTypeMergingExpression extends Expression {
+
+  /**
+   * A collection of data types used for resolution the output type of the 
expression. By default,
+   * data types of all child expressions. The collection must not be empty.
+   */
+  @transient
+  lazy val inputTypesForMerging: Seq[DataType] = children.map(_.dataType)
+
+  /**
+   * A method determining whether the input types are equal ignoring 
nullable, containsNull and
+   * valueContainsNull flags and thus convenient for resolution of the 
final data type.
+   */
+  def areInputTypesForMergingEqual: Boolean = {
+inputTypesForMerging.length <= 1 || inputTypesForMerging.sliding(2, 
1).forall {
+  case Seq(dt1, dt2) => dt1.sameType(dt2)
+}
+  }
+
+  override def dataType: DataType = {
+require(
+  inputTypesForMerging.nonEmpty,
+  "The collection of input data types must not be empty.")
+require(
+  areInputTypesForMergingEqual,
+  "All input types must be the same except nullable, containsNull, 
valueContainsNull flags.")
--- End diff --

```NullType``` is ```sameType``` equal only with itself. So it's up to 
coercion rules to cast ```NullType``` to a common type of other children. All 
the rules transitively utilize ```TypeCoercion.findTightestCommonType``` that 
does that job. (```case (NullType, t1) => Some(t1)```, ```case (t1, 
NullType) => Some(t1)```)

If I don't miss anything, this behavior was the same before this PR. I can 
add more tests to cover this scenario if you wish. :-)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21830: [SPARK-24878][SQL] Fix reverse function for array type o...

2018-07-20 Thread mn-mikke
Github user mn-mikke commented on the issue:

https://github.com/apache/spark/pull/21830
  
Thanks @ueshin for this PR! Good to know about the re-assignments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-20 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r203964623
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,332 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:Fun
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = ArrayType(elementType,
--- End diff --

Yeah this case is valid. But ```containsNull``` flag is defined for the 
whole column (accross multiple rows). Since this flag could cause removal of 
null safe check in expressions that will use ```ArrayExcept``` as a child, it 
could lead to failures with ```NullPointerException``` for the cases as in the 
second row of the example dataset. WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-19 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r203746577
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,332 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:Fun
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(2)
+  """,
+  since = "2.4.0")
+case class ArrayExcept(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = ArrayType(elementType,
--- End diff --

What about ```ArrayType(elementType, 
left.dataType.asInstanceOf[ArrayType].containsNull)```?

Example df:

left | right | result
--- | --- | ---
[1,2] | [1, null] | [2]
[1, null, 3] | [1,2] | **[null, 3]**


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21103: [SPARK-23915][SQL] Add array_except function

2018-07-19 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21103#discussion_r203741325
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3799,332 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in array1 but 
not in array2,
+without duplicates.
+  """,
+  examples = """
+Examples:Fun
--- End diff --

just "Examples:"?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21774: [SPARK-24811][SQL]Avro: add new function from_avr...

2018-07-19 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21774#discussion_r203693021
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala 
---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.avro.generic.GenericDatumReader
+import org.apache.avro.io.{BinaryDecoder, DecoderFactory}
+
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters, 
SerializableSchema}
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, 
Expression, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType}
+
+case class AvroDataToCatalyst(child: Expression, avroType: 
SerializableSchema)
+  extends UnaryExpression with CodegenFallback with ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)
+
+  override lazy val dataType: DataType =
--- End diff --

Since the schema could be quite complex, I am wondering about sending the 
same schema to executors twice (```dateType```, ```avroType```). But yeah, 
transmission of an extra payload and deserialization of ```dataType``` might be 
faster then the schema conversion.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21774: [SPARK-24811][SQL]Avro: add new function from_avr...

2018-07-19 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21774#discussion_r203678944
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala ---
@@ -36,4 +40,27 @@ package object avro {
 @scala.annotation.varargs
 def avro(sources: String*): DataFrame = 
reader.format("avro").load(sources: _*)
   }
+
--- End diff --

Thanks guys for your explanation!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21774: [SPARK-24811][SQL]Avro: add new function from_avr...

2018-07-18 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21774#discussion_r203422909
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala 
---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.ByteArrayOutputStream
+
+import org.apache.avro.generic.GenericDatumWriter
+import org.apache.avro.io.{BinaryEncoder, EncoderFactory}
+
+import org.apache.spark.sql.avro.{AvroSerializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.types.{BinaryType, DataType}
+
--- End diff --

```ExpressionDescription``` and javadoc?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21774: [SPARK-24811][SQL]Avro: add new function from_avr...

2018-07-18 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21774#discussion_r203424033
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/package.scala ---
@@ -36,4 +40,27 @@ package object avro {
 @scala.annotation.varargs
 def avro(sources: String*): DataFrame = 
reader.format("avro").load(sources: _*)
   }
+
--- End diff --

Why these two functions are not a part of 
```org.apache.spark.sql.functions```?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21774: [SPARK-24811][SQL]Avro: add new function from_avr...

2018-07-18 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21774#discussion_r203418945
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala 
---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.avro.generic.GenericDatumReader
+import org.apache.avro.io.{BinaryDecoder, DecoderFactory}
+
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters, 
SerializableSchema}
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, 
Expression, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType}
+
+case class AvroDataToCatalyst(child: Expression, avroType: 
SerializableSchema)
+  extends UnaryExpression with CodegenFallback with ExpectsInputTypes {
--- End diff --

see the 
[comment](https://github.com/apache/spark/pull/21061#discussion_r181399858) 
about ```CodegenFallback```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21774: [SPARK-24811][SQL]Avro: add new function from_avr...

2018-07-18 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21774#discussion_r203416936
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala 
---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.avro.generic.GenericDatumReader
+import org.apache.avro.io.{BinaryDecoder, DecoderFactory}
+
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters, 
SerializableSchema}
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, 
Expression, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType}
+
+case class AvroDataToCatalyst(child: Expression, avroType: 
SerializableSchema)
+  extends UnaryExpression with CodegenFallback with ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(BinaryType)
+
+  override lazy val dataType: DataType =
--- End diff --

```@transient```?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21774: [SPARK-24811][SQL]Avro: add new function from_avr...

2018-07-18 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21774#discussion_r203422417
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala 
---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.ByteArrayOutputStream
+
+import org.apache.avro.generic.GenericDatumWriter
+import org.apache.avro.io.{BinaryEncoder, EncoderFactory}
+
+import org.apache.spark.sql.avro.{AvroSerializer, SchemaConverters}
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.types.{BinaryType, DataType}
+
+case class CatalystDataToAvro(child: Expression) extends UnaryExpression 
with CodegenFallback {
+
+  override lazy val dataType: DataType = BinaryType
--- End diff --

just ```def```?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21774: [SPARK-24811][SQL]Avro: add new function from_avr...

2018-07-18 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21774#discussion_r203415956
  
--- Diff: 
external/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala 
---
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.avro.generic.GenericDatumReader
+import org.apache.avro.io.{BinaryDecoder, DecoderFactory}
+
+import org.apache.spark.sql.avro.{AvroDeserializer, SchemaConverters, 
SerializableSchema}
+import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, 
Expression, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType}
+
--- End diff --

I would add ```ExpressionDescription``` and javadoc here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21802: [SPARK-23928][SQL] Add shuffle collection functio...

2018-07-18 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21802#discussion_r203388798
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2382,6 +2382,20 @@ def array_sort(col):
 return Column(sc._jvm.functions.array_sort(_to_java_column(col)))
 
 
+@since(2.4)
+def shuffle(col):
+"""
+Collection function: Generates a random permutation of the given array.
+
+.. note:: The function is non-deterministic because its results 
depends on order of rows which
--- End diff --

Isn't it non-deterministic rather for the fact that the permutation is 
determined randomly?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21802: [SPARK-23928][SQL] Add shuffle collection functio...

2018-07-18 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21802#discussion_r203407122
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala ---
@@ -1444,6 +1444,51 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSQLContext {
 )
   }
 
+  test("shuffle function") {
+// Shuffle expressions should produce same results at retries in the 
same DataFrame.
+def checkResult(df: DataFrame): Unit = {
+  checkAnswer(df, df.collect())
+}
+
+// primitive-type elements
+val idf = Seq(
+  Seq(1, 9, 8, 7),
+  Seq(5, 8, 9, 7, 2),
+  Seq.empty,
+  null
+).toDF("i")
+
+def checkResult1(): Unit = {
--- End diff --

Maybe a different name for the method?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21795: [SPARK-24840][SQL] do not use dummy filter to swi...

2018-07-18 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21795#discussion_r203379244
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala ---
@@ -1147,65 +1149,66 @@ class DataFrameFunctionsSuite extends QueryTest 
with SharedSQLContext {
 val nseqi : Seq[Int] = null
 val nseqs : Seq[String] = null
 val df = Seq(
-
   (Seq(1), Seq(2, 3), Seq(5L, 6L), nseqi, Seq("a", "b", "c"), Seq("d", 
"e"), Seq("f"), nseqs),
   (Seq(1, 0), Seq.empty[Int], Seq(2L), nseqi, Seq("a"), 
Seq.empty[String], Seq(null), nseqs)
 ).toDF("i1", "i2", "i3", "in", "s1", "s2", "s3", "sn")
 
-val dummyFilter = (c: Column) => c.isNull || c.isNotNull // switch 
codeGen on
-
 // Simple test cases
-checkAnswer(
--- End diff --

Good catch!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21795: [SPARK-24840][SQL] do not use dummy filter to swi...

2018-07-18 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21795#discussion_r203378508
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala ---
@@ -924,26 +926,26 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSQLContext {
   null
 ).toDF("i")
 
-checkAnswer(
-  idf.select(reverse('i)),
-  Seq(Row(Seq(7, 8, 9, 1)), Row(Seq(2, 7, 9, 8, 5)), Row(Seq.empty), 
Row(null))
-)
-checkAnswer(
-  idf.filter(dummyFilter('i)).select(reverse('i)),
-  Seq(Row(Seq(7, 8, 9, 1)), Row(Seq(2, 7, 9, 8, 5)), Row(Seq.empty), 
Row(null))
-)
-checkAnswer(
-  idf.selectExpr("reverse(i)"),
-  Seq(Row(Seq(7, 8, 9, 1)), Row(Seq(2, 7, 9, 8, 5)), Row(Seq.empty), 
Row(null))
-)
-checkAnswer(
-  oneRowDF.selectExpr("reverse(array(1, null, 2, null))"),
-  Seq(Row(Seq(null, 2, null, 1)))
-)
-checkAnswer(
-  oneRowDF.filter(dummyFilter('i)).selectExpr("reverse(array(1, null, 
2, null))"),
-  Seq(Row(Seq(null, 2, null, 1)))
-)
+def checkResult2(): Unit = {
--- End diff --

What about using more specific names for functions ```checkResult2```, 
```checkResult3``` etc.? Maybe ```checkStringTestCases```, 
```checkCasesWithArraysOfComplexTypes``` or something like that? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21795: [SPARK-24840][SQL] do not use dummy filter to swi...

2018-07-18 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21795#discussion_r203305670
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
---
@@ -2336,46 +2336,40 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
 
 val sourceDF = spark.createDataFrame(rows, schema)
 
-val structWhenDF = sourceDF
+def structWhenDF: DataFrame = sourceDF
   .select(when('cond, struct(lit("a").as("val1"), 
lit(10).as("val2"))).otherwise('s) as "res")
   .select('res.getField("val1"))
-val arrayWhenDF = sourceDF
+def arrayWhenDF: DataFrame = sourceDF
   .select(when('cond, array(lit("a"), lit("b"))).otherwise('a) as 
"res")
   .select('res.getItem(0))
-val mapWhenDF = sourceDF
+def mapWhenDF: DataFrame = sourceDF
   .select(when('cond, map(lit(0), lit("a"))).otherwise('m) as "res")
   .select('res.getItem(0))
 
-val structIfDF = sourceDF
+def structIfDF: DataFrame = sourceDF
   .select(expr("if(cond, struct('a' as val1, 10 as val2), s)") as 
"res")
   .select('res.getField("val1"))
-val arrayIfDF = sourceDF
+def arrayIfDF: DataFrame = sourceDF
   .select(expr("if(cond, array('a', 'b'), a)") as "res")
   .select('res.getItem(0))
-val mapIfDF = sourceDF
+def mapIfDF: DataFrame = sourceDF
   .select(expr("if(cond, map(0, 'a'), m)") as "res")
   .select('res.getItem(0))
 
-def checkResult(df: DataFrame, codegenExpected: Boolean): Unit = {
-  
assert(df.queryExecution.executedPlan.isInstanceOf[WholeStageCodegenExec] == 
codegenExpected)
-  checkAnswer(df, Seq(Row("a"), Row(null)))
+def checkResult(): Unit = {
+  checkAnswer(structWhenDF, Seq(Row("a"), Row(null)))
+  checkAnswer(arrayWhenDF, Seq(Row("a"), Row(null)))
+  checkAnswer(mapWhenDF, Seq(Row("a"), Row(null)))
+  checkAnswer(structIfDF, Seq(Row("a"), Row(null)))
+  checkAnswer(arrayIfDF, Seq(Row("a"), Row(null)))
+  checkAnswer(mapIfDF, Seq(Row("a"), Row(null)))
 }
 
-// without codegen
-checkResult(structWhenDF, false)
-checkResult(arrayWhenDF, false)
-checkResult(mapWhenDF, false)
-checkResult(structIfDF, false)
-checkResult(arrayIfDF, false)
-checkResult(mapIfDF, false)
-
-// with codegen
-checkResult(structWhenDF.filter('cond.isNotNull), true)
--- End diff --

@cloud-fan Thanks for the clarification and this PR!

Btw, there are many tests in ```DataFrameFunctionsSuite``` that test only 
the scenarios without codgen. WDYT about adding a generic ```checkAnswer``` 
method to ```QueryTest``` that would evaluate a dataframe for both cases 
similarly like ```ExressionEvalHelper.checkEvaluation``` does for expressions? 
If it's possible, of course.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21795: [SPARK-24165][SQL][followup] Fixing conditional e...

2018-07-17 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21795#discussion_r203064514
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
---
@@ -2336,46 +2336,40 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
 
 val sourceDF = spark.createDataFrame(rows, schema)
 
-val structWhenDF = sourceDF
+def structWhenDF: DataFrame = sourceDF
   .select(when('cond, struct(lit("a").as("val1"), 
lit(10).as("val2"))).otherwise('s) as "res")
   .select('res.getField("val1"))
-val arrayWhenDF = sourceDF
+def arrayWhenDF: DataFrame = sourceDF
   .select(when('cond, array(lit("a"), lit("b"))).otherwise('a) as 
"res")
   .select('res.getItem(0))
-val mapWhenDF = sourceDF
+def mapWhenDF: DataFrame = sourceDF
   .select(when('cond, map(lit(0), lit("a"))).otherwise('m) as "res")
   .select('res.getItem(0))
 
-val structIfDF = sourceDF
+def structIfDF: DataFrame = sourceDF
   .select(expr("if(cond, struct('a' as val1, 10 as val2), s)") as 
"res")
   .select('res.getField("val1"))
-val arrayIfDF = sourceDF
+def arrayIfDF: DataFrame = sourceDF
   .select(expr("if(cond, array('a', 'b'), a)") as "res")
   .select('res.getItem(0))
-val mapIfDF = sourceDF
+def mapIfDF: DataFrame = sourceDF
   .select(expr("if(cond, map(0, 'a'), m)") as "res")
   .select('res.getItem(0))
 
-def checkResult(df: DataFrame, codegenExpected: Boolean): Unit = {
-  
assert(df.queryExecution.executedPlan.isInstanceOf[WholeStageCodegenExec] == 
codegenExpected)
-  checkAnswer(df, Seq(Row("a"), Row(null)))
+def checkResult(): Unit = {
+  checkAnswer(structWhenDF, Seq(Row("a"), Row(null)))
+  checkAnswer(arrayWhenDF, Seq(Row("a"), Row(null)))
+  checkAnswer(mapWhenDF, Seq(Row("a"), Row(null)))
+  checkAnswer(structIfDF, Seq(Row("a"), Row(null)))
+  checkAnswer(arrayIfDF, Seq(Row("a"), Row(null)))
+  checkAnswer(mapIfDF, Seq(Row("a"), Row(null)))
 }
 
-// without codegen
-checkResult(structWhenDF, false)
-checkResult(arrayWhenDF, false)
-checkResult(mapWhenDF, false)
-checkResult(structIfDF, false)
-checkResult(arrayIfDF, false)
-checkResult(mapIfDF, false)
-
-// with codegen
-checkResult(structWhenDF.filter('cond.isNotNull), true)
--- End diff --

If it's the case why the 
[assert](https://github.com/apache/spark/pull/21795/files/d1bc6124628808444a4ab5bed6bfc6f897d76e03#diff-5d2ebf4e9ca5a990136b276859769289L2360)
 didn't fail?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21352: [SPARK-24305][SQL][FOLLOWUP] Avoid serialization ...

2018-07-17 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21352#discussion_r203017442
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -737,21 +733,22 @@ case class MapConcat(children: Seq[Expression]) 
extends ComplexTypeMergingExpres
   since = "2.4.0")
 case class MapFromEntries(child: Expression) extends UnaryExpression {
 
-  @transient
-  private lazy val dataTypeDetails: Option[(MapType, Boolean, Boolean)] = 
child.dataType match {
-case ArrayType(
-  StructType(Array(
-StructField(_, keyType, keyNullable, _),
-StructField(_, valueType, valueNullable, _))),
-  containsNull) => Some((MapType(keyType, valueType, valueNullable), 
keyNullable, containsNull))
-case _ => None
+  @transient private lazy val dataTypeDetails: Option[(MapType, Boolean, 
Boolean)] = {
+child.dataType match {
+  case ArrayType(
+StructType(Array(
+  StructField(_, kt, kn, _),
--- End diff --

the motivation is described 
[here](https://github.com/apache/spark/pull/21352/files#r202983734). I will 
revert this piece of code shortly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21352: [SPARK-24305][SQL][FOLLOWUP] Avoid serialization ...

2018-07-17 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21352#discussion_r202983734
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -737,21 +733,22 @@ case class MapConcat(children: Seq[Expression]) 
extends ComplexTypeMergingExpres
   since = "2.4.0")
 case class MapFromEntries(child: Expression) extends UnaryExpression {
 
-  @transient
-  private lazy val dataTypeDetails: Option[(MapType, Boolean, Boolean)] = 
child.dataType match {
-case ArrayType(
-  StructType(Array(
-StructField(_, keyType, keyNullable, _),
-StructField(_, valueType, valueNullable, _))),
-  containsNull) => Some((MapType(keyType, valueType, valueNullable), 
keyNullable, containsNull))
-case _ => None
+  @transient private lazy val dataTypeDetails: Option[(MapType, Boolean, 
Boolean)] = {
--- End diff --

Here I wanted to be consistent in terms of formatting. (```@transient``` to 
be on the same line as ```private lazy val dataTypeDetails```) After the 
change, two lines were exceeding 100 characters.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21352: [SPARK-24305][SQL][FOLLOWUP] Avoid serialization ...

2018-07-17 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21352#discussion_r202982740
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -360,7 +356,7 @@ case class MapEntries(child: Expression) extends 
UnaryExpression with ExpectsInp
 
   override def inputTypes: Seq[AbstractDataType] = Seq(MapType)
 
-  lazy val childDataType: MapType = child.dataType.asInstanceOf[MapType]
+  private def childDataType: MapType = child.dataType.asInstanceOf[MapType]
--- End diff --

I missed that one. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   >