Re: SQL logical plans and DataSourceV2 (was: data source v2 online meetup)

2018-02-06 Thread Ryan Blue
Instead of exploring possible operations ourselves, I think we should
follow the SQL standard.

Most of these do. We should make conscious decisions with the standard in
mind for the SQL API. But we also have the Scala API (and versions of it in
other languages) and need to consider how these operations are invoked from
there.

There is something more we need to take care, like ALTER TABLE.

Yes, I was excluding the simple commands like this to focus on the commands
that may need to make behavior guarantees. I think those commands are
related to the 5 concerns I listed.

Another way to think about this is: Alter table isn’t something you could
reasonably combine with a write for a single atomic operation.

ReplaceTable, RTAS:
Most of the mainstream databases don’t support these 2. I think
drop-all-data operation is dangerous and we should only allow users to do
it with DROP TABLE.

I don’t think it would be too confusing for users to have REPLACE TABLE
commands. The fact that the old table is dropped is clear.

There’s a good use case for this. We have analysts that produce a report
table every day. They can overwrite the entire table with new data each
day, but they prefer to drop the previous table and create a new one with
CTAS because they don’t want to care about schema evolution. They make no
guarantees about the table schema, so they use an operation, CTAS, that
doesn’t constrain their work. No need to alter a table that is getting
replaced.

Given a reasonable use case for dropping and recreating a table with CTAS,
I think there’s a good argument for an atomic REPLACE TABLE AS SELECT
operation. My users don’t want to drop the previous report table until the
new one is ready, and they never want a period of time when report data is
unavailable.

This is why I think it is a good idea to consider each of these. Just
because it didn’t make sense in another DB to support RTAS doesn’t mean
there isn’t a reason to do it now.

As for REPLACE TABLE that isn’t RTAS, I don’t think there’s a good use case
because it is unlikely that we need an atomic operation. Not much goes
wrong in a table create, and we don’t want to confuse users, who should
generally use ALTER TABLE for schema evolution.

DeleteFrom, ReplaceData:
These 2 are SQL standard, but in a more general form. DELETE, UPDATE, MERGE
are the most common SQL statements to change the data.

My point is that we should care what users are trying to do and whether we
should support a combined atomic operation.

Replacing data is an operation that I see our data engineers using all the
time. Spark already supports INSERT OVERWRITE ... PARTITION that replaces
all data in a partition. We use this to continuously update summary tables
as data arrives. Each hour of fact data gets added to the daily summary
rollup and replaces the last summary written. Clearly, this should be an
atomic operation, and it currently is.

The question for v2 is: how do we perform this same operation with the v2
API?

A transaction made of from a delete and an insert would work. Is this what
we want to use? How do we add this to v2?

rb
​
-- 
Ryan Blue
Software Engineer
Netflix


Re: SQL logical plans and DataSourceV2 (was: data source v2 online meetup)

2018-02-05 Thread Wenchen Fan
I think many advanced Spark users already have customer catalyst rules, to
deal with the query plan directly, so it makes a lot of sense to
standardize the logical plan. However, instead of exploring possible
operations ourselves, I think we should follow the SQL standard.

ReplaceTable, RTAS:
Most of the mainstream databases don't support these 2. I think
drop-all-data operation is dangerous and we should only allow users to do
it with DROP TABLE.

DeleteFrom, ReplaceData:
These 2 are SQL standard, but in a more general form. DELETE, UPDATE, MERGE
are the most common SQL statements to change the data.

There is something more we need to take care, like ALTER TABLE. I'm looking
forward to a holistic SPIP about this, thanks for your contribution!

Wenchen



On Tue, Feb 6, 2018 at 8:32 AM, Ryan Blue  wrote:

> Thanks for responding!
>
> I’ve been coming up with a list of the high-level operations that are
> needed. I think all of them come down to 5 questions about what’s happening:
>
>- Does the target table exist?
>- If it does exist, should it be dropped?
>- If not, should it get created?
>- Should data get written to the table?
>- Should data get deleted from the table?
>
> Using those, you can list out all the potential operations. Here’s a flow
> chart that makes it easier to think about:
>
> Table exists?  NoYes
> | |
> Drop table?N/AYes 
> <---+--> No
> |  |  
>   |
> Create table?Yes <--+--> No  Yes <-+-> No 
> Exists
>   |  Noop |DropTable  
>   |
> Write data? Yes <-+-> NoYes <-+-> No Yes 
> <--+-> No
> CTAS  CreateTable   RTAS  ReplaceTable|   
>   |
> Delete data?  Yes <---+---> 
> No   Yes <--+--> No
>   ReplaceData   
> InsertInto   DeleteFrom  Noop
>
> Some of these can be broken down into other operations (replace table =
> drop & create), but I think it is valuable to consider each one and think
> about whether it should be atomic. CTAS is a create and an insert that
> guarantees the table exists only if the insert succeeded. Should we also
> support RTAS=ReplaceTableAsSelect (drop, create, insert) and make a similar
> guarantee that the original table will be dropped if and only if the write
> succeeds?
>
> As a sanity check, most of these operations correspond to SQL statements
> for tables
>
>- CreateTable = CREATE TABLE t
>- DropTable = DROP TABLE t
>- ReplaceTable = DROP TABLE t; CREATE TABLE t (no transaction needed?)
>- CTAS = CREATE TABLE t AS SELECT ...
>- RTAS = ??? (we could add REPLACE TABLE t AS ...)
>
> Or for data:
>
>- DeleteFrom = DELETE FROM t WHERE ...
>- InsertInto = INSERT INTO t SELECT ...
>- ReplaceData = INSERT OVERWRITE t PARTITION (p) SELECT ... or BEGIN;
>DELETE FROM t; INSERT INTO t SELECT ...; COMMIT;
>
> The last one, ReplaceData, is interesting because only one specific case
> is currently supported and requires partitioning.
>
> I think we need to consider all of these operations while building
> DataSourceV2. We still need to define what v2 sources should do.
>
> Also, I would like to see a way to provide weak guarantees easily and
> another way for v2 sources to implement stronger guarantees. For example,
> CTAS can be implemented as a create, then an insert, with a drop if the
> insert fails. That covers most cases and is easy to implement. But some
> table formats can provide stronger guarantees. Iceberg supports atomic
> create-and-insert, so that a table ever exists unless its write succeeds,
> and it’s not just rolled back if the driver is still alive after a failure.
> If we implement the basics (create, insert, drop-on-failure) in Spark, I
> think we will end up with more data sources that have reliable behavior.
>
> Would anyone be interested in an improvement proposal for this? It would
> be great to document this and build consensus around Spark’s expected
> behavior. I can write it up.
>
> rb
> ​
>
> On Fri, Feb 2, 2018 at 3:23 PM, Michael Armbrust 
> wrote:
>
>> So here are my recommendations for moving forward, with DataSourceV2 as a
>>> starting point:
>>>
>>>1. Use well-defined logical plan nodes for all high-level
>>>operations: insert, create, CTAS, overwrite table, etc.
>>>2. Use rules that match on these high-level plan nodes, so that it
>>>isn’t necessary to create rules to match each eventual code path
>>>individually
>>>3. Define Spark’s behavior for these 

Re: SQL logical plans and DataSourceV2 (was: data source v2 online meetup)

2018-02-05 Thread Ryan Blue
Thanks for responding!

I’ve been coming up with a list of the high-level operations that are
needed. I think all of them come down to 5 questions about what’s happening:

   - Does the target table exist?
   - If it does exist, should it be dropped?
   - If not, should it get created?
   - Should data get written to the table?
   - Should data get deleted from the table?

Using those, you can list out all the potential operations. Here’s a flow
chart that makes it easier to think about:

Table exists?  NoYes
| |
Drop table?N/AYes
<---+--> No
|  |
 |
Create table?Yes <--+--> No  Yes <-+-> No
   Exists
  |  Noop |DropTable
 |
Write data? Yes <-+-> NoYes <-+-> No
Yes <--+-> No
CTAS  CreateTable   RTAS  ReplaceTable
| |
Delete data?  Yes
<---+---> No   Yes <--+--> No

ReplaceData   InsertInto   DeleteFrom  Noop

