Re: saveAsTable with path not working as expected (pyspark + Scala)

2015-03-27 Thread Tom Walwyn
We can set a path, refer to the unit tests. For example:
df.saveAsTable(savedJsonTable, org.apache.spark.sql.json, append, path
=tmpPath)
https://github.com/apache/spark/blob/master/python/pyspark/sql/tests.py

Investigating some more, I found that the table is being created at the
specified location, but the error is still being thrown, and the table has
not been stored. This is the code that I ran:

 a = [Row(key=k, value=str(k)) for k in range(100)]
 df =  sc.parallelize(a).toDF()
 df.saveAsTable(savedJsonTable, org.apache.spark.sql.json, append,
path=/tmp/test10)
15/03/27 10:45:13 ERROR RetryingHMSHandler:
MetaException(message:file:/user/hive/warehouse/savedjsontable is not a
directory or unable to create one)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294)
...
 sqlCtx.tables()
DataFrame[tableName: string, isTemporary: boolean]
 exit()
~ cat /tmp/test10/part-0
{key:0,value:0}
{key:1,value:1}
{key:2,value:2}
{key:3,value:3}
{key:4,value:4}
{key:5,value:5}

Kind Regards,
Tom







On 27 March 2015 at 10:33, Yanbo Liang yblia...@gmail.com wrote:

 saveAsTable will use the default data source configured by
 spark.sql.sources.default.

 def saveAsTable(tableName: String): Unit = {
 saveAsTable(tableName, SaveMode.ErrorIfExists)
   }

 It can not set path if I understand correct.

 2015-03-27 15:45 GMT+08:00 Tom Walwyn twal...@gmail.com:

 Hi,

 The behaviour is the same for me in Scala and Python, so posting here in
 Python. When I use DataFrame.saveAsTable with the path option, I expect an
 external Hive table to be created at the specified path. Specifically, when
 I call:

   df.saveAsTable(..., path=/tmp/test)

 I expect an external Hive table to be created pointing to /tmp/test which
 would contain the data in df.

 However, running locally on my Mac, I get an error indicating that Spark
 tried to create a managed table in the location of the Hive warehouse:

 ERROR RetryingHMSHandler:
 MetaException(message:file:/user/hive/warehouse/savetable is not a
 directory or unable to create one)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294)

 Am I wrong to expect that Spark create an external table in this case?
 What is the expected behaviour of saveAsTable with the path option?

 Setup: running spark locally with spark 1.3.0.

 Kind Regards,
 Tom





saveAsTable with path not working as expected (pyspark + Scala)

2015-03-27 Thread Tom Walwyn
Hi,

The behaviour is the same for me in Scala and Python, so posting here in
Python. When I use DataFrame.saveAsTable with the path option, I expect an
external Hive table to be created at the specified path. Specifically, when
I call:

  df.saveAsTable(..., path=/tmp/test)

I expect an external Hive table to be created pointing to /tmp/test which
would contain the data in df.

However, running locally on my Mac, I get an error indicating that Spark
tried to create a managed table in the location of the Hive warehouse:

ERROR RetryingHMSHandler:
MetaException(message:file:/user/hive/warehouse/savetable is not a
directory or unable to create one)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294)

Am I wrong to expect that Spark create an external table in this case? What
is the expected behaviour of saveAsTable with the path option?

Setup: running spark locally with spark 1.3.0.

Kind Regards,
Tom


Re: saveAsTable with path not working as expected (pyspark + Scala)

2015-03-27 Thread Tom Walwyn
Another follow-up: saveAsTable works as expected when running on hadoop
cluster with Hive installed. It's just locally that I'm getting this
strange behaviour. Any ideas why this is happening?

Kind Regards.
Tom

