[jira] [Commented] (SPARK-25279) Throw exception: zzcclp java.io.NotSerializableException: org.apache.spark.sql.TypedColumn in Spark-shell when run example of doc

2018-09-05 Thread Zhichao Zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16604604#comment-16604604
 ] 

Zhichao  Zhang commented on SPARK-25279:


[~viirya], Thanks. I closed this issue.

> Throw exception: zzcclp   java.io.NotSerializableException: 
> org.apache.spark.sql.TypedColumn in Spark-shell when run example of doc
> ---
>
> Key: SPARK-25279
> URL: https://issues.apache.org/jira/browse/SPARK-25279
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Shell, SQL
>Affects Versions: 2.2.1
>Reporter: Zhichao  Zhang
>Priority: Minor
>
> Hi dev: 
>   I am using Spark-Shell to run the example which is in section 
> '[http://spark.apache.org/docs/2.2.2/sql-programming-guide.html#type-safe-user-defined-aggregate-functions'],
>  
> and there is an error: 
> {code:java}
> Caused by: java.io.NotSerializableException: 
> org.apache.spark.sql.TypedColumn 
> Serialization stack: 
>         - object not serializable (class: org.apache.spark.sql.TypedColumn, 
> value: 
> myaverage() AS `average_salary`) 
>         - field (class: $iw, name: averageSalary, type: class 
> org.apache.spark.sql.TypedColumn) 
>         - object (class $iw, $iw@4b2f8ae9) 
>         - field (class: MyAverage$, name: $outer, type: class $iw) 
>         - object (class MyAverage$, MyAverage$@2be41d90) 
>         - field (class: 
> org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression, 
> name: aggregator, type: class org.apache.spark.sql.expressions.Aggregator) 
>         - object (class 
> org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression, 
> MyAverage(Employee)) 
>         - field (class: 
> org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression, 
> name: aggregateFunction, type: class 
> org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction) 
>         - object (class 
> org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression, 
> partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class Employee)), 
> Some(class Employee), Some(StructType(StructField(name,StringType,true), 
> StructField(salary,LongType,false))), assertnotnull(assertnotnull(input[0, 
> Average, true])).sum AS sum#25L, assertnotnull(assertnotnull(input[0, 
> Average, true])).count AS count#26L, newInstance(class Average), input[0, 
> double, false] AS value#24, DoubleType, false, 0, 0)) 
>         - writeObject data (class: 
> scala.collection.immutable.List$SerializationProxy) 
>         - object (class scala.collection.immutable.List$SerializationProxy, 
> scala.collection.immutable.List$SerializationProxy@5e92c46f) 
>         - writeReplace data (class: 
> scala.collection.immutable.List$SerializationProxy) 
>         - object (class scala.collection.immutable.$colon$colon, 
> List(partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class 
> Employee)), Some(class Employee), 
> Some(StructType(StructField(name,StringType,true), 
> StructField(salary,LongType,false))), assertnotnull(assertnotnull(input[0, 
> Average, true])).sum AS sum#25L, assertnotnull(assertnotnull(input[0, 
> Average, true])).count AS count#26L, newInstance(class Average), input[0, 
> double, false] AS value#24, DoubleType, false, 0, 0))) 
>         - field (class: 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec, name: 
> aggregateExpressions, type: interface scala.collection.Seq) 
>         - object (class 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec, 
> ObjectHashAggregate(keys=[], 
> functions=[partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class 
> Employee)), Some(class Employee), 
> Some(StructType(StructField(name,StringType,true), 
> StructField(salary,LongType,false))), assertnotnull(assertnotnull(input[0, 
> Average, true])).sum AS sum#25L, assertnotnull(assertnotnull(input[0, 
> Average, true])).count AS count#26L, newInstance(class Average), input[0, 
> double, false] AS value#24, DoubleType, false, 0, 0)], output=[buf#37]) 
> +- *FileScan json [name#8,salary#9L] Batched: false, Format: JSON, Location: 
> InMemoryFileIndex[file:/opt/spark2/examples/src/main/resources/employees.json],
>  
> PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct 
> ) 
>         - field (class: 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1,
>  
> name: $outer, type: class 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec) 
>         - object (class 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1,
>  
> ) 
>         - field (class: 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2,
>  
> name: 

[jira] [Commented] (SPARK-25279) Throw exception: zzcclp java.io.NotSerializableException: org.apache.spark.sql.TypedColumn in Spark-shell when run example of doc

2018-09-05 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16604254#comment-16604254
 ] 

