Re: Update Spark 3.3 release window?

2021-10-27 Thread Sean Owen
Seems fine to me - as good a placeholder as anything.
Would that be about time to call 2.x end-of-life?

On Wed, Oct 27, 2021 at 9:36 PM Hyukjin Kwon  wrote:

> Hi all,
>
> Spark 3.2. is out. Shall we update the release window
> https://spark.apache.org/versioning-policy.html?
> I am thinking of Mid March 2022 (5 months after the 3.2 release) for code
> freeze and onward.
>
>


Update Spark 3.3 release window?

2021-10-27 Thread Hyukjin Kwon
Hi all,

Spark 3.2. is out. Shall we update the release window
https://spark.apache.org/versioning-policy.html?
I am thinking of Mid March 2022 (5 months after the 3.2 release) for code
freeze and onward.


Re: [DISCUSS] SPIP: Storage Partitioned Join for Data Source V2

2021-10-27 Thread Ryan Blue
The transform expressions in v2 are logical, not concrete implementations.
Even days may have different implementations -- the only expectation is
that the partitions are day-sized. For example, you could use a transform
that splits days at UTC 00:00, or uses some other day boundary.

Because the expressions are logical, we need to resolve them to
implementations at some point, like Chao outlines. We can do that using a
FunctionCatalog, although I think it's worth considering adding an
interface so that a transform from a Table can be converted into a
`BoundFunction` directly. That is easier than defining a way for Spark to
query the function catalog.

In any case, I'm sure it's easy to understand how this works once you get a
concrete implementation.

On Wed, Oct 27, 2021 at 9:35 AM Wenchen Fan  wrote:

> `BucketTransform` is a builtin partition transform in Spark, instead of a
> UDF from `FunctionCatalog`. Will Iceberg use UDF from `FunctionCatalog` to
> represent its bucket transform, or use the Spark builtin `BucketTransform`?
> I'm asking this because other v2 sources may also use the builtin
> `BucketTransform` but use a different bucket hash function. Or we can
> clearly define the bucket hash function of the builtin `BucketTransform` in
> the doc.
>
> On Thu, Oct 28, 2021 at 12:25 AM Ryan Blue  wrote:
>
>> Two v2 sources may return different bucket IDs for the same value, and
>> this breaks the phase 1 split-wise join.
>>
>> This is why the FunctionCatalog included a canonicalName method (docs
>> ).
>> That method returns an identifier that can be used to compare whether two
>> bucket function instances are the same.
>>
>>
>>1. Can we apply this idea to partitioned file source tables
>>(non-bucketed) as well?
>>
>> What do you mean here? The design doc discusses transforms like days(ts)
>> that can be supported in the future. Is that what you’re asking about? Or
>> are you referring to v1 file sources? I think the goal is to support v2,
>> since v1 doesn’t have reliable behavior.
>>
>> Note that the initial implementation goal is to support bucketing since
>> that’s an easier case because both sides have the same number of
>> partitions. More complex storage-partitioned joins can be implemented later.
>>
>>
>>1. What if the table has many partitions? Shall we apply certain join
>>algorithms in the phase 1 split-wise join as well? Or even launch a Spark
>>job to do so?
>>
>> I think that this proposal opens up a lot of possibilities, like what
>> you’re suggesting here. It is a bit like AQE. We’ll need to come up with
>> heuristics for choosing how and when to use storage partitioning in joins.
>> As I said above, bucketing is a great way to get started because it fills
>> an existing gap. More complex use cases can be supported over time.
>>
>> Ryan
>>
>> On Wed, Oct 27, 2021 at 9:08 AM Wenchen Fan  wrote:
>>
>>> IIUC, the general idea is to let each input split report its partition
>>> value, and Spark can perform the join in two phases:
>>> 1. join the input splits from left and right tables according to their
>>> partitions values and join keys, at the driver side.
>>> 2. for each joined input splits pair (or a group of splits), launch a
>>> Spark task to join the rows.
>>>
>>> My major concern is about how to define "compatible partitions". Things
>>> like `days(ts)` are straightforward: the same timestamp value always
>>> results in the same partition value, in whatever v2 sources. `bucket(col,
>>> num)` is tricky, as Spark doesn't define the bucket hash function. Two v2
>>> sources may return different bucket IDs for the same value, and this breaks
>>> the phase 1 split-wise join.
>>>
>>> And two questions for further improvements:
>>> 1. Can we apply this idea to partitioned file source tables
>>> (non-bucketed) as well?
>>> 2. What if the table has many partitions? Shall we apply certain join
>>> algorithms in the phase 1 split-wise join as well? Or even launch a Spark
>>> job to do so?
>>>
>>> Thanks,
>>> Wenchen
>>>
>>> On Wed, Oct 27, 2021 at 3:08 AM Chao Sun  wrote:
>>>
 Thanks Cheng for the comments.

 > Is migrating Hive table read path to data source v2, being a
 prerequisite of this SPIP

 Yes, this SPIP only aims at DataSourceV2, so obviously it will help if
 Hive eventually moves to use V2 API. With that said, I think some of the
 ideas could be useful for V1 Hive support as well. For instance, with the
 newly proposed logic to compare whether output partitionings from both
 sides of a join operator are compatible, we can have HiveTableScanExec to
 report a different partitioning other than HashPartitioning, and
 EnsureRequirements could potentially recognize that and therefore avoid
 shuffle if both sides report the same compatible partitioning. In addition,
 