Some of these can be broken down into other operations (replace table =
drop & create), but I think it is valuable to consider each one and think
about whether it should be atomic. CTAS is a create and an insert that
guarantees the table exists only if the insert succeeded. Should we also
support RTAS=ReplaceTableAsSelect (drop, create, insert) and make a similar
guarantee that the original table will be dropped if and only if the write
succeeds?

As a sanity check, most of these operations correspond to SQL statements
for tables

   - CreateTable = CREATE TABLE t
   - DropTable = DROP TABLE t
   - ReplaceTable = DROP TABLE t; CREATE TABLE t (no transaction needed?)
   - CTAS = CREATE TABLE t AS SELECT ...
   - RTAS = ??? (we could add REPLACE TABLE t AS ...)

Or for data:

   - DeleteFrom = DELETE FROM t WHERE ...
   - InsertInto = INSERT INTO t SELECT ...
   - ReplaceData = INSERT OVERWRITE t PARTITION (p) SELECT ... or BEGIN;
   DELETE FROM t; INSERT INTO t SELECT ...; COMMIT;

The last one, ReplaceData, is interesting because only one specific case is
currently supported and requires partitioning.

I think we need to consider all of these operations while building
DataSourceV2. We still need to define what v2 sources should do.

Also, I would like to see a way to provide weak guarantees easily and
another way for v2 sources to implement stronger guarantees. For example,
CTAS can be implemented as a create, then an insert, with a drop if the
insert fails. That covers most cases and is easy to implement. But some
table formats can provide stronger guarantees. Iceberg supports atomic
create-and-insert, so that a table ever exists unless its write succeeds,
and it’s not just rolled back if the driver is still alive after a failure.
If we implement the basics (create, insert, drop-on-failure) in Spark, I
think we will end up with more data sources that have reliable behavior.

Would anyone be interested in an improvement proposal for this? It would be
great to document this and build consensus around Spark’s expected
behavior. I can write it up.

rb
​

On Fri, Feb 2, 2018 at 3:23 PM, Michael Armbrust 
wrote:

> So here are my recommendations for moving forward, with DataSourceV2 as a
>> starting point:
>>
>>1. Use well-defined logical plan nodes for all high-level operations:
>>insert, create, CTAS, overwrite table, etc.
>>2. Use rules that match on these high-level plan nodes, so that it
>>isn’t necessary to create rules to match each eventual code path
>>individually
>>3. Define Spark’s behavior for these logical plan nodes. Physical
>>nodes should implement that behavior, but all CREATE TABLE OVERWRITE 
>> should
>>(eventually) make the same guarantees.
>>4. Specialize implementation when creating a physical plan, not
>>logical plans.
>>
>> I realize this is really long, but I’d like to hear thoughts about this.
>> I’m sure I’ve left out some additional context, but I think the main idea
>> here is solid: lets standardize logical plans for more consistent behavior
>> and easier maintenance.
>>
> Context aside, I really like these rules! I think having query planning be
> the boundary for specialization makes a lot of sense.
>
> (RunnableCommand might also be my fault though sorry! :P)
>



-- 
Ryan Blue
Software Engineer
Netflix


Re: SQL logical plans and DataSourceV2 (was: data source v2 online meetup)

2018-02-02 Thread Michael Armbrust
>
> So here are my recommendations for moving forward, with DataSourceV2 as a
> starting point:
>
>1. Use well-defined logical plan nodes for all high-level operations:
>insert, create, CTAS, overwrite table, etc.
>2. Use rules that match on these high-level plan nodes, so that it
>isn’t necessary to create rules to match each eventual code path
>individually
>3. Define Spark’s behavior for these logical plan nodes. Physical
>nodes should implement that behavior, but all CREATE TABLE OVERWRITE should
>(eventually) make the same guarantees.
>4. Specialize implementation when creating a physical plan, not
>logical plans.
>
> I realize this is really long, but I’d like to hear thoughts about this.
> I’m sure I’ve left out some additional context, but I think the main idea
> here is solid: lets standardize logical plans for more consistent behavior
> and easier maintenance.
>
Context aside, I really like these rules! I think having query planning be
the boundary for specialization makes a lot of sense.

(RunnableCommand might also be my fault though sorry! :P)