Re: Append In-Place to S3
Sorry actually my last message is not true for anti join, I was thinking of semi join. -TJ On Sun, Jun 3, 2018 at 14:57 Tayler Lawrence Jones wrote: > A left join with null filter is only the same as a left anti join if the > join keys can be guaranteed unique in the existing data. Since hive tables > on s3 offer no unique guarantees outside of your processing code, I > recommend using left anti join over left join + null filter. > > -TJ > > On Sun, Jun 3, 2018 at 14:47 ayan guha wrote: > >> I do not use anti join semantics, but you can use left outer join and >> then filter out nulls from right side. Your data may have dups on the >> columns separately but it should not have dups on the composite key ie all >> columns put together. >> >> On Mon, 4 Jun 2018 at 6:42 am, Tayler Lawrence Jones < >> t.jonesd...@gmail.com> wrote: >> >>> The issue is not the append vs overwrite - perhaps those responders do >>> not know Anti join semantics. Further, Overwrite on s3 is a bad pattern due >>> to s3 eventual consistency issues. >>> >>> First, your sql query is wrong as you don’t close the parenthesis of the >>> CTE (“with” part). In fact, it looks like you don’t need that with at all, >>> and the query should fail to parse. If that does parse, I would open a bug >>> on the spark jira. >>> >>> Can you provide the query that you are using to detect duplication so I >>> can see if your deduplication logic matches the detection query? >>> >>> -TJ >>> >>> On Sat, Jun 2, 2018 at 10:22 Aakash Basu >>> wrote: >>> >>>> As Jay suggested correctly, if you're joining then overwrite otherwise >>>> only append as it removes dups. >>>> >>>> I think, in this scenario, just change it to write.mode('overwrite') >>>> because you're already reading the old data and your job would be done. >>>> >>>> >>>> On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim, wrote: >>>> >>>>> Hi Jay, >>>>> >>>>> Thanks for your response. Are you saying to append the new data and >>>>> then remove the duplicates to the whole data set afterwards overwriting >>>>> the >>>>> existing data set with new data set with appended values? I will give that >>>>> a try. >>>>> >>>>> Cheers, >>>>> Ben >>>>> >>>>> On Fri, Jun 1, 2018 at 11:49 PM Jay >>>>> wrote: >>>>> >>>>>> Benjamin, >>>>>> >>>>>> The append will append the "new" data to the existing data with >>>>>> removing the duplicates. You would need to overwrite the file everytime >>>>>> if >>>>>> you need unique values. >>>>>> >>>>>> Thanks, >>>>>> Jayadeep >>>>>> >>>>>> On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim >>>>>> wrote: >>>>>> >>>>>>> I have a situation where I trying to add only new rows to an >>>>>>> existing data set that lives in S3 as gzipped parquet files, looping and >>>>>>> appending for each hour of the day. First, I create a DF from the >>>>>>> existing >>>>>>> data, then I use a query to create another DF with the data that is new. >>>>>>> Here is the code snippet. >>>>>>> >>>>>>> df = spark.read.parquet(existing_data_path) >>>>>>> df.createOrReplaceTempView(‘existing_data’) >>>>>>> new_df = spark.read.parquet(new_data_path) >>>>>>> new_df.createOrReplaceTempView(’new_data’) >>>>>>> append_df = spark.sql( >>>>>>> """ >>>>>>> WITH ids AS ( >>>>>>> SELECT DISTINCT >>>>>>> source, >>>>>>> source_id, >>>>>>> target, >>>>>>> target_id >>>>>>> FROM new_data i >>>>>>> LEFT ANTI JOIN existing_data im >>>>>>> ON i.source = im.source >>>>>>> AND i.source_id = im.source_id >>>>>>> AND i.target = im.target >>>>>>> AND i.target = im.target_id >>>>>>> """ >>>>>>> ) >>>>>>> append_df.coalesce(1).write.parquet(existing_data_path, >>>>>>> mode='append', compression='gzip’) >>>>>>> >>>>>>> >>>>>>> I thought this would append new rows and keep the data unique, but I >>>>>>> am see many duplicates. Can someone help me with this and tell me what >>>>>>> I am >>>>>>> doing wrong? >>>>>>> >>>>>>> Thanks, >>>>>>> Ben >>>>>>> >>>>>> -- >> Best Regards, >> Ayan Guha >> >
Re: Append In-Place to S3
A left join with null filter is only the same as a left anti join if the join keys can be guaranteed unique in the existing data. Since hive tables on s3 offer no unique guarantees outside of your processing code, I recommend using left anti join over left join + null filter. -TJ On Sun, Jun 3, 2018 at 14:47 ayan guha wrote: > I do not use anti join semantics, but you can use left outer join and then > filter out nulls from right side. Your data may have dups on the columns > separately but it should not have dups on the composite key ie all columns > put together. > > On Mon, 4 Jun 2018 at 6:42 am, Tayler Lawrence Jones < > t.jonesd...@gmail.com> wrote: > >> The issue is not the append vs overwrite - perhaps those responders do >> not know Anti join semantics. Further, Overwrite on s3 is a bad pattern due >> to s3 eventual consistency issues. >> >> First, your sql query is wrong as you don’t close the parenthesis of the >> CTE (“with” part). In fact, it looks like you don’t need that with at all, >> and the query should fail to parse. If that does parse, I would open a bug >> on the spark jira. >> >> Can you provide the query that you are using to detect duplication so I >> can see if your deduplication logic matches the detection query? >> >> -TJ >> >> On Sat, Jun 2, 2018 at 10:22 Aakash Basu >> wrote: >> >>> As Jay suggested correctly, if you're joining then overwrite otherwise >>> only append as it removes dups. >>> >>> I think, in this scenario, just change it to write.mode('overwrite') >>> because you're already reading the old data and your job would be done. >>> >>> >>> On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim, wrote: >>> >>>> Hi Jay, >>>> >>>> Thanks for your response. Are you saying to append the new data and >>>> then remove the duplicates to the whole data set afterwards overwriting the >>>> existing data set with new data set with appended values? I will give that >>>> a try. >>>> >>>> Cheers, >>>> Ben >>>> >>>> On Fri, Jun 1, 2018 at 11:49 PM Jay >>>> wrote: >>>> >>>>> Benjamin, >>>>> >>>>> The append will append the "new" data to the existing data with >>>>> removing the duplicates. You would need to overwrite the file everytime if >>>>> you need unique values. >>>>> >>>>> Thanks, >>>>> Jayadeep >>>>> >>>>> On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim >>>>> wrote: >>>>> >>>>>> I have a situation where I trying to add only new rows to an existing >>>>>> data set that lives in S3 as gzipped parquet files, looping and appending >>>>>> for each hour of the day. First, I create a DF from the existing data, >>>>>> then >>>>>> I use a query to create another DF with the data that is new. Here is the >>>>>> code snippet. >>>>>> >>>>>> df = spark.read.parquet(existing_data_path) >>>>>> df.createOrReplaceTempView(‘existing_data’) >>>>>> new_df = spark.read.parquet(new_data_path) >>>>>> new_df.createOrReplaceTempView(’new_data’) >>>>>> append_df = spark.sql( >>>>>> """ >>>>>> WITH ids AS ( >>>>>> SELECT DISTINCT >>>>>> source, >>>>>> source_id, >>>>>> target, >>>>>> target_id >>>>>> FROM new_data i >>>>>> LEFT ANTI JOIN existing_data im >>>>>> ON i.source = im.source >>>>>> AND i.source_id = im.source_id >>>>>> AND i.target = im.target >>>>>> AND i.target = im.target_id >>>>>> """ >>>>>> ) >>>>>> append_df.coalesce(1).write.parquet(existing_data_path, >>>>>> mode='append', compression='gzip’) >>>>>> >>>>>> >>>>>> I thought this would append new rows and keep the data unique, but I >>>>>> am see many duplicates. Can someone help me with this and tell me what I >>>>>> am >>>>>> doing wrong? >>>>>> >>>>>> Thanks, >>>>>> Ben >>>>>> >>>>> -- > Best Regards, > Ayan Guha >
Re: Append In-Place to S3
The issue is not the append vs overwrite - perhaps those responders do not know Anti join semantics. Further, Overwrite on s3 is a bad pattern due to s3 eventual consistency issues. First, your sql query is wrong as you don’t close the parenthesis of the CTE (“with” part). In fact, it looks like you don’t need that with at all, and the query should fail to parse. If that does parse, I would open a bug on the spark jira. Can you provide the query that you are using to detect duplication so I can see if your deduplication logic matches the detection query? -TJ On Sat, Jun 2, 2018 at 10:22 Aakash Basu wrote: > As Jay suggested correctly, if you're joining then overwrite otherwise > only append as it removes dups. > > I think, in this scenario, just change it to write.mode('overwrite') > because you're already reading the old data and your job would be done. > > > On Sat 2 Jun, 2018, 10:27 PM Benjamin Kim, wrote: > >> Hi Jay, >> >> Thanks for your response. Are you saying to append the new data and then >> remove the duplicates to the whole data set afterwards overwriting the >> existing data set with new data set with appended values? I will give that >> a try. >> >> Cheers, >> Ben >> >> On Fri, Jun 1, 2018 at 11:49 PM Jay wrote: >> >>> Benjamin, >>> >>> The append will append the "new" data to the existing data with removing >>> the duplicates. You would need to overwrite the file everytime if you need >>> unique values. >>> >>> Thanks, >>> Jayadeep >>> >>> On Fri, Jun 1, 2018 at 9:31 PM Benjamin Kim wrote: >>> I have a situation where I trying to add only new rows to an existing data set that lives in S3 as gzipped parquet files, looping and appending for each hour of the day. First, I create a DF from the existing data, then I use a query to create another DF with the data that is new. Here is the code snippet. df = spark.read.parquet(existing_data_path) df.createOrReplaceTempView(‘existing_data’) new_df = spark.read.parquet(new_data_path) new_df.createOrReplaceTempView(’new_data’) append_df = spark.sql( """ WITH ids AS ( SELECT DISTINCT source, source_id, target, target_id FROM new_data i LEFT ANTI JOIN existing_data im ON i.source = im.source AND i.source_id = im.source_id AND i.target = im.target AND i.target = im.target_id """ ) append_df.coalesce(1).write.parquet(existing_data_path, mode='append', compression='gzip’) I thought this would append new rows and keep the data unique, but I am see many duplicates. Can someone help me with this and tell me what I am doing wrong? Thanks, Ben >>>
Re: Writing files to s3 with out temporary directory
It is an open issue with Hadoop file committer, not spark. The simple workaround is to write to hdfs then copy to s3. Netflix did a talk about their custom output committer at the last spark summit which is a clever efficient way of doing that - I’d check it out on YouTube. They have open sourced their implementation, but it does not work (out the box) with parquet. -TJ On Mon, Nov 20, 2017 at 11:48 Jim Carrollwrote: > I have this exact issue. I was going to intercept the call in the > filesystem > if I had to (since we're using the S3 filesystem from Presto anyway) but if > there's simply a way to do this correctly I'd much prefer it. This > basically > doubles the time to write parquet files to s3. > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >