[jira] [Commented] (SPARK-19136) Aggregator with case class as output type fails with ClassCastException
[ https://issues.apache.org/jira/browse/SPARK-19136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15937680#comment-15937680 ] Hyukjin Kwon commented on SPARK-19136: -- [~a1ray], do you think this JIRA is resolvable? > Aggregator with case class as output type fails with ClassCastException > --- > > Key: SPARK-19136 > URL: https://issues.apache.org/jira/browse/SPARK-19136 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2, 2.1.0 >Reporter: Mathieu D >Priority: Minor > > {{Aggregator}} with a case-class as output type returns a Row that cannot be > cast back to this type, it fails with {{ClassCastException}}. > Here is a dummy example to reproduce the problem > {code} > import org.apache.spark.sql._ > import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder > import org.apache.spark.sql.expressions.Aggregator > import spark.implicits._ > case class MinMax(min: Int, max: Int) > case class MinMaxAgg() extends Aggregator[Row, (Int, Int), MinMax] with > Serializable { > def zero: (Int, Int) = (Int.MaxValue, Int.MinValue) > def reduce(b: (Int, Int), a: Row): (Int, Int) = (Math.min(b._1, > a.getAs[Int](0)), Math.max(b._2, a.getAs[Int](0))) > def finish(r: (Int, Int)): MinMax = MinMax(r._1, r._2) > def merge(b1: (Int, Int), b2: (Int, Int)): (Int, Int) = (Math.min(b1._1, > b2._1), Math.max(b1._2, b2._2)) > def bufferEncoder: Encoder[(Int, Int)] = ExpressionEncoder() > def outputEncoder: Encoder[MinMax] = ExpressionEncoder() > } > val ds = Seq(1, 2, 3, 4).toDF("col1") > val agg = ds.agg(MinMaxAgg().toColumn.alias("minmax")) > {code} > bq. {code} > ds: org.apache.spark.sql.DataFrame = [col1: int] > agg: org.apache.spark.sql.DataFrame = [minmax: struct] > {code} > {code}agg.printSchema(){code} > bq. {code} > root > |-- minmax: struct (nullable = true) > ||-- min: integer (nullable = false) > ||-- max: integer (nullable = false) > {code} > {code}agg.head(){code} > bq. {code} > res1: org.apache.spark.sql.Row = [[1,4]] > {code} > {code}agg.head().getAs[MinMax](0){code} > bq. {code} > java.lang.ClassCastException: > org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast > to line4c81e18af34342cda654c381ee91139525.$read$$iw$$iw$$iw$$iw$MinMax > [...] > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19136) Aggregator with case class as output type fails with ClassCastException
[ https://issues.apache.org/jira/browse/SPARK-19136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15821868#comment-15821868 ] Andrew Ray commented on SPARK-19136: I forgot you can also just do: {code} ds.select(MinMaxAgg().toColumn) {code} > Aggregator with case class as output type fails with ClassCastException > --- > > Key: SPARK-19136 > URL: https://issues.apache.org/jira/browse/SPARK-19136 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2, 2.1.0 >Reporter: Mathieu D >Priority: Minor > > {{Aggregator}} with a case-class as output type returns a Row that cannot be > cast back to this type, it fails with {{ClassCastException}}. > Here is a dummy example to reproduce the problem > {code} > import org.apache.spark.sql._ > import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder > import org.apache.spark.sql.expressions.Aggregator > import spark.implicits._ > case class MinMax(min: Int, max: Int) > case class MinMaxAgg() extends Aggregator[Row, (Int, Int), MinMax] with > Serializable { > def zero: (Int, Int) = (Int.MaxValue, Int.MinValue) > def reduce(b: (Int, Int), a: Row): (Int, Int) = (Math.min(b._1, > a.getAs[Int](0)), Math.max(b._2, a.getAs[Int](0))) > def finish(r: (Int, Int)): MinMax = MinMax(r._1, r._2) > def merge(b1: (Int, Int), b2: (Int, Int)): (Int, Int) = (Math.min(b1._1, > b2._1), Math.max(b1._2, b2._2)) > def bufferEncoder: Encoder[(Int, Int)] = ExpressionEncoder() > def outputEncoder: Encoder[MinMax] = ExpressionEncoder() > } > val ds = Seq(1, 2, 3, 4).toDF("col1") > val agg = ds.agg(MinMaxAgg().toColumn.alias("minmax")) > {code} > bq. {code} > ds: org.apache.spark.sql.DataFrame = [col1: int] > agg: org.apache.spark.sql.DataFrame = [minmax: struct] > {code} > {code}agg.printSchema(){code} > bq. {code} > root > |-- minmax: struct (nullable = true) > ||-- min: integer (nullable = false) > ||-- max: integer (nullable = false) > {code} > {code}agg.head(){code} > bq. {code} > res1: org.apache.spark.sql.Row = [[1,4]] > {code} > {code}agg.head().getAs[MinMax](0){code} > bq. {code} > java.lang.ClassCastException: > org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast > to line4c81e18af34342cda654c381ee91139525.$read$$iw$$iw$$iw$$iw$MinMax > [...] > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19136) Aggregator with case class as output type fails with ClassCastException
[ https://issues.apache.org/jira/browse/SPARK-19136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15816422#comment-15816422 ] Mathieu D commented on SPARK-19136: --- And... a RDD version based on treeAggregate is even quicker :-/ At least for that dummy min/max. > Aggregator with case class as output type fails with ClassCastException > --- > > Key: SPARK-19136 > URL: https://issues.apache.org/jira/browse/SPARK-19136 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2, 2.1.0 >Reporter: Mathieu D > > {{Aggregator}} with a case-class as output type returns a Row that cannot be > cast back to this type, it fails with {{ClassCastException}}. > Here is a dummy example to reproduce the problem > {code} > import org.apache.spark.sql._ > import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder > import org.apache.spark.sql.expressions.Aggregator > import spark.implicits._ > case class MinMax(min: Int, max: Int) > case class MinMaxAgg() extends Aggregator[Row, (Int, Int), MinMax] with > Serializable { > def zero: (Int, Int) = (Int.MaxValue, Int.MinValue) > def reduce(b: (Int, Int), a: Row): (Int, Int) = (Math.min(b._1, > a.getAs[Int](0)), Math.max(b._2, a.getAs[Int](0))) > def finish(r: (Int, Int)): MinMax = MinMax(r._1, r._2) > def merge(b1: (Int, Int), b2: (Int, Int)): (Int, Int) = (Math.min(b1._1, > b2._1), Math.max(b1._2, b2._2)) > def bufferEncoder: Encoder[(Int, Int)] = ExpressionEncoder() > def outputEncoder: Encoder[MinMax] = ExpressionEncoder() > } > val ds = Seq(1, 2, 3, 4).toDF("col1") > val agg = ds.agg(MinMaxAgg().toColumn.alias("minmax")) > {code} > bq. {code} > ds: org.apache.spark.sql.DataFrame = [col1: int] > agg: org.apache.spark.sql.DataFrame = [minmax: struct] > {code} > {code}agg.printSchema(){code} > bq. {code} > root > |-- minmax: struct (nullable = true) > ||-- min: integer (nullable = false) > ||-- max: integer (nullable = false) > {code} > {code}agg.head(){code} > bq. {code} > res1: org.apache.spark.sql.Row = [[1,4]] > {code} > {code}agg.head().getAs[MinMax](0){code} > bq. {code} > java.lang.ClassCastException: > org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast > to line4c81e18af34342cda654c381ee91139525.$read$$iw$$iw$$iw$$iw$MinMax > [...] > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19136) Aggregator with case class as output type fails with ClassCastException
[ https://issues.apache.org/jira/browse/SPARK-19136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15816366#comment-15816366 ] Mathieu D commented on SPARK-19136: --- Both queries are not equivalent, the dummy group generate a shuffle + repartition. 12s vs almost 2mn for 1M rows on my laptop. So the 2nd one is definitely not acceptable. So, I'll go with the first one, thanks for the hint ! > Aggregator with case class as output type fails with ClassCastException > --- > > Key: SPARK-19136 > URL: https://issues.apache.org/jira/browse/SPARK-19136 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2, 2.1.0 >Reporter: Mathieu D > > {{Aggregator}} with a case-class as output type returns a Row that cannot be > cast back to this type, it fails with {{ClassCastException}}. > Here is a dummy example to reproduce the problem > {code} > import org.apache.spark.sql._ > import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder > import org.apache.spark.sql.expressions.Aggregator > import spark.implicits._ > case class MinMax(min: Int, max: Int) > case class MinMaxAgg() extends Aggregator[Row, (Int, Int), MinMax] with > Serializable { > def zero: (Int, Int) = (Int.MaxValue, Int.MinValue) > def reduce(b: (Int, Int), a: Row): (Int, Int) = (Math.min(b._1, > a.getAs[Int](0)), Math.max(b._2, a.getAs[Int](0))) > def finish(r: (Int, Int)): MinMax = MinMax(r._1, r._2) > def merge(b1: (Int, Int), b2: (Int, Int)): (Int, Int) = (Math.min(b1._1, > b2._1), Math.max(b1._2, b2._2)) > def bufferEncoder: Encoder[(Int, Int)] = ExpressionEncoder() > def outputEncoder: Encoder[MinMax] = ExpressionEncoder() > } > val ds = Seq(1, 2, 3, 4).toDF("col1") > val agg = ds.agg(MinMaxAgg().toColumn.alias("minmax")) > {code} > bq. {code} > ds: org.apache.spark.sql.DataFrame = [col1: int] > agg: org.apache.spark.sql.DataFrame = [minmax: struct] > {code} > {code}agg.printSchema(){code} > bq. {code} > root > |-- minmax: struct (nullable = true) > ||-- min: integer (nullable = false) > ||-- max: integer (nullable = false) > {code} > {code}agg.head(){code} > bq. {code} > res1: org.apache.spark.sql.Row = [[1,4]] > {code} > {code}agg.head().getAs[MinMax](0){code} > bq. {code} > java.lang.ClassCastException: > org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast > to line4c81e18af34342cda654c381ee91139525.$read$$iw$$iw$$iw$$iw$MinMax > [...] > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19136) Aggregator with case class as output type fails with ClassCastException
[ https://issues.apache.org/jira/browse/SPARK-19136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15815460#comment-15815460 ] Andrew Ray commented on SPARK-19136: You did not to a _typed_ aggregation so your result is a dataframe. It can be manipulated back into a dataset like: {code} agg.select("minmax.*").as[MinMax] {code} Or you can do a typed aggregation to get back a typed result, although there is no method for a global typed aggregate currently so the creation of a dummy group is needed: {code} ds.groupByKey(_ => 1).agg(MinMaxAgg().toColumn) {code} > Aggregator with case class as output type fails with ClassCastException > --- > > Key: SPARK-19136 > URL: https://issues.apache.org/jira/browse/SPARK-19136 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2, 2.1.0 >Reporter: Mathieu D > > {{Aggregator}} with a case-class as output type returns a Row that cannot be > cast back to this type, it fails with {{ClassCastException}}. > Here is a dummy example to reproduce the problem > {code} > import org.apache.spark.sql._ > import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder > import org.apache.spark.sql.expressions.Aggregator > import spark.implicits._ > case class MinMax(min: Int, max: Int) > case class MinMaxAgg() extends Aggregator[Row, (Int, Int), MinMax] with > Serializable { > def zero: (Int, Int) = (Int.MaxValue, Int.MinValue) > def reduce(b: (Int, Int), a: Row): (Int, Int) = (Math.min(b._1, > a.getAs[Int](0)), Math.max(b._2, a.getAs[Int](0))) > def finish(r: (Int, Int)): MinMax = MinMax(r._1, r._2) > def merge(b1: (Int, Int), b2: (Int, Int)): (Int, Int) = (Math.min(b1._1, > b2._1), Math.max(b1._2, b2._2)) > def bufferEncoder: Encoder[(Int, Int)] = ExpressionEncoder() > def outputEncoder: Encoder[MinMax] = ExpressionEncoder() > } > val ds = Seq(1, 2, 3, 4).toDF("col1") > val agg = ds.agg(MinMaxAgg().toColumn.alias("minmax")) > {code} > bq. {code} > ds: org.apache.spark.sql.DataFrame = [col1: int] > agg: org.apache.spark.sql.DataFrame = [minmax: struct] > {code} > {code}agg.printSchema(){code} > bq. {code} > root > |-- minmax: struct (nullable = true) > ||-- min: integer (nullable = false) > ||-- max: integer (nullable = false) > {code} > {code}agg.head(){code} > bq. {code} > res1: org.apache.spark.sql.Row = [[1,4]] > {code} > {code}agg.head().getAs[MinMax](0){code} > bq. {code} > java.lang.ClassCastException: > org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast > to line4c81e18af34342cda654c381ee91139525.$read$$iw$$iw$$iw$$iw$MinMax > [...] > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org