Re: DataSourceWriter V2 Api questions

2019-12-06 Thread Jungtaek Lim
many tables in a view, you may have to come up
>>> with a separate process to merge the underlying tables on a periodic basis.
>>>
>>> It gets messy and probably moves you towards a write-once only tables,
>>> etc.
>>>
>>>
>>>
>>> Finally using views in a generic mongoDB connector may not be good and
>>> flexible enough.
>>>
>>>
>>>
>>>
>>>
>>> *From: *Russell Spitzer 
>>> *Date: *Tuesday, September 11, 2018 at 9:58 AM
>>> *To: *"Thakrar, Jayesh" 
>>> *Cc: *Arun Mahadevan , Jungtaek Lim ,
>>> Wenchen Fan , Reynold Xin ,
>>> Ross Lawley , Ryan Blue , dev
>>> , "dbis...@us.ibm.com" 
>>>
>>>
>>> *Subject: *Re: DataSourceWriter V2 Api questions
>>>
>>>
>>>
>>> That only works assuming that Spark is the only client of the table. It
>>> will be impossible to force an outside user to respect the special metadata
>>> table when reading so they will still see all of the data in transit.
>>> Additionally this would force the incoming data to only be written into new
>>> partitions which is not simple to do from a C* perspective as balancing the
>>> distribution of new rows would be non trivial. If we had to do something
>>> like this we would basically be forced to write to some disk format first
>>> and then when we move the data into C* we still have the same problem that
>>> we started with.
>>>
>>>
>>>
>>> On Tue, Sep 11, 2018 at 9:41 AM Thakrar, Jayesh <
>>> jthak...@conversantmedia.com> wrote:
>>>
>>> So if Spark and the destination datastore are both non-transactional,
>>> you will have to resort to an external mechanism for “transactionality”.
>>>
>>>
>>>
>>> Here are some options for both RDBMS and non-transaction datastore
>>> destination.
>>>
>>> For now assuming that Spark is used in batch mode (and not streaming
>>> mode).
>>>
>>>
>>>
>>> *RDBMS Options*
>>>
>>> Use staging table as discussed in the thread.
>>>
>>>
>>>
>>> As an extension of the above, use partitioned destination tables and
>>> load data into a staging table and then use partition management to include
>>> the staging table into the partitioned table.
>>>
>>> This this implies a partition per Spark batch run.
>>>
>>>
>>>
>>> *Non-transactional Datastore Options*
>>>
>>> Use another metadata table.
>>>
>>> Load the data into a staging table equivalent or even Cassandra
>>> partition(s).
>>>
>>> Start the transaction by making a “start of transaction” entry into the
>>> metadata table along with partition keys to be populated.
>>> As part of Spark batch commit, update the metadata entry with
>>> appropriate details – e.g. partition load time, etc.
>>> In the event of a failed / incomplete batch, the metadata table entry
>>> will be incomplete and the corresponding partition keys can be dropped.
>>>
>>> So essentially you use the metadata table to load/drop/skip the data to
>>> be moved/retained into the final destination.
>>>
>>>
>>>
>>> *Misc*
>>>
>>> Another option is to use Spark to stage data into a filesystem
>>> (distributed, HDFS) and then use RDBMS utilities to transactionally load
>>> data into the destination table.
>>>
>>>
>>>
>>>
>>>
>>> *From: *Russell Spitzer 
>>> *Date: *Tuesday, September 11, 2018 at 9:08 AM
>>> *To: *Arun Mahadevan 
>>> *Cc: *Jungtaek Lim , Wenchen Fan ,
>>> Reynold Xin , Ross Lawley ,
>>> Ryan Blue , dev , <
>>> dbis...@us.ibm.com>
>>>
>>>
>>> *Subject: *Re: DataSourceWriter V2 Api questions
>>>
>>>
>>>
>>> I'm still not sure how the staging table helps for databases which do
>>> not have such atomicity guarantees. For example in Cassandra if you wrote
>>> all of the data temporarily to a staging table, we would still have the
>>> same problem in moving the data from the staging table into the real table.
>>> We would likely have as similar a chance of failing and we still have no
>>> way of making the entire staging set simultaneously visible.
>>>
>>>
>>>
>>> On Tue, Sep 11, 2018 at 8:39 AM Arun Mahadevan  wrote

Re: DataSourceWriter V2 Api questions

2019-12-05 Thread Wenchen Fan
I also share the concerns of "writing twice", which hurts performance a
lot. What's worse, the final write may not be scalable, like writing the
staging table to the final table.

If the sink itself doesn't support global transaction, but only local
transaction (e.g. kafla), using staging tables seems the only way to make
the write atomic. But if we accept "eventually consistent", we can use 2PC
to avoid "writing twice". i.e. wait for all the tasks to finish
writing data, then ask them to commit at the same time.

There are 2 open questions we need to answer:
1. How to make sure all tasks are launched at the same time to implement
2PC? barrier execution?
2. To reach "eventually consistent", we must retry the job until successe.
How shall we guarantee the job retry?

On Fri, Oct 19, 2018 at 12:25 PM Jungtaek Lim  wrote:

> Sorry to resurrect this old and long thread: we have been struggling with
> Kafka end-to-end exactly-once support, and couldn't find any approach which
> can get both things, transactional and scalable.
>
> If we tolerate scalability, we can let writers to write to staging topic
> within individual transaction, and once all writers are succeed to write,
> let coordinator re-read topic and write to final topic in its transaction.
> (Coordinator should be able to get offsets to read and skip offsets which
> are being left due to failed trial of batch, so shouldn't be read-committed
> while reading.)
>
> This will result in twice of data writing as well as all rows being
> published within single thread in final step, which most of the cases we
> can't tolerate. Moreover, repartitioning to 1 and write with enabling
> transaction might do the same thing better: staging topic vs shuffle.
>
> If we tolerate transaction (I meant "global transaction") and allow
> "eventually consistent" - allow part(s) of output being seen to client side
> at specific point (2PC has same limitation), there could be some approaches
> which might be tricky but work.
>
> Even external storages support transaction, normally it doesn't mean they
> are supporting global transaction or grouped transactions. The boundary of
> transaction is mostly limited to the one client (connection), and once we
> want to write from multiple tasks, we will encounter same issue on these
> external storages, except the cases (like HDFS) which can "move" its data
> from staging to final destination within storage.
>
> So could we consider lessen the contract on DataSource V2 writer, or have
> a new representation of guarantee for such case so it is not "fully
> transactional" but another kind of "exactly-once" and not "at-least-once"?
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> 2018년 9월 14일 (금) 오전 12:08, Thakrar, Jayesh 님이
> 작성:
>
>> Agree on the “constraints” when working with Cassandra.
>>
>> But remember, this is a weak attempt to make two non-transactional
>> systems appear to the outside world as a transactional system.
>>
>> Scaffolding/plumbing/abstractions will have to be created in the form of
>> say, a custom data access layer.
>>
>>
>>
>> Anyway, Ross is trying to get some practices used by other adopters of
>> the V2 API while trying to implement a driver/connector for MongoDB.
>>
>>
>>
>> Probably views can be used similar to partitions in mongoDB?
>>
>> Essentially each batch load goes into a separate mongoDB table and will
>> result in view redefinition after a successful load.
>>
>> And finally to avoid too many tables in a view, you may have to come up
>> with a separate process to merge the underlying tables on a periodic basis.
>>
>> It gets messy and probably moves you towards a write-once only tables,
>> etc.
>>
>>
>>
>> Finally using views in a generic mongoDB connector may not be good and
>> flexible enough.
>>
>>
>>
>>
>>
>> *From: *Russell Spitzer 
>> *Date: *Tuesday, September 11, 2018 at 9:58 AM
>> *To: *"Thakrar, Jayesh" 
>> *Cc: *Arun Mahadevan , Jungtaek Lim ,
>> Wenchen Fan , Reynold Xin ,
>> Ross Lawley , Ryan Blue , dev <
>> dev@spark.apache.org>, "dbis...@us.ibm.com" 
>>
>>
>> *Subject: *Re: DataSourceWriter V2 Api questions
>>
>>
>>
>> That only works assuming that Spark is the only client of the table. It
>> will be impossible to force an outside user to respect the special metadata
>> table when reading so they will still see all of the data in transit.
>> Additionally this would force the incoming data to only be written into new
>>

Re: DataSourceWriter V2 Api questions

2018-10-18 Thread Jungtaek Lim
Sorry to resurrect this old and long thread: we have been struggling with
Kafka end-to-end exactly-once support, and couldn't find any approach which
can get both things, transactional and scalable.

If we tolerate scalability, we can let writers to write to staging topic
within individual transaction, and once all writers are succeed to write,
let coordinator re-read topic and write to final topic in its transaction.
(Coordinator should be able to get offsets to read and skip offsets which
are being left due to failed trial of batch, so shouldn't be read-committed
while reading.)

This will result in twice of data writing as well as all rows being
published within single thread in final step, which most of the cases we
can't tolerate. Moreover, repartitioning to 1 and write with enabling
transaction might do the same thing better: staging topic vs shuffle.

If we tolerate transaction (I meant "global transaction") and allow
"eventually consistent" - allow part(s) of output being seen to client side
at specific point (2PC has same limitation), there could be some approaches
which might be tricky but work.

Even external storages support transaction, normally it doesn't mean they
are supporting global transaction or grouped transactions. The boundary of
transaction is mostly limited to the one client (connection), and once we
want to write from multiple tasks, we will encounter same issue on these
external storages, except the cases (like HDFS) which can "move" its data
from staging to final destination within storage.

So could we consider lessen the contract on DataSource V2 writer, or have a
new representation of guarantee for such case so it is not "fully
transactional" but another kind of "exactly-once" and not "at-least-once"?

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 9월 14일 (금) 오전 12:08, Thakrar, Jayesh 님이
작성:

> Agree on the “constraints” when working with Cassandra.
>
> But remember, this is a weak attempt to make two non-transactional systems
> appear to the outside world as a transactional system.
>
> Scaffolding/plumbing/abstractions will have to be created in the form of
> say, a custom data access layer.
>
>
>
> Anyway, Ross is trying to get some practices used by other adopters of the
> V2 API while trying to implement a driver/connector for MongoDB.
>
>
>
> Probably views can be used similar to partitions in mongoDB?
>
> Essentially each batch load goes into a separate mongoDB table and will
> result in view redefinition after a successful load.
>
> And finally to avoid too many tables in a view, you may have to come up
> with a separate process to merge the underlying tables on a periodic basis.
>
> It gets messy and probably moves you towards a write-once only tables, etc.
>
>
>
> Finally using views in a generic mongoDB connector may not be good and
> flexible enough.
>
>
>
>
>
> *From: *Russell Spitzer 
> *Date: *Tuesday, September 11, 2018 at 9:58 AM
> *To: *"Thakrar, Jayesh" 
> *Cc: *Arun Mahadevan , Jungtaek Lim ,
> Wenchen Fan , Reynold Xin ,
> Ross Lawley , Ryan Blue , dev <
> dev@spark.apache.org>, "dbis...@us.ibm.com" 
>
>
> *Subject: *Re: DataSourceWriter V2 Api questions
>
>
>
> That only works assuming that Spark is the only client of the table. It
> will be impossible to force an outside user to respect the special metadata
> table when reading so they will still see all of the data in transit.
> Additionally this would force the incoming data to only be written into new
> partitions which is not simple to do from a C* perspective as balancing the
> distribution of new rows would be non trivial. If we had to do something
> like this we would basically be forced to write to some disk format first
> and then when we move the data into C* we still have the same problem that
> we started with.
>
>
>
> On Tue, Sep 11, 2018 at 9:41 AM Thakrar, Jayesh <
> jthak...@conversantmedia.com> wrote:
>
> So if Spark and the destination datastore are both non-transactional, you
> will have to resort to an external mechanism for “transactionality”.
>
>
>
> Here are some options for both RDBMS and non-transaction datastore
> destination.
>
> For now assuming that Spark is used in batch mode (and not streaming mode).
>
>
>
> *RDBMS Options*
>
> Use staging table as discussed in the thread.
>
>
>
> As an extension of the above, use partitioned destination tables and load
> data into a staging table and then use partition management to include the
> staging table into the partitioned table.
>
> This this implies a partition per Spark batch run.
>
>
>
> *Non-transactional Datastore Options*
>
> Use another metadata table.
>
> Loa

Re: DataSourceWriter V2 Api questions

2018-09-13 Thread Thakrar, Jayesh
Agree on the “constraints” when working with Cassandra.
But remember, this is a weak attempt to make two non-transactional systems 
appear to the outside world as a transactional system.
Scaffolding/plumbing/abstractions will have to be created in the form of say, a 
custom data access layer.

Anyway, Ross is trying to get some practices used by other adopters of the V2 
API while trying to implement a driver/connector for MongoDB.

Probably views can be used similar to partitions in mongoDB?
Essentially each batch load goes into a separate mongoDB table and will result 
in view redefinition after a successful load.
And finally to avoid too many tables in a view, you may have to come up with a 
separate process to merge the underlying tables on a periodic basis.
It gets messy and probably moves you towards a write-once only tables, etc.

Finally using views in a generic mongoDB connector may not be good and flexible 
enough.


From: Russell Spitzer 
Date: Tuesday, September 11, 2018 at 9:58 AM
To: "Thakrar, Jayesh" 
Cc: Arun Mahadevan , Jungtaek Lim , 
Wenchen Fan , Reynold Xin , Ross 
Lawley , Ryan Blue , dev 
, "dbis...@us.ibm.com" 
Subject: Re: DataSourceWriter V2 Api questions

That only works assuming that Spark is the only client of the table. It will be 
impossible to force an outside user to respect the special metadata table when 
reading so they will still see all of the data in transit. Additionally this 
would force the incoming data to only be written into new partitions which is 
not simple to do from a C* perspective as balancing the distribution of new 
rows would be non trivial. If we had to do something like this we would 
basically be forced to write to some disk format first and then when we move 
the data into C* we still have the same problem that we started with.

