[jira] [Commented] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
[ https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417301#comment-15417301 ] Roi Reshef commented on SPARK-17020: Nevertheless, any attempt to repartition the resulting RDD also end with having (almost) all of its partitions stay on the same node. I made it transform into a ShuffledRDD via PairRDDFunctions, set a HashPartitioner with 140 partitions, and yet, I got the same data-distribution as in the screenshot I attached. So I guess there's something very wrong with referring to a *DataFrame.rdd* without materializing it beforehand. What and why is beyond my understanding, currently. > Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data > -- > > Key: SPARK-17020 > URL: https://issues.apache.org/jira/browse/SPARK-17020 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.1, 1.6.2, 2.0.0 >Reporter: Roi Reshef > Attachments: dataframe_cache.PNG, rdd_cache.PNG > > > Calling DataFrame's lazy val .rdd results with a new RDD with a poor > distribution of partitions across the cluster. Moreover, any attempt to > repartition this RDD further will fail. > Attached are a screenshot of the original DataFrame on cache and the > resulting RDD on cache. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
[ https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417291#comment-15417291 ] Sean Owen commented on SPARK-17020: --- I see, I was asking because you show the results of caching a DataFrame above. My guess is that in one case, the DataFrame is computed using the expected number of partitions, and somehow when you go straight through to the RDD, it ends up executing one task for one partition, thus putting the result in one big block. As to why, I don't know. You could confirm/deny by looking at the partition count for the DataFrame and RDD in these cases. > Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data > -- > > Key: SPARK-17020 > URL: https://issues.apache.org/jira/browse/SPARK-17020 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.1, 1.6.2, 2.0.0 >Reporter: Roi Reshef > Attachments: dataframe_cache.PNG, rdd_cache.PNG > > > Calling DataFrame's lazy val .rdd results with a new RDD with a poor > distribution of partitions across the cluster. Moreover, any attempt to > repartition this RDD further will fail. > Attached are a screenshot of the original DataFrame on cache and the > resulting RDD on cache. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
[ https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417288#comment-15417288 ] Roi Reshef commented on SPARK-17020: The problem occurs only when calling **.rdd** on an *not-previously-cached* DataFrame. **data** is a DataFrame, so in the last code you have it cached whereas in the one before it wasn't, but rather only the RDD that was extracted from it. > Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data > -- > > Key: SPARK-17020 > URL: https://issues.apache.org/jira/browse/SPARK-17020 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.1, 1.6.2, 2.0.0 >Reporter: Roi Reshef > Attachments: dataframe_cache.PNG, rdd_cache.PNG > > > Calling DataFrame's lazy val .rdd results with a new RDD with a poor > distribution of partitions across the cluster. Moreover, any attempt to > repartition this RDD further will fail. > Attached are a screenshot of the original DataFrame on cache and the > resulting RDD on cache. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
[ https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417268#comment-15417268 ] Sean Owen commented on SPARK-17020: --- Yeah, after it's cached and the partitions are established, I'd certainly expect it to do the sensible thing and use that locality, and that you'd find the locality of the RDD's partitions is the same and well-distributed. What's the code path where you cache the DataFrame? I only see the RDD cached here. > Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data > -- > > Key: SPARK-17020 > URL: https://issues.apache.org/jira/browse/SPARK-17020 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.1, 1.6.2, 2.0.0 >Reporter: Roi Reshef > Attachments: dataframe_cache.PNG, rdd_cache.PNG > > > Calling DataFrame's lazy val .rdd results with a new RDD with a poor > distribution of partitions across the cluster. Moreover, any attempt to > repartition this RDD further will fail. > Attached are a screenshot of the original DataFrame on cache and the > resulting RDD on cache. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
[ https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417254#comment-15417254 ] Roi Reshef commented on SPARK-17020: Also note that I have just called: *data.cache().count()* val rdd = data.rdd.setName("rdd").cache() rdd.count and the rdd was distributed far better (similar to "data" DataFrame) I'm not sure it solves the issue with the rdd that ignores repartitioning methods further down the road. I'll have to check that > Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data > -- > > Key: SPARK-17020 > URL: https://issues.apache.org/jira/browse/SPARK-17020 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.1, 1.6.2, 2.0.0 >Reporter: Roi Reshef > Attachments: dataframe_cache.PNG, rdd_cache.PNG > > > Calling DataFrame's lazy val .rdd results with a new RDD with a poor > distribution of partitions across the cluster. Moreover, any attempt to > repartition this RDD further will fail. > Attached are a screenshot of the original DataFrame on cache and the > resulting RDD on cache. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
[ https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417250#comment-15417250 ] Roi Reshef commented on SPARK-17020: val ab = SomeReader.read(...) //some reader function that uses spark-csv with inferSchema=true filter(!isnull($"name")). alias("revab") val meta = SomeReader.read(...) //same but different schema and data val udaf = ... //some UserDefinedAggregateFunction val features = ab.groupBy(...).agg(udaf(...)) val data = features. join(meta, $"meta.id" === $"features.id"). select(...) //only relevant fields val rdd = data.rdd.setName("rdd").cache() rdd.count > Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data > -- > > Key: SPARK-17020 > URL: https://issues.apache.org/jira/browse/SPARK-17020 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.1, 1.6.2, 2.0.0 >Reporter: Roi Reshef > Attachments: dataframe_cache.PNG, rdd_cache.PNG > > > Calling DataFrame's lazy val .rdd results with a new RDD with a poor > distribution of partitions across the cluster. Moreover, any attempt to > repartition this RDD further will fail. > Attached are a screenshot of the original DataFrame on cache and the > resulting RDD on cache. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
[ https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417231#comment-15417231 ] Sean Owen commented on SPARK-17020: --- I think that's probably material, yes, as is the operations that created the DataFrame. Do you have any minimal reproduction? > Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data > -- > > Key: SPARK-17020 > URL: https://issues.apache.org/jira/browse/SPARK-17020 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.1, 1.6.2, 2.0.0 >Reporter: Roi Reshef >Priority: Critical > Attachments: dataframe_cache.PNG, rdd_cache.PNG > > > Calling DataFrame's lazy val .rdd results with a new RDD with a poor > distribution of partitions across the cluster. Moreover, any attempt to > repartition this RDD further will fail. > Attached are a screenshot of the original DataFrame on cache and the > resulting RDD on cache. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
[ https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417218#comment-15417218 ] Roi Reshef commented on SPARK-17020: [~srowen] Should there be any effect on this if I cached and materialized the DF befor I call on .rdd? > Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data > -- > > Key: SPARK-17020 > URL: https://issues.apache.org/jira/browse/SPARK-17020 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.1, 1.6.2, 2.0.0 >Reporter: Roi Reshef >Priority: Critical > Attachments: dataframe_cache.PNG, rdd_cache.PNG > > > Calling DataFrame's lazy val .rdd results with a new RDD with a poor > distribution of partitions across the cluster. Moreover, any attempt to > repartition this RDD further will fail. > Attached are a screenshot of the original DataFrame on cache and the > resulting RDD on cache. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
[ https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417204#comment-15417204 ] Roi Reshef commented on SPARK-17020: [~srowen] I have 2 DataFrames that are generated from spark-csv reader. Then I pass them through several transformations, and join them together. After that I call either .rdd or .flatMap to get an RDD out of the joint DataFrame. Throughout all the process I've monitored the distribution of the DataFrames. It is good until the point where ".rdd" is called > Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data > -- > > Key: SPARK-17020 > URL: https://issues.apache.org/jira/browse/SPARK-17020 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.1, 1.6.2, 2.0.0 >Reporter: Roi Reshef >Priority: Critical > Attachments: dataframe_cache.PNG, rdd_cache.PNG > > > Calling DataFrame's lazy val .rdd results with a new RDD with a poor > distribution of partitions across the cluster. Moreover, any attempt to > repartition this RDD further will fail. > Attached are a screenshot of the original DataFrame on cache and the > resulting RDD on cache. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
[ https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417194#comment-15417194 ] Sean Owen commented on SPARK-17020: --- Can you provide more detail on how you created the DataFrame, the RDD, where you cached them, etc? just to rule out some other stuff. I can't reproduce this locally but may have a different situation from you. > Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data > -- > > Key: SPARK-17020 > URL: https://issues.apache.org/jira/browse/SPARK-17020 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL >Affects Versions: 1.6.1, 1.6.2, 2.0.0 >Reporter: Roi Reshef >Priority: Critical > Attachments: dataframe_cache.PNG, rdd_cache.PNG > > > Calling DataFrame's lazy val .rdd results with a new RDD with a poor > distribution of partitions across the cluster. Moreover, any attempt to > repartition this RDD further will fail. > Attached are a screenshot of the original DataFrame on cache and the > resulting RDD on cache. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org