Re: [DISCUSS] SPIP: Storage Partitioned Join for Data Source V2

2021-10-27 Thread Chao Sun
Thanks Wenchen, this is a good question. `BucketTransform` and others
currently have no semantic meaning, and I think we should bind them to v2
functions as part of the SPIP. My current proposal is:

During query analysis, Spark will try to resolve `XXXTransform`s (in
`V2ExpressionUtils`) into catalyst expressions, and load functions from a
v2 FunctionCatalog. For pre-defined transforms such as bucket/hour/day etc,
we can ask the FunctionCatalog to provide a default implementation
associated with the given name. With this, different v2 sources can provide
their own bucket hash function for `BucketTransform`, with distinct
canonical names. As for ApplyTransform, it should look up the corresponding
function name in the catalog.

This will also help the write path (SPARK-33779) where a v2 data source can
specify a required distribution for Spark write. Currently it only allows
IdentityTransform.

One question is what the default behavior should be when a FunctionCatalog
is not provided by the v2 data source or the catalog doesn't define
functions for bucket/hour/day etc. For the storage partitioned join it is
more straightforward since we may just fallback to the existing shuffle
behavior. However for the write path it needs to have the functions
available and be used in `RepartitionByExpression`. Therefore, I think we
should throw an error in this case? since at the moment in the write path
Spark will also throw an exception when it sees any transform that's not
IdentityTransform (see here
).
Optionally we can also implement default behaviors for bucket/day/hour etc,
in Spark itself, but this will cause problems for data sources who are
already using their own bucket/day/hour functions but don't provide a
FunctionCatalog to Spark.

Would love to hear your opinions on this.

Thanks,
Chao

On Wed, Oct 27, 2021 at 9:35 AM Wenchen Fan  wrote:

> `BucketTransform` is a builtin partition transform in Spark, instead of a
> UDF from `FunctionCatalog`. Will Iceberg use UDF from `FunctionCatalog` to
> represent its bucket transform, or use the Spark builtin `BucketTransform`?
> I'm asking this because other v2 sources may also use the builtin
> `BucketTransform` but use a different bucket hash function. Or we can
> clearly define the bucket hash function of the builtin `BucketTransform` in
> the doc.
>
> On Thu, Oct 28, 2021 at 12:25 AM Ryan Blue  wrote:
>
>> Two v2 sources may return different bucket IDs for the same value, and
>> this breaks the phase 1 split-wise join.
>>
>> This is why the FunctionCatalog included a canonicalName method (docs
>> ).
>> That method returns an identifier that can be used to compare whether two
>> bucket function instances are the same.
>>
>>
>>1. Can we apply this idea to partitioned file source tables
>>(non-bucketed) as well?
>>
>> What do you mean here? The design doc discusses transforms like days(ts)
>> that can be supported in the future. Is that what you’re asking about? Or
>> are you referring to v1 file sources? I think the goal is to support v2,
>> since v1 doesn’t have reliable behavior.
>>
>> Note that the initial implementation goal is to support bucketing since
>> that’s an easier case because both sides have the same number of
>> partitions. More complex storage-partitioned joins can be implemented later.
>>
>>
>>1. What if the table has many partitions? Shall we apply certain join
>>algorithms in the phase 1 split-wise join as well? Or even launch a Spark
>>job to do so?
>>
>> I think that this proposal opens up a lot of possibilities, like what
>> you’re suggesting here. It is a bit like AQE. We’ll need to come up with
>> heuristics for choosing how and when to use storage partitioning in joins.
>> As I said above, bucketing is a great way to get started because it fills
>> an existing gap. More complex use cases can be supported over time.
>>
>> Ryan
>>
>> On Wed, Oct 27, 2021 at 9:08 AM Wenchen Fan  wrote:
>>
>>> IIUC, the general idea is to let each input split report its partition
>>> value, and Spark can perform the join in two phases:
>>> 1. join the input splits from left and right tables according to their
>>> partitions values and join keys, at the driver side.
>>> 2. for each joined input splits pair (or a group of splits), launch a
>>> Spark task to join the rows.
>>>
>>> My major concern is about how to define "compatible partitions". Things
>>> like `days(ts)` are straightforward: the same timestamp value always
>>> results in the same partition value, in whatever v2 sources. `bucket(col,
>>> num)` is tricky, as Spark doesn't define the bucket hash function. Two v2
>>> sources may return different bucket IDs for the same value, and this breaks
>>> 

Re: [DISCUSS] SPIP: Row-level operations in Data Source V2

2021-10-27 Thread L . C . Hsieh


Thanks for the initial feedback.

I think previously the community is busy on the works related to Spark 3.2 
release.
As 3.2 release was done, I'd like to bring this up to the surface again and 
seek for more discussion and feedback.

Thanks.

On 2021/06/25 15:49:49, huaxin gao  wrote: 
> I took a quick look at the PR and it looks like a great feature to have. It
> provides unified APIs for data sources to perform the commonly used
> operations easily and efficiently, so users don't have to implement
> customer extensions on their own. Thanks Anton for the work!
> 
> On Thu, Jun 24, 2021 at 9:42 PM L. C. Hsieh  wrote:
> 
> > Thanks Anton. I'm voluntarily to be the shepherd of the SPIP. This is also
> > my first time to shepherd a SPIP, so please let me know if anything I can
> > improve.
> >
> > This looks great features and the rationale claimed by the proposal makes
> > sense. These operations are getting more common and more important in big
> > data workloads. Instead of building custom extensions by individual data
> > sources, it makes more sense to support the API from Spark.
> >
> > Please provide your thoughts about the proposal and the design. Appreciate
> > your feedback. Thank you!
> >
> > On 2021/06/24 23:53:32, Anton Okolnychyi  wrote:
> > > Hey everyone,
> > >
> > > I'd like to start a discussion on adding support for executing row-level
> > > operations such as DELETE, UPDATE, MERGE for v2 tables (SPARK-35801). The
> > > execution should be the same across data sources and the best way to do
> > > that is to implement it in Spark.
> > >
> > > Right now, Spark can only parse and to some extent analyze DELETE,
> > UPDATE,
> > > MERGE commands. Data sources that support row-level changes have to build
> > > custom Spark extensions to execute such statements. The goal of this
> > effort
> > > is to come up with a flexible and easy-to-use API that will work across
> > > data sources.
> > >
> > > Design doc:
> > >
> > https://docs.google.com/document/d/12Ywmc47j3l2WF4anG5vL4qlrhT2OKigb7_EbIKhxg60/
> > >
> > > PR for handling DELETE statements:
> > > https://github.com/apache/spark/pull/33008
> > >
> > > Any feedback is more than welcome.
> > >
> > > Liang-Chi was kind enough to shepherd this effort. Thanks!
> > >
> > > - Anton
> > >
> >
> > -
> > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> >
> >
> 

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: [DISCUSS] SPIP: Storage Partitioned Join for Data Source V2

