Re: [discuss] Data Source V2 write path

2017-10-02 Thread Ryan Blue
As far as changes to the public API go, I’d prefer deprecating the API that
mixes data and metadata operations. But I don’t think that requires that we
go with your proposal #1, where the current write API can’t use data source
v2 writers. I think we can separate the metadata operations for Hadoop FS
tables from writes using the current public API. If this isn’t possible for
Hadoop FS sources, I’d like to understand *why* it isn’t possible (*how*
would it require a catalog API?). This also requires special handling for
Hadoop FS sources, it could be that this plan would cause some existing v1
sources, to lack certain operations before the catalog API is ready. We
should identify what those implementations are and then see whether this
would be worth it.

To sum up what I’m saying, here’s my proposal:

   - *4* — DataFrameWriter.save supports data source v2 writers, which
   assume that tables exist and have some configuration. That configuration is
   passed as a Relation or CatalogTable when the writer is created. For
   Hadoop FS tables, we would move metadata operations out of the v2 writer
   and would implement them in a command or using a non-public API called by a
   command. This doesn’t require waiting for a catalog API because it would
   have the same behavior as v1, just *implemented outside the new write
   API*.

This would not require any public API changes, but would prepare for the
catalog API by separating metadata operations from the writers. If writers
are expected to perform some metadata operations, then we won’t be able to
cleanly add the catalog API without changing the expectations of existing
writers.

Eventually, we should deprecate the existing write API so expectations are
clear like they are in SQL. I’ve been thinking it should look like this
(but this is by no means well thought-through):

df.insert.into("db.table") // inserts into db.table by column name
df.insert.byPosition.into("db.table") // insert by
positiondf.save.as("db.table") // creates db.table as select, fails if
exists

I’d also like an API for table creation, which may have more options than
the df.save.as("..."). This would all be after the catalog API is
introduced, of course.

rb
​

On Sun, Oct 1, 2017 at 8:25 AM, Wenchen Fan  wrote:

> The main entries of data source inside Spark is the SQL API and
> `DataFrameReader/Writer`.
>
> For SQL API, I think the semantic is well defined, the data and metadata
> operations are separated. E.g., INSERT INTO means write data into an
> existing table, CREATE TABLE means only create the metadata. But the
> problem is, data source v1 can't provide metadata access, so we still mix
> data and metadata operations at data source level. Data source v2 can solve
> this problem, by introducing a `CatalogSupport` trait which can allow data
> source to provide metadata access.
>
> For DataFrameWriter API, e.g. `df.write.format(...).save()`, there is no
> separation of data and metadata. According to the document, it seems Spark
> doesn't care about metadata here, and assumes the underlying data sources
> take care of it.
>
> I have 3 proposals:
> 1. `DataFrameWriter.save` doesn't support data source v2, we create new
> APIs(or reusing `DataFrameWriter.insertInto/saveAsTable`?) for data
> source v2 that separate data and metadata clearly.
> 2. Update the semantic of `DataFrameWriter.save` and say it's only for
> writing data, not metadata. Then data source v2 can separate data and
> metadata clearly, because the upper API also seperates them.
> 3. `DataFrameWriter.save` supports data source v2, and concrete data
> source implementations define their own behaviors. They can separate data
> and metadata operations, and fail `DataFrameWriter.save` if the table
> doesn't exist. They can mix the data and metadata operations in
> the write path, and use `options` to carry metadata information.
>
> Proposal 1 and 2 are similar, and proposal 1 is better because changing
> the semantic of an existing API is not good. Both proposal 1 and 2 need to
> wait for catalog federation before releasing data source v2, so that we can
> clearly define what catalog functionalities the data source should provide.
> Both proposal 1 and 2 force data sources to provide metadata
> access(catalog), which is unfriendly to simple data sources that don't have
> metastore.
>
> Personally I prefer proposal 3, because it's not blocked by catalog
> federation, so that we can develop it incrementally. And it makes the
> catalog support optional, so that simple data sources without metastore can
> also implement data source v2.
>
> More proposals are welcome!
>
>
> On Sat, Sep 30, 2017 at 3:26 AM, Ryan Blue  wrote:
>
>> > Spark doesn't know how to create a table in external systems like
>> Cassandra, and that's why it's currently done inside the data source writer.
>>
>> This isn't a valid argument for doing this task in the writer for v2. If
>> we want to fix the problems with 

Re: [discuss] Data Source V2 write path

2017-10-01 Thread Wenchen Fan
The main entries of data source inside Spark is the SQL API and
`DataFrameReader/Writer`.

For SQL API, I think the semantic is well defined, the data and metadata
operations are separated. E.g., INSERT INTO means write data into an
existing table, CREATE TABLE means only create the metadata. But the
problem is, data source v1 can't provide metadata access, so we still mix
data and metadata operations at data source level. Data source v2 can solve
this problem, by introducing a `CatalogSupport` trait which can allow data
source to provide metadata access.

For DataFrameWriter API, e.g. `df.write.format(...).save()`, there is no
separation of data and metadata. According to the document, it seems Spark
doesn't care about metadata here, and assumes the underlying data sources
take care of it.

I have 3 proposals:
1. `DataFrameWriter.save` doesn't support data source v2, we create new
APIs(or reusing `DataFrameWriter.insertInto/saveAsTable`?) for data source
v2 that separate data and metadata clearly.
2. Update the semantic of `DataFrameWriter.save` and say it's only for
writing data, not metadata. Then data source v2 can separate data and
metadata clearly, because the upper API also seperates them.
3. `DataFrameWriter.save` supports data source v2, and concrete data source
implementations define their own behaviors. They can separate data and
metadata operations, and fail `DataFrameWriter.save` if the table
doesn't exist. They can mix the data and metadata operations in
the write path, and use `options` to carry metadata information.

Proposal 1 and 2 are similar, and proposal 1 is better because changing the
semantic of an existing API is not good. Both proposal 1 and 2 need to wait
for catalog federation before releasing data source v2, so that we can
clearly define what catalog functionalities the data source should provide.
Both proposal 1 and 2 force data sources to provide metadata
access(catalog), which is unfriendly to simple data sources that don't have
metastore.

Personally I prefer proposal 3, because it's not blocked by catalog
federation, so that we can develop it incrementally. And it makes the
catalog support optional, so that simple data sources without metastore can
also implement data source v2.

More proposals are welcome!


On Sat, Sep 30, 2017 at 3:26 AM, Ryan Blue  wrote:

