[jira] [Commented] (SPARK-15769) Add Encoder for input type to Aggregator

2016-10-18 Thread Koert Kuipers (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15586654#comment-15586654
 ] 

Koert Kuipers commented on SPARK-15769:
---

what i really want is to be able to use Aggregator in DataFrame, without
having to resort to taking apart Row. this way we can develop all general
aggregation algorithms in Aggregator and use it across the board in
spark-sql.

to be specific: in RelationalGroupedDataset i want to be able to apply an
Aggregator to input specified by the column names, and the input gets
converted to the input type of the Aggregator, just like we do in UDFs and
such.

an example (taken from my pullreq) where ComplexResultAgg is an
Aggregator[(String, Long), (Long, Long), (Long, Long)]:
val df3 = Seq(("a", "x", 1), ("a", "y", 3), ("b", "x", 3)).toDF("i", "j", "k
")
df3.groupBy("i").agg(ComplexResultAgg("i", "k"))

this applies the Aggregator to columns "i" and "k"

i found creating a inputDeserializer from the encoder to be the easiest way
to make this all work, plus my pullreq removes a lof of adhoc stuff (all
the withInputType stuff) indicating to me its cleaner. i also like how this
catches mistakes earlier (because you need an implicit encoder) versus
storing TypeTags etc. and creating deserializer/converter expressions at
runtime. but yeah maybe i am misunderstanding encoders and input data type
is all we need.





> Add Encoder for input type to Aggregator
> 
>
> Key: SPARK-15769
> URL: https://issues.apache.org/jira/browse/SPARK-15769
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: koert kuipers
>Priority: Minor
>
> Currently org.apache.spark.sql.expressions.Aggregator has Encoders for its 
> buffer and output type, but not for its input type. The thought is that the 
> input type is known from the Dataset it operates on and hence can be inserted 
> later.
> However i think there are compelling reasons to have Aggregator carry an 
> Encoder for its input type:
> * Generally transformations on Dataset only require the Encoder for the 
> result type since the input type is exactly known and it's Encoder is already 
> available within the Dataset. However this is not the case for an Aggregator: 
> an Aggregator is defined independently of a Dataset, and i think it should be 
> generally desirable that an Aggregator work on any type that can safely be 
> cast to the Aggregator's input type (for example an Aggregator that has Long 
> as input should work on a Dataset of Ints).
> * Aggregators should also work on DataFrames, because its a much nicer API to 
> use than UserDefinedAggregateFunction. And when operating on DataFrames you 
> should not have to use Row objects, which means your input type is not equal 
> to the type of the Dataset you operate on (so the Encoder of the Dataset that 
> is operated on should not be used as input Encoder for the Aggregator).
> * Adding an input Encoder is not a big burden, since it can typically be 
> created implicitly
> * It removes TypedColumn.withInputType and its usage in Dataset, 
> KeyValueGroupedDataset and RelationalGroupedDataset, which always felt 
> somewhat ad-hoc to me
> * Once an Aggregator has an Encoder for it's input type it is a small change 
> to make the Aggregator also work on a subset of columns in a DataFrame, which 
> facilitates Aggregator re-use since you don't have to write a custom 
> Aggregator to extract the columns from a specific DataFrame. This also 
> enables a usage that is more typical within a DataFrame context, very similar 
> to how a UserDefinedAggregateFunction is used.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15769) Add Encoder for input type to Aggregator

2016-10-18 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15586577#comment-15586577
 ] 

Reynold Xin commented on SPARK-15769:
-

[~koert] what you want is just type conversion isn't it? In that case don't we 
just need an input data type specified, rather than an encoder? 

Encoder specifies the mapping from the user-visible type to the underlying 
physical types -- it doesn't really make sense to have an "input encoder" to 
me. Maybe I'm missing something?



> Add Encoder for input type to Aggregator
> 
>
> Key: SPARK-15769
> URL: https://issues.apache.org/jira/browse/SPARK-15769
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: koert kuipers
>Priority: Minor
>
> Currently org.apache.spark.sql.expressions.Aggregator has Encoders for its 
> buffer and output type, but not for its input type. The thought is that the 
> input type is known from the Dataset it operates on and hence can be inserted 
> later.
> However i think there are compelling reasons to have Aggregator carry an 
> Encoder for its input type:
> * Generally transformations on Dataset only require the Encoder for the 
> result type since the input type is exactly known and it's Encoder is already 
> available within the Dataset. However this is not the case for an Aggregator: 
> an Aggregator is defined independently of a Dataset, and i think it should be 
> generally desirable that an Aggregator work on any type that can safely be 
> cast to the Aggregator's input type (for example an Aggregator that has Long 
> as input should work on a Dataset of Ints).
> * Aggregators should also work on DataFrames, because its a much nicer API to 
> use than UserDefinedAggregateFunction. And when operating on DataFrames you 
> should not have to use Row objects, which means your input type is not equal 
> to the type of the Dataset you operate on (so the Encoder of the Dataset that 
> is operated on should not be used as input Encoder for the Aggregator).
> * Adding an input Encoder is not a big burden, since it can typically be 
> created implicitly
> * It removes TypedColumn.withInputType and its usage in Dataset, 
> KeyValueGroupedDataset and RelationalGroupedDataset, which always felt 
> somewhat ad-hoc to me
> * Once an Aggregator has an Encoder for it's input type it is a small change 
> to make the Aggregator also work on a subset of columns in a DataFrame, which 
> facilitates Aggregator re-use since you don't have to write a custom 
> Aggregator to extract the columns from a specific DataFrame. This also 
> enables a usage that is more typical within a DataFrame context, very similar 
> to how a UserDefinedAggregateFunction is used.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15769) Add Encoder for input type to Aggregator

