[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

2018-11-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22944#discussion_r231359624
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -262,25 +262,39 @@ object AppendColumns {
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
+}
 new AppendColumns(
   func.asInstanceOf[Any => Any],
   implicitly[Encoder[T]].clsTag.runtimeClass,
   implicitly[Encoder[T]].schema,
   UnresolvedDeserializer(encoderFor[T].deserializer),
-  encoderFor[U].namedExpressions,
+  namedExpressions,
   child)
   }
 
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   inputAttributes: Seq[Attribute],
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
--- End diff --

Ok. I will try to make a PR and see if we can have better fix for this. 
Thanks for suggestion.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

2018-11-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22944#discussion_r231358749
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -262,25 +262,39 @@ object AppendColumns {
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
+}
 new AppendColumns(
   func.asInstanceOf[Any => Any],
   implicitly[Encoder[T]].clsTag.runtimeClass,
   implicitly[Encoder[T]].schema,
   UnresolvedDeserializer(encoderFor[T].deserializer),
-  encoderFor[U].namedExpressions,
+  namedExpressions,
   child)
   }
 
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   inputAttributes: Seq[Attribute],
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
--- End diff --

I wouldn't special-case primitive type while this is a general problem.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

2018-11-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22944#discussion_r231358690
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -262,25 +262,39 @@ object AppendColumns {
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
+}
 new AppendColumns(
   func.asInstanceOf[Any => Any],
   implicitly[Encoder[T]].clsTag.runtimeClass,
   implicitly[Encoder[T]].schema,
   UnresolvedDeserializer(encoderFor[T].deserializer),
-  encoderFor[U].namedExpressions,
+  namedExpressions,
   child)
   }
 
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   inputAttributes: Seq[Attribute],
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
--- End diff --

I wouldn't special-case primitive type while this is a general problem.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

2018-11-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22944#discussion_r231350156
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -262,25 +262,39 @@ object AppendColumns {
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
+}
 new AppendColumns(
   func.asInstanceOf[Any => Any],
   implicitly[Encoder[T]].clsTag.runtimeClass,
   implicitly[Encoder[T]].schema,
   UnresolvedDeserializer(encoderFor[T].deserializer),
-  encoderFor[U].namedExpressions,
+  namedExpressions,
   child)
   }
 
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   inputAttributes: Seq[Attribute],
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
--- End diff --

Thanks, I see. For this primitive type case, is current fix ok? Or we 
should deal with case classes together?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

2018-11-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22944#discussion_r231201502
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -262,25 +262,39 @@ object AppendColumns {
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
+}
 new AppendColumns(
   func.asInstanceOf[Any => Any],
   implicitly[Encoder[T]].clsTag.runtimeClass,
   implicitly[Encoder[T]].schema,
   UnresolvedDeserializer(encoderFor[T].deserializer),
-  encoderFor[U].namedExpressions,
+  namedExpressions,
   child)
   }
 
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   inputAttributes: Seq[Attribute],
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
--- End diff --

I mean, maybe we should just leave this problem. I'm not sure how hacky it 
is to detect the `AppendColumns` in this case. Maybe we can have more 
confidence if you have a PR ready.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

2018-11-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22944#discussion_r231171593
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -262,25 +262,39 @@ object AppendColumns {
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
+}
 new AppendColumns(
   func.asInstanceOf[Any => Any],
   implicitly[Encoder[T]].clsTag.runtimeClass,
   implicitly[Encoder[T]].schema,
   UnresolvedDeserializer(encoderFor[T].deserializer),
-  encoderFor[U].namedExpressions,
+  namedExpressions,
   child)
   }
 
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   inputAttributes: Seq[Attribute],
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
--- End diff --

Oh, I see. Do you mean we don't do such check in `AttributeSeq.resolve` but 
leave it to `CheckAnalysis`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

2018-11-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22944#discussion_r231170974
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -262,25 +262,39 @@ object AppendColumns {
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
+}
 new AppendColumns(
   func.asInstanceOf[Any => Any],
   implicitly[Encoder[T]].clsTag.runtimeClass,
   implicitly[Encoder[T]].schema,
   UnresolvedDeserializer(encoderFor[T].deserializer),
-  encoderFor[U].namedExpressions,
+  namedExpressions,
   child)
   }
 
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   inputAttributes: Seq[Attribute],
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
--- End diff --

Can you elaborate it more?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