On 27 March 2015 at 11:29, Tom Walwyn twal...@gmail.com wrote:

 We can set a path, refer to the unit tests. For example:
 df.saveAsTable(savedJsonTable, org.apache.spark.sql.json, append,
 path=tmpPath)
 https://github.com/apache/spark/blob/master/python/pyspark/sql/tests.py

 Investigating some more, I found that the table is being created at the
 specified location, but the error is still being thrown, and the table has
 not been stored. This is the code that I ran:

  a = [Row(key=k, value=str(k)) for k in range(100)]
  df =  sc.parallelize(a).toDF()
  df.saveAsTable(savedJsonTable, org.apache.spark.sql.json,
 append, path=/tmp/test10)
 15/03/27 10:45:13 ERROR RetryingHMSHandler:
 MetaException(message:file:/user/hive/warehouse/savedjsontable is not a
 directory or unable to create one)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294)
 ...
  sqlCtx.tables()
 DataFrame[tableName: string, isTemporary: boolean]
  exit()
 ~ cat /tmp/test10/part-0
 {key:0,value:0}
 {key:1,value:1}
 {key:2,value:2}
 {key:3,value:3}
 {key:4,value:4}
 {key:5,value:5}

 Kind Regards,
 Tom







 On 27 March 2015 at 10:33, Yanbo Liang yblia...@gmail.com wrote:

 saveAsTable will use the default data source configured by
 spark.sql.sources.default.

 def saveAsTable(tableName: String): Unit = {
 saveAsTable(tableName, SaveMode.ErrorIfExists)
   }

 It can not set path if I understand correct.

 2015-03-27 15:45 GMT+08:00 Tom Walwyn twal...@gmail.com:

 Hi,

 The behaviour is the same for me in Scala and Python, so posting here in
 Python. When I use DataFrame.saveAsTable with the path option, I expect an
 external Hive table to be created at the specified path. Specifically, when
 I call:

   df.saveAsTable(..., path=/tmp/test)

 I expect an external Hive table to be created pointing to /tmp/test
 which would contain the data in df.

 However, running locally on my Mac, I get an error indicating that Spark
 tried to create a managed table in the location of the Hive warehouse:

 ERROR RetryingHMSHandler:
 MetaException(message:file:/user/hive/warehouse/savetable is not a
 directory or unable to create one)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239)
 at
 org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294)

 Am I wrong to expect that Spark create an external table in this case?
 What is the expected behaviour of saveAsTable with the path option?

 Setup: running spark locally with spark 1.3.0.

 Kind Regards,
 Tom






Re: OutOfMemory and GC limits (TODO) Error in map after self-join

2015-02-18 Thread Tom Walwyn
Thanks Imran, I'll try your suggestions.