On Tue, Sep 11, 2018 at 9:41 AM Thakrar, Jayesh 
mailto:jthak...@conversantmedia.com>> wrote:
So if Spark and the destination datastore are both non-transactional, you will 
have to resort to an external mechanism for “transactionality”.

Here are some options for both RDBMS and non-transaction datastore destination.
For now assuming that Spark is used in batch mode (and not streaming mode).

RDBMS Options
Use staging table as discussed in the thread.

As an extension of the above, use partitioned destination tables and load data 
into a staging table and then use partition management to include the staging 
table into the partitioned table.
This this implies a partition per Spark batch run.

Non-transactional Datastore Options
Use another metadata table.
Load the data into a staging table equivalent or even Cassandra partition(s).
Start the transaction by making a “start of transaction” entry into the 
metadata table along with partition keys to be populated.
As part of Spark batch commit, update the metadata entry with appropriate 
details – e.g. partition load time, etc.
In the event of a failed / incomplete batch, the metadata table entry will be 
incomplete and the corresponding partition keys can be dropped.
So essentially you use the metadata table to load/drop/skip the data to be 
moved/retained into the final destination.

Misc
Another option is to use Spark to stage data into a filesystem (distributed, 
HDFS) and then use RDBMS utilities to transactionally load data into the 
destination table.


From: Russell Spitzer 
mailto:russell.spit...@gmail.com>>
Date: Tuesday, September 11, 2018 at 9:08 AM
To: Arun Mahadevan mailto:ar...@apache.org>>
Cc: Jungtaek Lim mailto:kabh...@gmail.com>>, Wenchen Fan 
mailto:cloud0...@gmail.com>>, Reynold Xin 
mailto:r...@databricks.com>>, Ross Lawley 
mailto:ross.law...@gmail.com>>, Ryan Blue 
mailto:rb...@netflix.com>>, dev 
mailto:dev@spark.apache.org>>, 
mailto:dbis...@us.ibm.com>>

Subject: Re: DataSourceWriter V2 Api questions

I'm still not sure how the staging table helps for databases which do not have 
such atomicity guarantees. For example in Cassandra if you wrote all of the 
data temporarily to a staging table, we would still have the same problem in 
moving the data from the staging table into the real table. We would likely 
have as similar a chance of failing and we still have no way of making the 
entire staging set simultaneously visible.

On Tue, Sep 11, 2018 at 8:39 AM Arun Mahadevan 
mailto:ar...@apache.org>> wrote:
>Some being said it is exactly-once when the output is eventually exactly-once, 
>whereas others being said there should be no side effect, like consumer 
>shouldn't see partial write. I guess 2PC is former, since some partitions can 
>commit earlier while other partitions fail to commit for some time.
Yes its more about guaranteeing atomicity like all partitions eventually commit 
or none commits. The visibility of the data for the readers is orthogonal (e.g 
setting the isolation levels like serializable for XA) and in general its 
difficult to guar

Re: DataSourceWriter V2 Api questions

2018-09-11 Thread Russell Spitzer
That only works assuming that Spark is the only client of the table. It
will be impossible to force an outside user to respect the special metadata
table when reading so they will still see all of the data in transit.
Additionally this would force the incoming data to only be written into new
partitions which is not simple to do from a C* perspective as balancing the
distribution of new rows would be non trivial. If we had to do something
like this we would basically be forced to write to some disk format first
and then when we move the data into C* we still have the same problem that
we started with.

On Tue, Sep 11, 2018 at 9:41 AM Thakrar, Jayesh <
jthak...@conversantmedia.com> wrote:

> So if Spark and the destination datastore are both non-transactional, you
> will have to resort to an external mechanism for “transactionality”.
>
>
>
> Here are some options for both RDBMS and non-transaction datastore
> destination.
>
> For now assuming that Spark is used in batch mode (and not streaming mode).
>
>
>
> *RDBMS Options*
>
> Use staging table as discussed in the thread.
>
>
>
> As an extension of the above, use partitioned destination tables and load
> data into a staging table and then use partition management to include the
> staging table into the partitioned table.
>
> This this implies a partition per Spark batch run.
>
>
>
> *Non-transactional Datastore Options*
>
> Use another metadata table.
>
> Load the data into a staging table equivalent or even Cassandra
> partition(s).
>
> Start the transaction by making a “start of transaction” entry into the
> metadata table along with partition keys to be populated.
> As part of Spark batch commit, update the metadata entry with appropriate
> details – e.g. partition load time, etc.
> In the event of a failed / incomplete batch, the metadata table entry will
> be incomplete and the corresponding partition keys can be dropped.
>
> So essentially you use the metadata table to load/drop/skip the data to be
> moved/retained into the final destination.
>
>
>
> *Misc*
>
> Another option is to use Spark to stage data into a filesystem
> (distributed, HDFS) and then use RDBMS utilities to transactionally load
> data into the destination table.
>
>
>
>
>
> *From: *Russell Spitzer 
> *Date: *Tuesday, September 11, 2018 at 9:08 AM
> *To: *Arun Mahadevan 
> *Cc: *Jungtaek Lim , Wenchen Fan ,
> Reynold Xin , Ross Lawley ,
> Ryan Blue , dev , <
> dbis...@us.ibm.com>
>
>
> *Subject: *Re: DataSourceWriter V2 Api questions
>
>
>
> I'm still not sure how the staging table helps for databases which do not
> have such atomicity guarantees. For example in Cassandra if you wrote all
> of the data temporarily to a staging table, we would still have the same
> problem in moving the data from the staging table into the real table. We
> would likely have as similar a chance of failing and we still have no way
> of making the entire staging set simultaneously visible.
>
>
>
> On Tue, Sep 11, 2018 at 8:39 AM Arun Mahadevan  wrote:
>
> >Some being said it is exactly-once when the output is eventually
> exactly-once, whereas others being said there should be no side effect,
> like consumer shouldn't see partial write. I guess 2PC is former, since
> some partitions can commit earlier while other partitions fail to commit
> for some time.
>
> Yes its more about guaranteeing atomicity like all partitions eventually
> commit or none commits. The visibility of the data for the readers is
> orthogonal (e.g setting the isolation levels like serializable for XA) and
> in general its difficult to guarantee that data across partitions are
> visible at once. The approach like staging table and global commit works in
> a centralized set up but can be difficult to do in a distributed manner
> across partitions (e.g each partition output goes to a different database)
>
>
>
> On Mon, 10 Sep 2018 at 21:23, Jungtaek Lim  wrote:
>
> IMHO that's up to how we would like to be strict about "exactly-once".
>
>
>
> Some being said it is exactly-once when the output is eventually
> exactly-once, whereas others being said there should be no side effect,
> like consumer shouldn't see partial write. I guess 2PC is former, since
> some partitions can commit earlier while other partitions fail to commit
> for some time.
>
>
>
> Being said, there may be couple of alternatives other than the contract
> Spark provides/requires, and I'd like to see how Spark community wants to
> deal with others. Would we want to disallow alternatives, like "replay +
> deduplicate write (per a batch/partition)" which ensures "eventually"
> ex

Re: DataSourceWriter V2 Api questions

2018-09-11 Thread Thakrar, Jayesh
So if Spark and the destination datastore are both non-transactional, you will 
have to resort to an external mechanism for “transactionality”.

Here are some options for both RDBMS and non-transaction datastore destination.
For now assuming that Spark is used in batch mode (and not streaming mode).

RDBMS Options
Use staging table as discussed in the thread.

As an extension of the above, use partitioned destination tables and load data 
into a staging table and then use partition management to include the staging 
table into the partitioned table.
This this implies a partition per Spark batch run.

Non-transactional Datastore Options
Use another metadata table.
Load the data into a staging table equivalent or even Cassandra partition(s).
Start the transaction by making a “start of transaction” entry into the 
metadata table along with partition keys to be populated.
As part of Spark batch commit, update the metadata entry with appropriate 
details – e.g. partition load time, etc.
In the event of a failed / incomplete batch, the metadata table entry will be 
incomplete and the corresponding partition keys can be dropped.
So essentially you use the metadata table to load/drop/skip the data to be 
moved/retained into the final destination.

Misc
Another option is to use Spark to stage data into a filesystem (distributed, 
HDFS) and then use RDBMS utilities to transactionally load data into the 
destination table.


From: Russell Spitzer 
Date: Tuesday, September 11, 2018 at 9:08 AM
To: Arun Mahadevan 
Cc: Jungtaek Lim , Wenchen Fan , 
Reynold Xin , Ross Lawley , Ryan 
Blue , dev , 
Subject: Re: DataSourceWriter V2 Api questions

I'm still not sure how the staging table helps for databases which do not have 
such atomicity guarantees. For example in Cassandra if you wrote all of the 
data temporarily to a staging table, we would still have the same problem in 
moving the data from the staging table into the real table. We would likely 
have as similar a chance of failing and we still have no way of making the 
entire staging set simultaneously visible.

On Tue, Sep 11, 2018 at 8:39 AM Arun Mahadevan 
mailto:ar...@apache.org>> wrote:
>Some being said it is exactly-once when the output is eventually exactly-once, 
>whereas others being said there should be no side effect, like consumer 
>shouldn't see partial write. I guess 2PC is former, since some partitions can 
>commit earlier while other partitions fail to commit for some time.
Yes its more about guaranteeing atomicity like all partitions eventually commit 
or none commits. The visibility of the data for the readers is orthogonal (e.g 
setting the isolation levels like serializable for XA) and in general its 
difficult to guarantee that data across partitions are visible at once. The 
approach like staging table and global commit works in a centralized set up but 
can be difficult to do in a distributed manner across partitions (e.g each 
partition output goes to a different database)

On Mon, 10 Sep 2018 at 21:23, Jungtaek Lim 
mailto:kabh...@gmail.com>> wrote:
IMHO that's up to how we would like to be strict about "exactly-once".

Some being said it is exactly-once when the output is eventually exactly-once, 
whereas others being said there should be no side effect, like consumer 
shouldn't see partial write. I guess 2PC is former, since some partitions can 
commit earlier while other partitions fail to commit for some time.

Being said, there may be couple of alternatives other than the contract Spark 
provides/requires, and I'd like to see how Spark community wants to deal with 
others. Would we want to disallow alternatives, like "replay + deduplicate 
write (per a batch/partition)" which ensures "eventually" exactly-once but 
cannot ensure the contract?

Btw, unless achieving exactly-once is light enough for given sink, I think the 
sink should provide both at-least-once (also optimized for the semantic) vs 
exactly-once, and let end users pick one.

2018년 9월 11일 (화) 오후 12:57, Russell Spitzer 
mailto:russell.spit...@gmail.com>>님이 작성:
Why is atomic operations a requirement? I feel like doubling the amount of 
writes (with staging tables) is probably a tradeoff that the end user should 
make.
On Mon, Sep 10, 2018, 10:43 PM Wenchen Fan 
mailto:cloud0...@gmail.com>> wrote:
Regardless the API, to use Spark to write data atomically, it requires
1. Write data distributedly, with a central coordinator at Spark driver.
2. The distributed writers are not guaranteed to run together at the same time. 
(This can be relaxed if we can extend the barrier scheduling feature)
3. The new data is visible if and only if all distributed writers success.

According to these requirements, I think using a staging table is the most 
common way and maybe the only way. I'm not sure how 2PC can help, we don't want 
users to read partial data, so we need a final step to commit all the data 
together.

For RDBMS d

Re: DataSourceWriter V2 Api questions

2018-09-11 Thread Russell Spitzer
;>>
>>>>>>>>> 1. It'd require massive changes to Spark.
>>>>>>>>>
>>>>>>>>> 2. Unless the underlying data source can provide an API to
>>>>>>>>> coordinate commits (which few data sources I know provide something 
>>>>>>>>> like
>>>>>>>>> that), 2PC wouldn't work in the presence of network partitioning. You 
>>>>>>>>> can't
>>>>>>>>> defy the law of physics.
>>>>>>>>>
>>>>>>>>> Really the most common and simple way I've seen this working is
>>>>>>>>> through staging tables and a final transaction to move data from 
>>>>>>>>> staging
>>>>>>>>> table to final table.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Sep 10, 2018 at 12:56 PM Jungtaek Lim 
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I guess we all are aware of limitation of contract on DSv2
>>>>>>>>>> writer. Actually it can be achieved only with HDFS sink (or other
>>>>>>>>>> filesystem based sinks) and other external storage are normally not
>>>>>>>>>> feasible to implement it because there's no way to couple a 
>>>>>>>>>> transaction
>>>>>>>>>> with multiple clients as well as coordinator can't take over 
>>>>>>>>>> transactions
>>>>>>>>>> from writers to do the final commit.
>>>>>>>>>>
>>>>>>>>>> XA is also not a trivial one to get it correctly with current
>>>>>>>>>> execution model: Spark doesn't require writer tasks to run at the 
>>>>>>>>>> same time
>>>>>>>>>> but to achieve 2PC they should run until end of transaction (closing 
>>>>>>>>>> client
>>>>>>>>>> before transaction ends normally means aborting transaction). Spark 
>>>>>>>>>> should
>>>>>>>>>> also integrate 2PC with its checkpointing mechanism to guarantee
>>>>>>>>>> completeness of batch. And it might require different integration for
>>>>>>>>>> continuous mode.
>>>>>>>>>>
>>>>>>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>>>>>>
>>>>>>>>>> 2018년 9월 11일 (화) 오전 4:37, Arun Mahadevan 님이 작성:
>>>>>>>>>>
>>>>>>>>>>> In some cases the implementations may be ok with eventual
>>>>>>>>>>> consistency (and does not care if the output is written out 
>>>>>>>>>>> atomically)
>>>>>>>>>>>
>>>>>>>>>>> XA can be one option for datasources that supports it and
>>>>>>>>>>> requires atomicity but I am not sure how would one implement it 
>>>>>>>>>>> with the
>>>>>>>>>>> current API.
>>>>>>>>>>>
>>>>>>>>>>> May be we need to discuss improvements at the Datasource V2 API
>>>>>>>>>>> level (e.g. individual tasks would "prepare" for commit and once 
>>>>>>>>>>> the driver
>>>>>>>>>>> receives "prepared" from all the tasks, a "commit" would be invoked 
>>>>>>>>>>> at each
>>>>>>>>>>> of the individual tasks). Right now the responsibility of the final
>>>>>>>>>>> "commit" is with the driver and it may not always be possible for 
>>>>>>>>>>> the
>>>>>>>>>>> driver to take over the transactions started by the tasks.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, 10 Sep 2018 at 11:48, Dilip Biswal 
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>&