2018-11-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22944#discussion_r231159305
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -262,25 +262,39 @@ object AppendColumns {
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
+}
 new AppendColumns(
   func.asInstanceOf[Any => Any],
   implicitly[Encoder[T]].clsTag.runtimeClass,
   implicitly[Encoder[T]].schema,
   UnresolvedDeserializer(encoderFor[T].deserializer),
-  encoderFor[U].namedExpressions,
+  namedExpressions,
   child)
   }
 
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   inputAttributes: Seq[Attribute],
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
--- End diff --

ah i see. Then maybe we should just leave it instead of hacking the 
`AttributeSeq.resolve`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

2018-11-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22944#discussion_r231129654
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -262,25 +262,39 @@ object AppendColumns {
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
+}
 new AppendColumns(
   func.asInstanceOf[Any => Any],
   implicitly[Encoder[T]].clsTag.runtimeClass,
   implicitly[Encoder[T]].schema,
   UnresolvedDeserializer(encoderFor[T].deserializer),
-  encoderFor[U].namedExpressions,
+  namedExpressions,
   child)
   }
 
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   inputAttributes: Seq[Attribute],
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
--- End diff --

> Should we only fail the groupByKey query accessing ambiguous field names?

Yes. When we have unresolved attributes, check if the child plan is 
`AppendColumns`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

2018-11-06 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22944#discussion_r231038560
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -262,25 +262,39 @@ object AppendColumns {
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
+}
 new AppendColumns(
   func.asInstanceOf[Any => Any],
   implicitly[Encoder[T]].clsTag.runtimeClass,
   implicitly[Encoder[T]].schema,
   UnresolvedDeserializer(encoderFor[T].deserializer),
-  encoderFor[U].namedExpressions,
+  namedExpressions,
   child)
   }
 
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   inputAttributes: Seq[Attribute],
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
--- End diff --

I tried to add check into `CheckAnalysis` to detect such `AppendColumns`. I 
found that we have many such use cases in `DatasetSuite`, e.g. `map and group 
by with class data`:

```scala
val ds: Dataset[(ClassData, Long)] = Seq(ClassData("one", 1), 
ClassData("two", 2)).toDS()
  .map(c => ClassData(c.a, c.b + 1))
  .groupByKey(p => p).count()
```

If users don't access original output like this patch shows, it won't cause 
problem. So I'm thinking if we disallow it at all, it is a behavior change. 
Should we only fail the `groupByKey` query accessing ambiguous field names? Or 
we should disallow at all if there is any conflicting name?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

2018-11-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22944#discussion_r231002129
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -262,25 +262,39 @@ object AppendColumns {
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
+}
 new AppendColumns(
   func.asInstanceOf[Any => Any],
   implicitly[Encoder[T]].clsTag.runtimeClass,
   implicitly[Encoder[T]].schema,
   UnresolvedDeserializer(encoderFor[T].deserializer),
-  encoderFor[U].namedExpressions,
+  namedExpressions,
   child)
   }
 
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   inputAttributes: Seq[Attribute],
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
--- End diff --

For option of product, I think it is due to typed select. I will address it 
at #21732.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

2018-11-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22944#discussion_r231001655
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -262,25 +262,39 @@ object AppendColumns {
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
+}
 new AppendColumns(
   func.asInstanceOf[Any => Any],
   implicitly[Encoder[T]].clsTag.runtimeClass,
   implicitly[Encoder[T]].schema,
   UnresolvedDeserializer(encoderFor[T].deserializer),
-  encoderFor[U].namedExpressions,
+  namedExpressions,
   child)
   }
 
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   inputAttributes: Seq[Attribute],
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
--- End diff --

For primitive type and product type, looks like it works:
```scala
test("typed aggregation on primitive data") {
  val ds = Seq(1, 2, 3).toDS()
  val agg = ds.select(expr("value").as("data").as[Int])
.groupByKey(_ >= 2)
.agg(sum("data").as[Long], sum($"data" + 1).as[Long])
  agg.show()
}
```
```
+-+-+---+
|value|sum(data)|sum((data + 1))|
+-+-+---+
|false|1|  2|
| true|5|  7|
+-+-+---+
```

