[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22944#discussion_r231359624 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -262,25 +262,39 @@ object AppendColumns { def apply[T : Encoder, U : Encoder]( func: T => U, child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions +} new AppendColumns( func.asInstanceOf[Any => Any], implicitly[Encoder[T]].clsTag.runtimeClass, implicitly[Encoder[T]].schema, UnresolvedDeserializer(encoderFor[T].deserializer), - encoderFor[U].namedExpressions, + namedExpressions, child) } def apply[T : Encoder, U : Encoder]( func: T => U, inputAttributes: Seq[Attribute], child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions --- End diff -- Ok. I will try to make a PR and see if we can have better fix for this. Thanks for suggestion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22944#discussion_r231358749 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -262,25 +262,39 @@ object AppendColumns { def apply[T : Encoder, U : Encoder]( func: T => U, child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions +} new AppendColumns( func.asInstanceOf[Any => Any], implicitly[Encoder[T]].clsTag.runtimeClass, implicitly[Encoder[T]].schema, UnresolvedDeserializer(encoderFor[T].deserializer), - encoderFor[U].namedExpressions, + namedExpressions, child) } def apply[T : Encoder, U : Encoder]( func: T => U, inputAttributes: Seq[Attribute], child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions --- End diff -- I wouldn't special-case primitive type while this is a general problem. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22944#discussion_r231358690 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -262,25 +262,39 @@ object AppendColumns { def apply[T : Encoder, U : Encoder]( func: T => U, child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions +} new AppendColumns( func.asInstanceOf[Any => Any], implicitly[Encoder[T]].clsTag.runtimeClass, implicitly[Encoder[T]].schema, UnresolvedDeserializer(encoderFor[T].deserializer), - encoderFor[U].namedExpressions, + namedExpressions, child) } def apply[T : Encoder, U : Encoder]( func: T => U, inputAttributes: Seq[Attribute], child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions --- End diff -- I wouldn't special-case primitive type while this is a general problem. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22944#discussion_r231350156 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -262,25 +262,39 @@ object AppendColumns { def apply[T : Encoder, U : Encoder]( func: T => U, child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions +} new AppendColumns( func.asInstanceOf[Any => Any], implicitly[Encoder[T]].clsTag.runtimeClass, implicitly[Encoder[T]].schema, UnresolvedDeserializer(encoderFor[T].deserializer), - encoderFor[U].namedExpressions, + namedExpressions, child) } def apply[T : Encoder, U : Encoder]( func: T => U, inputAttributes: Seq[Attribute], child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions --- End diff -- Thanks, I see. For this primitive type case, is current fix ok? Or we should deal with case classes together? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22944#discussion_r231201502 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -262,25 +262,39 @@ object AppendColumns { def apply[T : Encoder, U : Encoder]( func: T => U, child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions +} new AppendColumns( func.asInstanceOf[Any => Any], implicitly[Encoder[T]].clsTag.runtimeClass, implicitly[Encoder[T]].schema, UnresolvedDeserializer(encoderFor[T].deserializer), - encoderFor[U].namedExpressions, + namedExpressions, child) } def apply[T : Encoder, U : Encoder]( func: T => U, inputAttributes: Seq[Attribute], child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions --- End diff -- I mean, maybe we should just leave this problem. I'm not sure how hacky it is to detect the `AppendColumns` in this case. Maybe we can have more confidence if you have a PR ready. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22944#discussion_r231171593 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -262,25 +262,39 @@ object AppendColumns { def apply[T : Encoder, U : Encoder]( func: T => U, child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions +} new AppendColumns( func.asInstanceOf[Any => Any], implicitly[Encoder[T]].clsTag.runtimeClass, implicitly[Encoder[T]].schema, UnresolvedDeserializer(encoderFor[T].deserializer), - encoderFor[U].namedExpressions, + namedExpressions, child) } def apply[T : Encoder, U : Encoder]( func: T => U, inputAttributes: Seq[Attribute], child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions --- End diff -- Oh, I see. Do you mean we don't do such check in `AttributeSeq.resolve` but leave it to `CheckAnalysis`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22944#discussion_r231170974 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -262,25 +262,39 @@ object AppendColumns { def apply[T : Encoder, U : Encoder]( func: T => U, child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions +} new AppendColumns( func.asInstanceOf[Any => Any], implicitly[Encoder[T]].clsTag.runtimeClass, implicitly[Encoder[T]].schema, UnresolvedDeserializer(encoderFor[T].deserializer), - encoderFor[U].namedExpressions, + namedExpressions, child) } def apply[T : Encoder, U : Encoder]( func: T => U, inputAttributes: Seq[Attribute], child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions --- End diff -- Can you elaborate it more? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22944#discussion_r231159305 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -262,25 +262,39 @@ object AppendColumns { def apply[T : Encoder, U : Encoder]( func: T => U, child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions +} new AppendColumns( func.asInstanceOf[Any => Any], implicitly[Encoder[T]].clsTag.runtimeClass, implicitly[Encoder[T]].schema, UnresolvedDeserializer(encoderFor[T].deserializer), - encoderFor[U].namedExpressions, + namedExpressions, child) } def apply[T : Encoder, U : Encoder]( func: T => U, inputAttributes: Seq[Attribute], child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions --- End diff -- ah i see. Then maybe we should just leave it instead of hacking the `AttributeSeq.resolve`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22944#discussion_r231129654 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -262,25 +262,39 @@ object AppendColumns { def apply[T : Encoder, U : Encoder]( func: T => U, child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions +} new AppendColumns( func.asInstanceOf[Any => Any], implicitly[Encoder[T]].clsTag.runtimeClass, implicitly[Encoder[T]].schema, UnresolvedDeserializer(encoderFor[T].deserializer), - encoderFor[U].namedExpressions, + namedExpressions, child) } def apply[T : Encoder, U : Encoder]( func: T => U, inputAttributes: Seq[Attribute], child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions --- End diff -- > Should we only fail the groupByKey query accessing ambiguous field names? Yes. When we have unresolved attributes, check if the child plan is `AppendColumns`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22944#discussion_r231038560 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -262,25 +262,39 @@ object AppendColumns { def apply[T : Encoder, U : Encoder]( func: T => U, child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions +} new AppendColumns( func.asInstanceOf[Any => Any], implicitly[Encoder[T]].clsTag.runtimeClass, implicitly[Encoder[T]].schema, UnresolvedDeserializer(encoderFor[T].deserializer), - encoderFor[U].namedExpressions, + namedExpressions, child) } def apply[T : Encoder, U : Encoder]( func: T => U, inputAttributes: Seq[Attribute], child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions --- End diff -- I tried to add check into `CheckAnalysis` to detect such `AppendColumns`. I found that we have many such use cases in `DatasetSuite`, e.g. `map and group by with class data`: ```scala val ds: Dataset[(ClassData, Long)] = Seq(ClassData("one", 1), ClassData("two", 2)).toDS() .map(c => ClassData(c.a, c.b + 1)) .groupByKey(p => p).count() ``` If users don't access original output like this patch shows, it won't cause problem. So I'm thinking if we disallow it at all, it is a behavior change. Should we only fail the `groupByKey` query accessing ambiguous field names? Or we should disallow at all if there is any conflicting name? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22944#discussion_r231002129 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -262,25 +262,39 @@ object AppendColumns { def apply[T : Encoder, U : Encoder]( func: T => U, child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions +} new AppendColumns( func.asInstanceOf[Any => Any], implicitly[Encoder[T]].clsTag.runtimeClass, implicitly[Encoder[T]].schema, UnresolvedDeserializer(encoderFor[T].deserializer), - encoderFor[U].namedExpressions, + namedExpressions, child) } def apply[T : Encoder, U : Encoder]( func: T => U, inputAttributes: Seq[Attribute], child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions --- End diff -- For option of product, I think it is due to typed select. I will address it at #21732. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22944#discussion_r231001655 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -262,25 +262,39 @@ object AppendColumns { def apply[T : Encoder, U : Encoder]( func: T => U, child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions +} new AppendColumns( func.asInstanceOf[Any => Any], implicitly[Encoder[T]].clsTag.runtimeClass, implicitly[Encoder[T]].schema, UnresolvedDeserializer(encoderFor[T].deserializer), - encoderFor[U].namedExpressions, + namedExpressions, child) } def apply[T : Encoder, U : Encoder]( func: T => U, inputAttributes: Seq[Attribute], child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions --- End diff -- For primitive type and product type, looks like it works: ```scala test("typed aggregation on primitive data") { val ds = Seq(1, 2, 3).toDS() val agg = ds.select(expr("value").as("data").as[Int]) .groupByKey(_ >= 2) .agg(sum("data").as[Long], sum($"data" + 1).as[Long]) agg.show() } ``` ``` +-+-+---+ |value|sum(data)|sum((data + 1))| +-+-+---+ |false|1| 2| | true|5| 7| +-+-+---+ ``` ```scala test("typed aggregation on product data") { val ds = Seq((1, 2), (2, 3), (3, 4)).toDS() val agg = ds.select(expr("_1").as("a").as[Int], expr("_2").as("b").as[Int]) .groupByKey(_._1).agg(sum("a").as[Int], sum($"b" + 1).as[Int]) agg.show } ``` ``` [info] - typed aggregation on primitive data (192 milliseconds) +-+--++ |value|sum(a)|sum((b + 1))| +-+--++ |3| 3| 5| |1| 1| 3| |2| 2| 4| +-+--++ ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22944#discussion_r230997935 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -262,25 +262,39 @@ object AppendColumns { def apply[T : Encoder, U : Encoder]( func: T => U, child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions +} new AppendColumns( func.asInstanceOf[Any => Any], implicitly[Encoder[T]].clsTag.runtimeClass, implicitly[Encoder[T]].schema, UnresolvedDeserializer(encoderFor[T].deserializer), - encoderFor[U].namedExpressions, + namedExpressions, child) } def apply[T : Encoder, U : Encoder]( func: T => U, inputAttributes: Seq[Attribute], child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions --- End diff -- is this a special case of option of product? can you try pritimive type and product type? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22944#discussion_r230995240 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -262,25 +262,39 @@ object AppendColumns { def apply[T : Encoder, U : Encoder]( func: T => U, child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions +} new AppendColumns( func.asInstanceOf[Any => Any], implicitly[Encoder[T]].clsTag.runtimeClass, implicitly[Encoder[T]].schema, UnresolvedDeserializer(encoderFor[T].deserializer), - encoderFor[U].namedExpressions, + namedExpressions, child) } def apply[T : Encoder, U : Encoder]( func: T => U, inputAttributes: Seq[Attribute], child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions --- End diff -- I just tried to manually add alias. It seems not working as we expect: ```scala val ds = Seq(Some(("a", 10)), Some(("a", 20)), Some(("b", 1)), Some(("b", 2)), Some(("c", 1)), None).toDS() val newDS = ds.select(expr("value").as("opt").as[Option[(String, Int)]]) // schema root |-- value: struct (nullable = true) ||-- _1: string (nullable = true) ||-- _2: integer (nullable = false) // physical plan *(1) SerializeFromObject [if (isnull(unwrapoption(ObjectType(class scala.Tuple2), input[0, scala.Option, true]))) null else named_struct(_1, staticinvo ke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(unwrapoption(ObjectType(class scala.Tuple2), input[0, scala.Op tion, true]))._1, true, false), _2, assertnotnull(unwrapoption(ObjectType(class scala.Tuple2), input[0, scala.Option, true]))._2) AS value#5482] +- *(1) MapElements , obj#5481: scala.Option +- *(1) DeserializeToObject newInstance(class scala.Tuple1), obj#5480: scala.Tuple1 +- *(1) Project [value#5473 AS opt#5476] +- LocalTableScan [value#5473] ``` So even we add alias before `groupByKey`, it can't change dataset's output names. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22944#discussion_r230989493 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -262,25 +262,39 @@ object AppendColumns { def apply[T : Encoder, U : Encoder]( func: T => U, child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions +} new AppendColumns( func.asInstanceOf[Any => Any], implicitly[Encoder[T]].clsTag.runtimeClass, implicitly[Encoder[T]].schema, UnresolvedDeserializer(encoderFor[T].deserializer), - encoderFor[U].namedExpressions, + namedExpressions, child) } def apply[T : Encoder, U : Encoder]( func: T => U, inputAttributes: Seq[Attribute], child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions --- End diff -- Ok. Sounds good. Let me do the change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22944#discussion_r230989073 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -262,25 +262,39 @@ object AppendColumns { def apply[T : Encoder, U : Encoder]( func: T => U, child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions +} new AppendColumns( func.asInstanceOf[Any => Any], implicitly[Encoder[T]].clsTag.runtimeClass, implicitly[Encoder[T]].schema, UnresolvedDeserializer(encoderFor[T].deserializer), - encoderFor[U].namedExpressions, + namedExpressions, child) } def apply[T : Encoder, U : Encoder]( func: T => U, inputAttributes: Seq[Attribute], child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions --- End diff -- We can improve the `CheckAnalysis` to detect this case, and improve the error message to ask users to do alias. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22944#discussion_r230986888 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -262,25 +262,39 @@ object AppendColumns { def apply[T : Encoder, U : Encoder]( func: T => U, child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions +} new AppendColumns( func.asInstanceOf[Any => Any], implicitly[Encoder[T]].clsTag.runtimeClass, implicitly[Encoder[T]].schema, UnresolvedDeserializer(encoderFor[T].deserializer), - encoderFor[U].namedExpressions, + namedExpressions, child) } def apply[T : Encoder, U : Encoder]( func: T => U, inputAttributes: Seq[Attribute], child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions --- End diff -- You mean we detect conflicting case, and show some error messages to ask users for that? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22944#discussion_r230986226 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -262,25 +262,39 @@ object AppendColumns { def apply[T : Encoder, U : Encoder]( func: T => U, child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions +} new AppendColumns( func.asInstanceOf[Any => Any], implicitly[Encoder[T]].clsTag.runtimeClass, implicitly[Encoder[T]].schema, UnresolvedDeserializer(encoderFor[T].deserializer), - encoderFor[U].namedExpressions, + namedExpressions, child) } def apply[T : Encoder, U : Encoder]( func: T => U, inputAttributes: Seq[Attribute], child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions --- End diff -- if that is the case, I feel it better to ask users to resolve conflict manually, by adding alias. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22944#discussion_r230983077 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -262,25 +262,39 @@ object AppendColumns { def apply[T : Encoder, U : Encoder]( func: T => U, child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions +} new AppendColumns( func.asInstanceOf[Any => Any], implicitly[Encoder[T]].clsTag.runtimeClass, implicitly[Encoder[T]].schema, UnresolvedDeserializer(encoderFor[T].deserializer), - encoderFor[U].namedExpressions, + namedExpressions, child) } def apply[T : Encoder, U : Encoder]( func: T => U, inputAttributes: Seq[Attribute], child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions --- End diff -- yea, but for such cases, seems it is more complicated as we can't simply create aliases for serializer fields. Because in methods like `mapGroups`, we need to access original fields of key. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22944#discussion_r230977212 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala --- @@ -262,25 +262,39 @@ object AppendColumns { def apply[T : Encoder, U : Encoder]( func: T => U, child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions +} new AppendColumns( func.asInstanceOf[Any => Any], implicitly[Encoder[T]].clsTag.runtimeClass, implicitly[Encoder[T]].schema, UnresolvedDeserializer(encoderFor[T].deserializer), - encoderFor[U].namedExpressions, + namedExpressions, child) } def apply[T : Encoder, U : Encoder]( func: T => U, inputAttributes: Seq[Attribute], child: LogicalPlan): AppendColumns = { +val outputEncoder = encoderFor[U] +val namedExpressions = if (!outputEncoder.isSerializedAsStruct) { + assert(outputEncoder.namedExpressions.length == 1) + outputEncoder.namedExpressions.map(Alias(_, "key")()) +} else { + outputEncoder.namedExpressions --- End diff -- so we may still fail if `T` and `U` are case classes and have conflict field names? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22944#discussion_r230781583 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala --- @@ -1556,6 +1556,14 @@ class DatasetSuite extends QueryTest with SharedSQLContext { df.where($"city".contains(new java.lang.Character('A'))), Seq(Row("Amsterdam"))) } + + test("SPARK-25942: typed aggregation on primitive data") { --- End diff -- It should not be introduced recently. I think we encode primitive data as a `value` field at beginning. Because `AppendColumns` takes serializers' named expressions as `newColumns`. For primitive data, it conflicts with original data attribute `value`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/22944#discussion_r230772018 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala --- @@ -1556,6 +1556,14 @@ class DatasetSuite extends QueryTest with SharedSQLContext { df.where($"city".contains(new java.lang.Character('A'))), Seq(Row("Amsterdam"))) } + + test("SPARK-25942: typed aggregation on primitive data") { --- End diff -- how was this bug introduced? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22944: [SPARK-25942][SQL] Fix Dataset.groupByKey to make...
GitHub user viirya opened a pull request: https://github.com/apache/spark/pull/22944 [SPARK-25942][SQL] Fix Dataset.groupByKey to make it work on primitive data ## What changes were proposed in this pull request? `Dataset.groupByKey` can't work on primitive data now. When we access `value` in aggregate function, it conflicts with the name of grouping attribute. ## How was this patch tested? Added test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/viirya/spark-1 dataset_agg Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22944.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22944 commit 7564d98a9b1242f1eea167152ed97371416c204d Author: Liang-Chi Hsieh Date: 2018-11-05T00:40:52Z Fix groupByKey. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org