> > Spark doesn't know how to create a table in external systems like
> Cassandra, and that's why it's currently done inside the data source writer.
>
> This isn't a valid argument for doing this task in the writer for v2. If
> we want to fix the problems with v1, we shouldn't continue to mix write
> operations with table metadata changes simply because it is more convenient
> and requires less refactoring.
>
> I'm proposing that in v2 we move creation of file system tables outside of
> the writer, but still in a non-public implementation. Cassandra and other
> external stores would behave as they should today and assume the table
> exists, or would wait to use the v2 API until there is catalog support.
>
> The important thing is that we don't set a standard that writers can
> create tables, which is going to lead to different behavior across
> implementations when we have conflicts between an existing table's config
> and the options passed into the writer.
>
> > For now, Spark just assumes data source writer takes care of it. For the
> internal file format data source, I propose to pass partition/bucket
> information to the writer via options, other data sources can define their
> own behavior, e.g. they can also use the options, or disallow users to
> write data to a non-existing table and ask users to create the table in the
> external systems first.
>
> The point is preventing data sources from defining their own behavior so
> we can introduce consistent behavior across sources for v2.
>
> rb
>
> On Thu, Sep 28, 2017 at 8:49 PM, Wenchen Fan  wrote:
>
>> > When this CTAS logical node is turned into a physical plan, the
>> relation gets turned into a `DataSourceV2` instance and then Spark gets a
>> writer and configures it with the proposed API. The main point of this is
>> to pass the logical relation (with all of the user's options) through to
>> the data source, not the writer. The data source creates the writer and can
>> tell the writer what to do.
>>
>> Here is the problem: Spark doesn't know how to create a table in external
>> systems like Cassandra, and that's why it's currently done inside the data
>> source writer.
>>
>> In the future, we can add a new trait `CatalogSupport` for
>> `DataSourceV2`, so that we can use your proposal and separate metadata
>> management from data source writer.
>>
>> For now, Spark just assumes data source writer takes care of it. For the
>> internal file format data source, I propose to pass partition/bucket
>> information to the writer via options, other data sources can define their
>> own behavior, e.g. they can also use the 

Re: [discuss] Data Source V2 write path

2017-09-29 Thread Ryan Blue
> Spark doesn't know how to create a table in external systems like
Cassandra, and that's why it's currently done inside the data source writer.

This isn't a valid argument for doing this task in the writer for v2. If we
want to fix the problems with v1, we shouldn't continue to mix write
operations with table metadata changes simply because it is more convenient
and requires less refactoring.

I'm proposing that in v2 we move creation of file system tables outside of
the writer, but still in a non-public implementation. Cassandra and other
external stores would behave as they should today and assume the table
exists, or would wait to use the v2 API until there is catalog support.

The important thing is that we don't set a standard that writers can create
tables, which is going to lead to different behavior across implementations
when we have conflicts between an existing table's config and the options
passed into the writer.

> For now, Spark just assumes data source writer takes care of it. For the
internal file format data source, I propose to pass partition/bucket
information to the writer via options, other data sources can define their
own behavior, e.g. they can also use the options, or disallow users to
write data to a non-existing table and ask users to create the table in the
external systems first.

The point is preventing data sources from defining their own behavior so we
can introduce consistent behavior across sources for v2.

rb

On Thu, Sep 28, 2017 at 8:49 PM, Wenchen Fan  wrote:

> > When this CTAS logical node is turned into a physical plan, the relation
> gets turned into a `DataSourceV2` instance and then Spark gets a writer and
> configures it with the proposed API. The main point of this is to pass the
> logical relation (with all of the user's options) through to the data
> source, not the writer. The data source creates the writer and can tell the
> writer what to do.
>
> Here is the problem: Spark doesn't know how to create a table in external
> systems like Cassandra, and that's why it's currently done inside the data
> source writer.
>
> In the future, we can add a new trait `CatalogSupport` for `DataSourceV2`,
> so that we can use your proposal and separate metadata management from data
> source writer.
>
> For now, Spark just assumes data source writer takes care of it. For the
> internal file format data source, I propose to pass partition/bucket
> information to the writer via options, other data sources can define their
> own behavior, e.g. they can also use the options, or disallow users to
> write data to a non-existing table and ask users to create the table in the
> external systems first.
>
>
>
> On Thu, Sep 28, 2017 at 5:45 AM, Russell Spitzer <
> russell.spit...@gmail.com> wrote:
>
>> On an unrelated note, is there any appetite for making the write path
>> also include an option to return elements that were not
>> able to be processed for some reason.
>>
>> Usage might be like
>>
>> saveAndIgnoreFailures() : Dataset
>>
>> So that if some records cannot be parsed by the datasource for writing,
>> or violate some contract with the datasource the records can be returned
>> for further processing or dealt with by an alternate system.
>>
>> On Wed, Sep 27, 2017 at 12:40 PM Ryan Blue 
>> wrote:
>>
>>> Comments inline. I've written up what I'm proposing with a bit more
>>> detail.
>>>
>>> On Tue, Sep 26, 2017 at 11:17 AM, Wenchen Fan 
>>> wrote:
>>>
 I'm trying to give a summary:

 Ideally data source API should only deal with data, not metadata. But
 one key problem is, Spark still need to support data sources without
 metastore, e.g. file format data sources.

 For this kind of data sources, users have to pass the metadata
 information like partitioning/bucketing to every write action of a
 "table"(or other identifiers like path of a file format data source), and
 it's user's responsibility to make sure these metadata information are
 consistent. If it's inconsistent, the behavior is undefined, different data
 sources may have different behaviors.

>>>
>>> Agreed so far. One minor point is that we currently throws an exception
>>> if you try to configure, for example, partitioning and also use
>>> `insertInto`.
>>>
>>>
 If we agree on this, then data source write API should have a way to
 pass these metadata information, and I think using data source options is
 a good choice because it's the most implicit way and doesn't require new
 APIs.

>>>
>>> What I don't understand is why we "can't avoid this problem" unless you
>>> mean the last point, that we have to support this. I don't think that using
>>> data source options is a good choice, but maybe I don't understand the
>>> alternatives. Here's a straw-man version of what I'm proposing so you can
>>> tell me what's wrong with it or why options are a better choice.
>>>
>>> I'm 

Re: [discuss] Data Source V2 write path

2017-09-28 Thread Wenchen Fan
> When this CTAS logical node is turned into a physical plan, the relation
gets turned into a `DataSourceV2` instance and then Spark gets a writer and
configures it with the proposed API. The main point of this is to pass the
logical relation (with all of the user's options) through to the data
source, not the writer. The data source creates the writer and can tell the
writer what to do.

Here is the problem: Spark doesn't know how to create a table in external
systems like Cassandra, and that's why it's currently done inside the data
source writer.

In the future, we can add a new trait `CatalogSupport` for `DataSourceV2`,
so that we can use your proposal and separate metadata management from data
source writer.

For now, Spark just assumes data source writer takes care of it. For the
internal file format data source, I propose to pass partition/bucket
information to the writer via options, other data sources can define their
own behavior, e.g. they can also use the options, or disallow users to
write data to a non-existing table and ask users to create the table in the
external systems first.



On Thu, Sep 28, 2017 at 5:45 AM, Russell Spitzer 
wrote:

> On an unrelated note, is there any appetite for making the write path also
> include an option to return elements that were not
> able to be processed for some reason.
>
> Usage might be like
>
> saveAndIgnoreFailures() : Dataset
>
> So that if some records cannot be parsed by the datasource for writing, or
> violate some contract with the datasource the records can be returned for
> further processing or dealt with by an alternate system.
>
> On Wed, Sep 27, 2017 at 12:40 PM Ryan Blue 
> wrote:
>
>> Comments inline. I've written up what I'm proposing with a bit more
>> detail.
>>
>> On Tue, Sep 26, 2017 at 11:17 AM, Wenchen Fan 
>> wrote:
>>
>>> I'm trying to give a summary:
>>>
>>> Ideally data source API should only deal with data, not metadata. But
>>> one key problem is, Spark still need to support data sources without
>>> metastore, e.g. file format data sources.
>>>
>>> For this kind of data sources, users have to pass the metadata
>>> information like partitioning/bucketing to every write action of a
>>> "table"(or other identifiers like path of a file format data source), and
>>> it's user's responsibility to make sure these metadata information are
>>> consistent. If it's inconsistent, the behavior is undefined, different data
>>> sources may have different behaviors.
>>>
>>
>> Agreed so far. One minor point is that we currently throws an exception
>> if you try to configure, for example, partitioning and also use
>> `insertInto`.
>>
>>
>>> If we agree on this, then data source write API should have a way to
>>> pass these metadata information, and I think using data source options is
>>> a good choice because it's the most implicit way and doesn't require new
>>> APIs.
>>>
>>
>> What I don't understand is why we "can't avoid this problem" unless you
>> mean the last point, that we have to support this. I don't think that using
>> data source options is a good choice, but maybe I don't understand the
>> alternatives. Here's a straw-man version of what I'm proposing so you can
>> tell me what's wrong with it or why options are a better choice.
>>
>> I'm assuming we start with a query like this:
>> ```
>> df.write.partitionBy("utc_date").bucketBy("primary_key")
>> .format("parquet").saveAsTable("s3://bucket/path/")
>> ```
>>
>> That creates a logical node, `CreateTableAsSelect`, with some options. It
>> would contain a `Relation` (or `CatalogTable` definition?) that corresponds
>> to the user's table name and `partitionBy`, `format`, etc. calls. It should
>> also have a write mode and the logical plan for `df`.
>>
>> When this CTAS logical node is turned into a physical plan, the relation
>> gets turned into a `DataSourceV2` instance and then Spark gets a writer and
>> configures it with the proposed API. The main point of this is to pass the
>> logical relation (with all of the user's options) through to the data
>> source, not the writer. The data source creates the writer and can tell the
>> writer what to do. Another benefit of this approach is that the relation
>> gets resolved during analysis, when it is easy to add sorts and other
>> requirements to the logical plan.
>>
>> If we were to implement what I'm suggesting, then we could handle
>> metadata conflicts outside of the `DataSourceV2Writer`, in the data source.
>> That eliminates problems about defining behavior when there are conflicts
>> (the next point) and prepares implementations for a catalog API that would
>> standardize how those conflicts are handled. In the short term, this
>> doesn't have to be in a public API yet. It can be special handling for
>> HadoopFS relations that we can eventually use underneath a public API.
>>
>> Please let me know if I've misunderstood something. Now that I've written
>> 

Re: [discuss] Data Source V2 write path

2017-09-27 Thread Russell Spitzer
On an unrelated note, is there any appetite for making the write path also
include an option to return elements that were not
able to be processed for some reason.

Usage might be like

saveAndIgnoreFailures() : Dataset

So that if some records cannot be parsed by the datasource for writing, or
violate some contract with the datasource the records can be returned for
further processing or dealt with by an alternate system.

On Wed, Sep 27, 2017 at 12:40 PM Ryan Blue 
wrote:

> Comments inline. I've written up what I'm proposing with a bit more detail.
>
> On Tue, Sep 26, 2017 at 11:17 AM, Wenchen Fan  wrote:
>
>> I'm trying to give a summary:
>>
>> Ideally data source API should only deal with data, not metadata. But one
>> key problem is, Spark still need to support data sources without metastore,
>> e.g. file format data sources.
>>
>> For this kind of data sources, users have to pass the metadata
>> information like partitioning/bucketing to every write action of a
>> "table"(or other identifiers like path of a file format data source), and
>> it's user's responsibility to make sure these metadata information are
>> consistent. If it's inconsistent, the behavior is undefined, different data
>> sources may have different behaviors.
>>
>
> Agreed so far. One minor point is that we currently throws an exception if
> you try to configure, for example, partitioning and also use `insertInto`.
>
>
>> If we agree on this, then data source write API should have a way to pass
>> these metadata information, and I think using data source options is a good
>> choice because it's the most implicit way and doesn't require new APIs.
>>
>
> What I don't understand is why we "can't avoid this problem" unless you
> mean the last point, that we have to support this. I don't think that using
> data source options is a good choice, but maybe I don't understand the
> alternatives. Here's a straw-man version of what I'm proposing so you can
> tell me what's wrong with it or why options are a better choice.
>
> I'm assuming we start with a query like this:
> ```
>
> df.write.partitionBy("utc_date").bucketBy("primary_key").format("parquet").saveAsTable("s3://bucket/path/")
> ```
>
> That creates a logical node, `CreateTableAsSelect`, with some options. It
> would contain a `Relation` (or `CatalogTable` definition?) that corresponds
> to the user's table name and `partitionBy`, `format`, etc. calls. It should
> also have a write mode and the logical plan for `df`.
>
> When this CTAS logical node is turned into a physical plan, the relation
> gets turned into a `DataSourceV2` instance and then Spark gets a writer and
> configures it with the proposed API. The main point of this is to pass the
> logical relation (with all of the user's options) through to the data
> source, not the writer. The data source creates the writer and can tell the
> writer what to do. Another benefit of this approach is that the relation
> gets resolved during analysis, when it is easy to add sorts and other
> requirements to the logical plan.
>
> If we were to implement what I'm suggesting, then we could handle metadata
> conflicts outside of the `DataSourceV2Writer`, in the data source. That
> eliminates problems about defining behavior when there are conflicts (the
> next point) and prepares implementations for a catalog API that would
> standardize how those conflicts are handled. In the short term, this
> doesn't have to be in a public API yet. It can be special handling for
> HadoopFS relations that we can eventually use underneath a public API.
>
> Please let me know if I've misunderstood something. Now that I've written
> out how we could actually implement conflict handling outside of the
> writer, I can see that it isn't as obvious of a change as I thought. But, I
> think in the long term this would be a better way to go.
>
>
>> But then we have another problem: how to define the behavior for data
>> sources with metastore when the given options contain metadata information?
>> A typical case is `DataFrameWriter.saveAsTable`, when a user calls it with
>> partition columns, he doesn't know what will happen. The table may not
>> exist and he may create the table successfully with specified partition
>> columns, or the table already exist but has inconsistent partition columns
>> and Spark throws exception. Besides, save mode doesn't play well in this
>> case, as we may need different save modes for data and metadata.
>>
>> My proposal: data source API should only focus on data, but concrete data
>> sources can implement some dirty features via options. e.g. file format
>> data sources can take partitioning/bucketing from options, data source with
>> metastore can use a special flag in options to indicate a create table
>> command(without writing data).
>>
>
> I can see how this would make changes smaller, but I don't think it is a
> good thing to do. If we do this, then I think we will not really 

Re: [discuss] Data Source V2 write path

2017-09-27 Thread Ryan Blue
Comments inline. I've written up what I'm proposing with a bit more detail.

On Tue, Sep 26, 2017 at 11:17 AM, Wenchen Fan  wrote:

> I'm trying to give a summary:
>
> Ideally data source API should only deal with data, not metadata. But one
> key problem is, Spark still need to support data sources without metastore,
> e.g. file format data sources.
>
> For this kind of data sources, users have to pass the metadata information
> like partitioning/bucketing to every write action of a "table"(or other
> identifiers like path of a file format data source), and it's user's
> responsibility to make sure these metadata information are consistent. If
> it's inconsistent, the behavior is undefined, different data sources may
> have different behaviors.
>

Agreed so far. One minor point is that we currently throws an exception if
you try to configure, for example, partitioning and also use `insertInto`.


> If we agree on this, then data source write API should have a way to pass
> these metadata information, and I think using data source options is a good
> choice because it's the most implicit way and doesn't require new APIs.
>

What I don't understand is why we "can't avoid this problem" unless you
mean the last point, that we have to support this. I don't think that using
data source options is a good choice, but maybe I don't understand the
alternatives. Here's a straw-man version of what I'm proposing so you can
tell me what's wrong with it or why options are a better choice.

I'm assuming we start with a query like this:
```
df.write.partitionBy("utc_date").bucketBy("primary_key").format("parquet").
saveAsTable("s3://bucket/path/")
```

That creates a logical node, `CreateTableAsSelect`, with some options. It
would contain a `Relation` (or `CatalogTable` definition?) that corresponds
to the user's table name and `partitionBy`, `format`, etc. calls. It should
also have a write mode and the logical plan for `df`.

When this CTAS logical node is turned into a physical plan, the relation
gets turned into a `DataSourceV2` instance and then Spark gets a writer and
configures it with the proposed API. The main point of this is to pass the
logical relation (with all of the user's options) through to the data
source, not the writer. The data source creates the writer and can tell the
writer what to do. Another benefit of this approach is that the relation
gets resolved during analysis, when it is easy to add sorts and other
requirements to the logical plan.

If we were to implement what I'm suggesting, then we could handle metadata
conflicts outside of the `DataSourceV2Writer`, in the data source. That
eliminates problems about defining behavior when there are conflicts (the
next point) and prepares implementations for a catalog API that would
standardize how those conflicts are handled. In the short term, this
doesn't have to be in a public API yet. It can be special handling for
HadoopFS relations that we can eventually use underneath a public API.

Please let me know if I've misunderstood something. Now that I've written
out how we could actually implement conflict handling outside of the
writer, I can see that it isn't as obvious of a change as I thought. But, I
think in the long term this would be a better way to go.


> But then we have another problem: how to define the behavior for data
> sources with metastore when the given options contain metadata information?
> A typical case is `DataFrameWriter.saveAsTable`, when a user calls it with
> partition columns, he doesn't know what will happen. The table may not
> exist and he may create the table successfully with specified partition
> columns, or the table already exist but has inconsistent partition columns
> and Spark throws exception. Besides, save mode doesn't play well in this
> case, as we may need different save modes for data and metadata.
>
> My proposal: data source API should only focus on data, but concrete data
> sources can implement some dirty features via options. e.g. file format
> data sources can take partitioning/bucketing from options, data source with
> metastore can use a special flag in options to indicate a create table
> command(without writing data).
>

I can see how this would make changes smaller, but I don't think it is a
good thing to do. If we do this, then I think we will not really accomplish
what we want to with this (a clean write API).


> In other words, Spark connects users to data sources with a clean protocol
> that only focus on data, but this protocol has a backdoor: the data source
> options. Concrete data sources are free to define how to deal with
> metadata, e.g. Cassandra data source can ask users to create table at
> Cassandra side first, then write data at Spark side, or ask users to
> provide more details in options and do CTAS at Spark side. These can be
> done via options.
>
> After catalog federation, hopefully only file format data sources still
> use this backdoor.
>

Why would 

Re: [discuss] Data Source V2 write path

2017-09-26 Thread Wenchen Fan
I'm trying to give a summary:

Ideally data source API should only deal with data, not metadata. But one
key problem is, Spark still need to support data sources without metastore,
e.g. file format data sources.

For this kind of data sources, users have to pass the metadata information
like partitioning/bucketing to every write action of a "table"(or other
identifiers like path of a file format data source), and it's user's
responsibility to make sure these metadata information are consistent. If
it's inconsistent, the behavior is undefined, different data sources may
have different behaviors.

If we agree on this, then data source write API should have a way to pass
these metadata information, and I think using data source options is a good
choice because it's the most implicit way and doesn't require new APIs.

But then we have another problem: how to define the behavior for data
sources with metastore when the given options contain metadata information?
A typical case is `DataFrameWriter.saveAsTable`, when a user calls it with
partition columns, he doesn't know what will happen. The table may not
exist and he may create the table successfully with specified partition
columns, or the table already exist but has inconsistent partition columns
and Spark throws exception. Besides, save mode doesn't play well in this
case, as we may need different save modes for data and metadata.

My proposal: data source API should only focus on data, but concrete data
sources can implement some dirty features via options. e.g. file format
data sources can take partitioning/bucketing from options, data source with
metastore can use a special flag in options to indicate a create table
command(without writing data).

In other words, Spark connects users to data sources with a clean protocol
that only focus on data, but this protocol has a backdoor: the data source
options. Concrete data sources are free to define how to deal with
metadata, e.g. Cassandra data source can ask users to create table at
Cassandra side first, then write data at Spark side, or ask users to
provide more details in options and do CTAS at Spark side. These can be
done via options.

After catalog federation, hopefully only file format data sources still use
this backdoor.


On Tue, Sep 26, 2017 at 8:52 AM, Wenchen Fan  wrote:

> > I think it is a bad idea to let this problem leak into the new storage
> API.
>
> Well, I think using data source options is a good compromise for this. We
> can't avoid this problem until catalog federation is done, and this may not
> happen within Spark 2.3, but we definitely need data source write API in
> Spark 2.3.
>
> > Why can't we use an in-memory catalog to store the configuration of
> HadoopFS tables?
>
> We still need to support existing Spark applications which have
> `df.write.partitionBy(...).parquet(...)`. And I think it's similar to
> `DataFrameWrier.path`, according to your theory, we should not leak `path`
> to the storage API too, but we don't have other solutions for Hadoop FS
> data sources.
>
>
> Eventually I think only Hadoop FS data sources need to take these special
> options, but for now data sources that want to support
> partitioning/bucketing need to take these special options too.
>
>
> On Tue, Sep 26, 2017 at 4:36 AM, Ryan Blue  wrote:
>
>> I think it is a bad idea to let this problem leak into the new storage
>> API. By not setting the expectation that metadata for a table will exist,
>> this will needlessly complicate writers just to support the existing
>> problematic design. Why can't we use an in-memory catalog to store the
>> configuration of HadoopFS tables? I see no compelling reason why this needs
>> to be passed into the V2 write API.
>>
>> If this is limited to an implementation hack for the Hadoop FS writers,
>> then I guess that's not terrible. I just don't understand why it is
>> necessary.
>>
>> On Mon, Sep 25, 2017 at 11:26 AM, Wenchen Fan 
>> wrote:
>>
>>> Catalog federation is to publish the Spark catalog API(kind of a data
>>> source API for metadata), so that Spark is able to read/write metadata from
>>> external systems. (SPARK-15777)
>>>
>>> Currently Spark can only read/write Hive metastore, which means for
>>> other systems like Cassandra, we can only implicitly create tables with
>>> data source API.
>>>
>>> Again this is not ideal but just a workaround before we finish catalog
>>> federation. That's why the save mode description mostly refer to how data
>>> will be handled instead of metadata.
>>>
>>> Because of this, I think we still need to pass metadata like
>>> partitioning/bucketing to the data source write API. And I propose to use
>>> data source options so that it's not at API level and we can easily ignore
>>> these options in the future if catalog federation is done.
>>>
>>> The same thing applies to Hadoop FS data sources, we need to pass
>>> metadata to the writer anyway.
>>>
>>>
>>>
>>> On Tue, 

Re: [discuss] Data Source V2 write path

2017-09-25 Thread Wenchen Fan
> I think it is a bad idea to let this problem leak into the new storage
API.

Well, I think using data source options is a good compromise for this. We
can't avoid this problem until catalog federation is done, and this may not
happen within Spark 2.3, but we definitely need data source write API in
Spark 2.3.

> Why can't we use an in-memory catalog to store the configuration of
HadoopFS tables?

We still need to support existing Spark applications which have
`df.write.partitionBy(...).parquet(...)`. And I think it's similar to
`DataFrameWrier.path`, according to your theory, we should not leak `path`
to the storage API too, but we don't have other solutions for Hadoop FS
data sources.


Eventually I think only Hadoop FS data sources need to take these special
options, but for now data sources that want to support
partitioning/bucketing need to take these special options too.


On Tue, Sep 26, 2017 at 4:36 AM, Ryan Blue  wrote:

> I think it is a bad idea to let this problem leak into the new storage
> API. By not setting the expectation that metadata for a table will exist,
> this will needlessly complicate writers just to support the existing
> problematic design. Why can't we use an in-memory catalog to store the
> configuration of HadoopFS tables? I see no compelling reason why this needs
> to be passed into the V2 write API.
>
> If this is limited to an implementation hack for the Hadoop FS writers,
> then I guess that's not terrible. I just don't understand why it is
> necessary.
>
> On Mon, Sep 25, 2017 at 11:26 AM, Wenchen Fan  wrote:
>
>> Catalog federation is to publish the Spark catalog API(kind of a data
>> source API for metadata), so that Spark is able to read/write metadata from
>> external systems. (SPARK-15777)
>>
>> Currently Spark can only read/write Hive metastore, which means for other
>> systems like Cassandra, we can only implicitly create tables with data
>> source API.
>>
>> Again this is not ideal but just a workaround before we finish catalog
>> federation. That's why the save mode description mostly refer to how data
>> will be handled instead of metadata.
>>
>> Because of this, I think we still need to pass metadata like
>> partitioning/bucketing to the data source write API. And I propose to use
>> data source options so that it's not at API level and we can easily ignore
>> these options in the future if catalog federation is done.
>>
>> The same thing applies to Hadoop FS data sources, we need to pass
>> metadata to the writer anyway.
>>
>>
>>
>> On Tue, Sep 26, 2017 at 1:08 AM, Ryan Blue  wrote:
>>
>>> However, without catalog federation, Spark doesn’t have an API to ask an
>>> external system(like Cassandra) to create a table. Currently it’s all done
>>> by data source write API. Data source implementations are responsible to
>>> create or insert a table according to the save mode.
>>>
>>> What’s catalog federation? Is there a SPIP for it? It sounds
>>> straight-forward based on your comments, but I’d rather make sure we’re
>>> talking about the same thing.
>>>
>>> What I’m proposing doesn’t require a change to either the public API,
>>> nor does it depend on being able to create tables. Why do writers
>>> necessarily need to create tables? I think other components (e.g. a
>>> federated catalog) should manage table creation outside of this
>>> abstraction. Just because data sources currently create tables doesn’t mean
>>> that we are tied to that implementation.
>>>
>>> I would also disagree that data source implementations are responsible
>>> for creating for inserting according to save mode. The modes are “append”,
>>> “overwrite”, “failIfExists” and “ignore”, and the descriptions indicate to
>>> me that the mode refers to how *data* will be handled, not table
>>> metadata. Overwrite’s docs
>>> 
>>> state that “existing *data* is expected to be overwritten.”
>>>
>>> Save mode currently introduces confusion because it isn’t clear whether
>>> the mode applies to tables or to writes. In Hive, overwrite removes
>>> conflicting partitions, but I think the Hadoop FS relations will delete
>>> tables. We get around this some by using external tables and preserving
>>> data, but this is an area where we should have clear semantics for external
>>> systems like Cassandra. I’d like to see a cleaner public API that separates
>>> these concerns, but that’s a different discussion. For now, I don’t think
>>> requiring that a table exists is unreasonable. If a table has no metastore
>>> (Hadoop FS tables) then we can just pass the table metadata in when
>>> creating the writer since there is no existence in this case.
>>>
>>> rb
>>> ​
>>>
>>> On Sun, Sep 24, 2017 at 7:17 PM, Wenchen Fan 
>>> wrote:
>>>
 I agree it would be a clean approach if data source is only responsible
 to write into an 

Re: [discuss] Data Source V2 write path

2017-09-25 Thread Ryan Blue
I think it is a bad idea to let this problem leak into the new storage API.
By not setting the expectation that metadata for a table will exist, this
will needlessly complicate writers just to support the existing problematic
design. Why can't we use an in-memory catalog to store the configuration of
HadoopFS tables? I see no compelling reason why this needs to be passed
into the V2 write API.

If this is limited to an implementation hack for the Hadoop FS writers,
then I guess that's not terrible. I just don't understand why it is
necessary.

On Mon, Sep 25, 2017 at 11:26 AM, Wenchen Fan  wrote:

> Catalog federation is to publish the Spark catalog API(kind of a data
> source API for metadata), so that Spark is able to read/write metadata from
> external systems. (SPARK-15777)
>
> Currently Spark can only read/write Hive metastore, which means for other
> systems like Cassandra, we can only implicitly create tables with data
> source API.
>
> Again this is not ideal but just a workaround before we finish catalog
> federation. That's why the save mode description mostly refer to how data
> will be handled instead of metadata.
>
> Because of this, I think we still need to pass metadata like
> partitioning/bucketing to the data source write API. And I propose to use
> data source options so that it's not at API level and we can easily ignore
> these options in the future if catalog federation is done.
>
> The same thing applies to Hadoop FS data sources, we need to pass metadata
> to the writer anyway.
>
>
>
> On Tue, Sep 26, 2017 at 1:08 AM, Ryan Blue  wrote:
>
>> However, without catalog federation, Spark doesn’t have an API to ask an
>> external system(like Cassandra) to create a table. Currently it’s all done
>> by data source write API. Data source implementations are responsible to
>> create or insert a table according to the save mode.
>>
>> What’s catalog federation? Is there a SPIP for it? It sounds
>> straight-forward based on your comments, but I’d rather make sure we’re
>> talking about the same thing.
>>
>> What I’m proposing doesn’t require a change to either the public API, nor
>> does it depend on being able to create tables. Why do writers necessarily
>> need to create tables? I think other components (e.g. a federated catalog)
>> should manage table creation outside of this abstraction. Just because data
>> sources currently create tables doesn’t mean that we are tied to that
>> implementation.
>>
>> I would also disagree that data source implementations are responsible
>> for creating for inserting according to save mode. The modes are “append”,
>> “overwrite”, “failIfExists” and “ignore”, and the descriptions indicate to
>> me that the mode refers to how *data* will be handled, not table
>> metadata. Overwrite’s docs
>> 
>> state that “existing *data* is expected to be overwritten.”
>>
>> Save mode currently introduces confusion because it isn’t clear whether
>> the mode applies to tables or to writes. In Hive, overwrite removes
>> conflicting partitions, but I think the Hadoop FS relations will delete
>> tables. We get around this some by using external tables and preserving
>> data, but this is an area where we should have clear semantics for external
>> systems like Cassandra. I’d like to see a cleaner public API that separates
>> these concerns, but that’s a different discussion. For now, I don’t think
>> requiring that a table exists is unreasonable. If a table has no metastore
>> (Hadoop FS tables) then we can just pass the table metadata in when
>> creating the writer since there is no existence in this case.
>>
>> rb
>> ​
>>
>> On Sun, Sep 24, 2017 at 7:17 PM, Wenchen Fan  wrote:
>>
>>> I agree it would be a clean approach if data source is only responsible
>>> to write into an already-configured table. However, without catalog
>>> federation, Spark doesn't have an API to ask an external system(like
>>> Cassandra) to create a table. Currently it's all done by data source write
>>> API. Data source implementations are responsible to create or insert a
>>> table according to the save mode.
>>>
>>> As a workaround, I think it's acceptable to pass partitioning/bucketing
>>> information via data source options, and data sources should decide to take
>>> these informations and create the table, or throw exception if these
>>> informations don't match the already-configured table.
>>>
>>>
>>> On Fri, Sep 22, 2017 at 9:35 AM, Ryan Blue  wrote:
>>>
 > input data requirement

 Clustering and sorting within partitions are a good start. We can
 always add more later when they are needed.

 The primary use case I'm thinking of for this is partitioning and
 bucketing. If I'm implementing a partitioned table format, I need to tell
 Spark to cluster by my partition columns. Should 

Re: [discuss] Data Source V2 write path

2017-09-25 Thread Wenchen Fan
Catalog federation is to publish the Spark catalog API(kind of a data
source API for metadata), so that Spark is able to read/write metadata from
external systems. (SPARK-15777)

Currently Spark can only read/write Hive metastore, which means for other
systems like Cassandra, we can only implicitly create tables with data
source API.

Again this is not ideal but just a workaround before we finish catalog
federation. That's why the save mode description mostly refer to how data
will be handled instead of metadata.

Because of this, I think we still need to pass metadata like
partitioning/bucketing to the data source write API. And I propose to use
data source options so that it's not at API level and we can easily ignore
these options in the future if catalog federation is done.

The same thing applies to Hadoop FS data sources, we need to pass metadata
to the writer anyway.



On Tue, Sep 26, 2017 at 1:08 AM, Ryan Blue  wrote:

> However, without catalog federation, Spark doesn’t have an API to ask an
> external system(like Cassandra) to create a table. Currently it’s all done
> by data source write API. Data source implementations are responsible to
> create or insert a table according to the save mode.
>
> What’s catalog federation? Is there a SPIP for it? It sounds
> straight-forward based on your comments, but I’d rather make sure we’re
> talking about the same thing.
>
> What I’m proposing doesn’t require a change to either the public API, nor
> does it depend on being able to create tables. Why do writers necessarily
> need to create tables? I think other components (e.g. a federated catalog)
> should manage table creation outside of this abstraction. Just because data
> sources currently create tables doesn’t mean that we are tied to that
> implementation.
>
> I would also disagree that data source implementations are responsible for
> creating for inserting according to save mode. The modes are “append”,
> “overwrite”, “failIfExists” and “ignore”, and the descriptions indicate to
> me that the mode refers to how *data* will be handled, not table
> metadata. Overwrite’s docs
> 
> state that “existing *data* is expected to be overwritten.”
>
> Save mode currently introduces confusion because it isn’t clear whether
> the mode applies to tables or to writes. In Hive, overwrite removes
> conflicting partitions, but I think the Hadoop FS relations will delete
> tables. We get around this some by using external tables and preserving
> data, but this is an area where we should have clear semantics for external
> systems like Cassandra. I’d like to see a cleaner public API that separates
> these concerns, but that’s a different discussion. For now, I don’t think
> requiring that a table exists is unreasonable. If a table has no metastore
> (Hadoop FS tables) then we can just pass the table metadata in when
> creating the writer since there is no existence in this case.
>
> rb
> ​
>
> On Sun, Sep 24, 2017 at 7:17 PM, Wenchen Fan  wrote:
>
>> I agree it would be a clean approach if data source is only responsible
>> to write into an already-configured table. However, without catalog
>> federation, Spark doesn't have an API to ask an external system(like
>> Cassandra) to create a table. Currently it's all done by data source write
>> API. Data source implementations are responsible to create or insert a
>> table according to the save mode.
>>
>> As a workaround, I think it's acceptable to pass partitioning/bucketing
>> information via data source options, and data sources should decide to take
>> these informations and create the table, or throw exception if these
>> informations don't match the already-configured table.
>>
>>
>> On Fri, Sep 22, 2017 at 9:35 AM, Ryan Blue  wrote:
>>
>>> > input data requirement
>>>
>>> Clustering and sorting within partitions are a good start. We can always
>>> add more later when they are needed.
>>>
>>> The primary use case I'm thinking of for this is partitioning and
>>> bucketing. If I'm implementing a partitioned table format, I need to tell
>>> Spark to cluster by my partition columns. Should there also be a way to
>>> pass those columns separately, since they may not be stored in the same way
>>> like partitions are in the current format?
>>>
>>> On Wed, Sep 20, 2017 at 3:10 AM, Wenchen Fan 
>>> wrote:
>>>
 Hi all,

 I want to have some discussion about Data Source V2 write path before
 starting a voting.

 The Data Source V1 write path asks implementations to write a DataFrame
 directly, which is painful:
 1. Exposing upper-level API like DataFrame to Data Source API is not
 good for maintenance.
 2. Data sources may need to preprocess the input data before writing,
 like cluster/sort the input by some columns. It's better to do the
 

Re: [discuss] Data Source V2 write path

2017-09-25 Thread Ryan Blue
However, without catalog federation, Spark doesn’t have an API to ask an
external system(like Cassandra) to create a table. Currently it’s all done
by data source write API. Data source implementations are responsible to
create or insert a table according to the save mode.

What’s catalog federation? Is there a SPIP for it? It sounds
straight-forward based on your comments, but I’d rather make sure we’re
talking about the same thing.

What I’m proposing doesn’t require a change to either the public API, nor
does it depend on being able to create tables. Why do writers necessarily
need to create tables? I think other components (e.g. a federated catalog)
should manage table creation outside of this abstraction. Just because data
sources currently create tables doesn’t mean that we are tied to that
implementation.

I would also disagree that data source implementations are responsible for
creating for inserting according to save mode. The modes are “append”,
“overwrite”, “failIfExists” and “ignore”, and the descriptions indicate to
me that the mode refers to how *data* will be handled, not table
metadata. Overwrite’s
docs

state that “existing *data* is expected to be overwritten.”

Save mode currently introduces confusion because it isn’t clear whether the
mode applies to tables or to writes. In Hive, overwrite removes conflicting
partitions, but I think the Hadoop FS relations will delete tables. We get
around this some by using external tables and preserving data, but this is
an area where we should have clear semantics for external systems like
Cassandra. I’d like to see a cleaner public API that separates these
concerns, but that’s a different discussion. For now, I don’t think
requiring that a table exists is unreasonable. If a table has no metastore
(Hadoop FS tables) then we can just pass the table metadata in when
creating the writer since there is no existence in this case.

rb
​

On Sun, Sep 24, 2017 at 7:17 PM, Wenchen Fan  wrote:

> I agree it would be a clean approach if data source is only responsible to
> write into an already-configured table. However, without catalog
> federation, Spark doesn't have an API to ask an external system(like
> Cassandra) to create a table. Currently it's all done by data source write
> API. Data source implementations are responsible to create or insert a
> table according to the save mode.
>
> As a workaround, I think it's acceptable to pass partitioning/bucketing
> information via data source options, and data sources should decide to take
> these informations and create the table, or throw exception if these
> informations don't match the already-configured table.
>
>
> On Fri, Sep 22, 2017 at 9:35 AM, Ryan Blue  wrote:
>
>> > input data requirement
>>
>> Clustering and sorting within partitions are a good start. We can always
>> add more later when they are needed.
>>
>> The primary use case I'm thinking of for this is partitioning and
>> bucketing. If I'm implementing a partitioned table format, I need to tell
>> Spark to cluster by my partition columns. Should there also be a way to
>> pass those columns separately, since they may not be stored in the same way
>> like partitions are in the current format?
>>
>> On Wed, Sep 20, 2017 at 3:10 AM, Wenchen Fan  wrote:
>>
>>> Hi all,
>>>
>>> I want to have some discussion about Data Source V2 write path before
>>> starting a voting.
>>>
>>> The Data Source V1 write path asks implementations to write a DataFrame
>>> directly, which is painful:
>>> 1. Exposing upper-level API like DataFrame to Data Source API is not
>>> good for maintenance.
>>> 2. Data sources may need to preprocess the input data before writing,
>>> like cluster/sort the input by some columns. It's better to do the
>>> preprocessing in Spark instead of in the data source.
>>> 3. Data sources need to take care of transaction themselves, which is
>>> hard. And different data sources may come up with a very similar approach
>>> for the transaction, which leads to many duplicated codes.
>>>
>>>
>>> To solve these pain points, I'm proposing a data source writing
>>> framework which is very similar to the reading framework, i.e.,
>>> WriteSupport -> DataSourceV2Writer -> WriteTask -> DataWriter. You can take
>>> a look at my prototype to see what it looks like:
>>> https://github.com/apache/spark/pull/19269
>>>
>>> There are some other details need further discussion:
>>> 1. *partitioning/bucketing*
>>> Currently only the built-in file-based data sources support them, but
>>> there is nothing stopping us from exposing them to all data sources. One
>>> question is, shall we make them as mix-in interfaces for data source v2
>>> reader/writer, or just encode them into data source options(a
>>> string-to-string map)? Ideally it's more like options, Spark just transfers
>>> these user-given 

Re: [discuss] Data Source V2 write path

2017-09-25 Thread Wenchen Fan
We still need to support low-level data sources like pure parquet files,
which do not have a metastore.

BTW I think we should leave the metadata management to the catalog API
after catalog federation. Data source API should only care about data.

On Mon, Sep 25, 2017 at 11:14 AM, Reynold Xin  wrote:

> Can there be an explicit create function?
>
>
> On Sun, Sep 24, 2017 at 7:17 PM, Wenchen Fan  wrote:
>
>> I agree it would be a clean approach if data source is only responsible
>> to write into an already-configured table. However, without catalog
>> federation, Spark doesn't have an API to ask an external system(like
>> Cassandra) to create a table. Currently it's all done by data source write
>> API. Data source implementations are responsible to create or insert a
>> table according to the save mode.
>>
>> As a workaround, I think it's acceptable to pass partitioning/bucketing
>> information via data source options, and data sources should decide to take
>> these informations and create the table, or throw exception if these
>> informations don't match the already-configured table.
>>
>>
>> On Fri, Sep 22, 2017 at 9:35 AM, Ryan Blue  wrote:
>>
>>> > input data requirement
>>>
>>> Clustering and sorting within partitions are a good start. We can always
>>> add more later when they are needed.
>>>
>>> The primary use case I'm thinking of for this is partitioning and
>>> bucketing. If I'm implementing a partitioned table format, I need to tell
>>> Spark to cluster by my partition columns. Should there also be a way to
>>> pass those columns separately, since they may not be stored in the same way
>>> like partitions are in the current format?
>>>
>>> On Wed, Sep 20, 2017 at 3:10 AM, Wenchen Fan 
>>> wrote:
>>>
 Hi all,

 I want to have some discussion about Data Source V2 write path before
 starting a voting.

 The Data Source V1 write path asks implementations to write a DataFrame
 directly, which is painful:
 1. Exposing upper-level API like DataFrame to Data Source API is not
 good for maintenance.
 2. Data sources may need to preprocess the input data before writing,
 like cluster/sort the input by some columns. It's better to do the
 preprocessing in Spark instead of in the data source.
 3. Data sources need to take care of transaction themselves, which is
 hard. And different data sources may come up with a very similar approach
 for the transaction, which leads to many duplicated codes.


 To solve these pain points, I'm proposing a data source writing
 framework which is very similar to the reading framework, i.e.,
 WriteSupport -> DataSourceV2Writer -> WriteTask -> DataWriter. You can take
 a look at my prototype to see what it looks like:
 https://github.com/apache/spark/pull/19269

 There are some other details need further discussion:
 1. *partitioning/bucketing*
 Currently only the built-in file-based data sources support them, but
 there is nothing stopping us from exposing them to all data sources. One
 question is, shall we make them as mix-in interfaces for data source v2
 reader/writer, or just encode them into data source options(a
 string-to-string map)? Ideally it's more like options, Spark just transfers
 these user-given informations to data sources, and doesn't do anything for
 it.

 2. *input data requirement*
 Data sources should be able to ask Spark to preprocess the input data,
 and this can be a mix-in interface for DataSourceV2Writer. I think we need
 to add clustering request and sorting within partitions request, any more?

 3. *transaction*
 I think we can just follow `FileCommitProtocol`, which is the internal
 framework Spark uses to guarantee transaction for built-in file-based data
 sources. Generally speaking, we need task level and job level commit/abort.
 Again you can see more details in my prototype about it:
 https://github.com/apache/spark/pull/19269

 4. *data source table*
 This is the trickiest one. In Spark you can create a table which points
 to a data source, so you can read/write this data source easily by
 referencing the table name. Ideally data source table is just a pointer
 which points to a data source with a list of predefined options, to save
 users from typing these options again and again for each query.
 If that's all, then everything is good, we don't need to add more
 interfaces to Data Source V2. However, data source tables provide special
 operators like ALTER TABLE SCHEMA, ADD PARTITION, etc., which requires data
 sources to have some extra ability.
 Currently these special operators only work for built-in file-based
 data sources, and I don't think we will extend it in the near future, I
 propose to mark them as out of the scope.

Re: [discuss] Data Source V2 write path

2017-09-24 Thread Reynold Xin
Can there be an explicit create function?


On Sun, Sep 24, 2017 at 7:17 PM, Wenchen Fan  wrote:

> I agree it would be a clean approach if data source is only responsible to
> write into an already-configured table. However, without catalog
> federation, Spark doesn't have an API to ask an external system(like
> Cassandra) to create a table. Currently it's all done by data source write
> API. Data source implementations are responsible to create or insert a
> table according to the save mode.
>
> As a workaround, I think it's acceptable to pass partitioning/bucketing
> information via data source options, and data sources should decide to take
> these informations and create the table, or throw exception if these
> informations don't match the already-configured table.
>
>
> On Fri, Sep 22, 2017 at 9:35 AM, Ryan Blue  wrote:
>
>> > input data requirement
>>
>> Clustering and sorting within partitions are a good start. We can always
>> add more later when they are needed.
>>
>> The primary use case I'm thinking of for this is partitioning and
>> bucketing. If I'm implementing a partitioned table format, I need to tell
>> Spark to cluster by my partition columns. Should there also be a way to
>> pass those columns separately, since they may not be stored in the same way
>> like partitions are in the current format?
>>
>> On Wed, Sep 20, 2017 at 3:10 AM, Wenchen Fan  wrote:
>>
>>> Hi all,
>>>
>>> I want to have some discussion about Data Source V2 write path before
>>> starting a voting.
>>>
>>> The Data Source V1 write path asks implementations to write a DataFrame
>>> directly, which is painful:
>>> 1. Exposing upper-level API like DataFrame to Data Source API is not
>>> good for maintenance.
>>> 2. Data sources may need to preprocess the input data before writing,
>>> like cluster/sort the input by some columns. It's better to do the
>>> preprocessing in Spark instead of in the data source.
>>> 3. Data sources need to take care of transaction themselves, which is
>>> hard. And different data sources may come up with a very similar approach
>>> for the transaction, which leads to many duplicated codes.
>>>
>>>
>>> To solve these pain points, I'm proposing a data source writing
>>> framework which is very similar to the reading framework, i.e.,
>>> WriteSupport -> DataSourceV2Writer -> WriteTask -> DataWriter. You can take
>>> a look at my prototype to see what it looks like:
>>> https://github.com/apache/spark/pull/19269
>>>
>>> There are some other details need further discussion:
>>> 1. *partitioning/bucketing*
>>> Currently only the built-in file-based data sources support them, but
>>> there is nothing stopping us from exposing them to all data sources. One
>>> question is, shall we make them as mix-in interfaces for data source v2
>>> reader/writer, or just encode them into data source options(a
>>> string-to-string map)? Ideally it's more like options, Spark just transfers
>>> these user-given informations to data sources, and doesn't do anything for
>>> it.
>>>
>>> 2. *input data requirement*
>>> Data sources should be able to ask Spark to preprocess the input data,
>>> and this can be a mix-in interface for DataSourceV2Writer. I think we need
>>> to add clustering request and sorting within partitions request, any more?
>>>
>>> 3. *transaction*
>>> I think we can just follow `FileCommitProtocol`, which is the internal
>>> framework Spark uses to guarantee transaction for built-in file-based data
>>> sources. Generally speaking, we need task level and job level commit/abort.
>>> Again you can see more details in my prototype about it:
>>> https://github.com/apache/spark/pull/19269
>>>
>>> 4. *data source table*
>>> This is the trickiest one. In Spark you can create a table which points
>>> to a data source, so you can read/write this data source easily by
>>> referencing the table name. Ideally data source table is just a pointer
>>> which points to a data source with a list of predefined options, to save
>>> users from typing these options again and again for each query.
>>> If that's all, then everything is good, we don't need to add more
>>> interfaces to Data Source V2. However, data source tables provide special
>>> operators like ALTER TABLE SCHEMA, ADD PARTITION, etc., which requires data
>>> sources to have some extra ability.
>>> Currently these special operators only work for built-in file-based data
>>> sources, and I don't think we will extend it in the near future, I propose
>>> to mark them as out of the scope.
>>>
>>>
>>> Any comments are welcome!
>>> Thanks,
>>> Wenchen
>>>
>>
>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>
>


Re: [discuss] Data Source V2 write path

2017-09-24 Thread Wenchen Fan
I agree it would be a clean approach if data source is only responsible to
write into an already-configured table. However, without catalog
federation, Spark doesn't have an API to ask an external system(like
Cassandra) to create a table. Currently it's all done by data source write
API. Data source implementations are responsible to create or insert a
table according to the save mode.