```scala
test("typed aggregation on product data") {
  val ds = Seq((1, 2), (2, 3), (3, 4)).toDS()
  val agg = ds.select(expr("_1").as("a").as[Int], 
expr("_2").as("b").as[Int])
.groupByKey(_._1).agg(sum("a").as[Int], sum($"b" + 1).as[Int])
  agg.show
}
```
```
[info] - typed aggregation on primitive data (192 milliseconds)
+-+--++
|value|sum(a)|sum((b + 1))|
+-+--++
|3| 3|   5|
|1| 1|   3|
|2| 2|   4|
+-+--++

```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

2018-11-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22944#discussion_r230997935
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -262,25 +262,39 @@ object AppendColumns {
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
+}
 new AppendColumns(
   func.asInstanceOf[Any => Any],
   implicitly[Encoder[T]].clsTag.runtimeClass,
   implicitly[Encoder[T]].schema,
   UnresolvedDeserializer(encoderFor[T].deserializer),
-  encoderFor[U].namedExpressions,
+  namedExpressions,
   child)
   }
 
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   inputAttributes: Seq[Attribute],
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
--- End diff --

is this a special case of option of product? can you try pritimive type and 
product type?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

2018-11-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22944#discussion_r230995240
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -262,25 +262,39 @@ object AppendColumns {
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
+}
 new AppendColumns(
   func.asInstanceOf[Any => Any],
   implicitly[Encoder[T]].clsTag.runtimeClass,
   implicitly[Encoder[T]].schema,
   UnresolvedDeserializer(encoderFor[T].deserializer),
-  encoderFor[U].namedExpressions,
+  namedExpressions,
   child)
   }
 
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   inputAttributes: Seq[Attribute],
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
--- End diff --

I just tried to manually add alias. It seems not working as we expect:

```scala
val ds = Seq(Some(("a", 10)), Some(("a", 20)), Some(("b", 1)), Some(("b", 
2)), Some(("c", 1)), None).toDS()

val newDS = ds.select(expr("value").as("opt").as[Option[(String, Int)]])

// schema
root
   
 |-- value: struct (nullable = true)
   
 ||-- _1: string (nullable = true)  

 ||-- _2: integer (nullable = false)

// physical plan
*(1) SerializeFromObject [if (isnull(unwrapoption(ObjectType(class 
scala.Tuple2), input[0, scala.Option, true]))) null else named_struct(_1, 
staticinvo
ke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, 
assertnotnull(unwrapoption(ObjectType(class scala.Tuple2), input[0, scala.Op
tion, true]))._1, true, false), _2, 
assertnotnull(unwrapoption(ObjectType(class scala.Tuple2), input[0, 
scala.Option, true]))._2) AS value#5482]
+- *(1) MapElements , obj#5481: scala.Option 

   +- *(1) DeserializeToObject newInstance(class scala.Tuple1), obj#5480: 
scala.Tuple1  
  +- *(1) Project [value#5473 AS opt#5476]  
   
 +- LocalTableScan [value#5473]   
```

So even we add alias before `groupByKey`, it can't change dataset's output 
names.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

2018-11-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22944#discussion_r230989493
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -262,25 +262,39 @@ object AppendColumns {
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
+}
 new AppendColumns(
   func.asInstanceOf[Any => Any],
   implicitly[Encoder[T]].clsTag.runtimeClass,
   implicitly[Encoder[T]].schema,
   UnresolvedDeserializer(encoderFor[T].deserializer),
-  encoderFor[U].namedExpressions,
+  namedExpressions,
   child)
   }
 
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   inputAttributes: Seq[Attribute],
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
--- End diff --

Ok. Sounds good. Let me do the change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

2018-11-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22944#discussion_r230989073
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -262,25 +262,39 @@ object AppendColumns {
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
+}
 new AppendColumns(
   func.asInstanceOf[Any => Any],
   implicitly[Encoder[T]].clsTag.runtimeClass,
   implicitly[Encoder[T]].schema,
   UnresolvedDeserializer(encoderFor[T].deserializer),
-  encoderFor[U].namedExpressions,
+  namedExpressions,
   child)
   }
 
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   inputAttributes: Seq[Attribute],
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
--- End diff --

We can improve the `CheckAnalysis` to detect this case, and improve the 
error message to ask users to do alias.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

2018-11-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22944#discussion_r230986888
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -262,25 +262,39 @@ object AppendColumns {
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
+}
 new AppendColumns(
   func.asInstanceOf[Any => Any],
   implicitly[Encoder[T]].clsTag.runtimeClass,
   implicitly[Encoder[T]].schema,
   UnresolvedDeserializer(encoderFor[T].deserializer),
-  encoderFor[U].namedExpressions,
+  namedExpressions,
   child)
   }
 
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   inputAttributes: Seq[Attribute],
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
--- End diff --