Re: DataSourceWriter V2 Api questions

2018-09-11 Thread Arun Mahadevan
gt;>>>>
>>>>>>>>> I guess we all are aware of limitation of contract on DSv2 writer.
>>>>>>>>> Actually it can be achieved only with HDFS sink (or other filesystem 
>>>>>>>>> based
>>>>>>>>> sinks) and other external storage are normally not feasible to 
>>>>>>>>> implement it
>>>>>>>>> because there's no way to couple a transaction with multiple clients 
>>>>>>>>> as
>>>>>>>>> well as coordinator can't take over transactions from writers to do 
>>>>>>>>> the
>>>>>>>>> final commit.
>>>>>>>>>
>>>>>>>>> XA is also not a trivial one to get it correctly with current
>>>>>>>>> execution model: Spark doesn't require writer tasks to run at the 
>>>>>>>>> same time
>>>>>>>>> but to achieve 2PC they should run until end of transaction (closing 
>>>>>>>>> client
>>>>>>>>> before transaction ends normally means aborting transaction). Spark 
>>>>>>>>> should
>>>>>>>>> also integrate 2PC with its checkpointing mechanism to guarantee
>>>>>>>>> completeness of batch. And it might require different integration for
>>>>>>>>> continuous mode.
>>>>>>>>>
>>>>>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>>>>>
>>>>>>>>> 2018년 9월 11일 (화) 오전 4:37, Arun Mahadevan 님이 작성:
>>>>>>>>>
>>>>>>>>>> In some cases the implementations may be ok with eventual
>>>>>>>>>> consistency (and does not care if the output is written out 
>>>>>>>>>> atomically)
>>>>>>>>>>
>>>>>>>>>> XA can be one option for datasources that supports it and
>>>>>>>>>> requires atomicity but I am not sure how would one implement it with 
>>>>>>>>>> the
>>>>>>>>>> current API.
>>>>>>>>>>
>>>>>>>>>> May be we need to discuss improvements at the Datasource V2 API
>>>>>>>>>> level (e.g. individual tasks would "prepare" for commit and once the 
>>>>>>>>>> driver
>>>>>>>>>> receives "prepared" from all the tasks, a "commit" would be invoked 
>>>>>>>>>> at each
>>>>>>>>>> of the individual tasks). Right now the responsibility of the final
>>>>>>>>>> "commit" is with the driver and it may not always be possible for the
>>>>>>>>>> driver to take over the transactions started by the tasks.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, 10 Sep 2018 at 11:48, Dilip Biswal 
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> This is a pretty big challenge in general for data sources --
>>>>>>>>>>> for the vast majority of data stores, the boundary of a transaction 
>>>>>>>>>>> is per
>>>>>>>>>>> client. That is, you can't have two clients doing writes and 
>>>>>>>>>>> coordinating a
>>>>>>>>>>> single transaction. That's certainly the case for almost all 
>>>>>>>>>>> relational
>>>>>>>>>>> databases. Spark, on the other hand, will have multiple clients 
>>>>>>>>>>> (consider
>>>>>>>>>>> each task a client) writing to the same underlying data store.
>>>>>>>>>>>
>>>>>>>>>>> DB>> Perhaps we can explore two-phase commit protocol (aka XA)
>>>>>>>>>>> for this ? Not sure how easy it is to implement this though :-)
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>> Dilip Biswal
>>>>>>>>>>> Tel: 408-463-4980 <(408)%20463-4980>
>>>>>>>>>>> dbis...@us.ibm.com
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> - Original message -
>>>>>>>>>>> From: Reynold Xin 
>>>>>>>>>>> To: Ryan Blue 
>>>>>>>>>>> Cc: ross.law...@gmail.com, dev 
>>>>>>>>>>> Subject: Re: DataSourceWriter V2 Api questions
>>>>>>>>>>> Date: Mon, Sep 10, 2018 10:26 AM
>>>>>>>>>>>
>>>>>>>>>>> I don't think the problem is just whether we have a starting
>>>>>>>>>>> point for write. As a matter of fact there's always a starting 
>>>>>>>>>>> point for
>>>>>>>>>>> write, whether it is explicit or implicit.
>>>>>>>>>>>
>>>>>>>>>>> This is a pretty big challenge in general for data sources --
>>>>>>>>>>> for the vast majority of data stores, the boundary of a transaction 
>>>>>>>>>>> is per
>>>>>>>>>>> client. That is, you can't have two clients doing writes and 
>>>>>>>>>>> coordinating a
>>>>>>>>>>> single transaction. That's certainly the case for almost all 
>>>>>>>>>>> relational
>>>>>>>>>>> databases. Spark, on the other hand, will have multiple clients 
>>>>>>>>>>> (consider
>>>>>>>>>>> each task a client) writing to the same underlying data store.
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Sep 10, 2018 at 10:19 AM Ryan Blue 
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Ross, I think the intent is to create a single transaction on
>>>>>>>>>>> the driver, write as part of it in each task, and then commit the
>>>>>>>>>>> transaction once the tasks complete. Is that possible in your
>>>>>>>>>>> implementation?
>>>>>>>>>>>
>>>>>>>>>>> I think that part of this is made more difficult by not having a
>>>>>>>>>>> clear starting point for a write, which we are fixing in the 
>>>>>>>>>>> redesign of
>>>>>>>>>>> the v2 API. That will have a method that creates a Write to track 
>>>>>>>>>>> the
>>>>>>>>>>> operation. That can create your transaction when it is created and 
>>>>>>>>>>> commit
>>>>>>>>>>> the transaction when commit is called on it.
>>>>>>>>>>>
>>>>>>>>>>> rb
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Sep 10, 2018 at 9:05 AM Reynold Xin 
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Typically people do it via transactions, or staging tables.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Sep 10, 2018 at 2:07 AM Ross Lawley <
>>>>>>>>>>> ross.law...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi all,
>>>>>>>>>>>
>>>>>>>>>>> I've been prototyping an implementation of the DataSource V2
>>>>>>>>>>> writer for the MongoDB Spark Connector and I have a couple of 
>>>>>>>>>>> questions
>>>>>>>>>>> about how its intended to be used with database systems. According 
>>>>>>>>>>> to the
>>>>>>>>>>> Javadoc for DataWriter.commit():
>>>>>>>>>>>
>>>>>>>>>>> *"this method should still "hide" the written data and ask the
>>>>>>>>>>> DataSourceWriter at driver side to do the final commit via
>>>>>>>>>>> WriterCommitMessage"*
>>>>>>>>>>>
>>>>>>>>>>> Although, MongoDB now has transactions, it doesn't have a way to
>>>>>>>>>>> "hide" the data once it has been written. So as soon as the 
>>>>>>>>>>> DataWriter has
>>>>>>>>>>> committed the data, it has been inserted/updated in the collection 
>>>>>>>>>>> and is
>>>>>>>>>>> discoverable - thereby breaking the documented contract.
>>>>>>>>>>>
>>>>>>>>>>> I was wondering how other databases systems plan to implement
>>>>>>>>>>> this API and meet the contract as per the Javadoc?
>>>>>>>>>>>
>>>>>>>>>>> Many thanks
>>>>>>>>>>>
>>>>>>>>>>> Ross
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Ryan Blue
>>>>>>>>>>> Software Engineer
>>>>>>>>>>> Netflix
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> -
>>>>>>>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>>>>>>>>
>>>>>>>>>>


Re: DataSourceWriter V2 Api questions

2018-09-11 Thread Ross Lawley
gt;>>>>>>> On Mon, Sep 10, 2018 at 12:56 PM Jungtaek Lim 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I guess we all are aware of limitation of contract on DSv2 writer.
>>>>>>>>> Actually it can be achieved only with HDFS sink (or other filesystem 
>>>>>>>>> based
>>>>>>>>> sinks) and other external storage are normally not feasible to 
>>>>>>>>> implement it
>>>>>>>>> because there's no way to couple a transaction with multiple clients 
>>>>>>>>> as
>>>>>>>>> well as coordinator can't take over transactions from writers to do 
>>>>>>>>> the
>>>>>>>>> final commit.
>>>>>>>>>
>>>>>>>>> XA is also not a trivial one to get it correctly with current
>>>>>>>>> execution model: Spark doesn't require writer tasks to run at the 
>>>>>>>>> same time
>>>>>>>>> but to achieve 2PC they should run until end of transaction (closing 
>>>>>>>>> client
>>>>>>>>> before transaction ends normally means aborting transaction). Spark 
>>>>>>>>> should
>>>>>>>>> also integrate 2PC with its checkpointing mechanism to guarantee
>>>>>>>>> completeness of batch. And it might require different integration for
>>>>>>>>> continuous mode.
>>>>>>>>>
>>>>>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>>>>>
>>>>>>>>> 2018년 9월 11일 (화) 오전 4:37, Arun Mahadevan 님이 작성:
>>>>>>>>>
>>>>>>>>>> In some cases the implementations may be ok with eventual
>>>>>>>>>> consistency (and does not care if the output is written out 
>>>>>>>>>> atomically)
>>>>>>>>>>
>>>>>>>>>> XA can be one option for datasources that supports it and
>>>>>>>>>> requires atomicity but I am not sure how would one implement it with 
>>>>>>>>>> the
>>>>>>>>>> current API.
>>>>>>>>>>
>>>>>>>>>> May be we need to discuss improvements at the Datasource V2 API
>>>>>>>>>> level (e.g. individual tasks would "prepare" for commit and once the 
>>>>>>>>>> driver
>>>>>>>>>> receives "prepared" from all the tasks, a "commit" would be invoked 
>>>>>>>>>> at each
>>>>>>>>>> of the individual tasks). Right now the responsibility of the final
>>>>>>>>>> "commit" is with the driver and it may not always be possible for the
>>>>>>>>>> driver to take over the transactions started by the tasks.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, 10 Sep 2018 at 11:48, Dilip Biswal 
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> This is a pretty big challenge in general for data sources --
>>>>>>>>>>> for the vast majority of data stores, the boundary of a transaction 
>>>>>>>>>>> is per
>>>>>>>>>>> client. That is, you can't have two clients doing writes and 
>>>>>>>>>>> coordinating a
>>>>>>>>>>> single transaction. That's certainly the case for almost all 
>>>>>>>>>>> relational
>>>>>>>>>>> databases. Spark, on the other hand, will have multiple clients 
>>>>>>>>>>> (consider
>>>>>>>>>>> each task a client) writing to the same underlying data store.
>>>>>>>>>>>
>>>>>>>>>>> DB>> Perhaps we can explore two-phase commit protocol (aka XA)
>>>>>>>>>>> for this ? Not sure how easy it is to implement this though :-)
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>> Dilip Biswal
>>>>>>>>>>> Tel: 408-463-4980 <(408)%20463-4980>
>>>>>>>>>>> dbis...@us.ibm.com
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> - Original message -
>>>>>>>>>>> From: Reynold Xin 
>>>>>>>>>>> To: Ryan Blue 
>>>>>>>>>>> Cc: ross.law...@gmail.com, dev 
>>>>>>>>>>> Subject: Re: DataSourceWriter V2 Api questions
>>>>>>>>>>> Date: Mon, Sep 10, 2018 10:26 AM
>>>>>>>>>>>
>>>>>>>>>>> I don't think the problem is just whether we have a starting
>>>>>>>>>>> point for write. As a matter of fact there's always a starting 
>>>>>>>>>>> point for
>>>>>>>>>>> write, whether it is explicit or implicit.
>>>>>>>>>>>
>>>>>>>>>>> This is a pretty big challenge in general for data sources --
>>>>>>>>>>> for the vast majority of data stores, the boundary of a transaction 
>>>>>>>>>>> is per
>>>>>>>>>>> client. That is, you can't have two clients doing writes and 
>>>>>>>>>>> coordinating a
>>>>>>>>>>> single transaction. That's certainly the case for almost all 
>>>>>>>>>>> relational
>>>>>>>>>>> databases. Spark, on the other hand, will have multiple clients 
>>>>>>>>>>> (consider
>>>>>>>>>>> each task a client) writing to the same underlying data store.
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Sep 10, 2018 at 10:19 AM Ryan Blue 
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Ross, I think the intent is to create a single transaction on
>>>>>>>>>>> the driver, write as part of it in each task, and then commit the
>>>>>>>>>>> transaction once the tasks complete. Is that possible in your
>>>>>>>>>>> implementation?
>>>>>>>>>>>
>>>>>>>>>>> I think that part of this is made more difficult by not having a
>>>>>>>>>>> clear starting point for a write, which we are fixing in the 
>>>>>>>>>>> redesign of
>>>>>>>>>>> the v2 API. That will have a method that creates a Write to track 
>>>>>>>>>>> the
>>>>>>>>>>> operation. That can create your transaction when it is created and 
>>>>>>>>>>> commit
>>>>>>>>>>> the transaction when commit is called on it.
>>>>>>>>>>>
>>>>>>>>>>> rb
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Sep 10, 2018 at 9:05 AM Reynold Xin 
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Typically people do it via transactions, or staging tables.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Sep 10, 2018 at 2:07 AM Ross Lawley <
>>>>>>>>>>> ross.law...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi all,
>>>>>>>>>>>
>>>>>>>>>>> I've been prototyping an implementation of the DataSource V2
>>>>>>>>>>> writer for the MongoDB Spark Connector and I have a couple of 
>>>>>>>>>>> questions
>>>>>>>>>>> about how its intended to be used with database systems. According 
>>>>>>>>>>> to the
>>>>>>>>>>> Javadoc for DataWriter.commit():
>>>>>>>>>>>
>>>>>>>>>>> *"this method should still "hide" the written data and ask the
>>>>>>>>>>> DataSourceWriter at driver side to do the final commit via
>>>>>>>>>>> WriterCommitMessage"*
>>>>>>>>>>>
>>>>>>>>>>> Although, MongoDB now has transactions, it doesn't have a way to
>>>>>>>>>>> "hide" the data once it has been written. So as soon as the 
>>>>>>>>>>> DataWriter has
>>>>>>>>>>> committed the data, it has been inserted/updated in the collection 
>>>>>>>>>>> and is
>>>>>>>>>>> discoverable - thereby breaking the documented contract.
>>>>>>>>>>>
>>>>>>>>>>> I was wondering how other databases systems plan to implement
>>>>>>>>>>> this API and meet the contract as per the Javadoc?
>>>>>>>>>>>
>>>>>>>>>>> Many thanks
>>>>>>>>>>>
>>>>>>>>>>> Ross
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Ryan Blue
>>>>>>>>>>> Software Engineer
>>>>>>>>>>> Netflix
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> -
>>>>>>>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>>>>>>>>
>>>>>>>>>>