I eventually got this to run by 'checkpointing' the joined RDD (according
to Akhil's suggestion) before performing the reduceBy, and then
checkpointing it again afterward. i.e.

 val rdd2 = rdd.join(rdd, numPartitions=1000)
  .map(fp=((fp._2._1, fp._2._2), 1))
  .persist(MEMORY_AND_DISK_SER)
 val rdd3 = rdd2.reduceByKey((x,y)=x+y).persist(MEMORY_AND_DISK_SER)
 rdd3.count()

It takes a while, but at least it runs. So, I'll be sure to try your
suggestions for further speed-up.

Thanks again for your help.


On 18 February 2015 at 18:47, Imran Rashid iras...@cloudera.com wrote:

 Hi Tom,

 there are a couple of things you can do here to make this more efficient.
  first, I think you can replace your self-join with a groupByKey. on your
 example data set, this would give you

 (1, Iterable(2,3))
 (4, Iterable(3))

 this reduces the amount of data that needs to be shuffled, and that way
 you can produce all of your pairs just from the Iterable(2,3).

 second, if you expect the same pairs to appear many times in your dataset,
 you might first want to replace them with a count.  eg., if you start with

 (1,2)
 (1,2)
 (1,2)
 ...
 (1,2)
 (1,3)
 (1,3)
 (4,3)
 ...

 you might want to first convert that to get a count of each pair

 val pairCounts = rdd.map{x = (x,1)}.reduceByKey{_ + _}

 to give you something like:

 ((1,2), 145)
 ((1,3), 2)
 ((4,3), 982)
 ...

 and then with a little more massaging you can group by key and also keep
 the counts of each item:

 val groupedCounts: RDD[(Int, Iterable[(Int,Int)])] =
 pairCounts.map{case((key, value), counts) =
   key - (value,counts)
 }.groupByKey

 which would give you something like

 (1, Iterable((2,145), (3, 2))
 (4, Iterable((3, 982))


 hope this helps
 Imran

 On Wed, Feb 18, 2015 at 1:43 AM, Tom Walwyn twal...@gmail.com wrote:

 Thanks for the reply, I'll try your suggestions.

 Apologies, in my previous post I was mistaken. rdd is actually an PairRDD
 of (Int, Int). I'm doing the self-join so I can count two things. First, I
 can count the number of times a value appears in the data set. Second I can
 count number of times values occur with the same key. For example, if I
 have the following:

 (1,2)
 (1,3)
 (4,3)

 Then joining with itself I get:

 (1,(2,2)) - map - ((2,2),1) - reduceByKey - ((2,2),1)
 (1,(2,3)) - map - ((2,3),1) - reduceByKey - ((2,3),1)
 (1,(3,2)) - map - ((3,2),1) - reduceByKey - ((3,2),1)
 (1,(3,3)) - map - ((3,3),1) - reduceByKey - ((3,3),2)
 (4,(3,3)) - map - ((3,3),1) _|

 Note that I want to keep the duplicates (2,2) and reflections.

 Rgds

 On 18 February 2015 at 09:00, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 Why are you joining the rdd with itself?

 You can try these things:

 - Change the StorageLevel of both rdds to MEMORY_AND_DISK_2 or
 MEMORY_AND_DISK_SER, so that it doesnt need to keep everything up in memory.

 - Set your default Serializer to Kryo (.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer))

 - Enable rdd compression (.set(spark.rdd.compress,true))


 Thanks
 Best Regards

 On Wed, Feb 18, 2015 at 12:21 PM, Tom Walwyn twal...@gmail.com wrote:

 Hi All,

 I'm a new Spark (and Hadoop) user and I want to find out if the cluster
 resources I am using are feasible for my use-case. The following is a
 snippet of code that is causing a OOM exception in the executor after about
 125/1000 tasks during the map stage.

  val rdd2 = rdd.join(rdd, numPartitions=1000)
  .map(fp=((fp._2._1, fp._2._2), 1))
  .reduceByKey((x,y)=x+y)
  rdd2.count()

 Which errors with a stack trace like:

  15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in
 stage 2.0 (TID 498)
  java.lang.OutOfMemoryError: GC overhead limit exceeded
  at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
  at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
  at
 scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
  at
 scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
  at scala.collection.immutable.List.foreach(List.scala:318)

 rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of
 co-occuring values by key in the dataset, i.e. 'These two numbers occurred
 with the same key n times'. I intentionally don't want to filter out
 duplicates and reflections. rdd is about 3.6 million records, which has a
 size in memory of about 120MB, and results in a 'joined' RDD (before the
 reduceByKey stage) of around 460 million records, with a size in memory of
 about 35GB.

 My cluster setup is as follows. I have 3 nodes, where each node has 2
 cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and
 executors are allowed 1280m each and the job has 5 executors and 1 driver.
 Additionally, I have set spark.storage.memoryFraction to 0.06, and
 spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate

Re: OutOfMemory and GC limits (TODO) Error in map after self-join

2015-02-17 Thread Tom Walwyn
Thanks for the reply, I'll try your suggestions.

Apologies, in my previous post I was mistaken. rdd is actually an PairRDD
of (Int, Int). I'm doing the self-join so I can count two things. First, I
can count the number of times a value appears in the data set. Second I can
count number of times values occur with the same key. For example, if I
have the following:

(1,2)
(1,3)
(4,3)

Then joining with itself I get:

(1,(2,2)) - map - ((2,2),1) - reduceByKey - ((2,2),1)
(1,(2,3)) - map - ((2,3),1) - reduceByKey - ((2,3),1)
(1,(3,2)) - map - ((3,2),1) - reduceByKey - ((3,2),1)
(1,(3,3)) - map - ((3,3),1) - reduceByKey - ((3,3),2)
(4,(3,3)) - map - ((3,3),1) _|

Note that I want to keep the duplicates (2,2) and reflections.

Rgds

On 18 February 2015 at 09:00, Akhil Das ak...@sigmoidanalytics.com wrote:

 Why are you joining the rdd with itself?

 You can try these things:

 - Change the StorageLevel of both rdds to MEMORY_AND_DISK_2 or
 MEMORY_AND_DISK_SER, so that it doesnt need to keep everything up in memory.

 - Set your default Serializer to Kryo (.set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer))

 - Enable rdd compression (.set(spark.rdd.compress,true))


 Thanks
 Best Regards

 On Wed, Feb 18, 2015 at 12:21 PM, Tom Walwyn twal...@gmail.com wrote:

 Hi All,

 I'm a new Spark (and Hadoop) user and I want to find out if the cluster
 resources I am using are feasible for my use-case. The following is a
 snippet of code that is causing a OOM exception in the executor after about
 125/1000 tasks during the map stage.

  val rdd2 = rdd.join(rdd, numPartitions=1000)
  .map(fp=((fp._2._1, fp._2._2), 1))
  .reduceByKey((x,y)=x+y)
  rdd2.count()

 Which errors with a stack trace like:

  15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in
 stage 2.0 (TID 498)
  java.lang.OutOfMemoryError: GC overhead limit exceeded
  at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
  at
 scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
  at
 scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
  at
 scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
  at scala.collection.immutable.List.foreach(List.scala:318)

 rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of
 co-occuring values by key in the dataset, i.e. 'These two numbers occurred
 with the same key n times'. I intentionally don't want to filter out
 duplicates and reflections. rdd is about 3.6 million records, which has a
 size in memory of about 120MB, and results in a 'joined' RDD (before the
 reduceByKey stage) of around 460 million records, with a size in memory of
 about 35GB.

 My cluster setup is as follows. I have 3 nodes, where each node has 2
 cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and
 executors are allowed 1280m each and the job has 5 executors and 1 driver.
 Additionally, I have set spark.storage.memoryFraction to 0.06, and
 spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate
 the issue. I've also tried increasing the number of partitions after the
 join dramatically (up to 15000). Nothing has been effective. Thus, I'm
 beginning to suspect I don't have enough resources for the job.

 Does anyone have a feeling about what the resource requirements would be
 for a use-case like this? I could scale the cluster up if necessary, but
 would like to avoid it. I'm willing to accept longer computation times if
 that is an option.

 Warm Regards,
 Thomas





OutOfMemory and GC limits (TODO) Error in map after self-join

2015-02-17 Thread Tom Walwyn
Hi All,

I'm a new Spark (and Hadoop) user and I want to find out if the cluster
resources I am using are feasible for my use-case. The following is a
snippet of code that is causing a OOM exception in the executor after about
125/1000 tasks during the map stage.

 val rdd2 = rdd.join(rdd, numPartitions=1000)
 .map(fp=((fp._2._1, fp._2._2), 1))
 .reduceByKey((x,y)=x+y)
 rdd2.count()

Which errors with a stack trace like:

 15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in
stage 2.0 (TID 498)
 java.lang.OutOfMemoryError: GC overhead limit exceeded
 at
scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
 at
scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
 at
scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
 at
scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
 at scala.collection.immutable.List.foreach(List.scala:318)

rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of
co-occuring values by key in the dataset, i.e. 'These two numbers occurred
with the same key n times'. I intentionally don't want to filter out
duplicates and reflections. rdd is about 3.6 million records, which has a
size in memory of about 120MB, and results in a 'joined' RDD (before the
reduceByKey stage) of around 460 million records, with a size in memory of
about 35GB.

My cluster setup is as follows. I have 3 nodes, where each node has 2 cores
and about 7.5GB of memory. I'm running Spark on YARN. The driver and
executors are allowed 1280m each and the job has 5 executors and 1 driver.
Additionally, I have set spark.storage.memoryFraction to 0.06, and
spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate
the issue. I've also tried increasing the number of partitions after the
join dramatically (up to 15000). Nothing has been effective. Thus, I'm
beginning to suspect I don't have enough resources for the job.

Does anyone have a feeling about what the resource requirements would be
for a use-case like this? I could scale the cluster up if necessary, but
would like to avoid it. I'm willing to accept longer computation times if
that is an option.

Warm Regards,
Thomas