You mean we detect conflicting case, and show some error messages to ask 
users for that?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

2018-11-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22944#discussion_r230986226
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -262,25 +262,39 @@ object AppendColumns {
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
+}
 new AppendColumns(
   func.asInstanceOf[Any => Any],
   implicitly[Encoder[T]].clsTag.runtimeClass,
   implicitly[Encoder[T]].schema,
   UnresolvedDeserializer(encoderFor[T].deserializer),
-  encoderFor[U].namedExpressions,
+  namedExpressions,
   child)
   }
 
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   inputAttributes: Seq[Attribute],
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
--- End diff --

if that is the case, I feel it better to ask users to resolve conflict 
manually, by adding alias.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

2018-11-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22944#discussion_r230983077
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -262,25 +262,39 @@ object AppendColumns {
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
+}
 new AppendColumns(
   func.asInstanceOf[Any => Any],
   implicitly[Encoder[T]].clsTag.runtimeClass,
   implicitly[Encoder[T]].schema,
   UnresolvedDeserializer(encoderFor[T].deserializer),
-  encoderFor[U].namedExpressions,
+  namedExpressions,
   child)
   }
 
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   inputAttributes: Seq[Attribute],
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
--- End diff --

yea, but for such cases, seems it is more complicated as we can't simply 
create aliases for serializer fields. Because in methods like `mapGroups`, we 
need to access original fields of key.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

2018-11-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22944#discussion_r230977212
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 ---
@@ -262,25 +262,39 @@ object AppendColumns {
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
+}
 new AppendColumns(
   func.asInstanceOf[Any => Any],
   implicitly[Encoder[T]].clsTag.runtimeClass,
   implicitly[Encoder[T]].schema,
   UnresolvedDeserializer(encoderFor[T].deserializer),
-  encoderFor[U].namedExpressions,
+  namedExpressions,
   child)
   }
 
   def apply[T : Encoder, U : Encoder](
   func: T => U,
   inputAttributes: Seq[Attribute],
   child: LogicalPlan): AppendColumns = {
+val outputEncoder = encoderFor[U]
+val namedExpressions = if (!outputEncoder.isSerializedAsStruct) {
+  assert(outputEncoder.namedExpressions.length == 1)
+  outputEncoder.namedExpressions.map(Alias(_, "key")())
+} else {
+  outputEncoder.namedExpressions
--- End diff --

so we may still fail if `T` and `U` are case classes and have conflict 
field names?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

2018-11-05 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22944#discussion_r230781583
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
---
@@ -1556,6 +1556,14 @@ class DatasetSuite extends QueryTest with 
SharedSQLContext {
   df.where($"city".contains(new java.lang.Character('A'))),
   Seq(Row("Amsterdam")))
   }
+
+  test("SPARK-25942: typed aggregation on primitive data") {
--- End diff --

It should not be introduced recently. I think we encode primitive data as a 
`value` field at beginning. 

Because `AppendColumns` takes serializers' named expressions as 
`newColumns`. For primitive data, it conflicts with original data attribute 
`value`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

2018-11-05 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22944#discussion_r230772018
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
---
@@ -1556,6 +1556,14 @@ class DatasetSuite extends QueryTest with 
SharedSQLContext {
   df.where($"city".contains(new java.lang.Character('A'))),
   Seq(Row("Amsterdam")))
   }
+
+  test("SPARK-25942: typed aggregation on primitive data") {
--- End diff --

how was this bug introduced?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...

2018-11-05 Thread viirya
GitHub user viirya opened a pull request:

https://github.com/apache/spark/pull/22944

[SPARK-25942][SQL] Fix Dataset.groupByKey to make it work on primitive data

## What changes were proposed in this pull request?

`Dataset.groupByKey` can't work on primitive data now. When we access 
`value` in aggregate function, it conflicts with the name of grouping attribute.

## How was this patch tested?

Added test.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/viirya/spark-1 dataset_agg

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22944.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22944


commit 7564d98a9b1242f1eea167152ed97371416c204d
Author: Liang-Chi Hsieh 
Date:   2018-11-05T00:40:52Z

Fix groupByKey.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org