Re: DataSourceWriter V2 Api questions

2018-09-10 Thread Jungtaek Lim
ational databases you can move data in a
>>>>> transactional way. That’s what transactions are for.
>>>>>
>>>>> For just straight HDFS, the move is a pretty fast operation so while
>>>>> it is not completely transactional, the window of potential failure is
>>>>> pretty short for appends. For writers at the partition level it is fine
>>>>> because it is just renaming directory, which is atomic.
>>>>>
>>>>> On Mon, Sep 10, 2018 at 1:40 PM Jungtaek Lim 
>>>>> wrote:
>>>>>
>>>>>> When network partitioning happens it is pretty OK for me to see 2PC
>>>>>> not working, cause we deal with global transaction. Recovery should be 
>>>>>> hard
>>>>>> thing to get it correctly though. I completely agree it would require
>>>>>> massive changes to Spark.
>>>>>>
>>>>>> What I couldn't find for underlying storages is moving data from
>>>>>> staging table to final table in transactional way. I'm not fully sure but
>>>>>> as I'm aware of, many storages would not support moving data, and even 
>>>>>> HDFS
>>>>>> sink it is not strictly done in transactional way since we move multiple
>>>>>> files with multiple operations. If coordinator just crashes it leaves
>>>>>> partial write, and among writers and coordinator need to deal with 
>>>>>> ensuring
>>>>>> it will not be going to be duplicated.
>>>>>>
>>>>>> Ryan replied me as Iceberg and HBase MVCC timestamps can enable us
>>>>>> to implement "commit" (his reply didn't hit dev. mailing list though) but
>>>>>> I'm not an expert of both twos and I couldn't still imagine it can deal
>>>>>> with various crash cases.
>>>>>>
>>>>>> 2018년 9월 11일 (화) 오전 5:17, Reynold Xin 님이 작성:
>>>>>>
>>>>>>> I don't think two phase commit would work here at all.
>>>>>>>
>>>>>>> 1. It'd require massive changes to Spark.
>>>>>>>
>>>>>>> 2. Unless the underlying data source can provide an API to
>>>>>>> coordinate commits (which few data sources I know provide something like
>>>>>>> that), 2PC wouldn't work in the presence of network partitioning. You 
>>>>>>> can't
>>>>>>> defy the law of physics.
>>>>>>>
>>>>>>> Really the most common and simple way I've seen this working is
>>>>>>> through staging tables and a final transaction to move data from staging
>>>>>>> table to final table.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Sep 10, 2018 at 12:56 PM Jungtaek Lim 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I guess we all are aware of limitation of contract on DSv2 writer.
>>>>>>>> Actually it can be achieved only with HDFS sink (or other filesystem 
>>>>>>>> based
>>>>>>>> sinks) and other external storage are normally not feasible to 
>>>>>>>> implement it
>>>>>>>> because there's no way to couple a transaction with multiple clients as
>>>>>>>> well as coordinator can't take over transactions from writers to do the
>>>>>>>> final commit.
>>>>>>>>
>>>>>>>> XA is also not a trivial one to get it correctly with current
>>>>>>>> execution model: Spark doesn't require writer tasks to run at the same 
>>>>>>>> time
>>>>>>>> but to achieve 2PC they should run until end of transaction (closing 
>>>>>>>> client
>>>>>>>> before transaction ends normally means aborting transaction). Spark 
>>>>>>>> should
>>>>>>>> also integrate 2PC with its checkpointing mechanism to guarantee
>>>>>>>> completeness of batch. And it might require different integration for
>>>>>>>> continuous mode.
>>>>>>>>
>>>>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>>>>
>>>>>>>> 2018년 9월 11일 (화) 오전 4:3

Re: DataSourceWriter V2 Api questions

2018-09-10 Thread Russell Spitzer
 it is not strictly done in transactional way since we move multiple
>>>>> files with multiple operations. If coordinator just crashes it leaves
>>>>> partial write, and among writers and coordinator need to deal with 
>>>>> ensuring
>>>>> it will not be going to be duplicated.
>>>>>
>>>>> Ryan replied me as Iceberg and HBase MVCC timestamps can enable us to
>>>>> implement "commit" (his reply didn't hit dev. mailing list though) but I'm
>>>>> not an expert of both twos and I couldn't still imagine it can deal with
>>>>> various crash cases.
>>>>>
>>>>> 2018년 9월 11일 (화) 오전 5:17, Reynold Xin 님이 작성:
>>>>>
>>>>>> I don't think two phase commit would work here at all.
>>>>>>
>>>>>> 1. It'd require massive changes to Spark.
>>>>>>
>>>>>> 2. Unless the underlying data source can provide an API to coordinate
>>>>>> commits (which few data sources I know provide something like that), 2PC
>>>>>> wouldn't work in the presence of network partitioning. You can't defy the
>>>>>> law of physics.
>>>>>>
>>>>>> Really the most common and simple way I've seen this working is
>>>>>> through staging tables and a final transaction to move data from staging
>>>>>> table to final table.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Sep 10, 2018 at 12:56 PM Jungtaek Lim 
>>>>>> wrote:
>>>>>>
>>>>>>> I guess we all are aware of limitation of contract on DSv2 writer.
>>>>>>> Actually it can be achieved only with HDFS sink (or other filesystem 
>>>>>>> based
>>>>>>> sinks) and other external storage are normally not feasible to 
>>>>>>> implement it
>>>>>>> because there's no way to couple a transaction with multiple clients as
>>>>>>> well as coordinator can't take over transactions from writers to do the
>>>>>>> final commit.
>>>>>>>
>>>>>>> XA is also not a trivial one to get it correctly with current
>>>>>>> execution model: Spark doesn't require writer tasks to run at the same 
>>>>>>> time
>>>>>>> but to achieve 2PC they should run until end of transaction (closing 
>>>>>>> client
>>>>>>> before transaction ends normally means aborting transaction). Spark 
>>>>>>> should
>>>>>>> also integrate 2PC with its checkpointing mechanism to guarantee
>>>>>>> completeness of batch. And it might require different integration for
>>>>>>> continuous mode.
>>>>>>>
>>>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>>>
>>>>>>> 2018년 9월 11일 (화) 오전 4:37, Arun Mahadevan 님이 작성:
>>>>>>>
>>>>>>>> In some cases the implementations may be ok with eventual
>>>>>>>> consistency (and does not care if the output is written out atomically)
>>>>>>>>
>>>>>>>> XA can be one option for datasources that supports it and requires
>>>>>>>> atomicity but I am not sure how would one implement it with the current
>>>>>>>> API.
>>>>>>>>
>>>>>>>> May be we need to discuss improvements at the Datasource V2 API
>>>>>>>> level (e.g. individual tasks would "prepare" for commit and once the 
>>>>>>>> driver
>>>>>>>> receives "prepared" from all the tasks, a "commit" would be invoked at 
>>>>>>>> each
>>>>>>>> of the individual tasks). Right now the responsibility of the final
>>>>>>>> "commit" is with the driver and it may not always be possible for the
>>>>>>>> driver to take over the transactions started by the tasks.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, 10 Sep 2018 at 11:48, Dilip Biswal 
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> This is a pretty big challenge in general for data sources -- for
>>>>>>>&g

Re: DataSourceWriter V2 Api questions

2018-09-10 Thread Wenchen Fan
eal with
>>>> various crash cases.
>>>>
>>>> 2018년 9월 11일 (화) 오전 5:17, Reynold Xin 님이 작성:
>>>>
>>>>> I don't think two phase commit would work here at all.
>>>>>
>>>>> 1. It'd require massive changes to Spark.
>>>>>
>>>>> 2. Unless the underlying data source can provide an API to coordinate
>>>>> commits (which few data sources I know provide something like that), 2PC
>>>>> wouldn't work in the presence of network partitioning. You can't defy the
>>>>> law of physics.
>>>>>
>>>>> Really the most common and simple way I've seen this working is
>>>>> through staging tables and a final transaction to move data from staging
>>>>> table to final table.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Sep 10, 2018 at 12:56 PM Jungtaek Lim 
>>>>> wrote:
>>>>>
>>>>>> I guess we all are aware of limitation of contract on DSv2 writer.
>>>>>> Actually it can be achieved only with HDFS sink (or other filesystem 
>>>>>> based
>>>>>> sinks) and other external storage are normally not feasible to implement 
>>>>>> it
>>>>>> because there's no way to couple a transaction with multiple clients as
>>>>>> well as coordinator can't take over transactions from writers to do the
>>>>>> final commit.
>>>>>>
>>>>>> XA is also not a trivial one to get it correctly with current
>>>>>> execution model: Spark doesn't require writer tasks to run at the same 
>>>>>> time
>>>>>> but to achieve 2PC they should run until end of transaction (closing 
>>>>>> client
>>>>>> before transaction ends normally means aborting transaction). Spark 
>>>>>> should
>>>>>> also integrate 2PC with its checkpointing mechanism to guarantee
>>>>>> completeness of batch. And it might require different integration for
>>>>>> continuous mode.
>>>>>>
>>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>>
>>>>>> 2018년 9월 11일 (화) 오전 4:37, Arun Mahadevan 님이 작성:
>>>>>>
>>>>>>> In some cases the implementations may be ok with eventual
>>>>>>> consistency (and does not care if the output is written out atomically)
>>>>>>>
>>>>>>> XA can be one option for datasources that supports it and requires
>>>>>>> atomicity but I am not sure how would one implement it with the current
>>>>>>> API.
>>>>>>>
>>>>>>> May be we need to discuss improvements at the Datasource V2 API
>>>>>>> level (e.g. individual tasks would "prepare" for commit and once the 
>>>>>>> driver
>>>>>>> receives "prepared" from all the tasks, a "commit" would be invoked at 
>>>>>>> each
>>>>>>> of the individual tasks). Right now the responsibility of the final
>>>>>>> "commit" is with the driver and it may not always be possible for the
>>>>>>> driver to take over the transactions started by the tasks.
>>>>>>>
>>>>>>>
>>>>>>> On Mon, 10 Sep 2018 at 11:48, Dilip Biswal 
>>>>>>> wrote:
>>>>>>>
>>>>>>>> This is a pretty big challenge in general for data sources -- for
>>>>>>>> the vast majority of data stores, the boundary of a transaction is per
>>>>>>>> client. That is, you can't have two clients doing writes and 
>>>>>>>> coordinating a
>>>>>>>> single transaction. That's certainly the case for almost all relational
>>>>>>>> databases. Spark, on the other hand, will have multiple clients 
>>>>>>>> (consider
>>>>>>>> each task a client) writing to the same underlying data store.
>>>>>>>>
>>>>>>>> DB>> Perhaps we can explore two-phase commit protocol (aka XA) for
>>>>>>>> this ? Not sure how easy it is to implement this though :-)
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Dilip Biswal
>>>

Re: DataSourceWriter V2 Api questions

2018-09-10 Thread Jungtaek Lim
;>> sinks) and other external storage are normally not feasible to implement 
>>>>> it
>>>>> because there's no way to couple a transaction with multiple clients as
>>>>> well as coordinator can't take over transactions from writers to do the
>>>>> final commit.
>>>>>
>>>>> XA is also not a trivial one to get it correctly with current
>>>>> execution model: Spark doesn't require writer tasks to run at the same 
>>>>> time
>>>>> but to achieve 2PC they should run until end of transaction (closing 
>>>>> client
>>>>> before transaction ends normally means aborting transaction). Spark should
>>>>> also integrate 2PC with its checkpointing mechanism to guarantee
>>>>> completeness of batch. And it might require different integration for
>>>>> continuous mode.
>>>>>
>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>
>>>>> 2018년 9월 11일 (화) 오전 4:37, Arun Mahadevan 님이 작성:
>>>>>
>>>>>> In some cases the implementations may be ok with eventual consistency
>>>>>> (and does not care if the output is written out atomically)
>>>>>>
>>>>>> XA can be one option for datasources that supports it and requires
>>>>>> atomicity but I am not sure how would one implement it with the current
>>>>>> API.
>>>>>>
>>>>>> May be we need to discuss improvements at the Datasource V2 API level
>>>>>> (e.g. individual tasks would "prepare" for commit and once the driver
>>>>>> receives "prepared" from all the tasks, a "commit" would be invoked at 
>>>>>> each
>>>>>> of the individual tasks). Right now the responsibility of the final
>>>>>> "commit" is with the driver and it may not always be possible for the
>>>>>> driver to take over the transactions started by the tasks.
>>>>>>
>>>>>>
>>>>>> On Mon, 10 Sep 2018 at 11:48, Dilip Biswal 
>>>>>> wrote:
>>>>>>
>>>>>>> This is a pretty big challenge in general for data sources -- for
>>>>>>> the vast majority of data stores, the boundary of a transaction is per
>>>>>>> client. That is, you can't have two clients doing writes and 
>>>>>>> coordinating a
>>>>>>> single transaction. That's certainly the case for almost all relational
>>>>>>> databases. Spark, on the other hand, will have multiple clients 
>>>>>>> (consider
>>>>>>> each task a client) writing to the same underlying data store.
>>>>>>>
>>>>>>> DB>> Perhaps we can explore two-phase commit protocol (aka XA) for
>>>>>>> this ? Not sure how easy it is to implement this though :-)
>>>>>>>
>>>>>>> Regards,
>>>>>>> Dilip Biswal
>>>>>>> Tel: 408-463-4980 <(408)%20463-4980>
>>>>>>> dbis...@us.ibm.com
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> - Original message -
>>>>>>> From: Reynold Xin 
>>>>>>> To: Ryan Blue 
>>>>>>> Cc: ross.law...@gmail.com, dev 
>>>>>>> Subject: Re: DataSourceWriter V2 Api questions
>>>>>>> Date: Mon, Sep 10, 2018 10:26 AM
>>>>>>>
>>>>>>> I don't think the problem is just whether we have a starting point
>>>>>>> for write. As a matter of fact there's always a starting point for 
>>>>>>> write,
>>>>>>> whether it is explicit or implicit.
>>>>>>>
>>>>>>> This is a pretty big challenge in general for data sources -- for
>>>>>>> the vast majority of data stores, the boundary of a transaction is per
>>>>>>> client. That is, you can't have two clients doing writes and 
>>>>>>> coordinating a
>>>>>>> single transaction. That's certainly the case for almost all relational
>>>>>>> databases. Spark, on the other hand, will have multiple clients 
>>>>>>> (consider
>>>>>>> each task a client) writing to the same underlying data store.
>>>>>>>
>>