2021-10-27 Thread L . C . Hsieh
+1 for the SPIP. This is a great improvement and optimization!

On 2021/10/26 19:01:03, Erik Krogen  wrote: 
> It's great to see this SPIP going live. Once this is complete, it will
> really help Spark to play nicely with a broader data ecosystem (Hive,
> Iceberg, Trino, etc.), and it's great to see that besides just bringing the
> existing bucketed-join support to V2, we are also making the types of
> partitioning that can be accommodated more broad and leaving open pathways
> for future optimizations like partially clustered distributions.
> 
> Big thanks to Ryan and Chao!
> 
> On Tue, Oct 26, 2021 at 10:35 AM Cheng Su  wrote:
> 
> > +1 for this. This is exciting movement to efficiently read bucketed table
> > from other systems (Hive, Trino & Presto)!
> >
> >
> >
> > Still looking at the details but having some early questions:
> >
> >
> >
> >1. Is migrating Hive table read path to data source v2, being a
> >prerequisite of this SPIP?
> >
> >
> >
> > Hive table read path is currently a mix of data source v1 (for Parquet &
> > ORC file format only), and legacy Hive code path (HiveTableScanExec). In
> > the SPIP, I am seeing we only make change for data source v2, so wondering
> > how this would work with existing Hive table read path. In addition, just
> > FYI, supporting writing Hive bucketed table is merged in master recently (
> > SPARK-19256  has
> > details).
> >
> >
> >
> >1. Would aggregate work automatically after the SPIP?
> >
> >
> >
> > Another major benefit for having bucketed table, is to avoid shuffle
> > before aggregate. Just want to bring to our attention that it would be
> > great to consider aggregate as well when doing this proposal.
> >
> >
> >
> >1. Any major use cases in mind except Hive bucketed table?
> >
> >
> >
> > Just curious if there’s any other use cases we are targeting as part of
> > SPIP.
> >
> >
> >
> > Thanks,
> >
> > Cheng Su
> >
> >
> >
> >
> >
> >
> >
> > *From: *Ryan Blue 
> > *Date: *Tuesday, October 26, 2021 at 9:39 AM
> > *To: *John Zhuge 
> > *Cc: *Chao Sun , Wenchen Fan ,
> > Cheng Su , DB Tsai , Dongjoon Hyun <
> > dongjoon.h...@gmail.com>, Hyukjin Kwon , Wenchen Fan
> > , angers zhu , dev <
> > dev@spark.apache.org>, huaxin gao 
> > *Subject: *Re: [DISCUSS] SPIP: Storage Partitioned Join for Data Source V2
> >
> > Instead of commenting on the doc, could we keep discussion here on the dev
> > list please? That way more people can follow it and there is more room for
> > discussion. Comment threads have a very small area and easily become hard
> > to follow.
> >
> >
> >
> > Ryan
> >
> >
> >
> > On Tue, Oct 26, 2021 at 9:32 AM John Zhuge  wrote:
> >
> > +1  Nicely done!
> >
> >
> >
> > On Tue, Oct 26, 2021 at 8:08 AM Chao Sun  wrote:
> >
> > Oops, sorry. I just fixed the permission setting.
> >
> >
> >
> > Thanks everyone for the positive support!
> >
> >
> >
> > On Tue, Oct 26, 2021 at 7:30 AM Wenchen Fan  wrote:
> >
> > +1 to this SPIP and nice writeup of the design doc!
> >
> >
> >
> > Can we open comment permission in the doc so that we can discuss details
> > there?
> >
> >
> >
> > On Tue, Oct 26, 2021 at 8:29 PM Hyukjin Kwon  wrote:
> >
> > Seems making sense to me.
> >
> > Would be great to have some feedback from people such as @Wenchen Fan
> >  @Cheng Su  @angers zhu
> > .
> >
> >
> >
> >
> >
> > On Tue, 26 Oct 2021 at 17:25, Dongjoon Hyun 
> > wrote:
> >
> > +1 for this SPIP.
> >
> >
> >
> > On Sun, Oct 24, 2021 at 9:59 AM huaxin gao  wrote:
> >
> > +1. Thanks for lifting the current restrictions on bucket join and making
> > this more generalized.
> >
> >
> >
> > On Sun, Oct 24, 2021 at 9:33 AM Ryan Blue  wrote:
> >
> > +1 from me as well. Thanks Chao for doing so much to get it to this point!
> >
> >
> >
> > On Sat, Oct 23, 2021 at 11:29 PM DB Tsai  wrote:
> >
> > +1 on this SPIP.
> >
> > This is a more generalized version of bucketed tables and bucketed
> > joins which can eliminate very expensive data shuffles when joins, and
> > many users in the Apache Spark community have wanted this feature for
> > a long time!
> >
> > Thank you, Ryan and Chao, for working on this, and I look forward to
> > it as a new feature in Spark 3.3
> >
> > DB Tsai  |  https://www.dbtsai.com/  |  PGP 42E5B25A8F7A82C1
> >
> > On Fri, Oct 22, 2021 at 12:18 PM Chao Sun  wrote:
> > >
> > > Hi,
> > >
> > > Ryan and I drafted a design doc to support a new type of join: storage
> > partitioned join which covers bucket join support for DataSourceV2 but is
> > more general. The goal is to let Spark leverage distribution properties
> > reported by data sources and eliminate shuffle whenever possible.
> > >
> > > Design doc:
> > https://docs.google.com/document/d/1foTkDSM91VxKgkEcBMsuAvEjNybjja-uHk-r3vtXWFE
> > (includes a POC link at the end)
> > >
> > > We'd like to start a discussion on the doc and any feedback is welcome!
> > >
> > > Thanks,
> > > Chao
> >
> >
> >
> >
> > --
> >
> > Ryan Blue
> >
> >
> >
> >
> 

