spark git commit: [SPARK-18189][SQL] Fix serialization issue in KeyValueGroupedDataset
Repository: spark Updated Branches: refs/heads/branch-2.0 4d2672a40 -> 58655f51f [SPARK-18189][SQL] Fix serialization issue in KeyValueGroupedDataset ## What changes were proposed in this pull request? Likewise [DataSet.scala](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L156) KeyValueGroupedDataset should mark the queryExecution as transient. As mentioned in the Jira ticket, without transient we saw serialization issues like ``` Caused by: java.io.NotSerializableException: org.apache.spark.sql.execution.QueryExecution Serialization stack: - object not serializable (class: org.apache.spark.sql.execution.QueryExecution, value: == ``` ## How was this patch tested? Run the query which is specified in the Jira ticket before and after: ``` val a = spark.createDataFrame(sc.parallelize(Seq((1,2),(3,4.as[(Int,Int)] val grouped = a.groupByKey( {x:(Int,Int)=>x._1} ) val mappedGroups = grouped.mapGroups((k,x)=> {(k,1)} ) val yyy = sc.broadcast(1) val last = mappedGroups.rdd.map(xx=> { val simpley = yyy.value 1 } ) ``` Author: Ergin Seyfe Closes #15706 from seyfe/keyvaluegrouped_serialization. (cherry picked from commit 8a538c97b556f80f67c80519af0ce879557050d5) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/58655f51 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/58655f51 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/58655f51 Branch: refs/heads/branch-2.0 Commit: 58655f51f65d852ec65a65b54f26b3c8eac8cc60 Parents: 4d2672a Author: Ergin Seyfe Authored: Tue Nov 1 11:18:42 2016 -0700 Committer: Reynold Xin Committed: Tue Nov 1 11:18:50 2016 -0700 -- .../scala/org/apache/spark/repl/ReplSuite.scala| 17 + .../apache/spark/sql/KeyValueGroupedDataset.scala | 2 +- 2 files changed, 18 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/58655f51/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala -- diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala index f7d7a4f..8deafe3 100644 --- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -473,4 +473,21 @@ class ReplSuite extends SparkFunSuite { assertDoesNotContain("AssertionError", output) assertDoesNotContain("Exception", output) } + + test("SPARK-18189: Fix serialization issue in KeyValueGroupedDataset") { +val resultValue = 12345 +val output = runInterpreter("local", + s""" + |val keyValueGrouped = Seq((1, 2), (3, 4)).toDS().groupByKey(_._1) + |val mapGroups = keyValueGrouped.mapGroups((k, v) => (k, 1)) + |val broadcasted = sc.broadcast($resultValue) + | + |// Using broadcast triggers serialization issue in KeyValueGroupedDataset + |val dataset = mapGroups.map(_ => broadcasted.value) + |dataset.collect() + """.stripMargin) +assertDoesNotContain("error:", output) +assertDoesNotContain("Exception", output) +assertContains(s": Array[Int] = Array($resultValue, $resultValue)", output) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/58655f51/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala index 8eec42a..407d036 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.expressions.ReduceAggregator class KeyValueGroupedDataset[K, V] private[sql]( kEncoder: Encoder[K], vEncoder: Encoder[V], -val queryExecution: QueryExecution, +@transient val queryExecution: QueryExecution, private val dataAttributes: Seq[Attribute], private val groupingAttributes: Seq[Attribute]) extends Serializable { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-18189][SQL] Fix serialization issue in KeyValueGroupedDataset
Repository: spark Updated Branches: refs/heads/master 8cdf143f4 -> 8a538c97b [SPARK-18189][SQL] Fix serialization issue in KeyValueGroupedDataset ## What changes were proposed in this pull request? Likewise [DataSet.scala](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L156) KeyValueGroupedDataset should mark the queryExecution as transient. As mentioned in the Jira ticket, without transient we saw serialization issues like ``` Caused by: java.io.NotSerializableException: org.apache.spark.sql.execution.QueryExecution Serialization stack: - object not serializable (class: org.apache.spark.sql.execution.QueryExecution, value: == ``` ## How was this patch tested? Run the query which is specified in the Jira ticket before and after: ``` val a = spark.createDataFrame(sc.parallelize(Seq((1,2),(3,4.as[(Int,Int)] val grouped = a.groupByKey( {x:(Int,Int)=>x._1} ) val mappedGroups = grouped.mapGroups((k,x)=> {(k,1)} ) val yyy = sc.broadcast(1) val last = mappedGroups.rdd.map(xx=> { val simpley = yyy.value 1 } ) ``` Author: Ergin Seyfe Closes #15706 from seyfe/keyvaluegrouped_serialization. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8a538c97 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8a538c97 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8a538c97 Branch: refs/heads/master Commit: 8a538c97b556f80f67c80519af0ce879557050d5 Parents: 8cdf143 Author: Ergin Seyfe Authored: Tue Nov 1 11:18:42 2016 -0700 Committer: Reynold Xin Committed: Tue Nov 1 11:18:42 2016 -0700 -- .../scala/org/apache/spark/repl/ReplSuite.scala| 17 + .../apache/spark/sql/KeyValueGroupedDataset.scala | 2 +- 2 files changed, 18 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8a538c97/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala -- diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala index 9262e93..96d2dfc 100644 --- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -473,4 +473,21 @@ class ReplSuite extends SparkFunSuite { assertDoesNotContain("AssertionError", output) assertDoesNotContain("Exception", output) } + + test("SPARK-18189: Fix serialization issue in KeyValueGroupedDataset") { +val resultValue = 12345 +val output = runInterpreter("local", + s""" + |val keyValueGrouped = Seq((1, 2), (3, 4)).toDS().groupByKey(_._1) + |val mapGroups = keyValueGrouped.mapGroups((k, v) => (k, 1)) + |val broadcasted = sc.broadcast($resultValue) + | + |// Using broadcast triggers serialization issue in KeyValueGroupedDataset + |val dataset = mapGroups.map(_ => broadcasted.value) + |dataset.collect() + """.stripMargin) +assertDoesNotContain("error:", output) +assertDoesNotContain("Exception", output) +assertContains(s": Array[Int] = Array($resultValue, $resultValue)", output) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/8a538c97/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala index 4cb0313..31ce8eb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.expressions.ReduceAggregator class KeyValueGroupedDataset[K, V] private[sql]( kEncoder: Encoder[K], vEncoder: Encoder[V], -val queryExecution: QueryExecution, +@transient val queryExecution: QueryExecution, private val dataAttributes: Seq[Attribute], private val groupingAttributes: Seq[Attribute]) extends Serializable { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org