Re: DataSourceWriter V2 Api questions

2018-09-10 Thread Ryan Blue
gt; final commit.
>>>>>
>>>>> XA is also not a trivial one to get it correctly with current
>>>>> execution model: Spark doesn't require writer tasks to run at the same 
>>>>> time
>>>>> but to achieve 2PC they should run until end of transaction (closing 
>>>>> client
>>>>> before transaction ends normally means aborting transaction). Spark should
>>>>> also integrate 2PC with its checkpointing mechanism to guarantee
>>>>> completeness of batch. And it might require different integration for
>>>>> continuous mode.
>>>>>
>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>
>>>>> 2018년 9월 11일 (화) 오전 4:37, Arun Mahadevan 님이 작성:
>>>>>
>>>>>> In some cases the implementations may be ok with eventual consistency
>>>>>> (and does not care if the output is written out atomically)
>>>>>>
>>>>>> XA can be one option for datasources that supports it and requires
>>>>>> atomicity but I am not sure how would one implement it with the current
>>>>>> API.
>>>>>>
>>>>>> May be we need to discuss improvements at the Datasource V2 API level
>>>>>> (e.g. individual tasks would "prepare" for commit and once the driver
>>>>>> receives "prepared" from all the tasks, a "commit" would be invoked at 
>>>>>> each
>>>>>> of the individual tasks). Right now the responsibility of the final
>>>>>> "commit" is with the driver and it may not always be possible for the
>>>>>> driver to take over the transactions started by the tasks.
>>>>>>
>>>>>>
>>>>>> On Mon, 10 Sep 2018 at 11:48, Dilip Biswal 
>>>>>> wrote:
>>>>>>
>>>>>>> This is a pretty big challenge in general for data sources -- for
>>>>>>> the vast majority of data stores, the boundary of a transaction is per
>>>>>>> client. That is, you can't have two clients doing writes and 
>>>>>>> coordinating a
>>>>>>> single transaction. That's certainly the case for almost all relational
>>>>>>> databases. Spark, on the other hand, will have multiple clients 
>>>>>>> (consider
>>>>>>> each task a client) writing to the same underlying data store.
>>>>>>>
>>>>>>> DB>> Perhaps we can explore two-phase commit protocol (aka XA) for
>>>>>>> this ? Not sure how easy it is to implement this though :-)
>>>>>>>
>>>>>>> Regards,
>>>>>>> Dilip Biswal
>>>>>>> Tel: 408-463-4980 <(408)%20463-4980>
>>>>>>> dbis...@us.ibm.com
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> - Original message -
>>>>>>> From: Reynold Xin 
>>>>>>> To: Ryan Blue 
>>>>>>> Cc: ross.law...@gmail.com, dev 
>>>>>>> Subject: Re: DataSourceWriter V2 Api questions
>>>>>>> Date: Mon, Sep 10, 2018 10:26 AM
>>>>>>>
>>>>>>> I don't think the problem is just whether we have a starting point
>>>>>>> for write. As a matter of fact there's always a starting point for 
>>>>>>> write,
>>>>>>> whether it is explicit or implicit.
>>>>>>>
>>>>>>> This is a pretty big challenge in general for data sources -- for
>>>>>>> the vast majority of data stores, the boundary of a transaction is per
>>>>>>> client. That is, you can't have two clients doing writes and 
>>>>>>> coordinating a
>>>>>>> single transaction. That's certainly the case for almost all relational
>>>>>>> databases. Spark, on the other hand, will have multiple clients 
>>>>>>> (consider
>>>>>>> each task a client) writing to the same underlying data store.
>>>>>>>
>>>>>>> On Mon, Sep 10, 2018 at 10:19 AM Ryan Blue 
>>>>>>> wrote:
>>>>>>>
>>>>>>> Ross, I think the intent is to create a single transaction on the
>>>>>>> driver, write as part of it in each task, and then commit the 
>>>>>>> transaction
>>>>>>> once the tasks complete. Is that possible in your implementation?
>>>>>>>
>>>>>>> I think that part of this is made more difficult by not having a
>>>>>>> clear starting point for a write, which we are fixing in the redesign of
>>>>>>> the v2 API. That will have a method that creates a Write to track the
>>>>>>> operation. That can create your transaction when it is created and 
>>>>>>> commit
>>>>>>> the transaction when commit is called on it.
>>>>>>>
>>>>>>> rb
>>>>>>>
>>>>>>> On Mon, Sep 10, 2018 at 9:05 AM Reynold Xin 
>>>>>>> wrote:
>>>>>>>
>>>>>>> Typically people do it via transactions, or staging tables.
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Sep 10, 2018 at 2:07 AM Ross Lawley 
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I've been prototyping an implementation of the DataSource V2 writer
>>>>>>> for the MongoDB Spark Connector and I have a couple of questions about 
>>>>>>> how
>>>>>>> its intended to be used with database systems. According to the Javadoc 
>>>>>>> for
>>>>>>> DataWriter.commit():
>>>>>>>
>>>>>>> *"this method should still "hide" the written data and ask the
>>>>>>> DataSourceWriter at driver side to do the final commit via
>>>>>>> WriterCommitMessage"*
>>>>>>>
>>>>>>> Although, MongoDB now has transactions, it doesn't have a way to
>>>>>>> "hide" the data once it has been written. So as soon as the DataWriter 
>>>>>>> has
>>>>>>> committed the data, it has been inserted/updated in the collection and 
>>>>>>> is
>>>>>>> discoverable - thereby breaking the documented contract.
>>>>>>>
>>>>>>> I was wondering how other databases systems plan to implement this
>>>>>>> API and meet the contract as per the Javadoc?
>>>>>>>
>>>>>>> Many thanks
>>>>>>>
>>>>>>> Ross
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Ryan Blue
>>>>>>> Software Engineer
>>>>>>> Netflix
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> -
>>>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>>>>
>>>>>>

-- 
Ryan Blue
Software Engineer
Netflix


Re: DataSourceWriter V2 Api questions

2018-09-10 Thread Ryan Blue
anism to guarantee
>>>>> completeness of batch. And it might require different integration for
>>>>> continuous mode.
>>>>>
>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>
>>>>> 2018년 9월 11일 (화) 오전 4:37, Arun Mahadevan 님이 작성:
>>>>>
>>>>>> In some cases the implementations may be ok with eventual consistency
>>>>>> (and does not care if the output is written out atomically)
>>>>>>
>>>>>> XA can be one option for datasources that supports it and requires
>>>>>> atomicity but I am not sure how would one implement it with the current
>>>>>> API.
>>>>>>
>>>>>> May be we need to discuss improvements at the Datasource V2 API level
>>>>>> (e.g. individual tasks would "prepare" for commit and once the driver
>>>>>> receives "prepared" from all the tasks, a "commit" would be invoked at 
>>>>>> each
>>>>>> of the individual tasks). Right now the responsibility of the final
>>>>>> "commit" is with the driver and it may not always be possible for the
>>>>>> driver to take over the transactions started by the tasks.
>>>>>>
>>>>>>
>>>>>> On Mon, 10 Sep 2018 at 11:48, Dilip Biswal 
>>>>>> wrote:
>>>>>>
>>>>>>> This is a pretty big challenge in general for data sources -- for
>>>>>>> the vast majority of data stores, the boundary of a transaction is per
>>>>>>> client. That is, you can't have two clients doing writes and 
>>>>>>> coordinating a
>>>>>>> single transaction. That's certainly the case for almost all relational
>>>>>>> databases. Spark, on the other hand, will have multiple clients 
>>>>>>> (consider
>>>>>>> each task a client) writing to the same underlying data store.
>>>>>>>
>>>>>>> DB>> Perhaps we can explore two-phase commit protocol (aka XA) for
>>>>>>> this ? Not sure how easy it is to implement this though :-)
>>>>>>>
>>>>>>> Regards,
>>>>>>> Dilip Biswal
>>>>>>> Tel: 408-463-4980 <(408)%20463-4980>
>>>>>>> dbis...@us.ibm.com
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> - Original message -
>>>>>>> From: Reynold Xin 
>>>>>>> To: Ryan Blue 
>>>>>>> Cc: ross.law...@gmail.com, dev 
>>>>>>> Subject: Re: DataSourceWriter V2 Api questions
>>>>>>> Date: Mon, Sep 10, 2018 10:26 AM
>>>>>>>
>>>>>>> I don't think the problem is just whether we have a starting point
>>>>>>> for write. As a matter of fact there's always a starting point for 
>>>>>>> write,
>>>>>>> whether it is explicit or implicit.
>>>>>>>
>>>>>>> This is a pretty big challenge in general for data sources -- for
>>>>>>> the vast majority of data stores, the boundary of a transaction is per
>>>>>>> client. That is, you can't have two clients doing writes and 
>>>>>>> coordinating a
>>>>>>> single transaction. That's certainly the case for almost all relational
>>>>>>> databases. Spark, on the other hand, will have multiple clients 
>>>>>>> (consider
>>>>>>> each task a client) writing to the same underlying data store.
>>>>>>>
>>>>>>> On Mon, Sep 10, 2018 at 10:19 AM Ryan Blue 
>>>>>>> wrote:
>>>>>>>
>>>>>>> Ross, I think the intent is to create a single transaction on the
>>>>>>> driver, write as part of it in each task, and then commit the 
>>>>>>> transaction
>>>>>>> once the tasks complete. Is that possible in your implementation?
>>>>>>>
>>>>>>> I think that part of this is made more difficult by not having a
>>>>>>> clear starting point for a write, which we are fixing in the redesign of
>>>>>>> the v2 API. That will have a method that creates a Write to track the
>>>>>>> operation. That can create your transaction when it is created and 
>>>>>>> commit
>>>>>>> the transaction when commit is called on it.
>>>>>>>
>>>>>>> rb
>>>>>>>
>>>>>>> On Mon, Sep 10, 2018 at 9:05 AM Reynold Xin 
>>>>>>> wrote:
>>>>>>>
>>>>>>> Typically people do it via transactions, or staging tables.
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Sep 10, 2018 at 2:07 AM Ross Lawley 
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I've been prototyping an implementation of the DataSource V2 writer
>>>>>>> for the MongoDB Spark Connector and I have a couple of questions about 
>>>>>>> how
>>>>>>> its intended to be used with database systems. According to the Javadoc 
>>>>>>> for
>>>>>>> DataWriter.commit():
>>>>>>>
>>>>>>> *"this method should still "hide" the written data and ask the
>>>>>>> DataSourceWriter at driver side to do the final commit via
>>>>>>> WriterCommitMessage"*
>>>>>>>
>>>>>>> Although, MongoDB now has transactions, it doesn't have a way to
>>>>>>> "hide" the data once it has been written. So as soon as the DataWriter 
>>>>>>> has
>>>>>>> committed the data, it has been inserted/updated in the collection and 
>>>>>>> is
>>>>>>> discoverable - thereby breaking the documented contract.
>>>>>>>
>>>>>>> I was wondering how other databases systems plan to implement this
>>>>>>> API and meet the contract as per the Javadoc?
>>>>>>>
>>>>>>> Many thanks
>>>>>>>
>>>>>>> Ross
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Ryan Blue
>>>>>>> Software Engineer
>>>>>>> Netflix
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> -
>>>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>>>>
>>>>>>

-- 
Ryan Blue
Software Engineer
Netflix


Re: DataSourceWriter V2 Api questions

