Re: SQL logical plans and DataSourceV2 (was: data source v2 online meetup)
Instead of exploring possible operations ourselves, I think we should follow the SQL standard. Most of these do. We should make conscious decisions with the standard in mind for the SQL API. But we also have the Scala API (and versions of it in other languages) and need to consider how these operations are invoked from there. There is something more we need to take care, like ALTER TABLE. Yes, I was excluding the simple commands like this to focus on the commands that may need to make behavior guarantees. I think those commands are related to the 5 concerns I listed. Another way to think about this is: Alter table isn’t something you could reasonably combine with a write for a single atomic operation. ReplaceTable, RTAS: Most of the mainstream databases don’t support these 2. I think drop-all-data operation is dangerous and we should only allow users to do it with DROP TABLE. I don’t think it would be too confusing for users to have REPLACE TABLE commands. The fact that the old table is dropped is clear. There’s a good use case for this. We have analysts that produce a report table every day. They can overwrite the entire table with new data each day, but they prefer to drop the previous table and create a new one with CTAS because they don’t want to care about schema evolution. They make no guarantees about the table schema, so they use an operation, CTAS, that doesn’t constrain their work. No need to alter a table that is getting replaced. Given a reasonable use case for dropping and recreating a table with CTAS, I think there’s a good argument for an atomic REPLACE TABLE AS SELECT operation. My users don’t want to drop the previous report table until the new one is ready, and they never want a period of time when report data is unavailable. This is why I think it is a good idea to consider each of these. Just because it didn’t make sense in another DB to support RTAS doesn’t mean there isn’t a reason to do it now. As for REPLACE TABLE that isn’t RTAS, I don’t think there’s a good use case because it is unlikely that we need an atomic operation. Not much goes wrong in a table create, and we don’t want to confuse users, who should generally use ALTER TABLE for schema evolution. DeleteFrom, ReplaceData: These 2 are SQL standard, but in a more general form. DELETE, UPDATE, MERGE are the most common SQL statements to change the data. My point is that we should care what users are trying to do and whether we should support a combined atomic operation. Replacing data is an operation that I see our data engineers using all the time. Spark already supports INSERT OVERWRITE ... PARTITION that replaces all data in a partition. We use this to continuously update summary tables as data arrives. Each hour of fact data gets added to the daily summary rollup and replaces the last summary written. Clearly, this should be an atomic operation, and it currently is. The question for v2 is: how do we perform this same operation with the v2 API? A transaction made of from a delete and an insert would work. Is this what we want to use? How do we add this to v2? rb -- Ryan Blue Software Engineer Netflix
Re: SQL logical plans and DataSourceV2 (was: data source v2 online meetup)
I think many advanced Spark users already have customer catalyst rules, to deal with the query plan directly, so it makes a lot of sense to standardize the logical plan. However, instead of exploring possible operations ourselves, I think we should follow the SQL standard. ReplaceTable, RTAS: Most of the mainstream databases don't support these 2. I think drop-all-data operation is dangerous and we should only allow users to do it with DROP TABLE. DeleteFrom, ReplaceData: These 2 are SQL standard, but in a more general form. DELETE, UPDATE, MERGE are the most common SQL statements to change the data. There is something more we need to take care, like ALTER TABLE. I'm looking forward to a holistic SPIP about this, thanks for your contribution! Wenchen On Tue, Feb 6, 2018 at 8:32 AM, Ryan Bluewrote: > Thanks for responding! > > I’ve been coming up with a list of the high-level operations that are > needed. I think all of them come down to 5 questions about what’s happening: > >- Does the target table exist? >- If it does exist, should it be dropped? >- If not, should it get created? >- Should data get written to the table? >- Should data get deleted from the table? > > Using those, you can list out all the potential operations. Here’s a flow > chart that makes it easier to think about: > > Table exists? NoYes > | | > Drop table?N/AYes > <---+--> No > | | > | > Create table?Yes <--+--> No Yes <-+-> No > Exists > | Noop |DropTable > | > Write data? Yes <-+-> NoYes <-+-> No Yes > <--+-> No > CTAS CreateTable RTAS ReplaceTable| > | > Delete data? Yes <---+---> > No Yes <--+--> No > ReplaceData > InsertInto DeleteFrom Noop > > Some of these can be broken down into other operations (replace table = > drop & create), but I think it is valuable to consider each one and think > about whether it should be atomic. CTAS is a create and an insert that > guarantees the table exists only if the insert succeeded. Should we also > support RTAS=ReplaceTableAsSelect (drop, create, insert) and make a similar > guarantee that the original table will be dropped if and only if the write > succeeds? > > As a sanity check, most of these operations correspond to SQL statements > for tables > >- CreateTable = CREATE TABLE t >- DropTable = DROP TABLE t >- ReplaceTable = DROP TABLE t; CREATE TABLE t (no transaction needed?) >- CTAS = CREATE TABLE t AS SELECT ... >- RTAS = ??? (we could add REPLACE TABLE t AS ...) > > Or for data: > >- DeleteFrom = DELETE FROM t WHERE ... >- InsertInto = INSERT INTO t SELECT ... >- ReplaceData = INSERT OVERWRITE t PARTITION (p) SELECT ... or BEGIN; >DELETE FROM t; INSERT INTO t SELECT ...; COMMIT; > > The last one, ReplaceData, is interesting because only one specific case > is currently supported and requires partitioning. > > I think we need to consider all of these operations while building > DataSourceV2. We still need to define what v2 sources should do. > > Also, I would like to see a way to provide weak guarantees easily and > another way for v2 sources to implement stronger guarantees. For example, > CTAS can be implemented as a create, then an insert, with a drop if the > insert fails. That covers most cases and is easy to implement. But some > table formats can provide stronger guarantees. Iceberg supports atomic > create-and-insert, so that a table ever exists unless its write succeeds, > and it’s not just rolled back if the driver is still alive after a failure. > If we implement the basics (create, insert, drop-on-failure) in Spark, I > think we will end up with more data sources that have reliable behavior. > > Would anyone be interested in an improvement proposal for this? It would > be great to document this and build consensus around Spark’s expected > behavior. I can write it up. > > rb > > > On Fri, Feb 2, 2018 at 3:23 PM, Michael Armbrust > wrote: > >> So here are my recommendations for moving forward, with DataSourceV2 as a >>> starting point: >>> >>>1. Use well-defined logical plan nodes for all high-level >>>operations: insert, create, CTAS, overwrite table, etc. >>>2. Use rules that match on these high-level plan nodes, so that it >>>isn’t necessary to create rules to match each eventual code path >>>individually >>>3. Define Spark’s behavior for these
Re: SQL logical plans and DataSourceV2 (was: data source v2 online meetup)
Thanks for responding! I’ve been coming up with a list of the high-level operations that are needed. I think all of them come down to 5 questions about what’s happening: - Does the target table exist? - If it does exist, should it be dropped? - If not, should it get created? - Should data get written to the table? - Should data get deleted from the table? Using those, you can list out all the potential operations. Here’s a flow chart that makes it easier to think about: Table exists? NoYes | | Drop table?N/AYes <---+--> No | | | Create table?Yes <--+--> No Yes <-+-> No Exists | Noop |DropTable | Write data? Yes <-+-> NoYes <-+-> No Yes <--+-> No CTAS CreateTable RTAS ReplaceTable | | Delete data? Yes <---+---> No Yes <--+--> No ReplaceData InsertInto DeleteFrom Noop Some of these can be broken down into other operations (replace table = drop & create), but I think it is valuable to consider each one and think about whether it should be atomic. CTAS is a create and an insert that guarantees the table exists only if the insert succeeded. Should we also support RTAS=ReplaceTableAsSelect (drop, create, insert) and make a similar guarantee that the original table will be dropped if and only if the write succeeds? As a sanity check, most of these operations correspond to SQL statements for tables - CreateTable = CREATE TABLE t - DropTable = DROP TABLE t - ReplaceTable = DROP TABLE t; CREATE TABLE t (no transaction needed?) - CTAS = CREATE TABLE t AS SELECT ... - RTAS = ??? (we could add REPLACE TABLE t AS ...) Or for data: - DeleteFrom = DELETE FROM t WHERE ... - InsertInto = INSERT INTO t SELECT ... - ReplaceData = INSERT OVERWRITE t PARTITION (p) SELECT ... or BEGIN; DELETE FROM t; INSERT INTO t SELECT ...; COMMIT; The last one, ReplaceData, is interesting because only one specific case is currently supported and requires partitioning. I think we need to consider all of these operations while building DataSourceV2. We still need to define what v2 sources should do. Also, I would like to see a way to provide weak guarantees easily and another way for v2 sources to implement stronger guarantees. For example, CTAS can be implemented as a create, then an insert, with a drop if the insert fails. That covers most cases and is easy to implement. But some table formats can provide stronger guarantees. Iceberg supports atomic create-and-insert, so that a table ever exists unless its write succeeds, and it’s not just rolled back if the driver is still alive after a failure. If we implement the basics (create, insert, drop-on-failure) in Spark, I think we will end up with more data sources that have reliable behavior. Would anyone be interested in an improvement proposal for this? It would be great to document this and build consensus around Spark’s expected behavior. I can write it up. rb On Fri, Feb 2, 2018 at 3:23 PM, Michael Armbrustwrote: > So here are my recommendations for moving forward, with DataSourceV2 as a >> starting point: >> >>1. Use well-defined logical plan nodes for all high-level operations: >>insert, create, CTAS, overwrite table, etc. >>2. Use rules that match on these high-level plan nodes, so that it >>isn’t necessary to create rules to match each eventual code path >>individually >>3. Define Spark’s behavior for these logical plan nodes. Physical >>nodes should implement that behavior, but all CREATE TABLE OVERWRITE >> should >>(eventually) make the same guarantees. >>4. Specialize implementation when creating a physical plan, not >>logical plans. >> >> I realize this is really long, but I’d like to hear thoughts about this. >> I’m sure I’ve left out some additional context, but I think the main idea >> here is solid: lets standardize logical plans for more consistent behavior >> and easier maintenance. >> > Context aside, I really like these rules! I think having query planning be > the boundary for specialization makes a lot of sense. > > (RunnableCommand might also be my fault though sorry! :P) > -- Ryan Blue Software Engineer Netflix
Re: SQL logical plans and DataSourceV2 (was: data source v2 online meetup)
> > So here are my recommendations for moving forward, with DataSourceV2 as a > starting point: > >1. Use well-defined logical plan nodes for all high-level operations: >insert, create, CTAS, overwrite table, etc. >2. Use rules that match on these high-level plan nodes, so that it >isn’t necessary to create rules to match each eventual code path >individually >3. Define Spark’s behavior for these logical plan nodes. Physical >nodes should implement that behavior, but all CREATE TABLE OVERWRITE should >(eventually) make the same guarantees. >4. Specialize implementation when creating a physical plan, not >logical plans. > > I realize this is really long, but I’d like to hear thoughts about this. > I’m sure I’ve left out some additional context, but I think the main idea > here is solid: lets standardize logical plans for more consistent behavior > and easier maintenance. > Context aside, I really like these rules! I think having query planning be the boundary for specialization makes a lot of sense. (RunnableCommand might also be my fault though sorry! :P)