Liang-Chi Hsieh commented on SPARK-25279:
-

The paste mode in REPL wraps pasted code as a single object and so the 
`TypedColumn` object is wrapped together. `TypedColumn` is not serializable.


 

 

 

> Throw exception: zzcclp   java.io.NotSerializableException: 
> org.apache.spark.sql.TypedColumn in Spark-shell when run example of doc
> ---
>
> Key: SPARK-25279
> URL: https://issues.apache.org/jira/browse/SPARK-25279
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Shell, SQL
>Affects Versions: 2.2.1
>Reporter: Zhichao  Zhang
>Priority: Minor
>
> Hi dev: 
>   I am using Spark-Shell to run the example which is in section 
> '[http://spark.apache.org/docs/2.2.2/sql-programming-guide.html#type-safe-user-defined-aggregate-functions'],
>  
> and there is an error: 
> {code:java}
> Caused by: java.io.NotSerializableException: 
> org.apache.spark.sql.TypedColumn 
> Serialization stack: 
>         - object not serializable (class: org.apache.spark.sql.TypedColumn, 
> value: 
> myaverage() AS `average_salary`) 
>         - field (class: $iw, name: averageSalary, type: class 
> org.apache.spark.sql.TypedColumn) 
>         - object (class $iw, $iw@4b2f8ae9) 
>         - field (class: MyAverage$, name: $outer, type: class $iw) 
>         - object (class MyAverage$, MyAverage$@2be41d90) 
>         - field (class: 
> org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression, 
> name: aggregator, type: class org.apache.spark.sql.expressions.Aggregator) 
>         - object (class 
> org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression, 
> MyAverage(Employee)) 
>         - field (class: 
> org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression, 
> name: aggregateFunction, type: class 
> org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction) 
>         - object (class 
> org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression, 
> partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class Employee)), 
> Some(class Employee), Some(StructType(StructField(name,StringType,true), 
> StructField(salary,LongType,false))), assertnotnull(assertnotnull(input[0, 
> Average, true])).sum AS sum#25L, assertnotnull(assertnotnull(input[0, 
> Average, true])).count AS count#26L, newInstance(class Average), input[0, 
> double, false] AS value#24, DoubleType, false, 0, 0)) 
>         - writeObject data (class: 
> scala.collection.immutable.List$SerializationProxy) 
>         - object (class scala.collection.immutable.List$SerializationProxy, 
> scala.collection.immutable.List$SerializationProxy@5e92c46f) 
>         - writeReplace data (class: 
> scala.collection.immutable.List$SerializationProxy) 
>         - object (class scala.collection.immutable.$colon$colon, 
> List(partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class 
> Employee)), Some(class Employee), 
> Some(StructType(StructField(name,StringType,true), 
> StructField(salary,LongType,false))), assertnotnull(assertnotnull(input[0, 
> Average, true])).sum AS sum#25L, assertnotnull(assertnotnull(input[0, 
> Average, true])).count AS count#26L, newInstance(class Average), input[0, 
> double, false] AS value#24, DoubleType, false, 0, 0))) 
>         - field (class: 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec, name: 
> aggregateExpressions, type: interface scala.collection.Seq) 
>         - object (class 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec, 
> ObjectHashAggregate(keys=[], 
> functions=[partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class 
> Employee)), Some(class Employee), 
> Some(StructType(StructField(name,StringType,true), 
> StructField(salary,LongType,false))), assertnotnull(assertnotnull(input[0, 
> Average, true])).sum AS sum#25L, assertnotnull(assertnotnull(input[0, 
> Average, true])).count AS count#26L, newInstance(class Average), input[0, 
> double, false] AS value#24, DoubleType, false, 0, 0)], output=[buf#37]) 
> +- *FileScan json [name#8,salary#9L] Batched: false, Format: JSON, Location: 
> InMemoryFileIndex[file:/opt/spark2/examples/src/main/resources/employees.json],
>  
> PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct 
> ) 
>         - field (class: 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1,
>  
> name: $outer, type: class 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec) 
>         - object (class 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1,
>  
> ) 
>         - field 

[jira] [Commented] (SPARK-25279) Throw exception: zzcclp java.io.NotSerializableException: org.apache.spark.sql.TypedColumn in Spark-shell when run example of doc

