Re: saveAsTable with path not working as expected (pyspark + Scala)
We can set a path, refer to the unit tests. For example: df.saveAsTable(savedJsonTable, org.apache.spark.sql.json, append, path =tmpPath) https://github.com/apache/spark/blob/master/python/pyspark/sql/tests.py Investigating some more, I found that the table is being created at the specified location, but the error is still being thrown, and the table has not been stored. This is the code that I ran: a = [Row(key=k, value=str(k)) for k in range(100)] df = sc.parallelize(a).toDF() df.saveAsTable(savedJsonTable, org.apache.spark.sql.json, append, path=/tmp/test10) 15/03/27 10:45:13 ERROR RetryingHMSHandler: MetaException(message:file:/user/hive/warehouse/savedjsontable is not a directory or unable to create one) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294) ... sqlCtx.tables() DataFrame[tableName: string, isTemporary: boolean] exit() ~ cat /tmp/test10/part-0 {key:0,value:0} {key:1,value:1} {key:2,value:2} {key:3,value:3} {key:4,value:4} {key:5,value:5} Kind Regards, Tom On 27 March 2015 at 10:33, Yanbo Liang yblia...@gmail.com wrote: saveAsTable will use the default data source configured by spark.sql.sources.default. def saveAsTable(tableName: String): Unit = { saveAsTable(tableName, SaveMode.ErrorIfExists) } It can not set path if I understand correct. 2015-03-27 15:45 GMT+08:00 Tom Walwyn twal...@gmail.com: Hi, The behaviour is the same for me in Scala and Python, so posting here in Python. When I use DataFrame.saveAsTable with the path option, I expect an external Hive table to be created at the specified path. Specifically, when I call: df.saveAsTable(..., path=/tmp/test) I expect an external Hive table to be created pointing to /tmp/test which would contain the data in df. However, running locally on my Mac, I get an error indicating that Spark tried to create a managed table in the location of the Hive warehouse: ERROR RetryingHMSHandler: MetaException(message:file:/user/hive/warehouse/savetable is not a directory or unable to create one) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294) Am I wrong to expect that Spark create an external table in this case? What is the expected behaviour of saveAsTable with the path option? Setup: running spark locally with spark 1.3.0. Kind Regards, Tom
saveAsTable with path not working as expected (pyspark + Scala)
Hi, The behaviour is the same for me in Scala and Python, so posting here in Python. When I use DataFrame.saveAsTable with the path option, I expect an external Hive table to be created at the specified path. Specifically, when I call: df.saveAsTable(..., path=/tmp/test) I expect an external Hive table to be created pointing to /tmp/test which would contain the data in df. However, running locally on my Mac, I get an error indicating that Spark tried to create a managed table in the location of the Hive warehouse: ERROR RetryingHMSHandler: MetaException(message:file:/user/hive/warehouse/savetable is not a directory or unable to create one) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294) Am I wrong to expect that Spark create an external table in this case? What is the expected behaviour of saveAsTable with the path option? Setup: running spark locally with spark 1.3.0. Kind Regards, Tom
Re: saveAsTable with path not working as expected (pyspark + Scala)
Another follow-up: saveAsTable works as expected when running on hadoop cluster with Hive installed. It's just locally that I'm getting this strange behaviour. Any ideas why this is happening? Kind Regards. Tom On 27 March 2015 at 11:29, Tom Walwyn twal...@gmail.com wrote: We can set a path, refer to the unit tests. For example: df.saveAsTable(savedJsonTable, org.apache.spark.sql.json, append, path=tmpPath) https://github.com/apache/spark/blob/master/python/pyspark/sql/tests.py Investigating some more, I found that the table is being created at the specified location, but the error is still being thrown, and the table has not been stored. This is the code that I ran: a = [Row(key=k, value=str(k)) for k in range(100)] df = sc.parallelize(a).toDF() df.saveAsTable(savedJsonTable, org.apache.spark.sql.json, append, path=/tmp/test10) 15/03/27 10:45:13 ERROR RetryingHMSHandler: MetaException(message:file:/user/hive/warehouse/savedjsontable is not a directory or unable to create one) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294) ... sqlCtx.tables() DataFrame[tableName: string, isTemporary: boolean] exit() ~ cat /tmp/test10/part-0 {key:0,value:0} {key:1,value:1} {key:2,value:2} {key:3,value:3} {key:4,value:4} {key:5,value:5} Kind Regards, Tom On 27 March 2015 at 10:33, Yanbo Liang yblia...@gmail.com wrote: saveAsTable will use the default data source configured by spark.sql.sources.default. def saveAsTable(tableName: String): Unit = { saveAsTable(tableName, SaveMode.ErrorIfExists) } It can not set path if I understand correct. 2015-03-27 15:45 GMT+08:00 Tom Walwyn twal...@gmail.com: Hi, The behaviour is the same for me in Scala and Python, so posting here in Python. When I use DataFrame.saveAsTable with the path option, I expect an external Hive table to be created at the specified path. Specifically, when I call: df.saveAsTable(..., path=/tmp/test) I expect an external Hive table to be created pointing to /tmp/test which would contain the data in df. However, running locally on my Mac, I get an error indicating that Spark tried to create a managed table in the location of the Hive warehouse: ERROR RetryingHMSHandler: MetaException(message:file:/user/hive/warehouse/savetable is not a directory or unable to create one) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1239) at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1294) Am I wrong to expect that Spark create an external table in this case? What is the expected behaviour of saveAsTable with the path option? Setup: running spark locally with spark 1.3.0. Kind Regards, Tom
Re: OutOfMemory and GC limits (TODO) Error in map after self-join
Thanks Imran, I'll try your suggestions. I eventually got this to run by 'checkpointing' the joined RDD (according to Akhil's suggestion) before performing the reduceBy, and then checkpointing it again afterward. i.e. val rdd2 = rdd.join(rdd, numPartitions=1000) .map(fp=((fp._2._1, fp._2._2), 1)) .persist(MEMORY_AND_DISK_SER) val rdd3 = rdd2.reduceByKey((x,y)=x+y).persist(MEMORY_AND_DISK_SER) rdd3.count() It takes a while, but at least it runs. So, I'll be sure to try your suggestions for further speed-up. Thanks again for your help. On 18 February 2015 at 18:47, Imran Rashid iras...@cloudera.com wrote: Hi Tom, there are a couple of things you can do here to make this more efficient. first, I think you can replace your self-join with a groupByKey. on your example data set, this would give you (1, Iterable(2,3)) (4, Iterable(3)) this reduces the amount of data that needs to be shuffled, and that way you can produce all of your pairs just from the Iterable(2,3). second, if you expect the same pairs to appear many times in your dataset, you might first want to replace them with a count. eg., if you start with (1,2) (1,2) (1,2) ... (1,2) (1,3) (1,3) (4,3) ... you might want to first convert that to get a count of each pair val pairCounts = rdd.map{x = (x,1)}.reduceByKey{_ + _} to give you something like: ((1,2), 145) ((1,3), 2) ((4,3), 982) ... and then with a little more massaging you can group by key and also keep the counts of each item: val groupedCounts: RDD[(Int, Iterable[(Int,Int)])] = pairCounts.map{case((key, value), counts) = key - (value,counts) }.groupByKey which would give you something like (1, Iterable((2,145), (3, 2)) (4, Iterable((3, 982)) hope this helps Imran On Wed, Feb 18, 2015 at 1:43 AM, Tom Walwyn twal...@gmail.com wrote: Thanks for the reply, I'll try your suggestions. Apologies, in my previous post I was mistaken. rdd is actually an PairRDD of (Int, Int). I'm doing the self-join so I can count two things. First, I can count the number of times a value appears in the data set. Second I can count number of times values occur with the same key. For example, if I have the following: (1,2) (1,3) (4,3) Then joining with itself I get: (1,(2,2)) - map - ((2,2),1) - reduceByKey - ((2,2),1) (1,(2,3)) - map - ((2,3),1) - reduceByKey - ((2,3),1) (1,(3,2)) - map - ((3,2),1) - reduceByKey - ((3,2),1) (1,(3,3)) - map - ((3,3),1) - reduceByKey - ((3,3),2) (4,(3,3)) - map - ((3,3),1) _| Note that I want to keep the duplicates (2,2) and reflections. Rgds On 18 February 2015 at 09:00, Akhil Das ak...@sigmoidanalytics.com wrote: Why are you joining the rdd with itself? You can try these things: - Change the StorageLevel of both rdds to MEMORY_AND_DISK_2 or MEMORY_AND_DISK_SER, so that it doesnt need to keep everything up in memory. - Set your default Serializer to Kryo (.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)) - Enable rdd compression (.set(spark.rdd.compress,true)) Thanks Best Regards On Wed, Feb 18, 2015 at 12:21 PM, Tom Walwyn twal...@gmail.com wrote: Hi All, I'm a new Spark (and Hadoop) user and I want to find out if the cluster resources I am using are feasible for my use-case. The following is a snippet of code that is causing a OOM exception in the executor after about 125/1000 tasks during the map stage. val rdd2 = rdd.join(rdd, numPartitions=1000) .map(fp=((fp._2._1, fp._2._2), 1)) .reduceByKey((x,y)=x+y) rdd2.count() Which errors with a stack trace like: 15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in stage 2.0 (TID 498) java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168) at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.immutable.List.foreach(List.scala:318) rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of co-occuring values by key in the dataset, i.e. 'These two numbers occurred with the same key n times'. I intentionally don't want to filter out duplicates and reflections. rdd is about 3.6 million records, which has a size in memory of about 120MB, and results in a 'joined' RDD (before the reduceByKey stage) of around 460 million records, with a size in memory of about 35GB. My cluster setup is as follows. I have 3 nodes, where each node has 2 cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and executors are allowed 1280m each and the job has 5 executors and 1 driver. Additionally, I have set spark.storage.memoryFraction to 0.06, and spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate
Re: OutOfMemory and GC limits (TODO) Error in map after self-join
Thanks for the reply, I'll try your suggestions. Apologies, in my previous post I was mistaken. rdd is actually an PairRDD of (Int, Int). I'm doing the self-join so I can count two things. First, I can count the number of times a value appears in the data set. Second I can count number of times values occur with the same key. For example, if I have the following: (1,2) (1,3) (4,3) Then joining with itself I get: (1,(2,2)) - map - ((2,2),1) - reduceByKey - ((2,2),1) (1,(2,3)) - map - ((2,3),1) - reduceByKey - ((2,3),1) (1,(3,2)) - map - ((3,2),1) - reduceByKey - ((3,2),1) (1,(3,3)) - map - ((3,3),1) - reduceByKey - ((3,3),2) (4,(3,3)) - map - ((3,3),1) _| Note that I want to keep the duplicates (2,2) and reflections. Rgds On 18 February 2015 at 09:00, Akhil Das ak...@sigmoidanalytics.com wrote: Why are you joining the rdd with itself? You can try these things: - Change the StorageLevel of both rdds to MEMORY_AND_DISK_2 or MEMORY_AND_DISK_SER, so that it doesnt need to keep everything up in memory. - Set your default Serializer to Kryo (.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)) - Enable rdd compression (.set(spark.rdd.compress,true)) Thanks Best Regards On Wed, Feb 18, 2015 at 12:21 PM, Tom Walwyn twal...@gmail.com wrote: Hi All, I'm a new Spark (and Hadoop) user and I want to find out if the cluster resources I am using are feasible for my use-case. The following is a snippet of code that is causing a OOM exception in the executor after about 125/1000 tasks during the map stage. val rdd2 = rdd.join(rdd, numPartitions=1000) .map(fp=((fp._2._1, fp._2._2), 1)) .reduceByKey((x,y)=x+y) rdd2.count() Which errors with a stack trace like: 15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in stage 2.0 (TID 498) java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168) at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.immutable.List.foreach(List.scala:318) rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of co-occuring values by key in the dataset, i.e. 'These two numbers occurred with the same key n times'. I intentionally don't want to filter out duplicates and reflections. rdd is about 3.6 million records, which has a size in memory of about 120MB, and results in a 'joined' RDD (before the reduceByKey stage) of around 460 million records, with a size in memory of about 35GB. My cluster setup is as follows. I have 3 nodes, where each node has 2 cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and executors are allowed 1280m each and the job has 5 executors and 1 driver. Additionally, I have set spark.storage.memoryFraction to 0.06, and spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate the issue. I've also tried increasing the number of partitions after the join dramatically (up to 15000). Nothing has been effective. Thus, I'm beginning to suspect I don't have enough resources for the job. Does anyone have a feeling about what the resource requirements would be for a use-case like this? I could scale the cluster up if necessary, but would like to avoid it. I'm willing to accept longer computation times if that is an option. Warm Regards, Thomas
OutOfMemory and GC limits (TODO) Error in map after self-join
Hi All, I'm a new Spark (and Hadoop) user and I want to find out if the cluster resources I am using are feasible for my use-case. The following is a snippet of code that is causing a OOM exception in the executor after about 125/1000 tasks during the map stage. val rdd2 = rdd.join(rdd, numPartitions=1000) .map(fp=((fp._2._1, fp._2._2), 1)) .reduceByKey((x,y)=x+y) rdd2.count() Which errors with a stack trace like: 15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in stage 2.0 (TID 498) java.lang.OutOfMemoryError: GC overhead limit exceeded at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168) at scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) at scala.collection.immutable.List.foreach(List.scala:318) rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of co-occuring values by key in the dataset, i.e. 'These two numbers occurred with the same key n times'. I intentionally don't want to filter out duplicates and reflections. rdd is about 3.6 million records, which has a size in memory of about 120MB, and results in a 'joined' RDD (before the reduceByKey stage) of around 460 million records, with a size in memory of about 35GB. My cluster setup is as follows. I have 3 nodes, where each node has 2 cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and executors are allowed 1280m each and the job has 5 executors and 1 driver. Additionally, I have set spark.storage.memoryFraction to 0.06, and spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate the issue. I've also tried increasing the number of partitions after the join dramatically (up to 15000). Nothing has been effective. Thus, I'm beginning to suspect I don't have enough resources for the job. Does anyone have a feeling about what the resource requirements would be for a use-case like this? I could scale the cluster up if necessary, but would like to avoid it. I'm willing to accept longer computation times if that is an option. Warm Regards, Thomas