Re: [DISCUSS] SPIP: Storage Partitioned Join for Data Source V2

2021-10-27 Thread Wenchen Fan
`BucketTransform` is a builtin partition transform in Spark, instead of a
UDF from `FunctionCatalog`. Will Iceberg use UDF from `FunctionCatalog` to
represent its bucket transform, or use the Spark builtin `BucketTransform`?
I'm asking this because other v2 sources may also use the builtin
`BucketTransform` but use a different bucket hash function. Or we can
clearly define the bucket hash function of the builtin `BucketTransform` in
the doc.

On Thu, Oct 28, 2021 at 12:25 AM Ryan Blue  wrote:

> Two v2 sources may return different bucket IDs for the same value, and
> this breaks the phase 1 split-wise join.
>
> This is why the FunctionCatalog included a canonicalName method (docs
> ).
> That method returns an identifier that can be used to compare whether two
> bucket function instances are the same.
>
>
>1. Can we apply this idea to partitioned file source tables
>(non-bucketed) as well?
>
> What do you mean here? The design doc discusses transforms like days(ts)
> that can be supported in the future. Is that what you’re asking about? Or
> are you referring to v1 file sources? I think the goal is to support v2,
> since v1 doesn’t have reliable behavior.
>
> Note that the initial implementation goal is to support bucketing since
> that’s an easier case because both sides have the same number of
> partitions. More complex storage-partitioned joins can be implemented later.
>
>
>1. What if the table has many partitions? Shall we apply certain join
>algorithms in the phase 1 split-wise join as well? Or even launch a Spark
>job to do so?
>
> I think that this proposal opens up a lot of possibilities, like what
> you’re suggesting here. It is a bit like AQE. We’ll need to come up with
> heuristics for choosing how and when to use storage partitioning in joins.
> As I said above, bucketing is a great way to get started because it fills
> an existing gap. More complex use cases can be supported over time.
>
> Ryan
>
> On Wed, Oct 27, 2021 at 9:08 AM Wenchen Fan  wrote:
>
>> IIUC, the general idea is to let each input split report its partition
>> value, and Spark can perform the join in two phases:
>> 1. join the input splits from left and right tables according to their
>> partitions values and join keys, at the driver side.
>> 2. for each joined input splits pair (or a group of splits), launch a
>> Spark task to join the rows.
>>
>> My major concern is about how to define "compatible partitions". Things
>> like `days(ts)` are straightforward: the same timestamp value always
>> results in the same partition value, in whatever v2 sources. `bucket(col,
>> num)` is tricky, as Spark doesn't define the bucket hash function. Two v2
>> sources may return different bucket IDs for the same value, and this breaks
>> the phase 1 split-wise join.
>>
>> And two questions for further improvements:
>> 1. Can we apply this idea to partitioned file source tables
>> (non-bucketed) as well?
>> 2. What if the table has many partitions? Shall we apply certain join
>> algorithms in the phase 1 split-wise join as well? Or even launch a Spark
>> job to do so?
>>
>> Thanks,
>> Wenchen
>>
>> On Wed, Oct 27, 2021 at 3:08 AM Chao Sun  wrote:
>>
>>> Thanks Cheng for the comments.
>>>
>>> > Is migrating Hive table read path to data source v2, being a
>>> prerequisite of this SPIP
>>>
>>> Yes, this SPIP only aims at DataSourceV2, so obviously it will help if
>>> Hive eventually moves to use V2 API. With that said, I think some of the
>>> ideas could be useful for V1 Hive support as well. For instance, with the
>>> newly proposed logic to compare whether output partitionings from both
>>> sides of a join operator are compatible, we can have HiveTableScanExec to
>>> report a different partitioning other than HashPartitioning, and
>>> EnsureRequirements could potentially recognize that and therefore avoid
>>> shuffle if both sides report the same compatible partitioning. In addition,
>>> SPARK-35703, which is part of the SPIP, is also useful in that it relaxes
>>> the constraint for V1 bucket join so that the join keys do not necessarily
>>> be identical to the bucket keys.
>>>
>>> > Would aggregate work automatically after the SPIP?
>>>
>>> Yes it will work as before. This case is already supported by
>>> DataSourcePartitioning in V2 (see SPARK-22389).
>>>
>>> > Any major use cases in mind except Hive bucketed table?
>>>
>>> Our first use case is Apache Iceberg. In addition to that we also want
>>> to add the support for Spark's built-in file data sources.
>>>
>>> Thanks,
>>> Chao
>>>
>>> On Tue, Oct 26, 2021 at 10:34 AM Cheng Su  wrote:
>>>
 +1 for this. This is exciting movement to efficiently read bucketed
 table from other systems (Hive, Trino & Presto)!



 Still looking at the details but having some early questions:



1. Is 

Re: [DISCUSS] SPIP: Storage Partitioned Join for Data Source V2

2021-10-27 Thread Ryan Blue
Two v2 sources may return different bucket IDs for the same value, and this
breaks the phase 1 split-wise join.

This is why the FunctionCatalog included a canonicalName method (docs
).
That method returns an identifier that can be used to compare whether two
bucket function instances are the same.


   1. Can we apply this idea to partitioned file source tables
   (non-bucketed) as well?

What do you mean here? The design doc discusses transforms like days(ts)
that can be supported in the future. Is that what you’re asking about? Or
are you referring to v1 file sources? I think the goal is to support v2,
since v1 doesn’t have reliable behavior.

Note that the initial implementation goal is to support bucketing since
that’s an easier case because both sides have the same number of
partitions. More complex storage-partitioned joins can be implemented later.


   1. What if the table has many partitions? Shall we apply certain join
   algorithms in the phase 1 split-wise join as well? Or even launch a Spark
   job to do so?

I think that this proposal opens up a lot of possibilities, like what
you’re suggesting here. It is a bit like AQE. We’ll need to come up with
heuristics for choosing how and when to use storage partitioning in joins.
As I said above, bucketing is a great way to get started because it fills
an existing gap. More complex use cases can be supported over time.

Ryan

On Wed, Oct 27, 2021 at 9:08 AM Wenchen Fan  wrote:

> IIUC, the general idea is to let each input split report its partition
> value, and Spark can perform the join in two phases:
> 1. join the input splits from left and right tables according to their
> partitions values and join keys, at the driver side.
> 2. for each joined input splits pair (or a group of splits), launch a
> Spark task to join the rows.
>
> My major concern is about how to define "compatible partitions". Things
> like `days(ts)` are straightforward: the same timestamp value always
> results in the same partition value, in whatever v2 sources. `bucket(col,
> num)` is tricky, as Spark doesn't define the bucket hash function. Two v2
> sources may return different bucket IDs for the same value, and this breaks
> the phase 1 split-wise join.
>
> And two questions for further improvements:
> 1. Can we apply this idea to partitioned file source tables (non-bucketed)
> as well?
> 2. What if the table has many partitions? Shall we apply certain join
> algorithms in the phase 1 split-wise join as well? Or even launch a Spark
> job to do so?
>
> Thanks,
> Wenchen
>
> On Wed, Oct 27, 2021 at 3:08 AM Chao Sun  wrote:
>
>> Thanks Cheng for the comments.
>>
>> > Is migrating Hive table read path to data source v2, being a
>> prerequisite of this SPIP
>>
>> Yes, this SPIP only aims at DataSourceV2, so obviously it will help if
>> Hive eventually moves to use V2 API. With that said, I think some of the
>> ideas could be useful for V1 Hive support as well. For instance, with the
>> newly proposed logic to compare whether output partitionings from both
>> sides of a join operator are compatible, we can have HiveTableScanExec to
>> report a different partitioning other than HashPartitioning, and
>> EnsureRequirements could potentially recognize that and therefore avoid
>> shuffle if both sides report the same compatible partitioning. In addition,
>> SPARK-35703, which is part of the SPIP, is also useful in that it relaxes
>> the constraint for V1 bucket join so that the join keys do not necessarily
>> be identical to the bucket keys.
>>
>> > Would aggregate work automatically after the SPIP?
>>
>> Yes it will work as before. This case is already supported by
>> DataSourcePartitioning in V2 (see SPARK-22389).
>>
>> > Any major use cases in mind except Hive bucketed table?
>>
>> Our first use case is Apache Iceberg. In addition to that we also want to
>> add the support for Spark's built-in file data sources.
>>
>> Thanks,
>> Chao
>>
>> On Tue, Oct 26, 2021 at 10:34 AM Cheng Su  wrote:
>>
>>> +1 for this. This is exciting movement to efficiently read bucketed
>>> table from other systems (Hive, Trino & Presto)!
>>>
>>>
>>>
>>> Still looking at the details but having some early questions:
>>>
>>>
>>>
>>>1. Is migrating Hive table read path to data source v2, being a
>>>prerequisite of this SPIP?
>>>
>>>
>>>
>>> Hive table read path is currently a mix of data source v1 (for Parquet &
>>> ORC file format only), and legacy Hive code path (HiveTableScanExec). In
>>> the SPIP, I am seeing we only make change for data source v2, so wondering
>>> how this would work with existing Hive table read path. In addition, just
>>> FYI, supporting writing Hive bucketed table is merged in master recently (
>>> SPARK-19256  has
>>> details).
>>>
>>>
>>>
>>>1. Would aggregate work 

Re: [DISCUSS] SPIP: Storage Partitioned Join for Data Source V2

2021-10-27 Thread Wenchen Fan
IIUC, the general idea is to let each input split report its partition
value, and Spark can perform the join in two phases:
1. join the input splits from left and right tables according to their
partitions values and join keys, at the driver side.
2. for each joined input splits pair (or a group of splits), launch a Spark
task to join the rows.

My major concern is about how to define "compatible partitions". Things
like `days(ts)` are straightforward: the same timestamp value always
results in the same partition value, in whatever v2 sources. `bucket(col,
num)` is tricky, as Spark doesn't define the bucket hash function. Two v2
sources may return different bucket IDs for the same value, and this breaks
the phase 1 split-wise join.

And two questions for further improvements:
1. Can we apply this idea to partitioned file source tables (non-bucketed)
as well?
2. What if the table has many partitions? Shall we apply certain join
algorithms in the phase 1 split-wise join as well? Or even launch a Spark
job to do so?

Thanks,
Wenchen

On Wed, Oct 27, 2021 at 3:08 AM Chao Sun  wrote:

> Thanks Cheng for the comments.
>
> > Is migrating Hive table read path to data source v2, being a
> prerequisite of this SPIP
>
> Yes, this SPIP only aims at DataSourceV2, so obviously it will help if
> Hive eventually moves to use V2 API. With that said, I think some of the
> ideas could be useful for V1 Hive support as well. For instance, with the
> newly proposed logic to compare whether output partitionings from both
> sides of a join operator are compatible, we can have HiveTableScanExec to
> report a different partitioning other than HashPartitioning, and
> EnsureRequirements could potentially recognize that and therefore avoid
> shuffle if both sides report the same compatible partitioning. In addition,
> SPARK-35703, which is part of the SPIP, is also useful in that it relaxes
> the constraint for V1 bucket join so that the join keys do not necessarily
> be identical to the bucket keys.
>
> > Would aggregate work automatically after the SPIP?
>
> Yes it will work as before. This case is already supported by
> DataSourcePartitioning in V2 (see SPARK-22389).
>
> > Any major use cases in mind except Hive bucketed table?
>
> Our first use case is Apache Iceberg. In addition to that we also want to
> add the support for Spark's built-in file data sources.
>
> Thanks,
> Chao
>
> On Tue, Oct 26, 2021 at 10:34 AM Cheng Su  wrote:
>
>> +1 for this. This is exciting movement to efficiently read bucketed table
>> from other systems (Hive, Trino & Presto)!
>>
>>
>>
>> Still looking at the details but having some early questions:
>>
>>
>>
>>1. Is migrating Hive table read path to data source v2, being a
>>prerequisite of this SPIP?
>>
>>
>>
>> Hive table read path is currently a mix of data source v1 (for Parquet &
>> ORC file format only), and legacy Hive code path (HiveTableScanExec). In
>> the SPIP, I am seeing we only make change for data source v2, so wondering
>> how this would work with existing Hive table read path. In addition, just
>> FYI, supporting writing Hive bucketed table is merged in master recently (
>> SPARK-19256  has
>> details).
>>
>>
>>
>>1. Would aggregate work automatically after the SPIP?
>>
>>
>>
>> Another major benefit for having bucketed table, is to avoid shuffle
>> before aggregate. Just want to bring to our attention that it would be
>> great to consider aggregate as well when doing this proposal.
>>
>>
>>
>>1. Any major use cases in mind except Hive bucketed table?
>>
>>
>>
>> Just curious if there’s any other use cases we are targeting as part of
>> SPIP.
>>
>>
>>
>> Thanks,
>>
>> Cheng Su
>>
>>
>>
>>
>>
>>
>>
>> *From: *Ryan Blue 
>> *Date: *Tuesday, October 26, 2021 at 9:39 AM
>> *To: *John Zhuge 
>> *Cc: *Chao Sun , Wenchen Fan ,
>> Cheng Su , DB Tsai , Dongjoon Hyun <
>> dongjoon.h...@gmail.com>, Hyukjin Kwon , Wenchen
>> Fan , angers zhu , dev <
>> dev@spark.apache.org>, huaxin gao 
>> *Subject: *Re: [DISCUSS] SPIP: Storage Partitioned Join for Data Source
>> V2
>>
>> Instead of commenting on the doc, could we keep discussion here on the
>> dev list please? That way more people can follow it and there is more room
>> for discussion. Comment threads have a very small area and easily become
>> hard to follow.
>>
>>
>>
>> Ryan
>>
>>
>>
>> On Tue, Oct 26, 2021 at 9:32 AM John Zhuge  wrote:
>>
>> +1  Nicely done!
>>
>>
>>
>> On Tue, Oct 26, 2021 at 8:08 AM Chao Sun  wrote:
>>
>> Oops, sorry. I just fixed the permission setting.
>>
>>
>>
>> Thanks everyone for the positive support!
>>
>>
>>
>> On Tue, Oct 26, 2021 at 7:30 AM Wenchen Fan  wrote:
>>
>> +1 to this SPIP and nice writeup of the design doc!
>>
>>
>>
>> Can we open comment permission in the doc so that we can discuss details
>> there?
>>
>>
>>
>> On Tue, Oct 26, 2021 at 8:29 PM Hyukjin Kwon  wrote:
>>
>> Seems making sense to me.
>>
>> Would be great to have some feedback