2018-09-10 Thread Arun Mahadevan
nd does not care if the output is written out atomically)
>>>>>
>>>>> XA can be one option for datasources that supports it and requires
>>>>> atomicity but I am not sure how would one implement it with the current
>>>>> API.
>>>>>
>>>>> May be we need to discuss improvements at the Datasource V2 API level
>>>>> (e.g. individual tasks would "prepare" for commit and once the driver
>>>>> receives "prepared" from all the tasks, a "commit" would be invoked at 
>>>>> each
>>>>> of the individual tasks). Right now the responsibility of the final
>>>>> "commit" is with the driver and it may not always be possible for the
>>>>> driver to take over the transactions started by the tasks.
>>>>>
>>>>>
>>>>> On Mon, 10 Sep 2018 at 11:48, Dilip Biswal  wrote:
>>>>>
>>>>>> This is a pretty big challenge in general for data sources -- for the
>>>>>> vast majority of data stores, the boundary of a transaction is per 
>>>>>> client.
>>>>>> That is, you can't have two clients doing writes and coordinating a 
>>>>>> single
>>>>>> transaction. That's certainly the case for almost all relational 
>>>>>> databases.
>>>>>> Spark, on the other hand, will have multiple clients (consider each task 
>>>>>> a
>>>>>> client) writing to the same underlying data store.
>>>>>>
>>>>>> DB>> Perhaps we can explore two-phase commit protocol (aka XA) for
>>>>>> this ? Not sure how easy it is to implement this though :-)
>>>>>>
>>>>>> Regards,
>>>>>> Dilip Biswal
>>>>>> Tel: 408-463-4980 <(408)%20463-4980>
>>>>>> dbis...@us.ibm.com
>>>>>>
>>>>>>
>>>>>>
>>>>>> - Original message -
>>>>>> From: Reynold Xin 
>>>>>> To: Ryan Blue 
>>>>>> Cc: ross.law...@gmail.com, dev 
>>>>>> Subject: Re: DataSourceWriter V2 Api questions
>>>>>> Date: Mon, Sep 10, 2018 10:26 AM
>>>>>>
>>>>>> I don't think the problem is just whether we have a starting point
>>>>>> for write. As a matter of fact there's always a starting point for write,
>>>>>> whether it is explicit or implicit.
>>>>>>
>>>>>> This is a pretty big challenge in general for data sources -- for the
>>>>>> vast majority of data stores, the boundary of a transaction is per 
>>>>>> client.
>>>>>> That is, you can't have two clients doing writes and coordinating a 
>>>>>> single
>>>>>> transaction. That's certainly the case for almost all relational 
>>>>>> databases.
>>>>>> Spark, on the other hand, will have multiple clients (consider each task 
>>>>>> a
>>>>>> client) writing to the same underlying data store.
>>>>>>
>>>>>> On Mon, Sep 10, 2018 at 10:19 AM Ryan Blue  wrote:
>>>>>>
>>>>>> Ross, I think the intent is to create a single transaction on the
>>>>>> driver, write as part of it in each task, and then commit the transaction
>>>>>> once the tasks complete. Is that possible in your implementation?
>>>>>>
>>>>>> I think that part of this is made more difficult by not having a
>>>>>> clear starting point for a write, which we are fixing in the redesign of
>>>>>> the v2 API. That will have a method that creates a Write to track the
>>>>>> operation. That can create your transaction when it is created and commit
>>>>>> the transaction when commit is called on it.
>>>>>>
>>>>>> rb
>>>>>>
>>>>>> On Mon, Sep 10, 2018 at 9:05 AM Reynold Xin 
>>>>>> wrote:
>>>>>>
>>>>>> Typically people do it via transactions, or staging tables.
>>>>>>
>>>>>>
>>>>>> On Mon, Sep 10, 2018 at 2:07 AM Ross Lawley 
>>>>>> wrote:
>>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I've been prototyping an implementation of the DataSource V2 writer
>>>>>> for the MongoDB Spark Connector and I have a couple of questions about 
>>>>>> how
>>>>>> its intended to be used with database systems. According to the Javadoc 
>>>>>> for
>>>>>> DataWriter.commit():
>>>>>>
>>>>>> *"this method should still "hide" the written data and ask the
>>>>>> DataSourceWriter at driver side to do the final commit via
>>>>>> WriterCommitMessage"*
>>>>>>
>>>>>> Although, MongoDB now has transactions, it doesn't have a way to
>>>>>> "hide" the data once it has been written. So as soon as the DataWriter 
>>>>>> has
>>>>>> committed the data, it has been inserted/updated in the collection and is
>>>>>> discoverable - thereby breaking the documented contract.
>>>>>>
>>>>>> I was wondering how other databases systems plan to implement this
>>>>>> API and meet the contract as per the Javadoc?
>>>>>>
>>>>>> Many thanks
>>>>>>
>>>>>> Ross
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Ryan Blue
>>>>>> Software Engineer
>>>>>> Netflix
>>>>>>
>>>>>>
>>>>>>
>>>>>> -
>>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>>>
>>>>>


Re: DataSourceWriter V2 Api questions

2018-09-10 Thread Jungtaek Lim
 it may not always be possible for the
>>>>> driver to take over the transactions started by the tasks.
>>>>>
>>>>>
>>>>> On Mon, 10 Sep 2018 at 11:48, Dilip Biswal  wrote:
>>>>>
>>>>>> This is a pretty big challenge in general for data sources -- for the
>>>>>> vast majority of data stores, the boundary of a transaction is per 
>>>>>> client.
>>>>>> That is, you can't have two clients doing writes and coordinating a 
>>>>>> single
>>>>>> transaction. That's certainly the case for almost all relational 
>>>>>> databases.
>>>>>> Spark, on the other hand, will have multiple clients (consider each task 
>>>>>> a
>>>>>> client) writing to the same underlying data store.
>>>>>>
>>>>>> DB>> Perhaps we can explore two-phase commit protocol (aka XA) for
>>>>>> this ? Not sure how easy it is to implement this though :-)
>>>>>>
>>>>>> Regards,
>>>>>> Dilip Biswal
>>>>>> Tel: 408-463-4980 <(408)%20463-4980>
>>>>>> dbis...@us.ibm.com
>>>>>>
>>>>>>
>>>>>>
>>>>>> - Original message -
>>>>>> From: Reynold Xin 
>>>>>> To: Ryan Blue 
>>>>>> Cc: ross.law...@gmail.com, dev 
>>>>>> Subject: Re: DataSourceWriter V2 Api questions
>>>>>> Date: Mon, Sep 10, 2018 10:26 AM
>>>>>>
>>>>>> I don't think the problem is just whether we have a starting point
>>>>>> for write. As a matter of fact there's always a starting point for write,
>>>>>> whether it is explicit or implicit.
>>>>>>
>>>>>> This is a pretty big challenge in general for data sources -- for the
>>>>>> vast majority of data stores, the boundary of a transaction is per 
>>>>>> client.
>>>>>> That is, you can't have two clients doing writes and coordinating a 
>>>>>> single
>>>>>> transaction. That's certainly the case for almost all relational 
>>>>>> databases.
>>>>>> Spark, on the other hand, will have multiple clients (consider each task 
>>>>>> a
>>>>>> client) writing to the same underlying data store.
>>>>>>
>>>>>> On Mon, Sep 10, 2018 at 10:19 AM Ryan Blue  wrote:
>>>>>>
>>>>>> Ross, I think the intent is to create a single transaction on the
>>>>>> driver, write as part of it in each task, and then commit the transaction
>>>>>> once the tasks complete. Is that possible in your implementation?
>>>>>>
>>>>>> I think that part of this is made more difficult by not having a
>>>>>> clear starting point for a write, which we are fixing in the redesign of
>>>>>> the v2 API. That will have a method that creates a Write to track the
>>>>>> operation. That can create your transaction when it is created and commit
>>>>>> the transaction when commit is called on it.
>>>>>>
>>>>>> rb
>>>>>>
>>>>>> On Mon, Sep 10, 2018 at 9:05 AM Reynold Xin 
>>>>>> wrote:
>>>>>>
>>>>>> Typically people do it via transactions, or staging tables.
>>>>>>
>>>>>>
>>>>>> On Mon, Sep 10, 2018 at 2:07 AM Ross Lawley 
>>>>>> wrote:
>>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I've been prototyping an implementation of the DataSource V2 writer
>>>>>> for the MongoDB Spark Connector and I have a couple of questions about 
>>>>>> how
>>>>>> its intended to be used with database systems. According to the Javadoc 
>>>>>> for
>>>>>> DataWriter.commit():
>>>>>>
>>>>>> *"this method should still "hide" the written data and ask the
>>>>>> DataSourceWriter at driver side to do the final commit via
>>>>>> WriterCommitMessage"*
>>>>>>
>>>>>> Although, MongoDB now has transactions, it doesn't have a way to
>>>>>> "hide" the data once it has been written. So as soon as the DataWriter 
>>>>>> has
>>>>>> committed the data, it has been inserted/updated in the collection and is
>>>>>> discoverable - thereby breaking the documented contract.
>>>>>>
>>>>>> I was wondering how other databases systems plan to implement this
>>>>>> API and meet the contract as per the Javadoc?
>>>>>>
>>>>>> Many thanks
>>>>>>
>>>>>> Ross
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Ryan Blue
>>>>>> Software Engineer
>>>>>> Netflix
>>>>>>
>>>>>>
>>>>>>
>>>>>> -
>>>>>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>>>>
>>>>>


Re: DataSourceWriter V2 Api questions

2018-09-10 Thread Reynold Xin
Well almost all relational databases you can move data in a transactional
way. That’s what transactions are for.

For just straight HDFS, the move is a pretty fast operation so while it is
not completely transactional, the window of potential failure is pretty
short for appends. For writers at the partition level it is fine because it
is just renaming directory, which is atomic.

On Mon, Sep 10, 2018 at 1:40 PM Jungtaek Lim  wrote:

> When network partitioning happens it is pretty OK for me to see 2PC not
> working, cause we deal with global transaction. Recovery should be hard
> thing to get it correctly though. I completely agree it would require
> massive changes to Spark.
>
> What I couldn't find for underlying storages is moving data from staging
> table to final table in transactional way. I'm not fully sure but as I'm
> aware of, many storages would not support moving data, and even HDFS sink
> it is not strictly done in transactional way since we move multiple files
> with multiple operations. If coordinator just crashes it leaves partial
> write, and among writers and coordinator need to deal with ensuring it will
> not be going to be duplicated.
>
> Ryan replied me as Iceberg and HBase MVCC timestamps can enable us to
> implement "commit" (his reply didn't hit dev. mailing list though) but I'm
> not an expert of both twos and I couldn't still imagine it can deal with
> various crash cases.
>
> 2018년 9월 11일 (화) 오전 5:17, Reynold Xin 님이 작성:
>
>> I don't think two phase commit would work here at all.
>>
>> 1. It'd require massive changes to Spark.
>>
>> 2. Unless the underlying data source can provide an API to coordinate
>> commits (which few data sources I know provide something like that), 2PC
>> wouldn't work in the presence of network partitioning. You can't defy the
>> law of physics.
>>
>> Really the most common and simple way I've seen this working is through
>> staging tables and a final transaction to move data from staging table to
>> final table.
>>
>>
>>
>>
>>
>> On Mon, Sep 10, 2018 at 12:56 PM Jungtaek Lim  wrote:
>>
>>> I guess we all are aware of limitation of contract on DSv2 writer.
>>> Actually it can be achieved only with HDFS sink (or other filesystem based
>>> sinks) and other external storage are normally not feasible to implement it
>>> because there's no way to couple a transaction with multiple clients as
>>> well as coordinator can't take over transactions from writers to do the
>>> final commit.
>>>
>>> XA is also not a trivial one to get it correctly with current execution
>>> model: Spark doesn't require writer tasks to run at the same time but to
>>> achieve 2PC they should run until end of transaction (closing client before
>>> transaction ends normally means aborting transaction). Spark should also
>>> integrate 2PC with its checkpointing mechanism to guarantee completeness of
>>> batch. And it might require different integration for continuous mode.
>>>
>>> Jungtaek Lim (HeartSaVioR)
>>>
>>> 2018년 9월 11일 (화) 오전 4:37, Arun Mahadevan 님이 작성:
>>>
>>>> In some cases the implementations may be ok with eventual consistency
>>>> (and does not care if the output is written out atomically)
>>>>
>>>> XA can be one option for datasources that supports it and requires
>>>> atomicity but I am not sure how would one implement it with the current
>>>> API.
>>>>
>>>> May be we need to discuss improvements at the Datasource V2 API level
>>>> (e.g. individual tasks would "prepare" for commit and once the driver
>>>> receives "prepared" from all the tasks, a "commit" would be invoked at each
>>>> of the individual tasks). Right now the responsibility of the final
>>>> "commit" is with the driver and it may not always be possible for the
>>>> driver to take over the transactions started by the tasks.
>>>>
>>>>
>>>> On Mon, 10 Sep 2018 at 11:48, Dilip Biswal  wrote:
>>>>
>>>>> This is a pretty big challenge in general for data sources -- for the
>>>>> vast majority of data stores, the boundary of a transaction is per client.
>>>>> That is, you can't have two clients doing writes and coordinating a single
>>>>> transaction. That's certainly the case for almost all relational 
>>>>> databases.
>>>>> Spark, on the other hand, will have multiple clients (consider each task a
>>>>> client) writing to the sa

Re: DataSourceWriter V2 Api questions

2018-09-10 Thread Jungtaek Lim
When network partitioning happens it is pretty OK for me to see 2PC not
working, cause we deal with global transaction. Recovery should be hard
thing to get it correctly though. I completely agree it would require
massive changes to Spark.

What I couldn't find for underlying storages is moving data from staging
table to final table in transactional way. I'm not fully sure but as I'm
aware of, many storages would not support moving data, and even HDFS sink
it is not strictly done in transactional way since we move multiple files
with multiple operations. If coordinator just crashes it leaves partial
write, and among writers and coordinator need to deal with ensuring it will
not be going to be duplicated.

Ryan replied me as Iceberg and HBase MVCC timestamps can enable us to
implement "commit" (his reply didn't hit dev. mailing list though) but I'm
not an expert of both twos and I couldn't still imagine it can deal with
various crash cases.

2018년 9월 11일 (화) 오전 5:17, Reynold Xin 님이 작성:

> I don't think two phase commit would work here at all.
>
> 1. It'd require massive changes to Spark.
>
> 2. Unless the underlying data source can provide an API to coordinate
> commits (which few data sources I know provide something like that), 2PC
> wouldn't work in the presence of network partitioning. You can't defy the
> law of physics.
>
> Really the most common and simple way I've seen this working is through
> staging tables and a final transaction to move data from staging table to
> final table.
>
>
>
>
>
> On Mon, Sep 10, 2018 at 12:56 PM Jungtaek Lim  wrote:
>
>> I guess we all are aware of limitation of contract on DSv2 writer.
>> Actually it can be achieved only with HDFS sink (or other filesystem based
>> sinks) and other external storage are normally not feasible to implement it
>> because there's no way to couple a transaction with multiple clients as
>> well as coordinator can't take over transactions from writers to do the
>> final commit.
>>
>> XA is also not a trivial one to get it correctly with current execution
>> model: Spark doesn't require writer tasks to run at the same time but to
>> achieve 2PC they should run until end of transaction (closing client before
>> transaction ends normally means aborting transaction). Spark should also
>> integrate 2PC with its checkpointing mechanism to guarantee completeness of
>> batch. And it might require different integration for continuous mode.
>>
>> Jungtaek Lim (HeartSaVioR)
>>
>> 2018년 9월 11일 (화) 오전 4:37, Arun Mahadevan 님이 작성:
>>
>>> In some cases the implementations may be ok with eventual consistency
>>> (and does not care if the output is written out atomically)
>>>
>>> XA can be one option for datasources that supports it and requires
>>> atomicity but I am not sure how would one implement it with the current
>>> API.
>>>
>>> May be we need to discuss improvements at the Datasource V2 API level
>>> (e.g. individual tasks would "prepare" for commit and once the driver
>>> receives "prepared" from all the tasks, a "commit" would be invoked at each
>>> of the individual tasks). Right now the responsibility of the final
>>> "commit" is with the driver and it may not always be possible for the
>>> driver to take over the transactions started by the tasks.
>>>
>>>
>>> On Mon, 10 Sep 2018 at 11:48, Dilip Biswal  wrote:
>>>
>>>> This is a pretty big challenge in general for data sources -- for the
>>>> vast majority of data stores, the boundary of a transaction is per client.
>>>> That is, you can't have two clients doing writes and coordinating a single
>>>> transaction. That's certainly the case for almost all relational databases.
>>>> Spark, on the other hand, will have multiple clients (consider each task a
>>>> client) writing to the same underlying data store.
>>>>
>>>> DB>> Perhaps we can explore two-phase commit protocol (aka XA) for this
>>>> ? Not sure how easy it is to implement this though :-)
>>>>
>>>> Regards,
>>>> Dilip Biswal
>>>> Tel: 408-463-4980 <(408)%20463-4980>
>>>> dbis...@us.ibm.com
>>>>
>>>>
>>>>
>>>> - Original message -
>>>> From: Reynold Xin 
>>>> To: Ryan Blue 
>>>> Cc: ross.law...@gmail.com, dev 
>>>> Subject: Re: DataSourceWriter V2 Api questions
>>>> Date: Mon, Sep 10, 2018 10:26 AM
>>>>
>>>> I don't think the problem is just whether we hav

Re: DataSourceWriter V2 Api questions

2018-09-10 Thread Dilip Biswal
Thanks a lot Reynold and Jungtaek Lim. It definitely helped me understand this better.
 
Regards,Dilip BiswalTel: 408-463-4980dbis...@us.ibm.com
 
 
- Original message -From: Reynold Xin To: kabh...@gmail.comCc: ar...@apache.org, dbis...@us.ibm.com, dev , Ryan Blue , Ross Lawley Subject: Re: DataSourceWriter V2 Api questionsDate: Mon, Sep 10, 2018 1:18 PM 
I don't think two phase commit would work here at all.
 
1. It'd require massive changes to Spark.
 
2. Unless the underlying data source can provide an API to coordinate commits (which few data sources I know provide something like that), 2PC wouldn't work in the presence of network partitioning. You can't defy the law of physics.
 
Really the most common and simple way I've seen this working is through staging tables and a final transaction to move data from staging table to final table.
 
 
 
  

On Mon, Sep 10, 2018 at 12:56 PM Jungtaek Lim  wrote:
I guess we all are aware of limitation of contract on DSv2 writer. Actually it can be achieved only with HDFS sink (or other filesystem based sinks) and other external storage are normally not feasible to implement it because there's no way to couple a transaction with multiple clients as well as coordinator can't take over transactions from writers to do the final commit.
 
XA is also not a trivial one to get it correctly with current execution model: Spark doesn't require writer tasks to run at the same time but to achieve 2PC they should run until end of transaction (closing client before transaction ends normally means aborting transaction). Spark should also integrate 2PC with its checkpointing mechanism to guarantee completeness of batch. And it might require different integration for continuous mode.
 
Jungtaek Lim (HeartSaVioR) 

2018년 9월 11일 (화) 오전 4:37, Arun Mahadevan 님이 작성:
In some cases the implementations may be ok with eventual consistency (and does not care if the output is written out atomically)
 XA can be one option for datasources that supports it and requires atomicity but I am not sure how would one implement it with the current API. 

 
May be we need to discuss improvements at the Datasource V2 API level (e.g. individual tasks would "prepare" for commit and once the driver receives "prepared" from all the tasks, a "commit" would be invoked at each of the individual tasks). Right now the responsibility of the final "commit" is with the driver and it may not always be possible for the driver to take over the transactions started by the tasks. 
  

On Mon, 10 Sep 2018 at 11:48, Dilip Biswal  wrote:
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
 
DB>> Perhaps we can explore two-phase commit protocol (aka XA) for this ? Not sure how easy it is to implement this though :-)
 