As a workaround, I think it's acceptable to pass partitioning/bucketing
information via data source options, and data sources should decide to take
these informations and create the table, or throw exception if these
informations don't match the already-configured table.


On Fri, Sep 22, 2017 at 9:35 AM, Ryan Blue  wrote:

> > input data requirement
>
> Clustering and sorting within partitions are a good start. We can always
> add more later when they are needed.
>
> The primary use case I'm thinking of for this is partitioning and
> bucketing. If I'm implementing a partitioned table format, I need to tell
> Spark to cluster by my partition columns. Should there also be a way to
> pass those columns separately, since they may not be stored in the same way
> like partitions are in the current format?
>
> On Wed, Sep 20, 2017 at 3:10 AM, Wenchen Fan  wrote:
>
>> Hi all,
>>
>> I want to have some discussion about Data Source V2 write path before
>> starting a voting.
>>
>> The Data Source V1 write path asks implementations to write a DataFrame
>> directly, which is painful:
>> 1. Exposing upper-level API like DataFrame to Data Source API is not good
>> for maintenance.
>> 2. Data sources may need to preprocess the input data before writing,
>> like cluster/sort the input by some columns. It's better to do the
>> preprocessing in Spark instead of in the data source.
>> 3. Data sources need to take care of transaction themselves, which is
>> hard. And different data sources may come up with a very similar approach
>> for the transaction, which leads to many duplicated codes.
>>
>>
>> To solve these pain points, I'm proposing a data source writing framework
>> which is very similar to the reading framework, i.e., WriteSupport ->
>> DataSourceV2Writer -> WriteTask -> DataWriter. You can take a look at my
>> prototype to see what it looks like: https://github.com/apach
>> e/spark/pull/19269
>>
>> There are some other details need further discussion:
>> 1. *partitioning/bucketing*
>> Currently only the built-in file-based data sources support them, but
>> there is nothing stopping us from exposing them to all data sources. One
>> question is, shall we make them as mix-in interfaces for data source v2
>> reader/writer, or just encode them into data source options(a
>> string-to-string map)? Ideally it's more like options, Spark just transfers
>> these user-given informations to data sources, and doesn't do anything for
>> it.
>>
>> 2. *input data requirement*
>> Data sources should be able to ask Spark to preprocess the input data,
>> and this can be a mix-in interface for DataSourceV2Writer. I think we need
>> to add clustering request and sorting within partitions request, any more?
>>
>> 3. *transaction*
>> I think we can just follow `FileCommitProtocol`, which is the internal
>> framework Spark uses to guarantee transaction for built-in file-based data
>> sources. Generally speaking, we need task level and job level commit/abort.
>> Again you can see more details in my prototype about it:
>> https://github.com/apache/spark/pull/19269
>>
>> 4. *data source table*
>> This is the trickiest one. In Spark you can create a table which points
>> to a data source, so you can read/write this data source easily by
>> referencing the table name. Ideally data source table is just a pointer
>> which points to a data source with a list of predefined options, to save
>> users from typing these options again and again for each query.
>> If that's all, then everything is good, we don't need to add more
>> interfaces to Data Source V2. However, data source tables provide special
>> operators like ALTER TABLE SCHEMA, ADD PARTITION, etc., which requires data
>> sources to have some extra ability.
>> Currently these special operators only work for built-in file-based data
>> sources, and I don't think we will extend it in the near future, I propose
>> to mark them as out of the scope.
>>
>>
>> Any comments are welcome!
>> Thanks,
>> Wenchen
>>
>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Re: [discuss] Data Source V2 write path