2016-10-18 Thread koert kuipers (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15585931#comment-15585931
 ] 

koert kuipers commented on SPARK-15769:
---

worth pointing out:
* adding an encoder to the input type for an Aggregator does not make it unsafe 
to use in Dataset, since Dataset is strongly typed and does not allow one to 
use an Aggregator with a different (input) type.
* adding an encoder to the input type for an Aggregator allows it to be used in 
DataFrame with the typical type conversions like any other UDF or UDAF-like 
construct. this is no more unsafe than what we already do, and expected for a 
DataFrame.

> Add Encoder for input type to Aggregator
> 
>
> Key: SPARK-15769
> URL: https://issues.apache.org/jira/browse/SPARK-15769
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Reporter: koert kuipers
>Priority: Minor
>
> Currently org.apache.spark.sql.expressions.Aggregator has Encoders for its 
> buffer and output type, but not for its input type. The thought is that the 
> input type is known from the Dataset it operates on and hence can be inserted 
> later.
> However i think there are compelling reasons to have Aggregator carry an 
> Encoder for its input type:
> * Generally transformations on Dataset only require the Encoder for the 
> result type since the input type is exactly known and it's Encoder is already 
> available within the Dataset. However this is not the case for an Aggregator: 
> an Aggregator is defined independently of a Dataset, and i think it should be 
> generally desirable that an Aggregator work on any type that can safely be 
> cast to the Aggregator's input type (for example an Aggregator that has Long 
> as input should work on a Dataset of Ints).
> * Aggregators should also work on DataFrames, because its a much nicer API to 
> use than UserDefinedAggregateFunction. And when operating on DataFrames you 
> should not have to use Row objects, which means your input type is not equal 
> to the type of the Dataset you operate on (so the Encoder of the Dataset that 
> is operated on should not be used as input Encoder for the Aggregator).
> * Adding an input Encoder is not a big burden, since it can typically be 
> created implicitly
> * It removes TypedColumn.withInputType and its usage in Dataset, 
> KeyValueGroupedDataset and RelationalGroupedDataset, which always felt 
> somewhat ad-hoc to me
> * Once an Aggregator has an Encoder for it's input type it is a small change 
> to make the Aggregator also work on a subset of columns in a DataFrame, which 
> facilitates Aggregator re-use since you don't have to write a custom 
> Aggregator to extract the columns from a specific DataFrame. This also 
> enables a usage that is more typical within a DataFrame context, very similar 
> to how a UserDefinedAggregateFunction is used.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-15769) Add Encoder for input type to Aggregator

2016-06-04 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-15769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15315707#comment-15315707
 ] 

Apache Spark commented on SPARK-15769:
--

User 'koertkuipers' has created a pull request for this issue:
https://github.com/apache/spark/pull/13512

> Add Encoder for input type to Aggregator
> 
>
> Key: SPARK-15769
> URL: https://issues.apache.org/jira/browse/SPARK-15769
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: koert kuipers
>Priority: Minor
>
> Currently org.apache.spark.sql.expressions.Aggregator has Encoders for its 
> buffer and output type, but not for its input type. The thought is that the 
> input type is known from the Dataset it operates on and hence can be inserted 
> later.
> However i think there are compelling reasons to have Aggregator carry an 
> Encoder for its input type:
> * Generally transformations on Dataset only require the Encoder for the 
> result type since the input type is exactly known and it's Encoder is already 
> available within the Dataset. However this is not the case for an Aggregator: 
> an Aggregator is defined independently of a Dataset, and i think it should be 
> generally desirable that an Aggregator work on any type that can safely be 
> cast to the Aggregator's input type (for example an Aggregator that has Long 
> as input should work on a Dataset of Ints).
> * Aggregators should also work on DataFrames, because its a much nicer API to 
> use than UserDefinedAggregateFunction. And when operating on DataFrames you 
> should not have to use Row objects, which means your input type is not equal 
> to the type of the Dataset you operate on (so the Encoder of the Dataset that 
> is operated on should not be used as input Encoder for the Aggregator).
> * Adding an input Encoder is not a big burden, since it can typically be 
> created implicitly
> * It removes TypedColumn.withInputType and its usage in Dataset, 
> KeyValueGroupedDataset and RelationalGroupedDataset, which always felt 
> somewhat ad-hoc to me
> * Once an Aggregator has an Encoder for it's input type it is a small change 
> to make the Aggregator also work on a subset of columns in a DataFrame, which 
> facilitates Aggregator re-use since you don't have to write a custom 
> Aggregator to extract the columns from a specific DataFrame. This also 
> enables a usage that is more typical within a DataFrame context, very similar 
> to how a UserDefinedAggregateFunction is used.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org