Re: Saving dataframes with partitionBy: append partitions, overwrite within each

2018-08-02 Thread Nirav Patel
I tried following to explicitly specify partition columns in sql statement
and also tried different cases (upper and lower) fro partition columns.

insert overwrite table $tableName PARTITION(P1, P2) select A, B, C, P1, P2
from updateTable.

Still getting:

Caused by:
org.apache.hadoop.hive.ql.metadata.Table$ValidationFailureSemanticException:
Partition spec {p1=, p2=, P1=1085, P2=164590861} contains non-partition
columns



On Thu, Aug 2, 2018 at 11:37 AM, Nirav Patel  wrote:

> Thanks Koert. I'll check that out when we can update to 2.3
>
> Meanwhile, I am trying hive sql (INSERT OVERWRITE) statement to insert
> overwrite multiple partitions. (without loosing existing ones)
>
> It's giving me issues around partition columns.
>
> dataFrame.createOrReplaceTempView("updateTable") //here dataframe
> contains values from multiple partitions.
>
> dataFrame also have partition columns but I can't get any of following to
> execute:
>
> insert overwrite table $tableName PARTITION(P1, P2) select * from
> updateTable.
>
> org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.
> metadata.Table.ValidationFailureSemanticException: Partition spec {p1=,
> p2=, P1=__HIVE_DEFAULT_PARTITION__, P2=1} contains non-partition columns;
>
>
> Is above a right approach to update multiple partitions? Or should I be
> more specific updating each partition with separate command like following:
>
> //Pseudo code; yet to try
>
> df.createOrReplaceTempView("updateTable")
> df.rdd.groupBy(P1, P2).map { (key, Iterable[Row]) =>
>
>
>   spark.sql("INSERT OVERWRITE TABLE stats
>   PARTITION(P1 = key._1, P2 = key._2)
>   SELECT * from updateTable where P1 = key._1 and P2 = key._2")
> }
>
> Regards,
> Nirav
>
>
> On Wed, Aug 1, 2018 at 4:18 PM, Koert Kuipers  wrote:
>
>> this works for dataframes with spark 2.3 by changing a global setting,
>> and will be configurable per write in 2.4
>> see:
>> https://issues.apache.org/jira/browse/SPARK-20236
>> https://issues.apache.org/jira/browse/SPARK-24860
>>
>> On Wed, Aug 1, 2018 at 3:11 PM, Nirav Patel 
>> wrote:
>>
>>> Hi Peay,
>>>
>>> Have you find better solution yet? I am having same issue.
>>>
>>> Following says it works with spark 2.1 onward but only when you use
>>> sqlContext and not Dataframe
>>> https://medium.com/@anuvrat/writing-into-dynamic-partitions-
>>> using-spark-2e2b818a007a
>>>
>>> Thanks,
>>> Nirav
>>>
>>> On Mon, Oct 2, 2017 at 4:37 AM, Pavel Knoblokh 
>>> wrote:
>>>
 If your processing task inherently processes input data by month you
 may want to "manually" partition the output data by month as well as
 by day, that is to save it with a file name including the given month,
 i.e. "dataset.parquet/month=01". Then you will be able to use the
 overwrite mode with each month partition. Hope this could be of some
 help.

 --
 Pavel Knoblokh

 On Fri, Sep 29, 2017 at 5:31 PM, peay  wrote:
 > Hello,
 >
 > I am trying to use
 > data_frame.write.partitionBy("day").save("dataset.parquet") to write
 a
 > dataset while splitting by day.
 >
 > I would like to run a Spark job  to process, e.g., a month:
 > dataset.parquet/day=2017-01-01/...
 > ...
 >
 > and then run another Spark job to add another month using the same
 folder
 > structure, getting me
 > dataset.parquet/day=2017-01-01/
 > ...
 > dataset.parquet/day=2017-02-01/
 > ...
 >
 > However:
 > - with save mode "overwrite", when I process the second month, all of
 > dataset.parquet/ gets removed and I lose whatever was already
 computed for
 > the previous month.
 > - with save mode "append", then I can't get idempotence: if I run the
 job to
 > process a given month twice, I'll get duplicate data in all the
 subfolders
 > for that month.
 >
 > Is there a way to do "append in terms of the subfolders from
 partitionBy,
 > but overwrite within each such partitions? Any help would be
 appreciated.
 >
 > Thanks!



 --
 Pavel Knoblokh

 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org


>>>
>>>
>>>
>>> [image: What's New with Xactly] 
>>>
>>> 
>>> 
>>>    
>>> 
>>
>>
>>
>

-- 


 

 
   
   
      



Re: Saving dataframes with partitionBy: append partitions, overwrite within each

2018-08-02 Thread Nirav Patel
Thanks Koert. I'll check that out when we can update to 2.3

Meanwhile, I am trying hive sql (INSERT OVERWRITE) statement to insert
overwrite multiple partitions. (without loosing existing ones)

It's giving me issues around partition columns.

dataFrame.createOrReplaceTempView("updateTable") //here dataframe
contains values from multiple partitions.

dataFrame also have partition columns but I can't get any of following to
execute:

insert overwrite table $tableName PARTITION(P1, P2) select * from
updateTable.

org.apache.spark.sql.AnalysisException:
org.apache.hadoop.hive.ql.metadata.Table.ValidationFailureSemanticException:
Partition spec {p1=, p2=, P1=__HIVE_DEFAULT_PARTITION__, P2=1} contains
non-partition columns;


Is above a right approach to update multiple partitions? Or should I be
more specific updating each partition with separate command like following:

//Pseudo code; yet to try

df.createOrReplaceTempView("updateTable")
df.rdd.groupBy(P1, P2).map { (key, Iterable[Row]) =>


  spark.sql("INSERT OVERWRITE TABLE stats
  PARTITION(P1 = key._1, P2 = key._2)
  SELECT * from updateTable where P1 = key._1 and P2 = key._2")
}

Regards,
Nirav


On Wed, Aug 1, 2018 at 4:18 PM, Koert Kuipers  wrote:

> this works for dataframes with spark 2.3 by changing a global setting, and
> will be configurable per write in 2.4
> see:
> https://issues.apache.org/jira/browse/SPARK-20236
> https://issues.apache.org/jira/browse/SPARK-24860
>
> On Wed, Aug 1, 2018 at 3:11 PM, Nirav Patel  wrote:
>
>> Hi Peay,
>>
>> Have you find better solution yet? I am having same issue.
>>
>> Following says it works with spark 2.1 onward but only when you use
>> sqlContext and not Dataframe
>> https://medium.com/@anuvrat/writing-into-dynamic-partitions-
>> using-spark-2e2b818a007a
>>
>> Thanks,
>> Nirav
>>
>> On Mon, Oct 2, 2017 at 4:37 AM, Pavel Knoblokh 
>> wrote:
>>
>>> If your processing task inherently processes input data by month you
>>> may want to "manually" partition the output data by month as well as
>>> by day, that is to save it with a file name including the given month,
>>> i.e. "dataset.parquet/month=01". Then you will be able to use the
>>> overwrite mode with each month partition. Hope this could be of some
>>> help.
>>>
>>> --
>>> Pavel Knoblokh
>>>
>>> On Fri, Sep 29, 2017 at 5:31 PM, peay  wrote:
>>> > Hello,
>>> >
>>> > I am trying to use
>>> > data_frame.write.partitionBy("day").save("dataset.parquet") to write a
>>> > dataset while splitting by day.
>>> >
>>> > I would like to run a Spark job  to process, e.g., a month:
>>> > dataset.parquet/day=2017-01-01/...
>>> > ...
>>> >
>>> > and then run another Spark job to add another month using the same
>>> folder
>>> > structure, getting me
>>> > dataset.parquet/day=2017-01-01/
>>> > ...
>>> > dataset.parquet/day=2017-02-01/
>>> > ...
>>> >
>>> > However:
>>> > - with save mode "overwrite", when I process the second month, all of
>>> > dataset.parquet/ gets removed and I lose whatever was already computed
>>> for
>>> > the previous month.
>>> > - with save mode "append", then I can't get idempotence: if I run the
>>> job to
>>> > process a given month twice, I'll get duplicate data in all the
>>> subfolders
>>> > for that month.
>>> >
>>> > Is there a way to do "append in terms of the subfolders from
>>> partitionBy,
>>> > but overwrite within each such partitions? Any help would be
>>> appreciated.
>>> >
>>> > Thanks!
>>>
>>>
>>>
>>> --
>>> Pavel Knoblokh
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>>
>>
>> [image: What's New with Xactly] 
>>
>> 
>> 
>>    
>> 
>
>
>

-- 


 

 
   
   
      



Re: Saving dataframes with partitionBy: append partitions, overwrite within each

2018-08-01 Thread Koert Kuipers
this works for dataframes with spark 2.3 by changing a global setting, and
will be configurable per write in 2.4
see:
https://issues.apache.org/jira/browse/SPARK-20236
https://issues.apache.org/jira/browse/SPARK-24860

On Wed, Aug 1, 2018 at 3:11 PM, Nirav Patel  wrote:

> Hi Peay,
>
> Have you find better solution yet? I am having same issue.
>
> Following says it works with spark 2.1 onward but only when you use
> sqlContext and not Dataframe
> https://medium.com/@anuvrat/writing-into-dynamic-partitions-using-spark-
> 2e2b818a007a
>
> Thanks,
> Nirav
>
> On Mon, Oct 2, 2017 at 4:37 AM, Pavel Knoblokh  wrote:
>
>> If your processing task inherently processes input data by month you
>> may want to "manually" partition the output data by month as well as
>> by day, that is to save it with a file name including the given month,
>> i.e. "dataset.parquet/month=01". Then you will be able to use the
>> overwrite mode with each month partition. Hope this could be of some
>> help.
>>
>> --
>> Pavel Knoblokh
>>
>> On Fri, Sep 29, 2017 at 5:31 PM, peay  wrote:
>> > Hello,
>> >
>> > I am trying to use
>> > data_frame.write.partitionBy("day").save("dataset.parquet") to write a
>> > dataset while splitting by day.
>> >
>> > I would like to run a Spark job  to process, e.g., a month:
>> > dataset.parquet/day=2017-01-01/...
>> > ...
>> >
>> > and then run another Spark job to add another month using the same
>> folder
>> > structure, getting me
>> > dataset.parquet/day=2017-01-01/
>> > ...
>> > dataset.parquet/day=2017-02-01/
>> > ...
>> >
>> > However:
>> > - with save mode "overwrite", when I process the second month, all of
>> > dataset.parquet/ gets removed and I lose whatever was already computed
>> for
>> > the previous month.
>> > - with save mode "append", then I can't get idempotence: if I run the
>> job to
>> > process a given month twice, I'll get duplicate data in all the
>> subfolders
>> > for that month.
>> >
>> > Is there a way to do "append in terms of the subfolders from
>> partitionBy,
>> > but overwrite within each such partitions? Any help would be
>> appreciated.
>> >
>> > Thanks!
>>
>>
>>
>> --
>> Pavel Knoblokh
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
>
> [image: What's New with Xactly] 
>
> 
> 
>    
> 


Re: Saving dataframes with partitionBy: append partitions, overwrite within each

2018-08-01 Thread Nirav Patel
Hi Peay,

Have you find better solution yet? I am having same issue.

Following says it works with spark 2.1 onward but only when you use
sqlContext and not Dataframe
https://medium.com/@anuvrat/writing-into-dynamic-partitions-using-spark-2e2b818a007a

Thanks,
Nirav

On Mon, Oct 2, 2017 at 4:37 AM, Pavel Knoblokh  wrote:

> If your processing task inherently processes input data by month you
> may want to "manually" partition the output data by month as well as
> by day, that is to save it with a file name including the given month,
> i.e. "dataset.parquet/month=01". Then you will be able to use the
> overwrite mode with each month partition. Hope this could be of some
> help.
>
> --
> Pavel Knoblokh
>
> On Fri, Sep 29, 2017 at 5:31 PM, peay  wrote:
> > Hello,
> >
> > I am trying to use
> > data_frame.write.partitionBy("day").save("dataset.parquet") to write a
> > dataset while splitting by day.
> >
> > I would like to run a Spark job  to process, e.g., a month:
> > dataset.parquet/day=2017-01-01/...
> > ...
> >
> > and then run another Spark job to add another month using the same folder
> > structure, getting me
> > dataset.parquet/day=2017-01-01/
> > ...
> > dataset.parquet/day=2017-02-01/
> > ...
> >
> > However:
> > - with save mode "overwrite", when I process the second month, all of
> > dataset.parquet/ gets removed and I lose whatever was already computed
> for
> > the previous month.
> > - with save mode "append", then I can't get idempotence: if I run the
> job to
> > process a given month twice, I'll get duplicate data in all the
> subfolders
> > for that month.
> >
> > Is there a way to do "append in terms of the subfolders from partitionBy,
> > but overwrite within each such partitions? Any help would be appreciated.
> >
> > Thanks!
>
>
>
> --
> Pavel Knoblokh
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 


 

 
   
   
      



Re: Saving dataframes with partitionBy: append partitions, overwrite within each

2017-10-02 Thread Pavel Knoblokh
If your processing task inherently processes input data by month you
may want to "manually" partition the output data by month as well as
by day, that is to save it with a file name including the given month,
i.e. "dataset.parquet/month=01". Then you will be able to use the
overwrite mode with each month partition. Hope this could be of some
help.

-- 
Pavel Knoblokh

On Fri, Sep 29, 2017 at 5:31 PM, peay  wrote:
> Hello,
>
> I am trying to use
> data_frame.write.partitionBy("day").save("dataset.parquet") to write a
> dataset while splitting by day.
>
> I would like to run a Spark job  to process, e.g., a month:
> dataset.parquet/day=2017-01-01/...
> ...
>
> and then run another Spark job to add another month using the same folder
> structure, getting me
> dataset.parquet/day=2017-01-01/
> ...
> dataset.parquet/day=2017-02-01/
> ...
>
> However:
> - with save mode "overwrite", when I process the second month, all of
> dataset.parquet/ gets removed and I lose whatever was already computed for
> the previous month.
> - with save mode "append", then I can't get idempotence: if I run the job to
> process a given month twice, I'll get duplicate data in all the subfolders
> for that month.
>
> Is there a way to do "append in terms of the subfolders from partitionBy,
> but overwrite within each such partitions? Any help would be appreciated.
>
> Thanks!



-- 
Pavel Knoblokh

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Saving dataframes with partitionBy: append partitions, overwrite within each

2017-09-29 Thread Vadim Semenov
As alternative: checkpoint the dataframe, collect days, and then delete
corresponding directories using hadoop FileUtils, then write the dataframe

On Fri, Sep 29, 2017 at 10:31 AM, peay  wrote:

> Hello,
>
> I am trying to use data_frame.write.partitionBy("day").save("dataset.parquet")
> to write a dataset while splitting by day.
>
> I would like to run a Spark job  to process, e.g., a month:
> dataset.parquet/day=2017-01-01/...
> ...
>
> and then run another Spark job to add another month using the same folder
> structure, getting me
> dataset.parquet/day=2017-01-01/
> ...
> dataset.parquet/day=2017-02-01/ 
> ...
>
> However:
> - with save mode "overwrite", when I process the second month, all of
> dataset.parquet/ gets removed and I lose whatever was already computed for
> the previous month.
> - with save mode "append", then I can't get idempotence: if I run the job
> to process a given month twice, I'll get duplicate data in all the
> subfolders for that month.
>
> Is there a way to do "append in terms of the subfolders from partitionBy,
> but overwrite within each such partitions? Any help would be appreciated.
>
> Thanks!
>