2017-09-21 Thread Ryan Blue
> input data requirement

Clustering and sorting within partitions are a good start. We can always
add more later when they are needed.

The primary use case I'm thinking of for this is partitioning and
bucketing. If I'm implementing a partitioned table format, I need to tell
Spark to cluster by my partition columns. Should there also be a way to
pass those columns separately, since they may not be stored in the same way
like partitions are in the current format?

On Wed, Sep 20, 2017 at 3:10 AM, Wenchen Fan  wrote:

> Hi all,
>
> I want to have some discussion about Data Source V2 write path before
> starting a voting.
>
> The Data Source V1 write path asks implementations to write a DataFrame
> directly, which is painful:
> 1. Exposing upper-level API like DataFrame to Data Source API is not good
> for maintenance.
> 2. Data sources may need to preprocess the input data before writing, like
> cluster/sort the input by some columns. It's better to do the preprocessing
> in Spark instead of in the data source.
> 3. Data sources need to take care of transaction themselves, which is
> hard. And different data sources may come up with a very similar approach
> for the transaction, which leads to many duplicated codes.
>
>
> To solve these pain points, I'm proposing a data source writing framework
> which is very similar to the reading framework, i.e., WriteSupport ->
> DataSourceV2Writer -> WriteTask -> DataWriter. You can take a look at my
> prototype to see what it looks like: https://github.com/
> apache/spark/pull/19269
>
> There are some other details need further discussion:
> 1. *partitioning/bucketing*
> Currently only the built-in file-based data sources support them, but
> there is nothing stopping us from exposing them to all data sources. One
> question is, shall we make them as mix-in interfaces for data source v2
> reader/writer, or just encode them into data source options(a
> string-to-string map)? Ideally it's more like options, Spark just transfers
> these user-given informations to data sources, and doesn't do anything for
> it.
>
> 2. *input data requirement*
> Data sources should be able to ask Spark to preprocess the input data, and
> this can be a mix-in interface for DataSourceV2Writer. I think we need to
> add clustering request and sorting within partitions request, any more?
>
> 3. *transaction*
> I think we can just follow `FileCommitProtocol`, which is the internal
> framework Spark uses to guarantee transaction for built-in file-based data
> sources. Generally speaking, we need task level and job level commit/abort.
> Again you can see more details in my prototype about it:
> https://github.com/apache/spark/pull/19269
>
> 4. *data source table*
> This is the trickiest one. In Spark you can create a table which points to
> a data source, so you can read/write this data source easily by referencing
> the table name. Ideally data source table is just a pointer which points to
> a data source with a list of predefined options, to save users from typing
> these options again and again for each query.
> If that's all, then everything is good, we don't need to add more
> interfaces to Data Source V2. However, data source tables provide special
> operators like ALTER TABLE SCHEMA, ADD PARTITION, etc., which requires data
> sources to have some extra ability.
> Currently these special operators only work for built-in file-based data
> sources, and I don't think we will extend it in the near future, I propose
> to mark them as out of the scope.
>
>
> Any comments are welcome!
> Thanks,
> Wenchen
>



-- 
Ryan Blue
Software Engineer
Netflix


Re: [discuss] Data Source V2 write path

2017-09-21 Thread Reynold Xin
Ah yes I agree. I was just saying it should be options (rather than
specific constructs). Having them at creation time makes a lot of sense.
Although one tricky thing is what if they need to change, but we can
probably just special case that.

On Thu, Sep 21, 2017 at 6:28 PM Ryan Blue  wrote:

> I’d just pass them [partitioning/bucketing] as options, until there are
> clear (and strong) use cases to do them otherwise.
>
> I don’t think it makes sense to pass partitioning and bucketing
> information *into* this API. The writer should already know the table
> structure and should pass relevant information back out to Spark so it can
> sort and group data for storage.
>
> I think the idea of passing the table structure into the writer comes from
> the current implementation, where the table may not exist before a data
> frame is written. But that isn’t something that should be carried forward.
> I think the writer should be responsible for writing into an
> already-configured table. That’s the normal case we should design for.
> Creating a table at the same time (CTAS) is a convenience, but should be
> implemented by creating an empty table and then running the same writer
> that would have been used for an insert into an existing table.
>
> Otherwise, there’s confusion about how to handle the options. What should
> the writer do when partitioning passed in doesn’t match the table’s
> partitioning? We already have this situation in the DataFrameWriter API,
> where calling partitionBy and then insertInto throws an exception. I’d
> like to keep that case out of this API by setting the expectation that
> tables this writes to already exist.
>
> rb
> ​
>
> On Wed, Sep 20, 2017 at 9:52 AM, Reynold Xin  wrote:
>
>>
>>
>> On Wed, Sep 20, 2017 at 3:10 AM, Wenchen Fan  wrote:
>>
>>> Hi all,
>>>
>>> I want to have some discussion about Data Source V2 write path before
>>> starting a voting.
>>>
>>> The Data Source V1 write path asks implementations to write a DataFrame
>>> directly, which is painful:
>>> 1. Exposing upper-level API like DataFrame to Data Source API is not
>>> good for maintenance.
>>> 2. Data sources may need to preprocess the input data before writing,
>>> like cluster/sort the input by some columns. It's better to do the
>>> preprocessing in Spark instead of in the data source.
>>> 3. Data sources need to take care of transaction themselves, which is
>>> hard. And different data sources may come up with a very similar approach
>>> for the transaction, which leads to many duplicated codes.
>>>
>>>
>>> To solve these pain points, I'm proposing a data source writing
>>> framework which is very similar to the reading framework, i.e.,
>>> WriteSupport -> DataSourceV2Writer -> WriteTask -> DataWriter. You can take
>>> a look at my prototype to see what it looks like:
>>> https://github.com/apache/spark/pull/19269
>>>
>>> There are some other details need further discussion:
>>> 1. *partitioning/bucketing*
>>> Currently only the built-in file-based data sources support them, but
>>> there is nothing stopping us from exposing them to all data sources. One
>>> question is, shall we make them as mix-in interfaces for data source v2
>>> reader/writer, or just encode them into data source options(a
>>> string-to-string map)? Ideally it's more like options, Spark just transfers
>>> these user-given informations to data sources, and doesn't do anything for
>>> it.
>>>
>>
>>
>> I'd just pass them as options, until there are clear (and strong) use
>> cases to do them otherwise.
>>
>>
>> +1 on the rest.
>>
>>
>>
>>>
>>> 2. *input data requirement*
>>> Data sources should be able to ask Spark to preprocess the input data,
>>> and this can be a mix-in interface for DataSourceV2Writer. I think we need
>>> to add clustering request and sorting within partitions request, any more?
>>>
>>> 3. *transaction*
>>> I think we can just follow `FileCommitProtocol`, which is the internal
>>> framework Spark uses to guarantee transaction for built-in file-based data
>>> sources. Generally speaking, we need task level and job level commit/abort.
>>> Again you can see more details in my prototype about it:
>>> https://github.com/apache/spark/pull/19269
>>>
>>> 4. *data source table*
>>> This is the trickiest one. In Spark you can create a table which points
>>> to a data source, so you can read/write this data source easily by
>>> referencing the table name. Ideally data source table is just a pointer
>>> which points to a data source with a list of predefined options, to save
>>> users from typing these options again and again for each query.
>>> If that's all, then everything is good, we don't need to add more
>>> interfaces to Data Source V2. However, data source tables provide special
>>> operators like ALTER TABLE SCHEMA, ADD PARTITION, etc., which requires data
>>> sources to have some extra ability.
>>> Currently these special operators only work for built-in 

