Re: Writing partitioned Avro data to HDFS

2015-12-22 Thread Jan Holmberg
Works like a charm. Thanks a lot!

-jan

On 22 Dec 2015, at 23:08, Michael Armbrust 
> wrote:

You need to say .mode("append") if you want to append to existing data.

On Tue, Dec 22, 2015 at 6:48 AM, Yash Sharma 
> wrote:

Well you are right.  Having a quick glance at the source[1] I see that the path 
creation does not consider the partitions.

It tries to create the path before looking for partitions columns.

Not sure what would be the best way to incorporate it. Probably you can file a 
jira and experienced contributors can share their thoughts.

1. 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala

Line- 131

- Thanks, via mobile,  excuse brevity.

On Dec 22, 2015 7:48 PM, "Jan Holmberg" 
> wrote:
In my example directories were distinct.

So If I would like to have to distinct directories ex.

/tmp/data/year=2012
/tmp/data/year=2013

It does not work with

val df = Seq((2012, "Batman")).toDF("year","title")

df.write.partitionBy("year").avro("/tmp/data")

val df2 = Seq((2013, "Batman")).toDF("year","title")

df2.write.partitionBy("year").avro("/tmp/data")


As you can see, it complains about the target directory (/tmp/data) and not 
about the partitioning keys.


org.apache.spark.sql.AnalysisException: path hdfs://nameservice1/tmp/data 
already exists.;
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:76)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)



On 22 Dec 2015, at 15:44, Yash Sharma 
> wrote:


Well this will indeed hit the error if the next run has similar year and months 
and writing would not be possible.

You can try working around by introducing a runCount in partition or in the 
output path.

Something like-

/tmp/data/year/month/01
/tmp/data/year/month/02

Or,
/tmp/data/01/year/month
/tmp/data/02/year/month

This is a work around.

Am sure other better approaches would follow.

- Thanks, via mobile,  excuse brevity.

On Dec 22, 2015 7:01 PM, "Jan Holmberg" 
> wrote:
Hi Yash,

the error is caused by the fact that first run creates the base directory ie. 
"/tmp/data" and the second batch stumbles to the existing base directory. I 
understand that the existing base directory is a challenge but I do not 
understand how to make this work with streaming example where each batch would 
create a new distinct directory.

Granularity has no impact. No matter how data is partitioned, second 'batch' 
always fails with existing base dir.

scala> df2.write.partitionBy("year").avro("/tmp/data")
org.apache.spark.sql.AnalysisException: path hdfs://nameservice1/tmp/data 
already exists.;
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:76)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
at 
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137)
at 
com.databricks.spark.avro.package$AvroDataFrameWriter$$anonfun$avro$1.apply(package.scala:37)
at 
com.databricks.spark.avro.package$AvroDataFrameWriter$$anonfun$avro$1.apply(package.scala:37)
at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:33)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:38)


On 22 Dec 2015, at 14:06, Yash Sharma 
> wrote:


Hi Jan,
Is the error because a past run of the job has already written to the location?

In that case you can add more granularity with 'time' along with year and 
month. That should give 

Re: Writing partitioned Avro data to HDFS

2015-12-22 Thread Michael Armbrust
You need to say .mode("append") if you want to append to existing data.

On Tue, Dec 22, 2015 at 6:48 AM, Yash Sharma  wrote:

> Well you are right.  Having a quick glance at the source[1] I see that the
> path creation does not consider the partitions.
>
> It tries to create the path before looking for partitions columns.
>
> Not sure what would be the best way to incorporate it. Probably you can
> file a jira and experienced contributors can share their thoughts.
>
> 1.
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
>
> Line- 131
>
> - Thanks, via mobile,  excuse brevity.
> On Dec 22, 2015 7:48 PM, "Jan Holmberg"  wrote:
>
>> In my example directories were distinct.
>>
>> So If I would like to have to distinct directories ex.
>>
>> /tmp/data/year=2012
>> /tmp/data/year=2013
>>
>> It does not work with
>> val df = Seq((2012, "Batman")).toDF("year","title")
>>
>> df.write.partitionBy("year").avro("/tmp/data")
>>
>> val df2 = Seq((2013, "Batman")).toDF("year","title")
>>
>> df2.write.partitionBy("year").avro("/tmp/data")
>>
>>
>> As you can see, it complains about the target directory (/tmp/data) and
>> not about the partitioning keys.
>>
>>
>> org.apache.spark.sql.AnalysisException: path hdfs://nameservice1/tmp/data
>> already exists.;
>> at
>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:76)
>> at
>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
>> at
>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
>> at
>> org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
>> at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
>>
>>
>>
>> On 22 Dec 2015, at 15:44, Yash Sharma  wrote:
>>
>> Well this will indeed hit the error if the next run has similar year and
>> months and writing would not be possible.
>>
>> You can try working around by introducing a runCount in partition or in
>> the output path.
>>
>> Something like-
>>
>> /tmp/data/year/month/01
>> /tmp/data/year/month/02
>>
>> Or,
>> /tmp/data/01/year/month
>> /tmp/data/02/year/month
>>
>> This is a work around.
>>
>> Am sure other better approaches would follow.
>>
>> - Thanks, via mobile,  excuse brevity.
>> On Dec 22, 2015 7:01 PM, "Jan Holmberg"  wrote:
>>
>>> Hi Yash,
>>>
>>> the error is caused by the fact that first run creates the base
>>> directory ie. "/tmp/data" and the second batch stumbles to the existing
>>> base directory. I understand that the existing base directory is a
>>> challenge but I do not understand how to make this work with streaming
>>> example where each batch would create a new distinct directory.
>>>
>>> Granularity has no impact. No matter how data is partitioned, second
>>> 'batch' always fails with existing base dir.
>>>
>>> scala> df2.write.partitionBy("year").avro("/tmp/data")
>>> org.apache.spark.sql.AnalysisException: path
>>> hdfs://nameservice1/tmp/data already exists.;
>>> at
>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:76)
>>> at
>>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
>>> at
>>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
>>> at
>>> org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
>>> at
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
>>> at
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
>>> at
>>> org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
>>> at
>>> org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
>>> at
>>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
>>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
>>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137)
>>> at
>>> com.databricks.spark.avro.package$AvroDataFrameWriter$$anonfun$avro$1.apply(package.scala:37)
>>> at
>>> com.databricks.spark.avro.package$AvroDataFrameWriter$$anonfun$avro$1.apply(package.scala:37)
>>> at
>>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:33)
>>> at
>>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:38)
>>>
>>>
>>> On 22 Dec 2015, at 14:06, Yash Sharma  wrote:
>>>
>>> Hi Jan,
>>> Is the error because a past run of the job has already written to the
>>> location?
>>>
>>> In that case you can add more 

Writing partitioned Avro data to HDFS

2015-12-22 Thread Jan Holmberg
Hi,
I'm stuck with writing partitioned data to hdfs. Example below ends up with 
'already exists' -error. 

I'm wondering how to handle streaming use case. 

What is the intended way to write streaming data to hdfs? What am I missing?

cheers,
-jan


import com.databricks.spark.avro._

import org.apache.spark.sql.SQLContext

val sqlContext = new SQLContext(sc)

import sqlContext.implicits._

val df = Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0)).toDF("year", "month", "title", "rating")

df.write.partitionBy("year", "month").avro("/tmp/data")

val df2 = Seq(
(2012, 10, "Batman", 9.8),
(2012, 10, "Hero", 8.7),
(2012, 9, "Robot", 5.5),
(2011, 9, "Git", 2.0)).toDF("year", "month", "title", "rating")

df2.write.partitionBy("year", "month").avro("/tmp/data")
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Writing partitioned Avro data to HDFS

2015-12-22 Thread Yash Sharma
Hi Jan,
Is the error because a past run of the job has already written to the
location?

In that case you can add more granularity with 'time' along with year and
month. That should give you a distinct path for every run.

Let us know if it helps or if i missed anything.

Goodluck

- Thanks, via mobile,  excuse brevity.
On Dec 22, 2015 2:31 PM, "Jan Holmberg"  wrote:

> Hi,
> I'm stuck with writing partitioned data to hdfs. Example below ends up
> with 'already exists' -error.
>
> I'm wondering how to handle streaming use case.
>
> What is the intended way to write streaming data to hdfs? What am I
> missing?
>
> cheers,
> -jan
>
>
> import com.databricks.spark.avro._
>
> import org.apache.spark.sql.SQLContext
>
> val sqlContext = new SQLContext(sc)
>
> import sqlContext.implicits._
>
> val df = Seq(
> (2012, 8, "Batman", 9.8),
> (2012, 8, "Hero", 8.7),
> (2012, 7, "Robot", 5.5),
> (2011, 7, "Git", 2.0)).toDF("year", "month", "title", "rating")
>
> df.write.partitionBy("year", "month").avro("/tmp/data")
>
> val df2 = Seq(
> (2012, 10, "Batman", 9.8),
> (2012, 10, "Hero", 8.7),
> (2012, 9, "Robot", 5.5),
> (2011, 9, "Git", 2.0)).toDF("year", "month", "title", "rating")
>
> df2.write.partitionBy("year", "month").avro("/tmp/data")
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Writing partitioned Avro data to HDFS

2015-12-22 Thread Jan Holmberg
Hi Yash,

the error is caused by the fact that first run creates the base directory ie. 
"/tmp/data" and the second batch stumbles to the existing base directory. I 
understand that the existing base directory is a challenge but I do not 
understand how to make this work with streaming example where each batch would 
create a new distinct directory.

Granularity has no impact. No matter how data is partitioned, second 'batch' 
always fails with existing base dir.

scala> df2.write.partitionBy("year").avro("/tmp/data")
org.apache.spark.sql.AnalysisException: path hdfs://nameservice1/tmp/data 
already exists.;
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:76)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
at 
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137)
at 
com.databricks.spark.avro.package$AvroDataFrameWriter$$anonfun$avro$1.apply(package.scala:37)
at 
com.databricks.spark.avro.package$AvroDataFrameWriter$$anonfun$avro$1.apply(package.scala:37)
at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:33)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:38)


On 22 Dec 2015, at 14:06, Yash Sharma 
> wrote:


Hi Jan,
Is the error because a past run of the job has already written to the location?

In that case you can add more granularity with 'time' along with year and 
month. That should give you a distinct path for every run.

Let us know if it helps or if i missed anything.

Goodluck

- Thanks, via mobile,  excuse brevity.

On Dec 22, 2015 2:31 PM, "Jan Holmberg" 
> wrote:
Hi,
I'm stuck with writing partitioned data to hdfs. Example below ends up with 
'already exists' -error.

I'm wondering how to handle streaming use case.

What is the intended way to write streaming data to hdfs? What am I missing?

cheers,
-jan


import com.databricks.spark.avro._

import org.apache.spark.sql.SQLContext

val sqlContext = new SQLContext(sc)

import sqlContext.implicits._

val df = Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0)).toDF("year", "month", "title", "rating")

df.write.partitionBy("year", "month").avro("/tmp/data")

val df2 = Seq(
(2012, 10, "Batman", 9.8),
(2012, 10, "Hero", 8.7),
(2012, 9, "Robot", 5.5),
(2011, 9, "Git", 2.0)).toDF("year", "month", "title", "rating")

df2.write.partitionBy("year", "month").avro("/tmp/data")
-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org




Re: Writing partitioned Avro data to HDFS

2015-12-22 Thread Yash Sharma
Well this will indeed hit the error if the next run has similar year and
months and writing would not be possible.

You can try working around by introducing a runCount in partition or in the
output path.

Something like-

/tmp/data/year/month/01
/tmp/data/year/month/02

Or,
/tmp/data/01/year/month
/tmp/data/02/year/month

This is a work around.

Am sure other better approaches would follow.

- Thanks, via mobile,  excuse brevity.
On Dec 22, 2015 7:01 PM, "Jan Holmberg"  wrote:

> Hi Yash,
>
> the error is caused by the fact that first run creates the base directory
> ie. "/tmp/data" and the second batch stumbles to the existing base
> directory. I understand that the existing base directory is a challenge but
> I do not understand how to make this work with streaming example where each
> batch would create a new distinct directory.
>
> Granularity has no impact. No matter how data is partitioned, second
> 'batch' always fails with existing base dir.
>
> scala> df2.write.partitionBy("year").avro("/tmp/data")
> org.apache.spark.sql.AnalysisException: path hdfs://nameservice1/tmp/data
> already exists.;
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:76)
> at
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
> at
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
> at
> org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
> at
> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137)
> at
> com.databricks.spark.avro.package$AvroDataFrameWriter$$anonfun$avro$1.apply(package.scala:37)
> at
> com.databricks.spark.avro.package$AvroDataFrameWriter$$anonfun$avro$1.apply(package.scala:37)
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:33)
> at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:38)
>
>
> On 22 Dec 2015, at 14:06, Yash Sharma  wrote:
>
> Hi Jan,
> Is the error because a past run of the job has already written to the
> location?
>
> In that case you can add more granularity with 'time' along with year and
> month. That should give you a distinct path for every run.
>
> Let us know if it helps or if i missed anything.
>
> Goodluck
>
> - Thanks, via mobile,  excuse brevity.
> On Dec 22, 2015 2:31 PM, "Jan Holmberg"  wrote:
>
>> Hi,
>> I'm stuck with writing partitioned data to hdfs. Example below ends up
>> with 'already exists' -error.
>>
>> I'm wondering how to handle streaming use case.
>>
>> What is the intended way to write streaming data to hdfs? What am I
>> missing?
>>
>> cheers,
>> -jan
>>
>>
>> import com.databricks.spark.avro._
>>
>> import org.apache.spark.sql.SQLContext
>>
>> val sqlContext = new SQLContext(sc)
>>
>> import sqlContext.implicits._
>>
>> val df = Seq(
>> (2012, 8, "Batman", 9.8),
>> (2012, 8, "Hero", 8.7),
>> (2012, 7, "Robot", 5.5),
>> (2011, 7, "Git", 2.0)).toDF("year", "month", "title", "rating")
>>
>> df.write.partitionBy("year", "month").avro("/tmp/data")
>>
>> val df2 = Seq(
>> (2012, 10, "Batman", 9.8),
>> (2012, 10, "Hero", 8.7),
>> (2012, 9, "Robot", 5.5),
>> (2011, 9, "Git", 2.0)).toDF("year", "month", "title", "rating")
>>
>> df2.write.partitionBy("year", "month").avro("/tmp/data")
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Writing partitioned Avro data to HDFS

2015-12-22 Thread Yash Sharma
Well you are right.  Having a quick glance at the source[1] I see that the
path creation does not consider the partitions.

It tries to create the path before looking for partitions columns.

Not sure what would be the best way to incorporate it. Probably you can
file a jira and experienced contributors can share their thoughts.

1.
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala

Line- 131

- Thanks, via mobile,  excuse brevity.
On Dec 22, 2015 7:48 PM, "Jan Holmberg"  wrote:

> In my example directories were distinct.
>
> So If I would like to have to distinct directories ex.
>
> /tmp/data/year=2012
> /tmp/data/year=2013
>
> It does not work with
> val df = Seq((2012, "Batman")).toDF("year","title")
>
> df.write.partitionBy("year").avro("/tmp/data")
>
> val df2 = Seq((2013, "Batman")).toDF("year","title")
>
> df2.write.partitionBy("year").avro("/tmp/data")
>
>
> As you can see, it complains about the target directory (/tmp/data) and
> not about the partitioning keys.
>
>
> org.apache.spark.sql.AnalysisException: path hdfs://nameservice1/tmp/data
> already exists.;
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:76)
> at
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
> at
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
> at
> org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
>
>
>
> On 22 Dec 2015, at 15:44, Yash Sharma  wrote:
>
> Well this will indeed hit the error if the next run has similar year and
> months and writing would not be possible.
>
> You can try working around by introducing a runCount in partition or in
> the output path.
>
> Something like-
>
> /tmp/data/year/month/01
> /tmp/data/year/month/02
>
> Or,
> /tmp/data/01/year/month
> /tmp/data/02/year/month
>
> This is a work around.
>
> Am sure other better approaches would follow.
>
> - Thanks, via mobile,  excuse brevity.
> On Dec 22, 2015 7:01 PM, "Jan Holmberg"  wrote:
>
>> Hi Yash,
>>
>> the error is caused by the fact that first run creates the base directory
>> ie. "/tmp/data" and the second batch stumbles to the existing base
>> directory. I understand that the existing base directory is a challenge but
>> I do not understand how to make this work with streaming example where each
>> batch would create a new distinct directory.
>>
>> Granularity has no impact. No matter how data is partitioned, second
>> 'batch' always fails with existing base dir.
>>
>> scala> df2.write.partitionBy("year").avro("/tmp/data")
>> org.apache.spark.sql.AnalysisException: path hdfs://nameservice1/tmp/data
>> already exists.;
>> at
>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:76)
>> at
>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
>> at
>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
>> at
>> org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
>> at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
>> at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
>> at
>> org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
>> at
>> org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
>> at
>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137)
>> at
>> com.databricks.spark.avro.package$AvroDataFrameWriter$$anonfun$avro$1.apply(package.scala:37)
>> at
>> com.databricks.spark.avro.package$AvroDataFrameWriter$$anonfun$avro$1.apply(package.scala:37)
>> at
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:33)
>> at
>> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:38)
>>
>>
>> On 22 Dec 2015, at 14:06, Yash Sharma  wrote:
>>
>> Hi Jan,
>> Is the error because a past run of the job has already written to the
>> location?
>>
>> In that case you can add more granularity with 'time' along with year and
>> month. That should give you a distinct path for every run.
>>
>> Let us know if it helps or if i missed anything.
>>
>> Goodluck
>>
>> - Thanks, via mobile,  excuse brevity.
>> On Dec 22, 2015 2:31 PM, "Jan Holmberg"  

Re: Writing partitioned Avro data to HDFS

2015-12-22 Thread Jan Holmberg
In my example directories were distinct.

So If I would like to have to distinct directories ex.

/tmp/data/year=2012
/tmp/data/year=2013

It does not work with

val df = Seq((2012, "Batman")).toDF("year","title")

df.write.partitionBy("year").avro("/tmp/data")

val df2 = Seq((2013, "Batman")).toDF("year","title")

df2.write.partitionBy("year").avro("/tmp/data")


As you can see, it complains about the target directory (/tmp/data) and not 
about the partitioning keys.


org.apache.spark.sql.AnalysisException: path hdfs://nameservice1/tmp/data 
already exists.;
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:76)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)



On 22 Dec 2015, at 15:44, Yash Sharma 
> wrote:


Well this will indeed hit the error if the next run has similar year and months 
and writing would not be possible.

You can try working around by introducing a runCount in partition or in the 
output path.

Something like-

/tmp/data/year/month/01
/tmp/data/year/month/02

Or,
/tmp/data/01/year/month
/tmp/data/02/year/month

This is a work around.

Am sure other better approaches would follow.

- Thanks, via mobile,  excuse brevity.

On Dec 22, 2015 7:01 PM, "Jan Holmberg" 
> wrote:
Hi Yash,

the error is caused by the fact that first run creates the base directory ie. 
"/tmp/data" and the second batch stumbles to the existing base directory. I 
understand that the existing base directory is a challenge but I do not 
understand how to make this work with streaming example where each batch would 
create a new distinct directory.

Granularity has no impact. No matter how data is partitioned, second 'batch' 
always fails with existing base dir.

scala> df2.write.partitionBy("year").avro("/tmp/data")
org.apache.spark.sql.AnalysisException: path hdfs://nameservice1/tmp/data 
already exists.;
at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:76)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
at 
org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:137)
at 
com.databricks.spark.avro.package$AvroDataFrameWriter$$anonfun$avro$1.apply(package.scala:37)
at 
com.databricks.spark.avro.package$AvroDataFrameWriter$$anonfun$avro$1.apply(package.scala:37)
at 
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:33)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:38)


On 22 Dec 2015, at 14:06, Yash Sharma 
> wrote:


Hi Jan,
Is the error because a past run of the job has already written to the location?

In that case you can add more granularity with 'time' along with year and 
month. That should give you a distinct path for every run.

Let us know if it helps or if i missed anything.

Goodluck

- Thanks, via mobile,  excuse brevity.

On Dec 22, 2015 2:31 PM, "Jan Holmberg" 
> wrote:
Hi,
I'm stuck with writing partitioned data to hdfs. Example below ends up with 
'already exists' -error.

I'm wondering how to handle streaming use case.

What is the intended way to write streaming data to hdfs? What am I missing?

cheers,
-jan


import com.databricks.spark.avro._

import org.apache.spark.sql.SQLContext

val sqlContext = new SQLContext(sc)

import sqlContext.implicits._

val df = Seq(
(2012, 8, "Batman", 9.8),
(2012, 8, "Hero", 8.7),
(2012, 7, "Robot", 5.5),
(2011, 7, "Git", 2.0)).toDF("year", "month", "title", "rating")

df.write.partitionBy("year", "month").avro("/tmp/data")

val df2 = Seq(
(2012, 10, "Batman", 9.8),
(2012, 10, "Hero", 8.7),
(2012, 9, "Robot", 5.5),
(2011, 9, "Git",