Re: Append In-Place to S3

2018-06-03 Thread Tayler Lawrence Jones
Sorry actually my last message is not true for anti join, I was thinking of
semi join.

-TJ

On Sun, Jun 3, 2018 at 14:57 Tayler Lawrence Jones 
wrote:

> A left join with null filter is only the same as a left anti join if the
> join keys can be guaranteed unique in the existing data. Since hive tables
> on s3 offer no unique guarantees outside of your processing code, I
> recommend using left anti join over left join + null filter.
>
> -TJ
>
> On Sun, Jun 3, 2018 at 14:47 ayan guha  wrote:
>
>> I do not use anti join semantics, but you can use left outer join and
>> then filter out nulls from right side. Your data may have dups on the
>> columns separately but it should not have dups on the composite key ie all
>> columns put together.
>>
>> On Mon, 4 Jun 2018 at 6:42 am, Tayler Lawrence Jones <
>> t.jonesd...@gmail.com> wrote:
>>
>>> The issue is not the append vs overwrite - perhaps those responders do
>>> not know Anti join semantics. Further, Overwrite on s3 is a bad pattern due
>>> to s3 eventual consistency issues.
>>>
>>> First, your sql query is wrong as you don’t close the parenthesis of the
>>> CTE (“with” part). In fact, it looks like you don’t need that with at all,
>>> and the query should fail to parse. If that does parse, I would open a bug
>>> on the spark jira.
>>>
>>> Can you provide the query that you are using to detect duplication so I
>>> can see if your deduplication logic matches the detection query?
>>>
>>> -TJ
>>>
>>> On Sat, Jun 2, 2018 at 10:22 Aakash Basu 
>>> wrote:
>>>
>>>> As Jay suggested correctly, if you're joining then overwrite otherwise
>>>> only append as it removes dups.
>>>>
>>>> I think, in this scenario, just change it to write.mode('overwrite')
>>>> because you're already reading the old data and your job would be done.
>>>>
>>>>
>>>> On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim,  wrote:
>>>>
>>>>> Hi Jay,
>>>>>
>>>>> Thanks for your response. Are you saying to append the new data and
>>>>> then remove the duplicates to the whole data set afterwards overwriting 
>>>>> the
>>>>> existing data set with new data set with appended values? I will give that
>>>>> a try.
>>>>>
>>>>> Cheers,
>>>>> Ben
>>>>>
>>>>> On Fri, Jun 1, 2018 at 11:49 PM Jay 
>>>>> wrote:
>>>>>
>>>>>> Benjamin,
>>>>>>
>>>>>> The append will append the "new" data to the existing data with
>>>>>> removing the duplicates. You would need to overwrite the file everytime 
>>>>>> if
>>>>>> you need unique values.
>>>>>>
>>>>>> Thanks,
>>>>>> Jayadeep
>>>>>>
>>>>>> On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim 
>>>>>> wrote:
>>>>>>
>>>>>>> I have a situation where I trying to add only new rows to an
>>>>>>> existing data set that lives in S3 as gzipped parquet files, looping and
>>>>>>> appending for each hour of the day. First, I create a DF from the 
>>>>>>> existing
>>>>>>> data, then I use a query to create another DF with the data that is new.
>>>>>>> Here is the code snippet.
>>>>>>>
>>>>>>> df = spark.read.parquet(existing_data_path)
>>>>>>> df.createOrReplaceTempView(‘existing_data’)
>>>>>>> new_df = spark.read.parquet(new_data_path)
>>>>>>> new_df.createOrReplaceTempView(’new_data’)
>>>>>>> append_df = spark.sql(
>>>>>>> """
>>>>>>> WITH ids AS (
>>>>>>> SELECT DISTINCT
>>>>>>> source,
>>>>>>> source_id,
>>>>>>> target,
>>>>>>> target_id
>>>>>>> FROM new_data i
>>>>>>> LEFT ANTI JOIN existing_data im
>>>>>>> ON i.source = im.source
>>>>>>> AND i.source_id = im.source_id
>>>>>>> AND i.target = im.target
>>>>>>> AND i.target = im.target_id
>>>>>>> """
>>>>>>> )
>>>>>>> append_df.coalesce(1).write.parquet(existing_data_path,
>>>>>>> mode='append', compression='gzip’)
>>>>>>>
>>>>>>>
>>>>>>> I thought this would append new rows and keep the data unique, but I
>>>>>>> am see many duplicates. Can someone help me with this and tell me what 
>>>>>>> I am
>>>>>>> doing wrong?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Ben
>>>>>>>
>>>>>> --
>> Best Regards,
>> Ayan Guha
>>
>


Re: Append In-Place to S3

2018-06-03 Thread Tayler Lawrence Jones
A left join with null filter is only the same as a left anti join if the
join keys can be guaranteed unique in the existing data. Since hive tables
on s3 offer no unique guarantees outside of your processing code, I
recommend using left anti join over left join + null filter.

-TJ

On Sun, Jun 3, 2018 at 14:47 ayan guha  wrote:

> I do not use anti join semantics, but you can use left outer join and then
> filter out nulls from right side. Your data may have dups on the columns
> separately but it should not have dups on the composite key ie all columns
> put together.
>
> On Mon, 4 Jun 2018 at 6:42 am, Tayler Lawrence Jones <
> t.jonesd...@gmail.com> wrote:
>
>> The issue is not the append vs overwrite - perhaps those responders do
>> not know Anti join semantics. Further, Overwrite on s3 is a bad pattern due
>> to s3 eventual consistency issues.
>>
>> First, your sql query is wrong as you don’t close the parenthesis of the
>> CTE (“with” part). In fact, it looks like you don’t need that with at all,
>> and the query should fail to parse. If that does parse, I would open a bug
>> on the spark jira.
>>
>> Can you provide the query that you are using to detect duplication so I
>> can see if your deduplication logic matches the detection query?
>>
>> -TJ
>>
>> On Sat, Jun 2, 2018 at 10:22 Aakash Basu 
>> wrote:
>>
>>> As Jay suggested correctly, if you're joining then overwrite otherwise
>>> only append as it removes dups.
>>>
>>> I think, in this scenario, just change it to write.mode('overwrite')
>>> because you're already reading the old data and your job would be done.
>>>
>>>
>>> On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim,  wrote:
>>>
>>>> Hi Jay,
>>>>
>>>> Thanks for your response. Are you saying to append the new data and
>>>> then remove the duplicates to the whole data set afterwards overwriting the
>>>> existing data set with new data set with appended values? I will give that
>>>> a try.
>>>>
>>>> Cheers,
>>>> Ben
>>>>
>>>> On Fri, Jun 1, 2018 at 11:49 PM Jay 
>>>> wrote:
>>>>
>>>>> Benjamin,
>>>>>
>>>>> The append will append the "new" data to the existing data with
>>>>> removing the duplicates. You would need to overwrite the file everytime if
>>>>> you need unique values.
>>>>>
>>>>> Thanks,
>>>>> Jayadeep
>>>>>
>>>>> On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim 
>>>>> wrote:
>>>>>
>>>>>> I have a situation where I trying to add only new rows to an existing
>>>>>> data set that lives in S3 as gzipped parquet files, looping and appending
>>>>>> for each hour of the day. First, I create a DF from the existing data, 
>>>>>> then
>>>>>> I use a query to create another DF with the data that is new. Here is the
>>>>>> code snippet.
>>>>>>
>>>>>> df = spark.read.parquet(existing_data_path)
>>>>>> df.createOrReplaceTempView(‘existing_data’)
>>>>>> new_df = spark.read.parquet(new_data_path)
>>>>>> new_df.createOrReplaceTempView(’new_data’)
>>>>>> append_df = spark.sql(
>>>>>> """
>>>>>> WITH ids AS (
>>>>>> SELECT DISTINCT
>>>>>> source,
>>>>>> source_id,
>>>>>> target,
>>>>>> target_id
>>>>>> FROM new_data i
>>>>>> LEFT ANTI JOIN existing_data im
>>>>>> ON i.source = im.source
>>>>>> AND i.source_id = im.source_id
>>>>>> AND i.target = im.target
>>>>>> AND i.target = im.target_id
>>>>>> """
>>>>>> )
>>>>>> append_df.coalesce(1).write.parquet(existing_data_path,
>>>>>> mode='append', compression='gzip’)
>>>>>>
>>>>>>
>>>>>> I thought this would append new rows and keep the data unique, but I
>>>>>> am see many duplicates. Can someone help me with this and tell me what I 
>>>>>> am
>>>>>> doing wrong?
>>>>>>
>>>>>> Thanks,
>>>>>> Ben
>>>>>>
>>>>> --
> Best Regards,
> Ayan Guha
>


Re: Append In-Place to S3

2018-06-03 Thread Tayler Lawrence Jones
The issue is not the append vs overwrite - perhaps those responders do not
know Anti join semantics. Further, Overwrite on s3 is a bad pattern due to
s3 eventual consistency issues.

First, your sql query is wrong as you don’t close the parenthesis of the
CTE (“with” part). In fact, it looks like you don’t need that with at all,
and the query should fail to parse. If that does parse, I would open a bug
on the spark jira.

Can you provide the query that you are using to detect duplication so I can
see if your deduplication logic matches the detection query?

-TJ

On Sat, Jun 2, 2018 at 10:22 Aakash Basu  wrote:

> As Jay suggested correctly, if you're joining then overwrite otherwise
> only append as it removes dups.
>
> I think, in this scenario, just change it to write.mode('overwrite')
> because you're already reading the old data and your job would be done.
>
>
> On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim,  wrote:
>
>> Hi Jay,
>>
>> Thanks for your response. Are you saying to append the new data and then
>> remove the duplicates to the whole data set afterwards overwriting the
>> existing data set with new data set with appended values? I will give that
>> a try.
>>
>> Cheers,
>> Ben
>>
>> On Fri, Jun 1, 2018 at 11:49 PM Jay  wrote:
>>
>>> Benjamin,
>>>
>>> The append will append the "new" data to the existing data with removing
>>> the duplicates. You would need to overwrite the file everytime if you need
>>> unique values.
>>>
>>> Thanks,
>>> Jayadeep
>>>
>>> On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim  wrote:
>>>
 I have a situation where I trying to add only new rows to an existing
 data set that lives in S3 as gzipped parquet files, looping and appending
 for each hour of the day. First, I create a DF from the existing data, then
 I use a query to create another DF with the data that is new. Here is the
 code snippet.

 df = spark.read.parquet(existing_data_path)
 df.createOrReplaceTempView(‘existing_data’)
 new_df = spark.read.parquet(new_data_path)
 new_df.createOrReplaceTempView(’new_data’)
 append_df = spark.sql(
 """
 WITH ids AS (
 SELECT DISTINCT
 source,
 source_id,
 target,
 target_id
 FROM new_data i
 LEFT ANTI JOIN existing_data im
 ON i.source = im.source
 AND i.source_id = im.source_id
 AND i.target = im.target
 AND i.target = im.target_id
 """
 )
 append_df.coalesce(1).write.parquet(existing_data_path, mode='append',
 compression='gzip’)


 I thought this would append new rows and keep the data unique, but I am
 see many duplicates. Can someone help me with this and tell me what I am
 doing wrong?

 Thanks,
 Ben

>>>


Re: Writing files to s3 with out temporary directory

2017-11-20 Thread Tayler Lawrence Jones
It is an open issue with Hadoop file committer, not spark. The simple
workaround is to write to hdfs then copy to s3. Netflix did a talk about
their custom output committer at the last spark summit which is a clever
efficient way of doing that - I’d check it out on YouTube. They have open
sourced their implementation, but it does not work (out the box) with
parquet.

-TJ

On Mon, Nov 20, 2017 at 11:48 Jim Carroll  wrote:

> I have this exact issue. I was going to intercept the call in the
> filesystem
> if I had to (since we're using the S3 filesystem from Presto anyway) but if
> there's simply a way to do this correctly I'd much prefer it. This
> basically
> doubles the time to write parquet files to s3.
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>