Re: [discuss] Data Source V2 write path

2017-09-21 Thread Ryan Blue
I’d just pass them [partitioning/bucketing] as options, until there are
clear (and strong) use cases to do them otherwise.

I don’t think it makes sense to pass partitioning and bucketing information
*into* this API. The writer should already know the table structure and
should pass relevant information back out to Spark so it can sort and group
data for storage.

I think the idea of passing the table structure into the writer comes from
the current implementation, where the table may not exist before a data
frame is written. But that isn’t something that should be carried forward.
I think the writer should be responsible for writing into an
already-configured table. That’s the normal case we should design for.
Creating a table at the same time (CTAS) is a convenience, but should be
implemented by creating an empty table and then running the same writer
that would have been used for an insert into an existing table.

Otherwise, there’s confusion about how to handle the options. What should
the writer do when partitioning passed in doesn’t match the table’s
partitioning? We already have this situation in the DataFrameWriter API,
where calling partitionBy and then insertInto throws an exception. I’d like
to keep that case out of this API by setting the expectation that tables
this writes to already exist.

rb
​

On Wed, Sep 20, 2017 at 9:52 AM, Reynold Xin  wrote:

>
>
> On Wed, Sep 20, 2017 at 3:10 AM, Wenchen Fan  wrote:
>
>> Hi all,
>>
>> I want to have some discussion about Data Source V2 write path before
>> starting a voting.
>>
>> The Data Source V1 write path asks implementations to write a DataFrame
>> directly, which is painful:
>> 1. Exposing upper-level API like DataFrame to Data Source API is not good
>> for maintenance.
>> 2. Data sources may need to preprocess the input data before writing,
>> like cluster/sort the input by some columns. It's better to do the
>> preprocessing in Spark instead of in the data source.
>> 3. Data sources need to take care of transaction themselves, which is
>> hard. And different data sources may come up with a very similar approach
>> for the transaction, which leads to many duplicated codes.
>>
>>
>> To solve these pain points, I'm proposing a data source writing framework
>> which is very similar to the reading framework, i.e., WriteSupport ->
>> DataSourceV2Writer -> WriteTask -> DataWriter. You can take a look at my
>> prototype to see what it looks like: https://github.com/apach
>> e/spark/pull/19269
>>
>> There are some other details need further discussion:
>> 1. *partitioning/bucketing*
>> Currently only the built-in file-based data sources support them, but
>> there is nothing stopping us from exposing them to all data sources. One
>> question is, shall we make them as mix-in interfaces for data source v2
>> reader/writer, or just encode them into data source options(a
>> string-to-string map)? Ideally it's more like options, Spark just transfers
>> these user-given informations to data sources, and doesn't do anything for
>> it.
>>
>
>
> I'd just pass them as options, until there are clear (and strong) use
> cases to do them otherwise.
>
>
> +1 on the rest.
>
>
>
>>
>> 2. *input data requirement*
>> Data sources should be able to ask Spark to preprocess the input data,
>> and this can be a mix-in interface for DataSourceV2Writer. I think we need
>> to add clustering request and sorting within partitions request, any more?
>>
>> 3. *transaction*
>> I think we can just follow `FileCommitProtocol`, which is the internal
>> framework Spark uses to guarantee transaction for built-in file-based data
>> sources. Generally speaking, we need task level and job level commit/abort.
>> Again you can see more details in my prototype about it:
>> https://github.com/apache/spark/pull/19269
>>
>> 4. *data source table*
>> This is the trickiest one. In Spark you can create a table which points
>> to a data source, so you can read/write this data source easily by
>> referencing the table name. Ideally data source table is just a pointer
>> which points to a data source with a list of predefined options, to save
>> users from typing these options again and again for each query.
>> If that's all, then everything is good, we don't need to add more
>> interfaces to Data Source V2. However, data source tables provide special
>> operators like ALTER TABLE SCHEMA, ADD PARTITION, etc., which requires data
>> sources to have some extra ability.
>> Currently these special operators only work for built-in file-based data
>> sources, and I don't think we will extend it in the near future, I propose
>> to mark them as out of the scope.
>>
>>
>> Any comments are welcome!
>> Thanks,
>> Wenchen
>>
>
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: [discuss] Data Source V2 write path