2018-09-03 Thread Dilip Biswal (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16602440#comment-16602440
 ] 

Dilip Biswal commented on SPARK-25279:
--

[~zzcclp] Hmmn.. i don't know whats happening in the :paste mode. Let me cc the 
experts.
cc [~cloud_fan] [~viirya] 
Hi Wenchen and Simon, Do you know the reason for the failure ?

> Throw exception: zzcclp   java.io.NotSerializableException: 
> org.apache.spark.sql.TypedColumn in Spark-shell when run example of doc
> ---
>
> Key: SPARK-25279
> URL: https://issues.apache.org/jira/browse/SPARK-25279
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Shell, SQL
>Affects Versions: 2.2.1
>Reporter: Zhichao  Zhang
>Priority: Minor
>
> Hi dev: 
>   I am using Spark-Shell to run the example which is in section 
> '[http://spark.apache.org/docs/2.2.2/sql-programming-guide.html#type-safe-user-defined-aggregate-functions'],
>  
> and there is an error: 
> {code:java}
> Caused by: java.io.NotSerializableException: 
> org.apache.spark.sql.TypedColumn 
> Serialization stack: 
>         - object not serializable (class: org.apache.spark.sql.TypedColumn, 
> value: 
> myaverage() AS `average_salary`) 
>         - field (class: $iw, name: averageSalary, type: class 
> org.apache.spark.sql.TypedColumn) 
>         - object (class $iw, $iw@4b2f8ae9) 
>         - field (class: MyAverage$, name: $outer, type: class $iw) 
>         - object (class MyAverage$, MyAverage$@2be41d90) 
>         - field (class: 
> org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression, 
> name: aggregator, type: class org.apache.spark.sql.expressions.Aggregator) 
>         - object (class 
> org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression, 
> MyAverage(Employee)) 
>         - field (class: 
> org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression, 
> name: aggregateFunction, type: class 
> org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction) 
>         - object (class 
> org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression, 
> partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class Employee)), 
> Some(class Employee), Some(StructType(StructField(name,StringType,true), 
> StructField(salary,LongType,false))), assertnotnull(assertnotnull(input[0, 
> Average, true])).sum AS sum#25L, assertnotnull(assertnotnull(input[0, 
> Average, true])).count AS count#26L, newInstance(class Average), input[0, 
> double, false] AS value#24, DoubleType, false, 0, 0)) 
>         - writeObject data (class: 
> scala.collection.immutable.List$SerializationProxy) 
>         - object (class scala.collection.immutable.List$SerializationProxy, 
> scala.collection.immutable.List$SerializationProxy@5e92c46f) 
>         - writeReplace data (class: 
> scala.collection.immutable.List$SerializationProxy) 
>         - object (class scala.collection.immutable.$colon$colon, 
> List(partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class 
> Employee)), Some(class Employee), 
> Some(StructType(StructField(name,StringType,true), 
> StructField(salary,LongType,false))), assertnotnull(assertnotnull(input[0, 
> Average, true])).sum AS sum#25L, assertnotnull(assertnotnull(input[0, 
> Average, true])).count AS count#26L, newInstance(class Average), input[0, 
> double, false] AS value#24, DoubleType, false, 0, 0))) 
>         - field (class: 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec, name: 
> aggregateExpressions, type: interface scala.collection.Seq) 
>         - object (class 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec, 
> ObjectHashAggregate(keys=[], 
> functions=[partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class 
> Employee)), Some(class Employee), 
> Some(StructType(StructField(name,StringType,true), 
> StructField(salary,LongType,false))), assertnotnull(assertnotnull(input[0, 
> Average, true])).sum AS sum#25L, assertnotnull(assertnotnull(input[0, 
> Average, true])).count AS count#26L, newInstance(class Average), input[0, 
> double, false] AS value#24, DoubleType, false, 0, 0)], output=[buf#37]) 
> +- *FileScan json [name#8,salary#9L] Batched: false, Format: JSON, Location: 
> InMemoryFileIndex[file:/opt/spark2/examples/src/main/resources/employees.json],
>  
> PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct 
> ) 
>         - field (class: 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1,
>  
> name: $outer, type: class 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec) 
>         - object (class 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1,
>  
> ) 
>  

[jira] [Commented] (SPARK-25279) Throw exception: zzcclp java.io.NotSerializableException: org.apache.spark.sql.TypedColumn in Spark-shell when run example of doc

