Re: Creating a memory-efficient AggregateFunction to calculate Median

2021-12-15 Thread Nicholas Chammas
Thanks for the suggestions. I suppose I should share a bit more about what
I tried/learned, so others who come later can understand why a
memory-efficient, exact median is not in Spark.

Spark's own ApproximatePercentile also uses QuantileSummaries internally
.
QuantileSummaries is a helper class for computing approximate quantiles
with a single pass over the data. I don't think I can use it to compute an
exact median.

Spark does already have code to compute an exact median: Percentile
.
Since it works like other Catalyst expressions, it computes the median with
a single pass over the data. It does that by loading all the data into a
buffer and sorting it in memory
.
This is why the leading comment on Percentile warns that too much data will
cause GC pauses and OOMs

.

So I think this is what Reynold was getting at: With the design of Catalyst
expressions as they are today, there is no way to save memory by making
multiple passes over the data. So an approximate median is your best bet if
you want to avoid high memory usage.

You can build an exact median by doing other things, like multiple passes
over the data, or by using window functions, but that can't be captured in
a Catalyst Expression.

On Wed, Dec 15, 2021 at 11:00 AM Fitch, Simeon  wrote:

> Nicholas,
>
> This may or may not be much help, but in RasterFrames we have an
> approximate quantiles Expression computed against Tiles (2d geospatial
> arrays) which makes use of
> `org.apache.spark.sql.catalyst.util.QuantileSummaries` to do the hard work.
> So perhaps a directionally correct example of doing what you look to do?
>
>
> https://github.com/locationtech/rasterframes/blob/develop/core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/ApproxCellQuantilesAggregate.scala
>
> In that same package are a number of other Aggregates, including
> declarative ones, which are another way of computing aggregations through
> composition of other Expressions.
>
> Simeon
>
>
>
>
>
> On Thu, Dec 9, 2021 at 9:26 PM Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> I'm trying to create a new aggregate function. It's my first time working
>> with Catalyst, so it's exciting---but I'm also in a bit over my head.
>>
>> My goal is to create a function to calculate the median
>> .
>>
>> As a very simple solution, I could just define median to be an alias of 
>> `Percentile(col,
>> 0.5)`. However, the leading comment on the Percentile expression
>> 
>> highlights that it's very memory-intensive and can easily lead to
>> OutOfMemory errors.
>>
>> So instead of using Percentile, I'm trying to create an Expression that
>> calculates the median without needing to hold everything in memory at once.
>> I'm considering two different approaches:
>>
>> 1. Define Median as a combination of existing expressions: The median
>> can perhaps be built out of the existing expressions for Count
>> 
>> and NthValue
>> 
>> .
>>
>> I don't see a template I can follow for building a new expression out of
>> existing expressions (i.e. without having to implement a bunch of methods
>> for DeclarativeAggregate or ImperativeAggregate). I also don't know how I
>> would wrap NthValue to make it usable as a regular aggregate function. The
>> wrapped NthValue would need an implicit window that provides the necessary
>> ordering.
>>
>>
>> Is there any potential to this idea? Any pointers on how to implement it?
>>
>>
>> 2. Another memory-light approach to calculating the median requires
>> multiple passes over the data to converge on the answer. The approach is 
>> described
>> here
>> 

Re: Creating a memory-efficient AggregateFunction to calculate Median

2021-12-15 Thread Fitch, Simeon
Nicholas,

This may or may not be much help, but in RasterFrames we have an
approximate quantiles Expression computed against Tiles (2d geospatial
arrays) which makes use of
`org.apache.spark.sql.catalyst.util.QuantileSummaries` to do the hard work.
So perhaps a directionally correct example of doing what you look to do?

https://github.com/locationtech/rasterframes/blob/develop/core/src/main/scala/org/locationtech/rasterframes/expressions/aggregates/ApproxCellQuantilesAggregate.scala

In that same package are a number of other Aggregates, including
declarative ones, which are another way of computing aggregations through
composition of other Expressions.

Simeon





On Thu, Dec 9, 2021 at 9:26 PM Nicholas Chammas 
wrote:

> I'm trying to create a new aggregate function. It's my first time working
> with Catalyst, so it's exciting---but I'm also in a bit over my head.
>
> My goal is to create a function to calculate the median
> .
>
> As a very simple solution, I could just define median to be an alias of 
> `Percentile(col,
> 0.5)`. However, the leading comment on the Percentile expression
> 
> highlights that it's very memory-intensive and can easily lead to
> OutOfMemory errors.
>
> So instead of using Percentile, I'm trying to create an Expression that
> calculates the median without needing to hold everything in memory at once.
> I'm considering two different approaches:
>
> 1. Define Median as a combination of existing expressions: The median can
> perhaps be built out of the existing expressions for Count
> 
> and NthValue
> 
> .
>
> I don't see a template I can follow for building a new expression out of
> existing expressions (i.e. without having to implement a bunch of methods
> for DeclarativeAggregate or ImperativeAggregate). I also don't know how I
> would wrap NthValue to make it usable as a regular aggregate function. The
> wrapped NthValue would need an implicit window that provides the necessary
> ordering.
>
>
> Is there any potential to this idea? Any pointers on how to implement it?
>
>
> 2. Another memory-light approach to calculating the median requires
> multiple passes over the data to converge on the answer. The approach is 
> described
> here
> .
> (I posted a sketch implementation of this approach using Spark's user-level
> API here
> 
> .)
>
> I am also struggling to understand how I would build an aggregate function
> like this, since it requires multiple passes over the data. From what I can
> see, Catalyst's aggregate functions are designed to work with a single pass
> over the data.
>
> We don't seem to have an interface for AggregateFunction that supports
> multiple passes over the data. Is there some way to do this?
>
>
> Again, this is my first serious foray into Catalyst. Any specific
> implementation guidance is appreciated!
>
> Nick
>
>

-- 
Simeon Fitch
Co-founder & VP of R
Astraea, Inc.


Re: Creating a memory-efficient AggregateFunction to calculate Median

2021-12-15 Thread Sean Owen
Parquet or ORC have the necessary stats to make this fast too already, but
only helps if you want the median of sorted data as stored on disk, rather
than the general case. Not sure you can do better than roughly what a sort
entails if you want the exact median

On Wed, Dec 15, 2021, 8:56 AM Pol Santamaria  wrote:

> Correct me if I am wrong, but If the dataset was indexed by the given
> column, you could get the median without reading the whole dataset,
> shuffling, and so on. Disclaimer (I work in Qbeast). So the issue is more
> on the data format and the possibility to push down the operation to the
> data source.
>
> On our side, we are working on an open data format that supports indexing
> and efficient sampling on data lakes (Qbeast Format), but I also know about
> other initiatives (Microsoft Hyperspace) to allow consuming indexed
> datasets with Apache Spark.
>
> If you are interested in experimenting with the median aggregate, I have
> some ideas on how to implement it for the Spark data source of Qbeast
> Format in an efficient way.
>
> [Qbeast-spark] https://github.com/Qbeast-io/qbeast-spark
> [Microsoft Hyperspace] https://github.com/microsoft/hyperspace
>
> Bests,
>
> Pol Santamaria
>
>
> On Tue, Dec 14, 2021 at 4:42 AM Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> Yeah, I think approximate percentile is good enough most of the time.
>>
>> I don't have a specific need for a precise median. I was interested in
>> implementing it more as a Catalyst learning exercise, but it turns out I
>> picked a bad learning exercise to solve. :)
>>
>> On Mon, Dec 13, 2021 at 9:46 PM Reynold Xin  wrote:
>>
>>> tl;dr: there's no easy way to implement aggregate expressions that'd
>>> require multiple pass over data. It is simply not something that's
>>> supported and doing so would be very high cost.
>>>
>>> Would you be OK using approximate percentile? That's relatively cheap.
>>>
>>>
>>>
>>> On Mon, Dec 13, 2021 at 6:43 PM, Nicholas Chammas <
>>> nicholas.cham...@gmail.com> wrote:
>>>
 No takers here? :)

 I can see now why a median function is not available in most data
 processing systems. It's pretty annoying to implement!

 On Thu, Dec 9, 2021 at 9:25 PM Nicholas Chammas <
 nicholas.cham...@gmail.com> wrote:

> I'm trying to create a new aggregate function. It's my first time
> working with Catalyst, so it's exciting---but I'm also in a bit over my
> head.
>
> My goal is to create a function to calculate the median
> .
>
> As a very simple solution, I could just define median to be an alias
> of `Percentile(col, 0.5)`. However, the leading comment on the
> Percentile expression
> 
> highlights that it's very memory-intensive and can easily lead to
> OutOfMemory errors.
>
> So instead of using Percentile, I'm trying to create an Expression
> that calculates the median without needing to hold everything in memory at
> once. I'm considering two different approaches:
>
> 1. Define Median as a combination of existing expressions: The median
> can perhaps be built out of the existing expressions for Count
> 
> and NthValue
> 
> .
>
> I don't see a template I can follow for building a new expression out
> of existing expressions (i.e. without having to implement a bunch of
> methods for DeclarativeAggregate or ImperativeAggregate). I also don't 
> know
> how I would wrap NthValue to make it usable as a regular aggregate
> function. The wrapped NthValue would need an implicit window that provides
> the necessary ordering.
>
>
> Is there any potential to this idea? Any pointers on how to implement
> it?
>
>
> 2. Another memory-light approach to calculating the median requires
> multiple passes over the data to converge on the answer. The approach is 
> described
> here
> .
> (I posted a sketch implementation of this approach using Spark's 
> user-level
> API here
> 
> .)
>
> I am also 

Re: Creating a memory-efficient AggregateFunction to calculate Median

2021-12-15 Thread Pol Santamaria
Correct me if I am wrong, but If the dataset was indexed by the given
column, you could get the median without reading the whole dataset,
shuffling, and so on. Disclaimer (I work in Qbeast). So the issue is more
on the data format and the possibility to push down the operation to the
data source.

On our side, we are working on an open data format that supports indexing
and efficient sampling on data lakes (Qbeast Format), but I also know about
other initiatives (Microsoft Hyperspace) to allow consuming indexed
datasets with Apache Spark.

If you are interested in experimenting with the median aggregate, I have
some ideas on how to implement it for the Spark data source of Qbeast
Format in an efficient way.

[Qbeast-spark] https://github.com/Qbeast-io/qbeast-spark
[Microsoft Hyperspace] https://github.com/microsoft/hyperspace

Bests,

Pol Santamaria


On Tue, Dec 14, 2021 at 4:42 AM Nicholas Chammas 
wrote:

> Yeah, I think approximate percentile is good enough most of the time.
>
> I don't have a specific need for a precise median. I was interested in
> implementing it more as a Catalyst learning exercise, but it turns out I
> picked a bad learning exercise to solve. :)
>
> On Mon, Dec 13, 2021 at 9:46 PM Reynold Xin  wrote:
>
>> tl;dr: there's no easy way to implement aggregate expressions that'd
>> require multiple pass over data. It is simply not something that's
>> supported and doing so would be very high cost.
>>
>> Would you be OK using approximate percentile? That's relatively cheap.
>>
>>
>>
>> On Mon, Dec 13, 2021 at 6:43 PM, Nicholas Chammas <
>> nicholas.cham...@gmail.com> wrote:
>>
>>> No takers here? :)
>>>
>>> I can see now why a median function is not available in most data
>>> processing systems. It's pretty annoying to implement!
>>>
>>> On Thu, Dec 9, 2021 at 9:25 PM Nicholas Chammas <
>>> nicholas.cham...@gmail.com> wrote:
>>>
 I'm trying to create a new aggregate function. It's my first time
 working with Catalyst, so it's exciting---but I'm also in a bit over my
 head.

 My goal is to create a function to calculate the median
 .

 As a very simple solution, I could just define median to be an alias of
 `Percentile(col, 0.5)`. However, the leading comment on the Percentile
 expression
 
 highlights that it's very memory-intensive and can easily lead to
 OutOfMemory errors.

 So instead of using Percentile, I'm trying to create an Expression that
 calculates the median without needing to hold everything in memory at once.
 I'm considering two different approaches:

 1. Define Median as a combination of existing expressions: The median
 can perhaps be built out of the existing expressions for Count
 
 and NthValue
 
 .

 I don't see a template I can follow for building a new expression out
 of existing expressions (i.e. without having to implement a bunch of
 methods for DeclarativeAggregate or ImperativeAggregate). I also don't know
 how I would wrap NthValue to make it usable as a regular aggregate
 function. The wrapped NthValue would need an implicit window that provides
 the necessary ordering.


 Is there any potential to this idea? Any pointers on how to implement
 it?


 2. Another memory-light approach to calculating the median requires
 multiple passes over the data to converge on the answer. The approach is 
 described
 here
 .
 (I posted a sketch implementation of this approach using Spark's user-level
 API here
 
 .)

 I am also struggling to understand how I would build an aggregate
 function like this, since it requires multiple passes over the data. From
 what I can see, Catalyst's aggregate functions are designed to work with a
 single pass over the data.

 We don't seem to have an interface for AggregateFunction that supports
 multiple passes over the data. Is there some way to do this?


 Again, this is my first serious foray into Catalyst. Any specific