2017-09-20 Thread Reynold Xin
On Wed, Sep 20, 2017 at 3:10 AM, Wenchen Fan  wrote:

> Hi all,
>
> I want to have some discussion about Data Source V2 write path before
> starting a voting.
>
> The Data Source V1 write path asks implementations to write a DataFrame
> directly, which is painful:
> 1. Exposing upper-level API like DataFrame to Data Source API is not good
> for maintenance.
> 2. Data sources may need to preprocess the input data before writing, like
> cluster/sort the input by some columns. It's better to do the preprocessing
> in Spark instead of in the data source.
> 3. Data sources need to take care of transaction themselves, which is
> hard. And different data sources may come up with a very similar approach
> for the transaction, which leads to many duplicated codes.
>
>
> To solve these pain points, I'm proposing a data source writing framework
> which is very similar to the reading framework, i.e., WriteSupport ->
> DataSourceV2Writer -> WriteTask -> DataWriter. You can take a look at my
> prototype to see what it looks like: https://github.com/
> apache/spark/pull/19269
>
> There are some other details need further discussion:
> 1. *partitioning/bucketing*
> Currently only the built-in file-based data sources support them, but
> there is nothing stopping us from exposing them to all data sources. One
> question is, shall we make them as mix-in interfaces for data source v2
> reader/writer, or just encode them into data source options(a
> string-to-string map)? Ideally it's more like options, Spark just transfers
> these user-given informations to data sources, and doesn't do anything for
> it.
>


