[jira] [Comment Edited] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
[ https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417301#comment-15417301 ] Roi Reshef edited comment on SPARK-17020 at 8/11/16 2:09 PM: - Nevertheless, any attempt to repartition the resulting RDD also end with having (almost) all of its partitions stay on the same node. I made it transform into a ShuffledRDD via PairRDDFunctions, set a HashPartitioner with 140 partitions using *.partitionBy*, and yet, I got the same data-distribution as in the screenshot I attached. So I guess there's something very wrong with referring to a *DataFrame.rdd* without materializing it beforehand. What and why is beyond my understanding, currently. was (Author: roireshef): Nevertheless, any attempt to repartition the resulting RDD also end with having (almost) all of its partitions stay on the same node. I made it transform into a ShuffledRDD via PairRDDFunctions, set a HashPartitioner with 140 partitions *.partitionBy*, and yet, I got the same data-distribution as in the screenshot I attached. So I guess there's something very wrong with referring to a *DataFrame.rdd* without materializing it beforehand. What and why is beyond my understanding, currently. > Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data > -- > > Key: SPARK-17020 > URL: https://issues.apache.org/jira/browse/SPARK-17020 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.1, 1.6.2, 2.0.0 >Reporter: Roi Reshef > Attachments: dataframe_cache.PNG, rdd_cache.PNG > > > Calling DataFrame's lazy val .rdd results with a new RDD with a poor > distribution of partitions across the cluster. Moreover, any attempt to > repartition this RDD further will fail. > Attached are a screenshot of the original DataFrame on cache and the > resulting RDD on cache. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
[ https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417301#comment-15417301 ] Roi Reshef edited comment on SPARK-17020 at 8/11/16 2:09 PM: - Nevertheless, any attempt to repartition the resulting RDD also end with having (almost) all of its partitions stay on the same node. I made it transform into a ShuffledRDD via PairRDDFunctions, set a HashPartitioner with 140 partitions *.partitionBy*, and yet, I got the same data-distribution as in the screenshot I attached. So I guess there's something very wrong with referring to a *DataFrame.rdd* without materializing it beforehand. What and why is beyond my understanding, currently. was (Author: roireshef): Nevertheless, any attempt to repartition the resulting RDD also end with having (almost) all of its partitions stay on the same node. I made it transform into a ShuffledRDD via PairRDDFunctions, set a HashPartitioner with 140 partitions, and yet, I got the same data-distribution as in the screenshot I attached. So I guess there's something very wrong with referring to a *DataFrame.rdd* without materializing it beforehand. What and why is beyond my understanding, currently. > Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data > -- > > Key: SPARK-17020 > URL: https://issues.apache.org/jira/browse/SPARK-17020 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.1, 1.6.2, 2.0.0 >Reporter: Roi Reshef > Attachments: dataframe_cache.PNG, rdd_cache.PNG > > > Calling DataFrame's lazy val .rdd results with a new RDD with a poor > distribution of partitions across the cluster. Moreover, any attempt to > repartition this RDD further will fail. > Attached are a screenshot of the original DataFrame on cache and the > resulting RDD on cache. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
[ https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417301#comment-15417301 ] Roi Reshef commented on SPARK-17020: Nevertheless, any attempt to repartition the resulting RDD also end with having (almost) all of its partitions stay on the same node. I made it transform into a ShuffledRDD via PairRDDFunctions, set a HashPartitioner with 140 partitions, and yet, I got the same data-distribution as in the screenshot I attached. So I guess there's something very wrong with referring to a *DataFrame.rdd* without materializing it beforehand. What and why is beyond my understanding, currently. > Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data > -- > > Key: SPARK-17020 > URL: https://issues.apache.org/jira/browse/SPARK-17020 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.1, 1.6.2, 2.0.0 >Reporter: Roi Reshef > Attachments: dataframe_cache.PNG, rdd_cache.PNG > > > Calling DataFrame's lazy val .rdd results with a new RDD with a poor > distribution of partitions across the cluster. Moreover, any attempt to > repartition this RDD further will fail. > Attached are a screenshot of the original DataFrame on cache and the > resulting RDD on cache. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
[ https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417288#comment-15417288 ] Roi Reshef commented on SPARK-17020: The problem occurs only when calling **.rdd** on an *not-previously-cached* DataFrame. **data** is a DataFrame, so in the last code you have it cached whereas in the one before it wasn't, but rather only the RDD that was extracted from it. > Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data > -- > > Key: SPARK-17020 > URL: https://issues.apache.org/jira/browse/SPARK-17020 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.1, 1.6.2, 2.0.0 >Reporter: Roi Reshef > Attachments: dataframe_cache.PNG, rdd_cache.PNG > > > Calling DataFrame's lazy val .rdd results with a new RDD with a poor > distribution of partitions across the cluster. Moreover, any attempt to > repartition this RDD further will fail. > Attached are a screenshot of the original DataFrame on cache and the > resulting RDD on cache. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
[ https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417254#comment-15417254 ] Roi Reshef commented on SPARK-17020: Also note that I have just called: *data.cache().count()* val rdd = data.rdd.setName("rdd").cache() rdd.count and the rdd was distributed far better (similar to "data" DataFrame) I'm not sure it solves the issue with the rdd that ignores repartitioning methods further down the road. I'll have to check that > Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data > -- > > Key: SPARK-17020 > URL: https://issues.apache.org/jira/browse/SPARK-17020 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.1, 1.6.2, 2.0.0 >Reporter: Roi Reshef > Attachments: dataframe_cache.PNG, rdd_cache.PNG > > > Calling DataFrame's lazy val .rdd results with a new RDD with a poor > distribution of partitions across the cluster. Moreover, any attempt to > repartition this RDD further will fail. > Attached are a screenshot of the original DataFrame on cache and the > resulting RDD on cache. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
[ https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417250#comment-15417250 ] Roi Reshef commented on SPARK-17020: val ab = SomeReader.read(...) //some reader function that uses spark-csv with inferSchema=true filter(!isnull($"name")). alias("revab") val meta = SomeReader.read(...) //same but different schema and data val udaf = ... //some UserDefinedAggregateFunction val features = ab.groupBy(...).agg(udaf(...)) val data = features. join(meta, $"meta.id" === $"features.id"). select(...) //only relevant fields val rdd = data.rdd.setName("rdd").cache() rdd.count > Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data > -- > > Key: SPARK-17020 > URL: https://issues.apache.org/jira/browse/SPARK-17020 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.1, 1.6.2, 2.0.0 >Reporter: Roi Reshef > Attachments: dataframe_cache.PNG, rdd_cache.PNG > > > Calling DataFrame's lazy val .rdd results with a new RDD with a poor > distribution of partitions across the cluster. Moreover, any attempt to > repartition this RDD further will fail. > Attached are a screenshot of the original DataFrame on cache and the > resulting RDD on cache. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
[ https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417218#comment-15417218 ] Roi Reshef commented on SPARK-17020: [~srowen] Should there be any effect on this if I cached and materialized the DF befor I call on .rdd? > Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data > -- > > Key: SPARK-17020 > URL: https://issues.apache.org/jira/browse/SPARK-17020 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.1, 1.6.2, 2.0.0 >Reporter: Roi Reshef >Priority: Critical > Attachments: dataframe_cache.PNG, rdd_cache.PNG > > > Calling DataFrame's lazy val .rdd results with a new RDD with a poor > distribution of partitions across the cluster. Moreover, any attempt to > repartition this RDD further will fail. > Attached are a screenshot of the original DataFrame on cache and the > resulting RDD on cache. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
[ https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417204#comment-15417204 ] Roi Reshef edited comment on SPARK-17020 at 8/11/16 1:13 PM: - [~srowen] I have 2 DataFrames that are generated from spark-csv reader. Then I pass them through several transformations, and join them together. After that I call either .rdd or .flatMap to get an RDD out of the joint DataFrame. Throughout the whole process I've monitored the distribution of the DataFrames. It is good until the point where ".rdd" is called was (Author: roireshef): [~srowen] I have 2 DataFrames that are generated from spark-csv reader. Then I pass them through several transformations, and join them together. After that I call either .rdd or .flatMap to get an RDD out of the joint DataFrame. Throughout all the process I've monitored the distribution of the DataFrames. It is good until the point where ".rdd" is called > Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data > -- > > Key: SPARK-17020 > URL: https://issues.apache.org/jira/browse/SPARK-17020 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.1, 1.6.2, 2.0.0 >Reporter: Roi Reshef >Priority: Critical > Attachments: dataframe_cache.PNG, rdd_cache.PNG > > > Calling DataFrame's lazy val .rdd results with a new RDD with a poor > distribution of partitions across the cluster. Moreover, any attempt to > repartition this RDD further will fail. > Attached are a screenshot of the original DataFrame on cache and the > resulting RDD on cache. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
[ https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417204#comment-15417204 ] Roi Reshef commented on SPARK-17020: [~srowen] I have 2 DataFrames that are generated from spark-csv reader. Then I pass them through several transformations, and join them together. After that I call either .rdd or .flatMap to get an RDD out of the joint DataFrame. Throughout all the process I've monitored the distribution of the DataFrames. It is good until the point where ".rdd" is called > Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data > -- > > Key: SPARK-17020 > URL: https://issues.apache.org/jira/browse/SPARK-17020 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.1, 1.6.2, 2.0.0 >Reporter: Roi Reshef >Priority: Critical > Attachments: dataframe_cache.PNG, rdd_cache.PNG > > > Calling DataFrame's lazy val .rdd results with a new RDD with a poor > distribution of partitions across the cluster. Moreover, any attempt to > repartition this RDD further will fail. > Attached are a screenshot of the original DataFrame on cache and the > resulting RDD on cache. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
[ https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Roi Reshef updated SPARK-17020: --- Affects Version/s: 2.0.0 > Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data > -- > > Key: SPARK-17020 > URL: https://issues.apache.org/jira/browse/SPARK-17020 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.1, 1.6.2, 2.0.0 >Reporter: Roi Reshef >Priority: Critical > Attachments: dataframe_cache.PNG, rdd_cache.PNG > > > Calling DataFrame's lazy val .rdd results with a new RDD with a poor > distribution of partitions across the cluster. Moreover, any attempt to > repartition this RDD further will fail. > Attached are a screenshot of the original DataFrame on cache and the > resulting RDD on cache. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
[ https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Roi Reshef updated SPARK-17020: --- Attachment: rdd_cache.PNG dataframe_cache.PNG > Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data > -- > > Key: SPARK-17020 > URL: https://issues.apache.org/jira/browse/SPARK-17020 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.1, 1.6.2 >Reporter: Roi Reshef >Priority: Critical > Attachments: dataframe_cache.PNG, rdd_cache.PNG > > > Calling DataFrame's lazy val .rdd results with a new RDD with a poor > distribution of partitions across the cluster. Moreover, any attempt to > repartition this RDD further will fail. > Attached are a screenshot of the original DataFrame on cache and the > resulting RDD on cache. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
Roi Reshef created SPARK-17020: -- Summary: Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data Key: SPARK-17020 URL: https://issues.apache.org/jira/browse/SPARK-17020 Project: Spark Issue Type: Bug Components: Spark Core, SQL Affects Versions: 1.6.2, 1.6.1 Reporter: Roi Reshef Priority: Critical Calling DataFrame's lazy val .rdd results with a new RDD with a poor distribution of partitions across the cluster. Moreover, any attempt to repartition this RDD further will fail. Attached are a screenshot of the original DataFrame on cache and the resulting RDD on cache. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-10789) Cluster mode SparkSubmit classpath only includes Spark assembly
[ https://issues.apache.org/jira/browse/SPARK-10789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15073832#comment-15073832 ] Roi Reshef edited comment on SPARK-10789 at 12/29/15 11:56 AM: --- Thanks [~jonathak]. That requires rebuilding spark and redistributing it across my cluster, right? I finally figured out a solution to import external jars without rebuilding spark. One can modify two configurations inside spark-env.sh (at least for Netlib package, which include *.jar and *.so): spark.{driver,executor}.extraClassPath - for *.jar spark.{driver,executor}.extraLibraryPath - for *.so And spark (I'm using v1.5.2) will pick them up automatically was (Author: roireshef): Thanks [~jonathak]. That requires rebuilding spark and redistributing it across my cluster, right? I finally figured out a solution to import external jars without rebuilding spark. One can modify two configurations inside spark-env.sh (at least for Netlib package, which include *.jar and *.so): spark. { driver,executor } .extraClassPath - for *.jar spark. { driver,executor } .extraLibraryPath - for *.so And spark (I'm using v1.5.2) will pick them up automatically > Cluster mode SparkSubmit classpath only includes Spark assembly > --- > > Key: SPARK-10789 > URL: https://issues.apache.org/jira/browse/SPARK-10789 > Project: Spark > Issue Type: Bug > Components: Spark Submit >Affects Versions: 1.5.0, 1.6.0 >Reporter: Jonathan Kelly > Attachments: SPARK-10789.diff, SPARK-10789.v1.6.0.diff > > > When using cluster deploy mode, the classpath of the SparkSubmit process that > gets launched only includes the Spark assembly and not > spark.driver.extraClassPath. This is of course by design, since the driver > actually runs on the cluster and not inside the SparkSubmit process. > However, if the SparkSubmit process, minimal as it may be, needs any extra > libraries that are not part of the Spark assembly, there is no good way to > include them. (I say "no good way" because including them in the > SPARK_CLASSPATH environment variable does cause the SparkSubmit process to > include them, but this is not acceptable because this environment variable > has long been deprecated, and it prevents the use of > spark.driver.extraClassPath.) > An example of when this matters is on Amazon EMR when using an S3 path for > the application JAR and running in yarn-cluster mode. The SparkSubmit process > needs the EmrFileSystem implementation and its dependencies in the classpath > in order to download the application JAR from S3, so it fails with a > ClassNotFoundException. (EMR currently gets around this by setting > SPARK_CLASSPATH, but as mentioned above this is less than ideal.) > I have tried modifying SparkSubmitCommandBuilder to include the driver extra > classpath whether it's client mode or cluster mode, and this seems to work, > but I don't know if there is any downside to this. > Example that fails on emr-4.0.0 (if you switch to setting > spark.(driver,executor).extraClassPath instead of SPARK_CLASSPATH): > spark-submit --deploy-mode cluster --class > org.apache.spark.examples.JavaWordCount s3://my-bucket/spark-examples.jar > s3://my-bucket/word-count-input.txt > Resulting Exception: > Exception in thread "main" java.lang.RuntimeException: > java.lang.ClassNotFoundException: Class > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found > at > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074) > at > org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2626) > at > org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2639) > at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:90) > at > org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2678) > at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2660) > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:374) > at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) > at > org.apache.spark.deploy.yarn.Client.copyFileToRemote(Client.scala:233) > at > org.apache.spark.deploy.yarn.Client.org$apache$spark$deploy$yarn$Client$$distribute$1(Client.scala:327) > at > org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$5.apply(Client.scala:366) > at > org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$5.apply(Client.scala:364) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:364) > at > org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:629) > at >
[jira] [Commented] (SPARK-10789) Cluster mode SparkSubmit classpath only includes Spark assembly
[ https://issues.apache.org/jira/browse/SPARK-10789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15073832#comment-15073832 ] Roi Reshef commented on SPARK-10789: Thanks [~jonathak]. That requires rebuilding spark and redistributing it across my cluster, right? I finally figured out a solution to import external jars without rebuilding spark. One can modify two configurations inside spark-env.sh (at least for Netlib package, which include *.jar and *.so): spark.{driver,executor}.extraClassPath - for *.jar spark.{driver,executor}.extraLibraryPath - for *.so And spark (I'm using v1.5.2) will pick them up automatically > Cluster mode SparkSubmit classpath only includes Spark assembly > --- > > Key: SPARK-10789 > URL: https://issues.apache.org/jira/browse/SPARK-10789 > Project: Spark > Issue Type: Bug > Components: Spark Submit >Affects Versions: 1.5.0, 1.6.0 >Reporter: Jonathan Kelly > Attachments: SPARK-10789.diff, SPARK-10789.v1.6.0.diff > > > When using cluster deploy mode, the classpath of the SparkSubmit process that > gets launched only includes the Spark assembly and not > spark.driver.extraClassPath. This is of course by design, since the driver > actually runs on the cluster and not inside the SparkSubmit process. > However, if the SparkSubmit process, minimal as it may be, needs any extra > libraries that are not part of the Spark assembly, there is no good way to > include them. (I say "no good way" because including them in the > SPARK_CLASSPATH environment variable does cause the SparkSubmit process to > include them, but this is not acceptable because this environment variable > has long been deprecated, and it prevents the use of > spark.driver.extraClassPath.) > An example of when this matters is on Amazon EMR when using an S3 path for > the application JAR and running in yarn-cluster mode. The SparkSubmit process > needs the EmrFileSystem implementation and its dependencies in the classpath > in order to download the application JAR from S3, so it fails with a > ClassNotFoundException. (EMR currently gets around this by setting > SPARK_CLASSPATH, but as mentioned above this is less than ideal.) > I have tried modifying SparkSubmitCommandBuilder to include the driver extra > classpath whether it's client mode or cluster mode, and this seems to work, > but I don't know if there is any downside to this. > Example that fails on emr-4.0.0 (if you switch to setting > spark.(driver,executor).extraClassPath instead of SPARK_CLASSPATH): > spark-submit --deploy-mode cluster --class > org.apache.spark.examples.JavaWordCount s3://my-bucket/spark-examples.jar > s3://my-bucket/word-count-input.txt > Resulting Exception: > Exception in thread "main" java.lang.RuntimeException: > java.lang.ClassNotFoundException: Class > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found > at > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074) > at > org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2626) > at > org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2639) > at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:90) > at > org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2678) > at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2660) > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:374) > at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) > at > org.apache.spark.deploy.yarn.Client.copyFileToRemote(Client.scala:233) > at > org.apache.spark.deploy.yarn.Client.org$apache$spark$deploy$yarn$Client$$distribute$1(Client.scala:327) > at > org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$5.apply(Client.scala:366) > at > org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$5.apply(Client.scala:364) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:364) > at > org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:629) > at > org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:119) > at org.apache.spark.deploy.yarn.Client.run(Client.scala:907) > at org.apache.spark.deploy.yarn.Client$.main(Client.scala:966) > at org.apache.spark.deploy.yarn.Client.main(Client.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at >
[jira] [Comment Edited] (SPARK-10789) Cluster mode SparkSubmit classpath only includes Spark assembly
[ https://issues.apache.org/jira/browse/SPARK-10789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15073832#comment-15073832 ] Roi Reshef edited comment on SPARK-10789 at 12/29/15 11:55 AM: --- Thanks [~jonathak]. That requires rebuilding spark and redistributing it across my cluster, right? I finally figured out a solution to import external jars without rebuilding spark. One can modify two configurations inside spark-env.sh (at least for Netlib package, which include *.jar and *.so): spark. { driver,executor } .extraClassPath - for *.jar spark. { driver,executor } .extraLibraryPath - for *.so And spark (I'm using v1.5.2) will pick them up automatically was (Author: roireshef): Thanks [~jonathak]. That requires rebuilding spark and redistributing it across my cluster, right? I finally figured out a solution to import external jars without rebuilding spark. One can modify two configurations inside spark-env.sh (at least for Netlib package, which include *.jar and *.so): spark.{driver,executor}.extraClassPath - for *.jar spark.{driver,executor}.extraLibraryPath - for *.so And spark (I'm using v1.5.2) will pick them up automatically > Cluster mode SparkSubmit classpath only includes Spark assembly > --- > > Key: SPARK-10789 > URL: https://issues.apache.org/jira/browse/SPARK-10789 > Project: Spark > Issue Type: Bug > Components: Spark Submit >Affects Versions: 1.5.0, 1.6.0 >Reporter: Jonathan Kelly > Attachments: SPARK-10789.diff, SPARK-10789.v1.6.0.diff > > > When using cluster deploy mode, the classpath of the SparkSubmit process that > gets launched only includes the Spark assembly and not > spark.driver.extraClassPath. This is of course by design, since the driver > actually runs on the cluster and not inside the SparkSubmit process. > However, if the SparkSubmit process, minimal as it may be, needs any extra > libraries that are not part of the Spark assembly, there is no good way to > include them. (I say "no good way" because including them in the > SPARK_CLASSPATH environment variable does cause the SparkSubmit process to > include them, but this is not acceptable because this environment variable > has long been deprecated, and it prevents the use of > spark.driver.extraClassPath.) > An example of when this matters is on Amazon EMR when using an S3 path for > the application JAR and running in yarn-cluster mode. The SparkSubmit process > needs the EmrFileSystem implementation and its dependencies in the classpath > in order to download the application JAR from S3, so it fails with a > ClassNotFoundException. (EMR currently gets around this by setting > SPARK_CLASSPATH, but as mentioned above this is less than ideal.) > I have tried modifying SparkSubmitCommandBuilder to include the driver extra > classpath whether it's client mode or cluster mode, and this seems to work, > but I don't know if there is any downside to this. > Example that fails on emr-4.0.0 (if you switch to setting > spark.(driver,executor).extraClassPath instead of SPARK_CLASSPATH): > spark-submit --deploy-mode cluster --class > org.apache.spark.examples.JavaWordCount s3://my-bucket/spark-examples.jar > s3://my-bucket/word-count-input.txt > Resulting Exception: > Exception in thread "main" java.lang.RuntimeException: > java.lang.ClassNotFoundException: Class > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found > at > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074) > at > org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2626) > at > org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2639) > at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:90) > at > org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2678) > at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2660) > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:374) > at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) > at > org.apache.spark.deploy.yarn.Client.copyFileToRemote(Client.scala:233) > at > org.apache.spark.deploy.yarn.Client.org$apache$spark$deploy$yarn$Client$$distribute$1(Client.scala:327) > at > org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$5.apply(Client.scala:366) > at > org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$5.apply(Client.scala:364) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:364) > at > org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:629) > at >
[jira] [Commented] (SPARK-10789) Cluster mode SparkSubmit classpath only includes Spark assembly
[ https://issues.apache.org/jira/browse/SPARK-10789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15066304#comment-15066304 ] Roi Reshef commented on SPARK-10789: Any resolution on that? Can you elaborate more on how were you able to bypass the limitations you described? I'm trying to add Netlib unsuccessfully. I'm also restricted to running the driver on yarn-local mode rather than yarn-cluster - will your temporary solution work with client (local) mode? > Cluster mode SparkSubmit classpath only includes Spark assembly > --- > > Key: SPARK-10789 > URL: https://issues.apache.org/jira/browse/SPARK-10789 > Project: Spark > Issue Type: Bug > Components: Spark Submit >Affects Versions: 1.5.0 >Reporter: Jonathan Kelly > > When using cluster deploy mode, the classpath of the SparkSubmit process that > gets launched only includes the Spark assembly and not > spark.driver.extraClassPath. This is of course by design, since the driver > actually runs on the cluster and not inside the SparkSubmit process. > However, if the SparkSubmit process, minimal as it may be, needs any extra > libraries that are not part of the Spark assembly, there is no good way to > include them. (I say "no good way" because including them in the > SPARK_CLASSPATH environment variable does cause the SparkSubmit process to > include them, but this is not acceptable because this environment variable > has long been deprecated, and it prevents the use of > spark.driver.extraClassPath.) > An example of when this matters is on Amazon EMR when using an S3 path for > the application JAR and running in yarn-cluster mode. The SparkSubmit process > needs the EmrFileSystem implementation and its dependencies in the classpath > in order to download the application JAR from S3, so it fails with a > ClassNotFoundException. (EMR currently gets around this by setting > SPARK_CLASSPATH, but as mentioned above this is less than ideal.) > I have tried modifying SparkSubmitCommandBuilder to include the driver extra > classpath whether it's client mode or cluster mode, and this seems to work, > but I don't know if there is any downside to this. > Example that fails on emr-4.0.0 (if you switch to setting > spark.(driver,executor).extraClassPath instead of SPARK_CLASSPATH): > spark-submit --deploy-mode cluster --class > org.apache.spark.examples.JavaWordCount s3://my-bucket/spark-examples.jar > s3://my-bucket/word-count-input.txt > Resulting Exception: > Exception in thread "main" java.lang.RuntimeException: > java.lang.ClassNotFoundException: Class > com.amazon.ws.emr.hadoop.fs.EmrFileSystem not found > at > org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074) > at > org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2626) > at > org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2639) > at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:90) > at > org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2678) > at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2660) > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:374) > at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) > at > org.apache.spark.deploy.yarn.Client.copyFileToRemote(Client.scala:233) > at > org.apache.spark.deploy.yarn.Client.org$apache$spark$deploy$yarn$Client$$distribute$1(Client.scala:327) > at > org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$5.apply(Client.scala:366) > at > org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$5.apply(Client.scala:364) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:364) > at > org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:629) > at > org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:119) > at org.apache.spark.deploy.yarn.Client.run(Client.scala:907) > at org.apache.spark.deploy.yarn.Client$.main(Client.scala:966) > at org.apache.spark.deploy.yarn.Client.main(Client.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) > at > org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) > at
[jira] [Issue Comment Deleted] (SPARK-5081) Shuffle write increases
[ https://issues.apache.org/jira/browse/SPARK-5081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Roi Reshef updated SPARK-5081: -- Comment: was deleted (was: Hi Guys, Was this issue already solved by any chance? I'm using Spark 1.3.1 for training algorithm with an iterative fashion. Since implementing a ranking measure (that ultimately uses sortBy) i'm experiencing similar problems. It seems that my cache explodes after ~100 iterations, and crushes the server with a There is insufficient memory for the Java Runtime Environment to continue message. Note that it isn't supposed to persist the sorted vectors nor to use them in the following iterations. So I wonder why memory consumption keeps growing with each iteration.) Shuffle write increases --- Key: SPARK-5081 URL: https://issues.apache.org/jira/browse/SPARK-5081 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Kevin Jung Priority: Critical Attachments: Spark_Debug.pdf, diff.txt The size of shuffle write showing in spark web UI is much different when I execute same spark job with same input data in both spark 1.1 and spark 1.2. At sortBy stage, the size of shuffle write is 98.1MB in spark 1.1 but 146.9MB in spark 1.2. I set spark.shuffle.manager option to hash because it's default value is changed but spark 1.2 still writes shuffle output more than spark 1.1. It can increase disk I/O overhead exponentially as the input file gets bigger and it causes the jobs take more time to complete. In the case of about 100GB input, for example, the size of shuffle write is 39.7GB in spark 1.1 but 91.0GB in spark 1.2. spark 1.1 ||Stage Id||Description||Input||Shuffle Read||Shuffle Write|| |9|saveAsTextFile| |1169.4KB| | |12|combineByKey| |1265.4KB|1275.0KB| |6|sortByKey| |1276.5KB| | |8|mapPartitions| |91.0MB|1383.1KB| |4|apply| |89.4MB| | |5|sortBy|155.6MB| |98.1MB| |3|sortBy|155.6MB| | | |1|collect| |2.1MB| | |2|mapValues|155.6MB| |2.2MB| |0|first|184.4KB| | | spark 1.2 ||Stage Id||Description||Input||Shuffle Read||Shuffle Write|| |12|saveAsTextFile| |1170.2KB| | |11|combineByKey| |1264.5KB|1275.0KB| |8|sortByKey| |1273.6KB| | |7|mapPartitions| |134.5MB|1383.1KB| |5|zipWithIndex| |132.5MB| | |4|sortBy|155.6MB| |146.9MB| |3|sortBy|155.6MB| | | |2|collect| |2.0MB| | |1|mapValues|155.6MB| |2.2MB| |0|first|184.4KB| | | -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5081) Shuffle write increases
[ https://issues.apache.org/jira/browse/SPARK-5081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14585641#comment-14585641 ] Roi Reshef commented on SPARK-5081: --- Hi Guys, Was this issue already solved by any chance? I'm using Spark 1.3.1 for training in an iterative fashion. Since implementing a ranking measure (that ultimately uses sortBy) i'm experiencing similar problems. It seems that my cache explodes after ~100 iterations, and crushes the server with a There is insufficient memory for the Java Runtime Environment to continue message. Note that it isn't supposed to persist the sorted vectors nor to use them in the following iterations. So I wonder why memory consumption keeps growing with each iteration. Shuffle write increases --- Key: SPARK-5081 URL: https://issues.apache.org/jira/browse/SPARK-5081 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Kevin Jung Priority: Critical Attachments: Spark_Debug.pdf, diff.txt The size of shuffle write showing in spark web UI is much different when I execute same spark job with same input data in both spark 1.1 and spark 1.2. At sortBy stage, the size of shuffle write is 98.1MB in spark 1.1 but 146.9MB in spark 1.2. I set spark.shuffle.manager option to hash because it's default value is changed but spark 1.2 still writes shuffle output more than spark 1.1. It can increase disk I/O overhead exponentially as the input file gets bigger and it causes the jobs take more time to complete. In the case of about 100GB input, for example, the size of shuffle write is 39.7GB in spark 1.1 but 91.0GB in spark 1.2. spark 1.1 ||Stage Id||Description||Input||Shuffle Read||Shuffle Write|| |9|saveAsTextFile| |1169.4KB| | |12|combineByKey| |1265.4KB|1275.0KB| |6|sortByKey| |1276.5KB| | |8|mapPartitions| |91.0MB|1383.1KB| |4|apply| |89.4MB| | |5|sortBy|155.6MB| |98.1MB| |3|sortBy|155.6MB| | | |1|collect| |2.1MB| | |2|mapValues|155.6MB| |2.2MB| |0|first|184.4KB| | | spark 1.2 ||Stage Id||Description||Input||Shuffle Read||Shuffle Write|| |12|saveAsTextFile| |1170.2KB| | |11|combineByKey| |1264.5KB|1275.0KB| |8|sortByKey| |1273.6KB| | |7|mapPartitions| |134.5MB|1383.1KB| |5|zipWithIndex| |132.5MB| | |4|sortBy|155.6MB| |146.9MB| |3|sortBy|155.6MB| | | |2|collect| |2.0MB| | |1|mapValues|155.6MB| |2.2MB| |0|first|184.4KB| | | -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-5081) Shuffle write increases
[ https://issues.apache.org/jira/browse/SPARK-5081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14585641#comment-14585641 ] Roi Reshef edited comment on SPARK-5081 at 6/15/15 8:41 AM: Hi Guys, Was this issue already solved by any chance? I'm using Spark 1.3.1 for training algorithm with an iterative fashion. Since implementing a ranking measure (that ultimately uses sortBy) i'm experiencing similar problems. It seems that my cache explodes after ~100 iterations, and crushes the server with a There is insufficient memory for the Java Runtime Environment to continue message. Note that it isn't supposed to persist the sorted vectors nor to use them in the following iterations. So I wonder why memory consumption keeps growing with each iteration. was (Author: roireshef): Hi Guys, Was this issue already solved by any chance? I'm using Spark 1.3.1 for training in an iterative fashion. Since implementing a ranking measure (that ultimately uses sortBy) i'm experiencing similar problems. It seems that my cache explodes after ~100 iterations, and crushes the server with a There is insufficient memory for the Java Runtime Environment to continue message. Note that it isn't supposed to persist the sorted vectors nor to use them in the following iterations. So I wonder why memory consumption keeps growing with each iteration. Shuffle write increases --- Key: SPARK-5081 URL: https://issues.apache.org/jira/browse/SPARK-5081 Project: Spark Issue Type: Bug Components: Shuffle Affects Versions: 1.2.0 Reporter: Kevin Jung Priority: Critical Attachments: Spark_Debug.pdf, diff.txt The size of shuffle write showing in spark web UI is much different when I execute same spark job with same input data in both spark 1.1 and spark 1.2. At sortBy stage, the size of shuffle write is 98.1MB in spark 1.1 but 146.9MB in spark 1.2. I set spark.shuffle.manager option to hash because it's default value is changed but spark 1.2 still writes shuffle output more than spark 1.1. It can increase disk I/O overhead exponentially as the input file gets bigger and it causes the jobs take more time to complete. In the case of about 100GB input, for example, the size of shuffle write is 39.7GB in spark 1.1 but 91.0GB in spark 1.2. spark 1.1 ||Stage Id||Description||Input||Shuffle Read||Shuffle Write|| |9|saveAsTextFile| |1169.4KB| | |12|combineByKey| |1265.4KB|1275.0KB| |6|sortByKey| |1276.5KB| | |8|mapPartitions| |91.0MB|1383.1KB| |4|apply| |89.4MB| | |5|sortBy|155.6MB| |98.1MB| |3|sortBy|155.6MB| | | |1|collect| |2.1MB| | |2|mapValues|155.6MB| |2.2MB| |0|first|184.4KB| | | spark 1.2 ||Stage Id||Description||Input||Shuffle Read||Shuffle Write|| |12|saveAsTextFile| |1170.2KB| | |11|combineByKey| |1264.5KB|1275.0KB| |8|sortByKey| |1273.6KB| | |7|mapPartitions| |134.5MB|1383.1KB| |5|zipWithIndex| |132.5MB| | |4|sortBy|155.6MB| |146.9MB| |3|sortBy|155.6MB| | | |2|collect| |2.0MB| | |1|mapValues|155.6MB| |2.2MB| |0|first|184.4KB| | | -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org