Regards,Dilip BiswalTel: 408-463-4980dbis...@us.ibm.com
 
 
- Original message -From: Reynold Xin To: Ryan Blue Cc: ross.law...@gmail.com, dev Subject: Re: DataSourceWriter V2 Api questionsDate: Mon, Sep 10, 2018 10:26 AM 
I don't think the problem is just whether we have a starting point for write. As a matter of fact there's always a starting point for write, whether it is explicit or implicit.
 
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store. 

On Mon, Sep 10, 2018 at 10:19 AM Ryan Blue  wrote:
Ross, I think the intent is to create a single transaction on the driver, write as part of it in each task, and then commit the transaction once the tasks complete. Is that possible in your implementation?
 
I think that part of this is made more difficult by not having a clear starting point for a write, which we are fixing in the redesign of the v2 API. That will have a method that creates a Write to track the operation. That can create your transaction when it is created and commit the transaction when commit is called on it.
 
rb 

On Mon, Sep 10, 2018 at 9:05 AM Reynold Xin  wrote:
Typically people do it via transactions, or staging tables.
  

On Mon, Sep 10, 2018 at 2:07 AM Ross Lawley  wrote:
Hi all,
 
I've been prototyping an implementation of the DataSource V2 writer for the MongoDB 

Re: DataSourceWriter V2 Api questions

2018-09-10 Thread Reynold Xin
I don't think two phase commit would work here at all.

1. It'd require massive changes to Spark.

2. Unless the underlying data source can provide an API to coordinate
commits (which few data sources I know provide something like that), 2PC
wouldn't work in the presence of network partitioning. You can't defy the
law of physics.

Really the most common and simple way I've seen this working is through
staging tables and a final transaction to move data from staging table to
final table.





On Mon, Sep 10, 2018 at 12:56 PM Jungtaek Lim  wrote:

> I guess we all are aware of limitation of contract on DSv2 writer.
> Actually it can be achieved only with HDFS sink (or other filesystem based
> sinks) and other external storage are normally not feasible to implement it
> because there's no way to couple a transaction with multiple clients as
> well as coordinator can't take over transactions from writers to do the
> final commit.
>
> XA is also not a trivial one to get it correctly with current execution
> model: Spark doesn't require writer tasks to run at the same time but to
> achieve 2PC they should run until end of transaction (closing client before
> transaction ends normally means aborting transaction). Spark should also
> integrate 2PC with its checkpointing mechanism to guarantee completeness of
> batch. And it might require different integration for continuous mode.
>
> Jungtaek Lim (HeartSaVioR)
>
> 2018년 9월 11일 (화) 오전 4:37, Arun Mahadevan 님이 작성:
>
>> In some cases the implementations may be ok with eventual consistency
>> (and does not care if the output is written out atomically)
>>
>> XA can be one option for datasources that supports it and requires
>> atomicity but I am not sure how would one implement it with the current
>> API.
>>
>> May be we need to discuss improvements at the Datasource V2 API level
>> (e.g. individual tasks would "prepare" for commit and once the driver
>> receives "prepared" from all the tasks, a "commit" would be invoked at each
>> of the individual tasks). Right now the responsibility of the final
>> "commit" is with the driver and it may not always be possible for the
>> driver to take over the transactions started by the tasks.
>>
>>
>> On Mon, 10 Sep 2018 at 11:48, Dilip Biswal  wrote:
>>
>>> This is a pretty big challenge in general for data sources -- for the
>>> vast majority of data stores, the boundary of a transaction is per client.
>>> That is, you can't have two clients doing writes and coordinating a single
>>> transaction. That's certainly the case for almost all relational databases.
>>> Spark, on the other hand, will have multiple clients (consider each task a
>>> client) writing to the same underlying data store.
>>>
>>> DB>> Perhaps we can explore two-phase commit protocol (aka XA) for this
>>> ? Not sure how easy it is to implement this though :-)
>>>
>>> Regards,
>>> Dilip Biswal
>>> Tel: 408-463-4980 <(408)%20463-4980>
>>> dbis...@us.ibm.com
>>>
>>>
>>>
>>> - Original message -
>>> From: Reynold Xin 
>>> To: Ryan Blue 
>>> Cc: ross.law...@gmail.com, dev 
>>> Subject: Re: DataSourceWriter V2 Api questions
>>> Date: Mon, Sep 10, 2018 10:26 AM
>>>
>>> I don't think the problem is just whether we have a starting point for
>>> write. As a matter of fact there's always a starting point for write,
>>> whether it is explicit or implicit.
>>>
>>> This is a pretty big challenge in general for data sources -- for the
>>> vast majority of data stores, the boundary of a transaction is per client.
>>> That is, you can't have two clients doing writes and coordinating a single
>>> transaction. That's certainly the case for almost all relational databases.
>>> Spark, on the other hand, will have multiple clients (consider each task a
>>> client) writing to the same underlying data store.
>>>
>>> On Mon, Sep 10, 2018 at 10:19 AM Ryan Blue  wrote:
>>>
>>> Ross, I think the intent is to create a single transaction on the
>>> driver, write as part of it in each task, and then commit the transaction
>>> once the tasks complete. Is that possible in your implementation?
>>>
>>> I think that part of this is made more difficult by not having a clear
>>> starting point for a write, which we are fixing in the redesign of the v2
>>> API. That will have a method that creates a Write to track the operation.
>>> That can create your transaction when it is crea

Re: DataSourceWriter V2 Api questions

2018-09-10 Thread Jungtaek Lim
I guess we all are aware of limitation of contract on DSv2 writer. Actually
it can be achieved only with HDFS sink (or other filesystem based sinks)
and other external storage are normally not feasible to implement it
because there's no way to couple a transaction with multiple clients as
well as coordinator can't take over transactions from writers to do the
final commit.

XA is also not a trivial one to get it correctly with current execution
model: Spark doesn't require writer tasks to run at the same time but to
achieve 2PC they should run until end of transaction (closing client before
transaction ends normally means aborting transaction). Spark should also
integrate 2PC with its checkpointing mechanism to guarantee completeness of
batch. And it might require different integration for continuous mode.

Jungtaek Lim (HeartSaVioR)

2018년 9월 11일 (화) 오전 4:37, Arun Mahadevan 님이 작성:

> In some cases the implementations may be ok with eventual consistency (and
> does not care if the output is written out atomically)
>
> XA can be one option for datasources that supports it and requires
> atomicity but I am not sure how would one implement it with the current
> API.
>
> May be we need to discuss improvements at the Datasource V2 API level
> (e.g. individual tasks would "prepare" for commit and once the driver
> receives "prepared" from all the tasks, a "commit" would be invoked at each
> of the individual tasks). Right now the responsibility of the final
> "commit" is with the driver and it may not always be possible for the
> driver to take over the transactions started by the tasks.
>
>
> On Mon, 10 Sep 2018 at 11:48, Dilip Biswal  wrote:
>
>> This is a pretty big challenge in general for data sources -- for the
>> vast majority of data stores, the boundary of a transaction is per client.
>> That is, you can't have two clients doing writes and coordinating a single
>> transaction. That's certainly the case for almost all relational databases.
>> Spark, on the other hand, will have multiple clients (consider each task a
>> client) writing to the same underlying data store.
>>
>> DB>> Perhaps we can explore two-phase commit protocol (aka XA) for this ?
>> Not sure how easy it is to implement this though :-)
>>
>> Regards,
>> Dilip Biswal
>> Tel: 408-463-4980 <(408)%20463-4980>
>> dbis...@us.ibm.com
>>
>>
>>
>> - Original message -
>> From: Reynold Xin 
>> To: Ryan Blue 
>> Cc: ross.law...@gmail.com, dev 
>> Subject: Re: DataSourceWriter V2 Api questions
>> Date: Mon, Sep 10, 2018 10:26 AM
>>
>> I don't think the problem is just whether we have a starting point for
>> write. As a matter of fact there's always a starting point for write,
>> whether it is explicit or implicit.
>>
>> This is a pretty big challenge in general for data sources -- for the
>> vast majority of data stores, the boundary of a transaction is per client.
>> That is, you can't have two clients doing writes and coordinating a single
>> transaction. That's certainly the case for almost all relational databases.
>> Spark, on the other hand, will have multiple clients (consider each task a
>> client) writing to the same underlying data store.
>>
>> On Mon, Sep 10, 2018 at 10:19 AM Ryan Blue  wrote:
>>
>> Ross, I think the intent is to create a single transaction on the driver,
>> write as part of it in each task, and then commit the transaction once the
>> tasks complete. Is that possible in your implementation?
>>
>> I think that part of this is made more difficult by not having a clear
>> starting point for a write, which we are fixing in the redesign of the v2
>> API. That will have a method that creates a Write to track the operation.
>> That can create your transaction when it is created and commit the
>> transaction when commit is called on it.
>>
>> rb
>>
>> On Mon, Sep 10, 2018 at 9:05 AM Reynold Xin  wrote:
>>
>> Typically people do it via transactions, or staging tables.
>>
>>
>> On Mon, Sep 10, 2018 at 2:07 AM Ross Lawley 
>> wrote:
>>
>> Hi all,
>>
>> I've been prototyping an implementation of the DataSource V2 writer for
>> the MongoDB Spark Connector and I have a couple of questions about how its
>> intended to be used with database systems. According to the Javadoc for
>> DataWriter.commit():
>>
>> *"this method should still "hide" the written data and ask the
>> DataSourceWriter at driver side to do the final commit via
>>