2018-09-03 Thread Zhichao Zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16602308#comment-16602308
 ] 

Zhichao  Zhang commented on SPARK-25279:


[~dkbiswal], I followed your steps to run code successfully, but if I pasted 
all code and then ran the code, the error occured:
scala>:paste
...copy all code here
Crtl + D
what is the difference between these two mode?

> Throw exception: zzcclp   java.io.NotSerializableException: 
> org.apache.spark.sql.TypedColumn in Spark-shell when run example of doc
> ---
>
> Key: SPARK-25279
> URL: https://issues.apache.org/jira/browse/SPARK-25279
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Shell, SQL
>Affects Versions: 2.2.1
>Reporter: Zhichao  Zhang
>Priority: Minor
>
> Hi dev: 
>   I am using Spark-Shell to run the example which is in section 
> '[http://spark.apache.org/docs/2.2.2/sql-programming-guide.html#type-safe-user-defined-aggregate-functions'],
>  
> and there is an error: 
> {code:java}
> Caused by: java.io.NotSerializableException: 
> org.apache.spark.sql.TypedColumn 
> Serialization stack: 
>         - object not serializable (class: org.apache.spark.sql.TypedColumn, 
> value: 
> myaverage() AS `average_salary`) 
>         - field (class: $iw, name: averageSalary, type: class 
> org.apache.spark.sql.TypedColumn) 
>         - object (class $iw, $iw@4b2f8ae9) 
>         - field (class: MyAverage$, name: $outer, type: class $iw) 
>         - object (class MyAverage$, MyAverage$@2be41d90) 
>         - field (class: 
> org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression, 
> name: aggregator, type: class org.apache.spark.sql.expressions.Aggregator) 
>         - object (class 
> org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression, 
> MyAverage(Employee)) 
>         - field (class: 
> org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression, 
> name: aggregateFunction, type: class 
> org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction) 
>         - object (class 
> org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression, 
> partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class Employee)), 
> Some(class Employee), Some(StructType(StructField(name,StringType,true), 
> StructField(salary,LongType,false))), assertnotnull(assertnotnull(input[0, 
> Average, true])).sum AS sum#25L, assertnotnull(assertnotnull(input[0, 
> Average, true])).count AS count#26L, newInstance(class Average), input[0, 
> double, false] AS value#24, DoubleType, false, 0, 0)) 
>         - writeObject data (class: 
> scala.collection.immutable.List$SerializationProxy) 
>         - object (class scala.collection.immutable.List$SerializationProxy, 
> scala.collection.immutable.List$SerializationProxy@5e92c46f) 
>         - writeReplace data (class: 
> scala.collection.immutable.List$SerializationProxy) 
>         - object (class scala.collection.immutable.$colon$colon, 
> List(partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class 
> Employee)), Some(class Employee), 
> Some(StructType(StructField(name,StringType,true), 
> StructField(salary,LongType,false))), assertnotnull(assertnotnull(input[0, 
> Average, true])).sum AS sum#25L, assertnotnull(assertnotnull(input[0, 
> Average, true])).count AS count#26L, newInstance(class Average), input[0, 
> double, false] AS value#24, DoubleType, false, 0, 0))) 
>         - field (class: 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec, name: 
> aggregateExpressions, type: interface scala.collection.Seq) 
>         - object (class 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec, 
> ObjectHashAggregate(keys=[], 
> functions=[partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class 
> Employee)), Some(class Employee), 
> Some(StructType(StructField(name,StringType,true), 
> StructField(salary,LongType,false))), assertnotnull(assertnotnull(input[0, 
> Average, true])).sum AS sum#25L, assertnotnull(assertnotnull(input[0, 
> Average, true])).count AS count#26L, newInstance(class Average), input[0, 
> double, false] AS value#24, DoubleType, false, 0, 0)], output=[buf#37]) 
> +- *FileScan json [name#8,salary#9L] Batched: false, Format: JSON, Location: 
> InMemoryFileIndex[file:/opt/spark2/examples/src/main/resources/employees.json],
>  
> PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct 
> ) 
>         - field (class: 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1,
>  
> name: $outer, type: class 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec) 
>         - object (class 
> 

[jira] [Commented] (SPARK-25279) Throw exception: zzcclp java.io.NotSerializableException: org.apache.spark.sql.TypedColumn in Spark-shell when run example of doc

2018-09-01 Thread Zhichao Zhang (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16599574#comment-16599574
 ] 

Zhichao  Zhang commented on SPARK-25279:


[~dkbiswal], thank you. you mean that use the lates code on branch 2.2 to test 
and it work fine?

> Throw exception: zzcclp   java.io.NotSerializableException: 
> org.apache.spark.sql.TypedColumn in Spark-shell when run example of doc
> ---
>
> Key: SPARK-25279
> URL: https://issues.apache.org/jira/browse/SPARK-25279
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Shell, SQL
>Affects Versions: 2.2.1
>Reporter: Zhichao  Zhang
>Priority: Minor
>
> Hi dev: 
>   I am using Spark-Shell to run the example which is in section 
> '[http://spark.apache.org/docs/2.2.2/sql-programming-guide.html#type-safe-user-defined-aggregate-functions'],
>  
> and there is an error: 
> {code:java}
> Caused by: java.io.NotSerializableException: 
> org.apache.spark.sql.TypedColumn 
> Serialization stack: 
>         - object not serializable (class: org.apache.spark.sql.TypedColumn, 
> value: 
> myaverage() AS `average_salary`) 
>         - field (class: $iw, name: averageSalary, type: class 
> org.apache.spark.sql.TypedColumn) 
>         - object (class $iw, $iw@4b2f8ae9) 
>         - field (class: MyAverage$, name: $outer, type: class $iw) 
>         - object (class MyAverage$, MyAverage$@2be41d90) 
>         - field (class: 
> org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression, 
> name: aggregator, type: class org.apache.spark.sql.expressions.Aggregator) 
>         - object (class 
> org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression, 
> MyAverage(Employee)) 
>         - field (class: 
> org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression, 
> name: aggregateFunction, type: class 
> org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction) 
>         - object (class 
> org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression, 
> partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class Employee)), 
> Some(class Employee), Some(StructType(StructField(name,StringType,true), 
> StructField(salary,LongType,false))), assertnotnull(assertnotnull(input[0, 
> Average, true])).sum AS sum#25L, assertnotnull(assertnotnull(input[0, 
> Average, true])).count AS count#26L, newInstance(class Average), input[0, 
> double, false] AS value#24, DoubleType, false, 0, 0)) 
>         - writeObject data (class: 
> scala.collection.immutable.List$SerializationProxy) 
>         - object (class scala.collection.immutable.List$SerializationProxy, 
> scala.collection.immutable.List$SerializationProxy@5e92c46f) 
>         - writeReplace data (class: 
> scala.collection.immutable.List$SerializationProxy) 
>         - object (class scala.collection.immutable.$colon$colon, 
> List(partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class 
> Employee)), Some(class Employee), 
> Some(StructType(StructField(name,StringType,true), 
> StructField(salary,LongType,false))), assertnotnull(assertnotnull(input[0, 
> Average, true])).sum AS sum#25L, assertnotnull(assertnotnull(input[0, 
> Average, true])).count AS count#26L, newInstance(class Average), input[0, 
> double, false] AS value#24, DoubleType, false, 0, 0))) 
>         - field (class: 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec, name: 
> aggregateExpressions, type: interface scala.collection.Seq) 
>         - object (class 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec, 
> ObjectHashAggregate(keys=[], 
> functions=[partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class 
> Employee)), Some(class Employee), 
> Some(StructType(StructField(name,StringType,true), 
> StructField(salary,LongType,false))), assertnotnull(assertnotnull(input[0, 
> Average, true])).sum AS sum#25L, assertnotnull(assertnotnull(input[0, 
> Average, true])).count AS count#26L, newInstance(class Average), input[0, 
> double, false] AS value#24, DoubleType, false, 0, 0)], output=[buf#37]) 
> +- *FileScan json [name#8,salary#9L] Batched: false, Format: JSON, Location: 
> InMemoryFileIndex[file:/opt/spark2/examples/src/main/resources/employees.json],
>  
> PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct 
> ) 
>         - field (class: 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1,
>  
> name: $outer, type: class 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec) 
>         - object (class 
> org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1,
>  
> ) 
>         - field (class: 
> 

[jira] [Commented] (SPARK-25279) Throw exception: zzcclp java.io.NotSerializableException: org.apache.spark.sql.TypedColumn in Spark-shell when run example of doc

2018-08-31 Thread Dilip Biswal (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16599048#comment-16599048
 ] 

Dilip Biswal commented on SPARK-25279:
--

Hello,

Tried against the latest trunk. Seems to work fine.
{code:java}

 scala> import org.apache.spark.sql.expressions.Aggregator
 import org.apache.spark.sql.expressions.Aggregator

scala> import org.apache.spark.sql.Encoder
 import org.apache.spark.sql.Encoder

scala> import org.apache.spark.sql.Encoders
 import org.apache.spark.sql.Encoders

scala> import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.SparkSession

scala>

scala> case class Employee(name: String, salary: Long)
 defined class Employee

scala> case class Average(var sum: Long, var count: Long)
 defined class Average

scala>

scala> object MyAverage extends Aggregator[Employee, Average, Double] {
|// A zero value for this aggregation. Should satisfy the property that any b + 
zero = b|
|def zero: Average = Average(0L, 0L)|
|// Combine two values to produce a new value. For performance, the function 
may modify `buffer`|
|// and return it instead of constructing a new object|
|def reduce(buffer: Average, employee: Employee): Average = \{ \| buffer.sum += 
employee.salary \| buffer.count += 1 \| buffer \| }|
|// Merge two intermediate values|
|def merge(b1: Average, b2: Average): Average = \{ \| b1.sum += b2.sum \| 
b1.count += b2.count \| b1 \| }|
|// Transform the output of the reduction|
|def finish(reduction: Average): Double = reduction.sum.toDouble / 
reduction.count|
|// Specifies the Encoder for the intermediate value type|
|def bufferEncoder: Encoder[Average] = Encoders.product|
|// Specifies the Encoder for the final output value type|
|def outputEncoder: Encoder[Double] = Encoders.scalaDouble|
|}
 defined object MyAverage|

scala>

scala> val ds = 
spark.read.json("examples/src/main/resources/employees.json").as[Employee]
 ds: org.apache.spark.sql.Dataset[Employee] = [name: string, salary: bigint]

scala> ds.show()
 ++-+
|name|salary|

++-+
|Michael|3000|
|Andy|4500|
|Justin|3500|
|Berta|4000|

++-+

scala> // ++-+

scala> // | name|salary|

scala> // ++-+

scala> // |Michael| 3000|

scala> // | Andy| 4500|

scala> // | Justin| 3500|

scala> // | Berta| 4000|

scala> // ++-+

scala>

scala> // Convert the function to a `TypedColumn` and give it a name

scala> val averageSalary = MyAverage.toColumn.name("average_salary")
 averageSalary: org.apache.spark.sql.TypedColumn[Employee,Double] = myaverage() 
AS `average_salary`

scala> val result = ds.select(averageSalary)
 result: org.apache.spark.sql.Dataset[Double] = [average_salary: double]

scala> result.show()
 +--+
|average_salary|

+--+
|3750.0|

+--+
 {code}

> Throw exception: zzcclp   java.io.NotSerializableException: 
> org.apache.spark.sql.TypedColumn in Spark-shell when run example of doc
> ---
>
> Key: SPARK-25279
> URL: https://issues.apache.org/jira/browse/SPARK-25279
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Shell, SQL
>Affects Versions: 2.2.1
>Reporter: Zhichao  Zhang
>Priority: Minor
>
> Hi dev: 
>   I am using Spark-Shell to run the example which is in section 
> '[http://spark.apache.org/docs/2.2.2/sql-programming-guide.html#type-safe-user-defined-aggregate-functions'],
>  
> and there is an error: 
> {code:java}
> Caused by: java.io.NotSerializableException: 
> org.apache.spark.sql.TypedColumn 
> Serialization stack: 
>         - object not serializable (class: org.apache.spark.sql.TypedColumn, 
> value: 
> myaverage() AS `average_salary`) 
>         - field (class: $iw, name: averageSalary, type: class 
> org.apache.spark.sql.TypedColumn) 
>         - object (class $iw, $iw@4b2f8ae9) 
>         - field (class: MyAverage$, name: $outer, type: class $iw) 
>         - object (class MyAverage$, MyAverage$@2be41d90) 
>         - field (class: 
> org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression, 
> name: aggregator, type: class org.apache.spark.sql.expressions.Aggregator) 
>         - object (class 
> org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression, 
> MyAverage(Employee)) 
>         - field (class: 
> org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression, 
> name: aggregateFunction, type: class 
> org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction) 
>         - object (class 
> org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression, 
> partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class Employee)), 
> Some(class Employee), Some(StructType(StructField(name,StringType,true),