I'd just pass them as options, until there are clear (and strong) use cases
to do them otherwise.


+1 on the rest.



>
> 2. *input data requirement*
> Data sources should be able to ask Spark to preprocess the input data, and
> this can be a mix-in interface for DataSourceV2Writer. I think we need to
> add clustering request and sorting within partitions request, any more?
>
> 3. *transaction*
> I think we can just follow `FileCommitProtocol`, which is the internal
> framework Spark uses to guarantee transaction for built-in file-based data
> sources. Generally speaking, we need task level and job level commit/abort.
> Again you can see more details in my prototype about it:
> https://github.com/apache/spark/pull/19269
>
> 4. *data source table*
> This is the trickiest one. In Spark you can create a table which points to
> a data source, so you can read/write this data source easily by referencing
> the table name. Ideally data source table is just a pointer which points to
> a data source with a list of predefined options, to save users from typing
> these options again and again for each query.
> If that's all, then everything is good, we don't need to add more
> interfaces to Data Source V2. However, data source tables provide special
> operators like ALTER TABLE SCHEMA, ADD PARTITION, etc., which requires data
> sources to have some extra ability.
> Currently these special operators only work for built-in file-based data
> sources, and I don't think we will extend it in the near future, I propose
> to mark them as out of the scope.
>
>
> Any comments are welcome!
> Thanks,
> Wenchen
>


[discuss] Data Source V2 write path

2017-09-20 Thread Wenchen Fan
Hi all,

I want to have some discussion about Data Source V2 write path before
starting a voting.

The Data Source V1 write path asks implementations to write a DataFrame
directly, which is painful:
1. Exposing upper-level API like DataFrame to Data Source API is not good
for maintenance.
2. Data sources may need to preprocess the input data before writing, like
cluster/sort the input by some columns. It's better to do the preprocessing
in Spark instead of in the data source.
3. Data sources need to take care of transaction themselves, which is hard.
And different data sources may come up with a very similar approach for
the transaction, which leads to many duplicated codes.


To solve these pain points, I'm proposing a data source writing framework
which is very similar to the reading framework, i.e., WriteSupport ->
DataSourceV2Writer -> WriteTask -> DataWriter. You can take a look at my
prototype to see what it looks like:
https://github.com/apache/spark/pull/19269

There are some other details need further discussion:
1. *partitioning/bucketing*
Currently only the built-in file-based data sources support them, but there
is nothing stopping us from exposing them to all data sources. One question
is, shall we make them as mix-in interfaces for data source v2
reader/writer, or just encode them into data source options(a
string-to-string map)? Ideally it's more like options, Spark just transfers
these user-given informations to data sources, and doesn't do anything for
it.

2. *input data requirement*
Data sources should be able to ask Spark to preprocess the input data, and
this can be a mix-in interface for DataSourceV2Writer. I think we need to
add clustering request and sorting within partitions request, any more?

3. *transaction*
I think we can just follow `FileCommitProtocol`, which is the internal
framework Spark uses to guarantee transaction for built-in file-based data
sources. Generally speaking, we need task level and job level commit/abort.
Again you can see more details in my prototype about it:
https://github.com/apache/spark/pull/19269

4. *data source table*
This is the trickiest one. In Spark you can create a table which points to
a data source, so you can read/write this data source easily by referencing
the table name. Ideally data source table is just a pointer which points to
a data source with a list of predefined options, to save users from typing
these options again and again for each query.
If that's all, then everything is good, we don't need to add more
interfaces to Data Source V2. However, data source tables provide special
operators like ALTER TABLE SCHEMA, ADD PARTITION, etc., which requires data
sources to have some extra ability.
Currently these special operators only work for built-in file-based data
sources, and I don't think we will extend it in the near future, I propose
to mark them as out of the scope.


Any comments are welcome!
Thanks,
Wenchen