Re: DataSourceWriter V2 Api questions

2018-09-10 Thread Arun Mahadevan
In some cases the implementations may be ok with eventual consistency (and
does not care if the output is written out atomically)

XA can be one option for datasources that supports it and requires
atomicity but I am not sure how would one implement it with the current
API.

May be we need to discuss improvements at the Datasource V2 API level (e.g.
individual tasks would "prepare" for commit and once the driver receives
"prepared" from all the tasks, a "commit" would be invoked at each of the
individual tasks). Right now the responsibility of the final "commit" is
with the driver and it may not always be possible for the driver to take
over the transactions started by the tasks.


On Mon, 10 Sep 2018 at 11:48, Dilip Biswal  wrote:

> This is a pretty big challenge in general for data sources -- for the vast
> majority of data stores, the boundary of a transaction is per client. That
> is, you can't have two clients doing writes and coordinating a single
> transaction. That's certainly the case for almost all relational databases.
> Spark, on the other hand, will have multiple clients (consider each task a
> client) writing to the same underlying data store.
>
> DB>> Perhaps we can explore two-phase commit protocol (aka XA) for this ?
> Not sure how easy it is to implement this though :-)
>
> Regards,
> Dilip Biswal
> Tel: 408-463-4980
> dbis...@us.ibm.com
>
>
>
> - Original message -
> From: Reynold Xin 
> To: Ryan Blue 
> Cc: ross.law...@gmail.com, dev 
> Subject: Re: DataSourceWriter V2 Api questions
> Date: Mon, Sep 10, 2018 10:26 AM
>
> I don't think the problem is just whether we have a starting point for
> write. As a matter of fact there's always a starting point for write,
> whether it is explicit or implicit.
>
> This is a pretty big challenge in general for data sources -- for the vast
> majority of data stores, the boundary of a transaction is per client. That
> is, you can't have two clients doing writes and coordinating a single
> transaction. That's certainly the case for almost all relational databases.
> Spark, on the other hand, will have multiple clients (consider each task a
> client) writing to the same underlying data store.
>
> On Mon, Sep 10, 2018 at 10:19 AM Ryan Blue  wrote:
>
> Ross, I think the intent is to create a single transaction on the driver,
> write as part of it in each task, and then commit the transaction once the
> tasks complete. Is that possible in your implementation?
>
> I think that part of this is made more difficult by not having a clear
> starting point for a write, which we are fixing in the redesign of the v2
> API. That will have a method that creates a Write to track the operation.
> That can create your transaction when it is created and commit the
> transaction when commit is called on it.
>
> rb
>
> On Mon, Sep 10, 2018 at 9:05 AM Reynold Xin  wrote:
>
> Typically people do it via transactions, or staging tables.
>
>
> On Mon, Sep 10, 2018 at 2:07 AM Ross Lawley  wrote:
>
> Hi all,
>
> I've been prototyping an implementation of the DataSource V2 writer for
> the MongoDB Spark Connector and I have a couple of questions about how its
> intended to be used with database systems. According to the Javadoc for
> DataWriter.commit():
>
> *"this method should still "hide" the written data and ask the
> DataSourceWriter at driver side to do the final commit via
> WriterCommitMessage"*
>
> Although, MongoDB now has transactions, it doesn't have a way to "hide"
> the data once it has been written. So as soon as the DataWriter has
> committed the data, it has been inserted/updated in the collection and is
> discoverable - thereby breaking the documented contract.
>
> I was wondering how other databases systems plan to implement this API and
> meet the contract as per the Javadoc?
>
> Many thanks
>
> Ross
>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>
>
>
> - To
> unsubscribe e-mail: dev-unsubscr...@spark.apache.org


Re: DataSourceWriter V2 Api questions

2018-09-10 Thread Dilip Biswal
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store.
 
DB>> Perhaps we can explore two-phase commit protocol (aka XA) for this ? Not sure how easy it is to implement this though :-)
 
Regards,Dilip BiswalTel: 408-463-4980dbis...@us.ibm.com
 
 
- Original message -From: Reynold Xin To: Ryan Blue Cc: ross.law...@gmail.com, dev Subject: Re: DataSourceWriter V2 Api questionsDate: Mon, Sep 10, 2018 10:26 AM 
I don't think the problem is just whether we have a starting point for write. As a matter of fact there's always a starting point for write, whether it is explicit or implicit.
 
This is a pretty big challenge in general for data sources -- for the vast majority of data stores, the boundary of a transaction is per client. That is, you can't have two clients doing writes and coordinating a single transaction. That's certainly the case for almost all relational databases. Spark, on the other hand, will have multiple clients (consider each task a client) writing to the same underlying data store. 

On Mon, Sep 10, 2018 at 10:19 AM Ryan Blue  wrote:
Ross, I think the intent is to create a single transaction on the driver, write as part of it in each task, and then commit the transaction once the tasks complete. Is that possible in your implementation?
 
I think that part of this is made more difficult by not having a clear starting point for a write, which we are fixing in the redesign of the v2 API. That will have a method that creates a Write to track the operation. That can create your transaction when it is created and commit the transaction when commit is called on it.
 
rb 

On Mon, Sep 10, 2018 at 9:05 AM Reynold Xin  wrote:
Typically people do it via transactions, or staging tables.
  

On Mon, Sep 10, 2018 at 2:07 AM Ross Lawley  wrote:
Hi all,
 
I've been prototyping an implementation of the DataSource V2 writer for the MongoDB Spark Connector and I have a couple of questions about how its intended to be used with database systems. According to the Javadoc for DataWriter.commit():
 
"this method should still "hide" the written data and ask the DataSourceWriter at driver side to do the final commit via WriterCommitMessage"
 
Although, MongoDB now has transactions, it doesn't have a way to "hide" the data once it has been written. So as soon as the DataWriter has committed the data, it has been inserted/updated in the collection and is discoverable - thereby breaking the documented contract.
 
I was wondering how other databases systems plan to implement this API and meet the contract as per the Javadoc?
 
Many thanks
 
Ross 

 --

Ryan Blue
Software Engineer
Netflix
 


-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: DataSourceWriter V2 Api questions

2018-09-10 Thread Russell Spitzer
I think I mentioned on the Design Doc that with the Cassandra connector we
have similar issues. There is no "transaction" or "staging table" capable
of really doing that the api requires.

On Mon, Sep 10, 2018 at 12:26 PM Reynold Xin  wrote:

> I don't think the problem is just whether we have a starting point for
> write. As a matter of fact there's always a starting point for write,
> whether it is explicit or implicit.
>
> This is a pretty big challenge in general for data sources -- for the vast
> majority of data stores, the boundary of a transaction is per client. That
> is, you can't have two clients doing writes and coordinating a single
> transaction. That's certainly the case for almost all relational databases.
> Spark, on the other hand, will have multiple clients (consider each task a
> client) writing to the same underlying data store.
>
> On Mon, Sep 10, 2018 at 10:19 AM Ryan Blue  wrote:
>
>> Ross, I think the intent is to create a single transaction on the driver,
>> write as part of it in each task, and then commit the transaction once the
>> tasks complete. Is that possible in your implementation?
>>
>> I think that part of this is made more difficult by not having a clear
>> starting point for a write, which we are fixing in the redesign of the v2
>> API. That will have a method that creates a Write to track the operation.
>> That can create your transaction when it is created and commit the
>> transaction when commit is called on it.
>>
>> rb
>>
>> On Mon, Sep 10, 2018 at 9:05 AM Reynold Xin  wrote:
>>
>>> Typically people do it via transactions, or staging tables.
>>>
>>>
>>> On Mon, Sep 10, 2018 at 2:07 AM Ross Lawley 
>>> wrote:
>>>
 Hi all,

 I've been prototyping an implementation of the DataSource V2 writer for
 the MongoDB Spark Connector and I have a couple of questions about how its
 intended to be used with database systems. According to the Javadoc for
 DataWriter.commit():


 *"this method should still "hide" the written data and ask the
 DataSourceWriter at driver side to do the final commit via
 WriterCommitMessage"*

 Although, MongoDB now has transactions, it doesn't have a way to "hide"
 the data once it has been written. So as soon as the DataWriter has
 committed the data, it has been inserted/updated in the collection and is
 discoverable - thereby breaking the documented contract.

 I was wondering how other databases systems plan to implement this API
 and meet the contract as per the Javadoc?

 Many thanks

 Ross

>>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>


Re: DataSourceWriter V2 Api questions

2018-09-10 Thread Reynold Xin
I don't think the problem is just whether we have a starting point for
write. As a matter of fact there's always a starting point for write,
whether it is explicit or implicit.

This is a pretty big challenge in general for data sources -- for the vast
majority of data stores, the boundary of a transaction is per client. That
is, you can't have two clients doing writes and coordinating a single
transaction. That's certainly the case for almost all relational databases.
Spark, on the other hand, will have multiple clients (consider each task a
client) writing to the same underlying data store.

On Mon, Sep 10, 2018 at 10:19 AM Ryan Blue  wrote:

> Ross, I think the intent is to create a single transaction on the driver,
> write as part of it in each task, and then commit the transaction once the
> tasks complete. Is that possible in your implementation?
>
> I think that part of this is made more difficult by not having a clear
> starting point for a write, which we are fixing in the redesign of the v2
> API. That will have a method that creates a Write to track the operation.
> That can create your transaction when it is created and commit the
> transaction when commit is called on it.
>
> rb
>
> On Mon, Sep 10, 2018 at 9:05 AM Reynold Xin  wrote:
>
>> Typically people do it via transactions, or staging tables.
>>
>>
>> On Mon, Sep 10, 2018 at 2:07 AM Ross Lawley 
>> wrote:
>>
>>> Hi all,
>>>
>>> I've been prototyping an implementation of the DataSource V2 writer for
>>> the MongoDB Spark Connector and I have a couple of questions about how its
>>> intended to be used with database systems. According to the Javadoc for
>>> DataWriter.commit():
>>>
>>>
>>> *"this method should still "hide" the written data and ask the
>>> DataSourceWriter at driver side to do the final commit via
>>> WriterCommitMessage"*
>>>
>>> Although, MongoDB now has transactions, it doesn't have a way to "hide"
>>> the data once it has been written. So as soon as the DataWriter has
>>> committed the data, it has been inserted/updated in the collection and is
>>> discoverable - thereby breaking the documented contract.
>>>
>>> I was wondering how other databases systems plan to implement this API
>>> and meet the contract as per the Javadoc?
>>>
>>> Many thanks
>>>
>>> Ross
>>>
>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Re: DataSourceWriter V2 Api questions

2018-09-10 Thread Ryan Blue
Ross, I think the intent is to create a single transaction on the driver,
write as part of it in each task, and then commit the transaction once the
tasks complete. Is that possible in your implementation?

I think that part of this is made more difficult by not having a clear
starting point for a write, which we are fixing in the redesign of the v2
API. That will have a method that creates a Write to track the operation.
That can create your transaction when it is created and commit the
transaction when commit is called on it.

rb

On Mon, Sep 10, 2018 at 9:05 AM Reynold Xin  wrote:

> Typically people do it via transactions, or staging tables.
>
>
> On Mon, Sep 10, 2018 at 2:07 AM Ross Lawley  wrote:
>
>> Hi all,
>>
>> I've been prototyping an implementation of the DataSource V2 writer for
>> the MongoDB Spark Connector and I have a couple of questions about how its
>> intended to be used with database systems. According to the Javadoc for
>> DataWriter.commit():
>>
>>
>> *"this method should still "hide" the written data and ask the
>> DataSourceWriter at driver side to do the final commit via
>> WriterCommitMessage"*
>>
>> Although, MongoDB now has transactions, it doesn't have a way to "hide"
>> the data once it has been written. So as soon as the DataWriter has
>> committed the data, it has been inserted/updated in the collection and is
>> discoverable - thereby breaking the documented contract.
>>
>> I was wondering how other databases systems plan to implement this API
>> and meet the contract as per the Javadoc?
>>
>> Many thanks
>>
>> Ross
>>
>

-- 
Ryan Blue
Software Engineer
Netflix


Re: DataSourceWriter V2 Api questions

2018-09-10 Thread Reynold Xin
Typically people do it via transactions, or staging tables.


On Mon, Sep 10, 2018 at 2:07 AM Ross Lawley  wrote:

> Hi all,
>
> I've been prototyping an implementation of the DataSource V2 writer for
> the MongoDB Spark Connector and I have a couple of questions about how its
> intended to be used with database systems. According to the Javadoc for
> DataWriter.commit():
>
>
> *"this method should still "hide" the written data and ask the
> DataSourceWriter at driver side to do the final commit via
> WriterCommitMessage"*
>
> Although, MongoDB now has transactions, it doesn't have a way to "hide"
> the data once it has been written. So as soon as the DataWriter has
> committed the data, it has been inserted/updated in the collection and is
> discoverable - thereby breaking the documented contract.
>
> I was wondering how other databases systems plan to implement this API and
> meet the contract as per the Javadoc?
>
> Many thanks
>
> Ross
>


DataSourceWriter V2 Api questions

2018-09-10 Thread Ross Lawley
Hi all,

I've been prototyping an implementation of the DataSource V2 writer for the
MongoDB Spark Connector and I have a couple of questions about how its
intended to be used with database systems. According to the Javadoc for
DataWriter.commit():


*"this method should still "hide" the written data and ask the
DataSourceWriter at driver side to do the final commit via
WriterCommitMessage"*

Although, MongoDB now has transactions, it doesn't have a way to "hide" the
data once it has been written. So as soon as the DataWriter has committed
the data, it has been inserted/updated in the collection and is
discoverable - thereby breaking the documented contract.

I was wondering how other databases systems plan to implement this API and
meet the contract as per the Javadoc?

Many thanks

Ross