Re: Creating a memory-efficient AggregateFunction to calculate Median

2021-12-13 Thread Nicholas Chammas
Yeah, I think approximate percentile is good enough most of the time.

I don't have a specific need for a precise median. I was interested in
implementing it more as a Catalyst learning exercise, but it turns out I
picked a bad learning exercise to solve. :)

On Mon, Dec 13, 2021 at 9:46 PM Reynold Xin  wrote:

> tl;dr: there's no easy way to implement aggregate expressions that'd
> require multiple pass over data. It is simply not something that's
> supported and doing so would be very high cost.
>
> Would you be OK using approximate percentile? That's relatively cheap.
>
>
>
> On Mon, Dec 13, 2021 at 6:43 PM, Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> No takers here? :)
>>
>> I can see now why a median function is not available in most data
>> processing systems. It's pretty annoying to implement!
>>
>> On Thu, Dec 9, 2021 at 9:25 PM Nicholas Chammas <
>> nicholas.cham...@gmail.com> wrote:
>>
>>> I'm trying to create a new aggregate function. It's my first time
>>> working with Catalyst, so it's exciting---but I'm also in a bit over my
>>> head.
>>>
>>> My goal is to create a function to calculate the median
>>> .
>>>
>>> As a very simple solution, I could just define median to be an alias of 
>>> `Percentile(col,
>>> 0.5)`. However, the leading comment on the Percentile expression
>>> 
>>> highlights that it's very memory-intensive and can easily lead to
>>> OutOfMemory errors.
>>>
>>> So instead of using Percentile, I'm trying to create an Expression that
>>> calculates the median without needing to hold everything in memory at once.
>>> I'm considering two different approaches:
>>>
>>> 1. Define Median as a combination of existing expressions: The median
>>> can perhaps be built out of the existing expressions for Count
>>> 
>>> and NthValue
>>> 
>>> .
>>>
>>> I don't see a template I can follow for building a new expression out of
>>> existing expressions (i.e. without having to implement a bunch of methods
>>> for DeclarativeAggregate or ImperativeAggregate). I also don't know how I
>>> would wrap NthValue to make it usable as a regular aggregate function. The
>>> wrapped NthValue would need an implicit window that provides the necessary
>>> ordering.
>>>
>>>
>>> Is there any potential to this idea? Any pointers on how to implement it?
>>>
>>>
>>> 2. Another memory-light approach to calculating the median requires
>>> multiple passes over the data to converge on the answer. The approach is 
>>> described
>>> here
>>> .
>>> (I posted a sketch implementation of this approach using Spark's user-level
>>> API here
>>> 
>>> .)
>>>
>>> I am also struggling to understand how I would build an aggregate
>>> function like this, since it requires multiple passes over the data. From
>>> what I can see, Catalyst's aggregate functions are designed to work with a
>>> single pass over the data.
>>>
>>> We don't seem to have an interface for AggregateFunction that supports
>>> multiple passes over the data. Is there some way to do this?
>>>
>>>
>>> Again, this is my first serious foray into Catalyst. Any specific
>>> implementation guidance is appreciated!
>>>
>>> Nick
>>>
>>
>


Re: Creating a memory-efficient AggregateFunction to calculate Median

2021-12-13 Thread Reynold Xin
tl;dr: there's no easy way to implement aggregate expressions that'd require 
multiple pass over data. It is simply not something that's supported and doing 
so would be very high cost.

Would you be OK using approximate percentile? That's relatively cheap.

On Mon, Dec 13, 2021 at 6:43 PM, Nicholas Chammas < nicholas.cham...@gmail.com 
> wrote:

> 
> No takers here? :)
> 
> 
> I can see now why a median function is not available in most data
> processing systems. It's pretty annoying to implement!
> 
> On Thu, Dec 9, 2021 at 9:25 PM Nicholas Chammas < nicholas. chammas@ gmail.
> com ( nicholas.cham...@gmail.com ) > wrote:
> 
> 
>> I'm trying to create a new aggregate function. It's my first time working
>> with Catalyst, so it's exciting---but I'm also in a bit over my head.
>> 
>> 
>> My goal is to create a function to calculate the median (
>> https://issues.apache.org/jira/browse/SPARK-26589 ).
>> 
>> 
>> As a very simple solution, I could just define median to be an alias of ` 
>> Percentile(col,
>> 0.5)`. However, the leading comment on the Percentile expression (
>> https://github.com/apache/spark/blob/08123a3795683238352e5bf55452de381349fdd9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala#L37-L39
>> ) highlights that it's very memory-intensive and can easily lead to
>> OutOfMemory errors.
>> 
>> 
>> So instead of using Percentile, I'm trying to create an Expression that
>> calculates the median without needing to hold everything in memory at
>> once. I'm considering two different approaches:
>> 
>> 
>> 1. Define Median as a combination of existing expressions: The median can
>> perhaps be built out of the existing expressions for Count (
>> https://github.com/apache/spark/blob/9af338cd685bce26abbc2dd4d077bde5068157b1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala#L48
>> ) and NthValue (
>> https://github.com/apache/spark/blob/568ad6aa4435ce76ca3b5d9966e64259ea1f9b38/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala#L675
>> ).
>> 
>> 
>> 
>>> I don't see a template I can follow for building a new expression out of
>>> existing expressions (i.e. without having to implement a bunch of methods
>>> for DeclarativeAggregate or ImperativeAggregate). I also don't know how I
>>> would wrap NthValue to make it usable as a regular aggregate function. The
>>> wrapped NthValue would need an implicit window that provides the necessary
>>> ordering.
>>> 
>> 
>> 
>> 
>> 
>>> Is there any potential to this idea? Any pointers on how to implement it?
>>> 
>> 
>> 
>> 
>> 2. Another memory-light approach to calculating the median requires
>> multiple passes over the data to converge on the answer. The approach is 
>> described
>> here (
>> https://www.quora.com/Distributed-Algorithms/What-is-the-distributed-algorithm-to-determine-the-median-of-arrays-of-integers-located-on-different-computers
>> ). (I posted a sketch implementation of this approach using Spark's
>> user-level API here (
>> https://issues.apache.org/jira/browse/SPARK-26589?focusedCommentId=17452081=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17452081
>> ).)
>> 
>> 
>> 
>>> I am also struggling to understand how I would build an aggregate function
>>> like this, since it requires multiple passes over the data. From what I
>>> can see, Catalyst's aggregate functions are designed to work with a single
>>> pass over the data.
>>> 
>>> 
>>> We don't seem to have an interface for AggregateFunction that supports
>>> multiple passes over the data. Is there some way to do this?
>>> 
>> 
>> 
>> Again, this is my first serious foray into Catalyst. Any specific
>> implementation guidance is appreciated!
>> 
>> 
>> Nick
>> 
> 
>

smime.p7s
Description: S/MIME Cryptographic Signature


Re: Creating a memory-efficient AggregateFunction to calculate Median

2021-12-13 Thread Nicholas Chammas
No takers here? :)

I can see now why a median function is not available in most data
processing systems. It's pretty annoying to implement!

On Thu, Dec 9, 2021 at 9:25 PM Nicholas Chammas 
wrote:

> I'm trying to create a new aggregate function. It's my first time working
> with Catalyst, so it's exciting---but I'm also in a bit over my head.
>
> My goal is to create a function to calculate the median
> .
>
> As a very simple solution, I could just define median to be an alias of 
> `Percentile(col,
> 0.5)`. However, the leading comment on the Percentile expression
> 
> highlights that it's very memory-intensive and can easily lead to
> OutOfMemory errors.
>
> So instead of using Percentile, I'm trying to create an Expression that
> calculates the median without needing to hold everything in memory at once.
> I'm considering two different approaches:
>
> 1. Define Median as a combination of existing expressions: The median can
> perhaps be built out of the existing expressions for Count
> 
> and NthValue
> 
> .
>
> I don't see a template I can follow for building a new expression out of
> existing expressions (i.e. without having to implement a bunch of methods
> for DeclarativeAggregate or ImperativeAggregate). I also don't know how I
> would wrap NthValue to make it usable as a regular aggregate function. The
> wrapped NthValue would need an implicit window that provides the necessary
> ordering.
>
>
> Is there any potential to this idea? Any pointers on how to implement it?
>
>
> 2. Another memory-light approach to calculating the median requires
> multiple passes over the data to converge on the answer. The approach is 
> described
> here
> .
> (I posted a sketch implementation of this approach using Spark's user-level
> API here
> 
> .)
>
> I am also struggling to understand how I would build an aggregate function
> like this, since it requires multiple passes over the data. From what I can
> see, Catalyst's aggregate functions are designed to work with a single pass
> over the data.
>
> We don't seem to have an interface for AggregateFunction that supports
> multiple passes over the data. Is there some way to do this?
>
>
> Again, this is my first serious foray into Catalyst. Any specific
> implementation guidance is appreciated!
>
> Nick
>
>