spark git commit: [SPARK-18189][SQL] Fix serialization issue in KeyValueGroupedDataset

2016-11-01 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 4d2672a40 -> 58655f51f


[SPARK-18189][SQL] Fix serialization issue in KeyValueGroupedDataset

## What changes were proposed in this pull request?
Likewise 
[DataSet.scala](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L156)
 KeyValueGroupedDataset should mark the queryExecution as transient.

As mentioned in the Jira ticket, without transient we saw serialization issues 
like

```
Caused by: java.io.NotSerializableException: 
org.apache.spark.sql.execution.QueryExecution
Serialization stack:
- object not serializable (class: 
org.apache.spark.sql.execution.QueryExecution, value: ==
```

## How was this patch tested?

Run the query which is specified in the Jira ticket before and after:
```
val a = spark.createDataFrame(sc.parallelize(Seq((1,2),(3,4.as[(Int,Int)]
val grouped = a.groupByKey(
{x:(Int,Int)=>x._1}
)
val mappedGroups = grouped.mapGroups((k,x)=>
{(k,1)}
)
val yyy = sc.broadcast(1)
val last = mappedGroups.rdd.map(xx=>
{ val simpley = yyy.value 1 }
)
```

Author: Ergin Seyfe 

Closes #15706 from seyfe/keyvaluegrouped_serialization.

(cherry picked from commit 8a538c97b556f80f67c80519af0ce879557050d5)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/58655f51
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/58655f51
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/58655f51

Branch: refs/heads/branch-2.0
Commit: 58655f51f65d852ec65a65b54f26b3c8eac8cc60
Parents: 4d2672a
Author: Ergin Seyfe 
Authored: Tue Nov 1 11:18:42 2016 -0700
Committer: Reynold Xin 
Committed: Tue Nov 1 11:18:50 2016 -0700

--
 .../scala/org/apache/spark/repl/ReplSuite.scala| 17 +
 .../apache/spark/sql/KeyValueGroupedDataset.scala  |  2 +-
 2 files changed, 18 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/58655f51/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
--
diff --git 
a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala 
b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index f7d7a4f..8deafe3 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -473,4 +473,21 @@ class ReplSuite extends SparkFunSuite {
 assertDoesNotContain("AssertionError", output)
 assertDoesNotContain("Exception", output)
   }
+
+  test("SPARK-18189: Fix serialization issue in KeyValueGroupedDataset") {
+val resultValue = 12345
+val output = runInterpreter("local",
+  s"""
+ |val keyValueGrouped = Seq((1, 2), (3, 4)).toDS().groupByKey(_._1)
+ |val mapGroups = keyValueGrouped.mapGroups((k, v) => (k, 1))
+ |val broadcasted = sc.broadcast($resultValue)
+ |
+ |// Using broadcast triggers serialization issue in 
KeyValueGroupedDataset
+ |val dataset = mapGroups.map(_ => broadcasted.value)
+ |dataset.collect()
+  """.stripMargin)
+assertDoesNotContain("error:", output)
+assertDoesNotContain("Exception", output)
+assertContains(s": Array[Int] = Array($resultValue, $resultValue)", output)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/58655f51/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
index 8eec42a..407d036 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.expressions.ReduceAggregator
 class KeyValueGroupedDataset[K, V] private[sql](
 kEncoder: Encoder[K],
 vEncoder: Encoder[V],
-val queryExecution: QueryExecution,
+@transient val queryExecution: QueryExecution,
 private val dataAttributes: Seq[Attribute],
 private val groupingAttributes: Seq[Attribute]) extends Serializable {
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18189][SQL] Fix serialization issue in KeyValueGroupedDataset

2016-11-01 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 8cdf143f4 -> 8a538c97b


[SPARK-18189][SQL] Fix serialization issue in KeyValueGroupedDataset

## What changes were proposed in this pull request?
Likewise 
[DataSet.scala](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L156)
 KeyValueGroupedDataset should mark the queryExecution as transient.

As mentioned in the Jira ticket, without transient we saw serialization issues 
like

```
Caused by: java.io.NotSerializableException: 
org.apache.spark.sql.execution.QueryExecution
Serialization stack:
- object not serializable (class: 
org.apache.spark.sql.execution.QueryExecution, value: ==
```

## How was this patch tested?

Run the query which is specified in the Jira ticket before and after:
```
val a = spark.createDataFrame(sc.parallelize(Seq((1,2),(3,4.as[(Int,Int)]
val grouped = a.groupByKey(
{x:(Int,Int)=>x._1}
)
val mappedGroups = grouped.mapGroups((k,x)=>
{(k,1)}
)
val yyy = sc.broadcast(1)
val last = mappedGroups.rdd.map(xx=>
{ val simpley = yyy.value 1 }
)
```

Author: Ergin Seyfe 

Closes #15706 from seyfe/keyvaluegrouped_serialization.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8a538c97
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8a538c97
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8a538c97

Branch: refs/heads/master
Commit: 8a538c97b556f80f67c80519af0ce879557050d5
Parents: 8cdf143
Author: Ergin Seyfe 
Authored: Tue Nov 1 11:18:42 2016 -0700
Committer: Reynold Xin 
Committed: Tue Nov 1 11:18:42 2016 -0700

--
 .../scala/org/apache/spark/repl/ReplSuite.scala| 17 +
 .../apache/spark/sql/KeyValueGroupedDataset.scala  |  2 +-
 2 files changed, 18 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8a538c97/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
--
diff --git 
a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala 
b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 9262e93..96d2dfc 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -473,4 +473,21 @@ class ReplSuite extends SparkFunSuite {
 assertDoesNotContain("AssertionError", output)
 assertDoesNotContain("Exception", output)
   }
+
+  test("SPARK-18189: Fix serialization issue in KeyValueGroupedDataset") {
+val resultValue = 12345
+val output = runInterpreter("local",
+  s"""
+ |val keyValueGrouped = Seq((1, 2), (3, 4)).toDS().groupByKey(_._1)
+ |val mapGroups = keyValueGrouped.mapGroups((k, v) => (k, 1))
+ |val broadcasted = sc.broadcast($resultValue)
+ |
+ |// Using broadcast triggers serialization issue in 
KeyValueGroupedDataset
+ |val dataset = mapGroups.map(_ => broadcasted.value)
+ |dataset.collect()
+  """.stripMargin)
+assertDoesNotContain("error:", output)
+assertDoesNotContain("Exception", output)
+assertContains(s": Array[Int] = Array($resultValue, $resultValue)", output)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8a538c97/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
index 4cb0313..31ce8eb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.expressions.ReduceAggregator
 class KeyValueGroupedDataset[K, V] private[sql](
 kEncoder: Encoder[K],
 vEncoder: Encoder[V],
-val queryExecution: QueryExecution,
+@transient val queryExecution: QueryExecution,
 private val dataAttributes: Seq[Attribute],
 private val groupingAttributes: Seq[Attribute]) extends Serializable {
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org