Re: Aggregating over sorted data

2016-12-22 Thread Koert Kuipers
yes it's less optimal because an abstraction is missing and with mapPartitions it is done without optimizations. but aggregator is not the right abstraction to begin with, is assumes a monoid which means no ordering guarantees. you need a fold operation. On Dec 22, 2016 02:20, "Liang-Chi Hsieh" w

Re: Aggregating over sorted data

2016-12-22 Thread trsell
t; > I was hoping to find something like: Efficient sortByKey to work with… > > > > *From:* Koert Kuipers [via Apache Spark Developers List] > [mailto:ml-node+[hidden > email] <http:///user/SendEmail.jtp?type=node&node=20334&i=0>] > *Sent:* Thursday, Decembe

RE: Aggregating over sorted data

2016-12-22 Thread assaf.mendelson
+s1001551n20332...@n3.nabble.com] Sent: Thursday, December 22, 2016 7:14 AM To: Mendelson, Assaf Subject: Re: Aggregating over sorted data it can also be done with repartition + sortWithinPartitions + mapPartitions. perhaps not as convenient but it does not rely on undocumented behavior. i used this approach

Re: Aggregating over sorted data

2016-12-22 Thread Liang-Chi Hsieh
You can't use existing aggregation functions with that. Besides, the execution plan of `mapPartitions` doesn't support wholestage codegen. Without that and some optimization around aggregation, that might be possible performance degradation. Also when you have more than one keys in a partition, yo

Re: Aggregating over sorted data

2016-12-21 Thread Koert Kuipers
it can also be done with repartition + sortWithinPartitions + mapPartitions. perhaps not as convenient but it does not rely on undocumented behavior. i used this approach in spark-sorted. see here: https://github.com/tresata/spark-sorted/blob/master/src/main/scala/com/tresata/spark/sorted/sql/Group

Re: Aggregating over sorted data

2016-12-21 Thread Liang-Chi Hsieh
I agreed that to make sure this work, you might need to know the Spark internal implementation for APIs such as `groupBy`. But without any more changes to current Spark implementation, I think this is the one possible way to achieve the required function to aggregate on sorted data per key.

Re: Aggregating over sorted data

2016-12-21 Thread Koert Kuipers
i think this works but it relies on groupBy and agg respecting the sorting. the api provides no such guarantee, so this could break in future versions. i would not rely on this i think... On Dec 20, 2016 18:58, "Liang-Chi Hsieh" wrote: Hi, Can you try the combination of `repartition` + `sortWit

Re: Aggregating over sorted data

2016-12-20 Thread Liang-Chi Hsieh
Hi, Can you try the combination of `repartition` + `sortWithinPartitions` on the dataset? E.g., val df = Seq((2, "b c a"), (1, "c a b"), (3, "a c b")).toDF("number", "letters") val df2 = df.explode('letters) { case Row(letters: String) => letters.split(" ").map(Tuple1(_)).t

Re: Aggregating over sorted data

2016-12-19 Thread Koert Kuipers
take a look at: https://issues.apache.org/jira/browse/SPARK-15798 On Dec 19, 2016 00:17, "Robin East" wrote: This is also a feature we need for our time-series processing > On 19 Dec 2016, at 04:07, Liang-Chi Hsieh wrote: > > > Hi, > > As I know, Spark SQL doesn't provide native support for

Re: Aggregating over sorted data

2016-12-19 Thread Robin East
This is also a feature we need for our time-series processing > On 19 Dec 2016, at 04:07, Liang-Chi Hsieh wrote: > > > Hi, > > As I know, Spark SQL doesn't provide native support for this feature now. > After searching, I found only few database systems support it, e.g., > PostgreSQL. > >

Re: Aggregating over sorted data

2016-12-18 Thread Liang-Chi Hsieh
Hi, As I know, Spark SQL doesn't provide native support for this feature now. After searching, I found only few database systems support it, e.g., PostgreSQL. Actually based on the Spark SQL's aggregate system, I think it is not very difficult to add the support for this feature. The problem is

Re: Aggregating over sorted data

2016-12-12 Thread nsyca
Hi, SPARK-18591 might be a solution to your problem but making assuming in your UDAF logic on how Spark will process the aggregation is really a risky thing. Is there a way to do it using Windows function with ORDER BY clause to enforce the pro