[jira] [Commented] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data

2016-08-11 Thread Roi Reshef (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417301#comment-15417301
 ] 

Roi Reshef commented on SPARK-17020:


Nevertheless, any attempt to repartition the resulting RDD also end with having 
(almost) all of its partitions stay on the same node. I made it transform into 
a ShuffledRDD via PairRDDFunctions, set a HashPartitioner with 140 partitions, 
and yet, I got the same data-distribution as in the screenshot I attached.

So I guess there's something very wrong with referring to a *DataFrame.rdd* 
without materializing it beforehand. What and why is beyond my understanding, 
currently.

> Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
> --
>
> Key: SPARK-17020
> URL: https://issues.apache.org/jira/browse/SPARK-17020
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 1.6.1, 1.6.2, 2.0.0
>Reporter: Roi Reshef
> Attachments: dataframe_cache.PNG, rdd_cache.PNG
>
>
> Calling DataFrame's lazy val .rdd results with a new RDD with a poor 
> distribution of partitions across the cluster. Moreover, any attempt to 
> repartition this RDD further will fail.
> Attached are a screenshot of the original DataFrame on cache and the 
> resulting RDD on cache.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data

2016-08-11 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417291#comment-15417291
 ] 

Sean Owen commented on SPARK-17020:
---

I see, I was asking because you show the results of caching a DataFrame above.
My guess is that in one case, the DataFrame is computed using the expected 
number of partitions, and somehow when you go straight through to the RDD, it 
ends up executing one task for one partition, thus putting the result in one 
big block. As to why, I don't know. You could confirm/deny by looking at the 
partition count for the DataFrame and RDD in these cases.

> Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
> --
>
> Key: SPARK-17020
> URL: https://issues.apache.org/jira/browse/SPARK-17020
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 1.6.1, 1.6.2, 2.0.0
>Reporter: Roi Reshef
> Attachments: dataframe_cache.PNG, rdd_cache.PNG
>
>
> Calling DataFrame's lazy val .rdd results with a new RDD with a poor 
> distribution of partitions across the cluster. Moreover, any attempt to 
> repartition this RDD further will fail.
> Attached are a screenshot of the original DataFrame on cache and the 
> resulting RDD on cache.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data

2016-08-11 Thread Roi Reshef (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417288#comment-15417288
 ] 

Roi Reshef commented on SPARK-17020:


The problem occurs only when calling **.rdd** on an *not-previously-cached* 
DataFrame.
**data** is a DataFrame, so in the last code you have it cached whereas in the 
one before it wasn't, but rather only the RDD that was extracted from it.

> Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
> --
>
> Key: SPARK-17020
> URL: https://issues.apache.org/jira/browse/SPARK-17020
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 1.6.1, 1.6.2, 2.0.0
>Reporter: Roi Reshef
> Attachments: dataframe_cache.PNG, rdd_cache.PNG
>
>
> Calling DataFrame's lazy val .rdd results with a new RDD with a poor 
> distribution of partitions across the cluster. Moreover, any attempt to 
> repartition this RDD further will fail.
> Attached are a screenshot of the original DataFrame on cache and the 
> resulting RDD on cache.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data

2016-08-11 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417268#comment-15417268
 ] 

Sean Owen commented on SPARK-17020:
---

Yeah, after it's cached and the partitions are established, I'd certainly 
expect it to do the sensible thing and use that locality, and that you'd find 
the locality of the RDD's partitions is the same and well-distributed.

What's the code path where you cache the DataFrame? I only see the RDD cached 
here.

> Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
> --
>
> Key: SPARK-17020
> URL: https://issues.apache.org/jira/browse/SPARK-17020
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 1.6.1, 1.6.2, 2.0.0
>Reporter: Roi Reshef
> Attachments: dataframe_cache.PNG, rdd_cache.PNG
>
>
> Calling DataFrame's lazy val .rdd results with a new RDD with a poor 
> distribution of partitions across the cluster. Moreover, any attempt to 
> repartition this RDD further will fail.
> Attached are a screenshot of the original DataFrame on cache and the 
> resulting RDD on cache.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data

2016-08-11 Thread Roi Reshef (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417254#comment-15417254
 ] 

Roi Reshef commented on SPARK-17020:


Also note that I have just called:

*data.cache().count()*
val rdd = data.rdd.setName("rdd").cache()
rdd.count

and the rdd was distributed far better (similar to "data" DataFrame)

I'm not sure it solves the issue with the rdd that ignores repartitioning 
methods further down the road. I'll have to check that

> Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
> --
>
> Key: SPARK-17020
> URL: https://issues.apache.org/jira/browse/SPARK-17020
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 1.6.1, 1.6.2, 2.0.0
>Reporter: Roi Reshef
> Attachments: dataframe_cache.PNG, rdd_cache.PNG
>
>
> Calling DataFrame's lazy val .rdd results with a new RDD with a poor 
> distribution of partitions across the cluster. Moreover, any attempt to 
> repartition this RDD further will fail.
> Attached are a screenshot of the original DataFrame on cache and the 
> resulting RDD on cache.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data

2016-08-11 Thread Roi Reshef (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417250#comment-15417250
 ] 

Roi Reshef commented on SPARK-17020:


val ab = SomeReader.read(...)  //some reader function that uses spark-csv with 
inferSchema=true
filter(!isnull($"name")).
alias("revab")

val meta = SomeReader.read(...) //same but different schema and data

val udaf = ... //some UserDefinedAggregateFunction
val features = ab.groupBy(...).agg(udaf(...))

val data = features.
join(meta, $"meta.id" === $"features.id").
select(...)   //only relevant fields

val rdd = data.rdd.setName("rdd").cache()
rdd.count


> Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
> --
>
> Key: SPARK-17020
> URL: https://issues.apache.org/jira/browse/SPARK-17020
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 1.6.1, 1.6.2, 2.0.0
>Reporter: Roi Reshef
> Attachments: dataframe_cache.PNG, rdd_cache.PNG
>
>
> Calling DataFrame's lazy val .rdd results with a new RDD with a poor 
> distribution of partitions across the cluster. Moreover, any attempt to 
> repartition this RDD further will fail.
> Attached are a screenshot of the original DataFrame on cache and the 
> resulting RDD on cache.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data

2016-08-11 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417231#comment-15417231
 ] 

Sean Owen commented on SPARK-17020:
---

I think that's probably material, yes, as is the operations that created the 
DataFrame. Do you have any minimal reproduction?

> Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
> --
>
> Key: SPARK-17020
> URL: https://issues.apache.org/jira/browse/SPARK-17020
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 1.6.1, 1.6.2, 2.0.0
>Reporter: Roi Reshef
>Priority: Critical
> Attachments: dataframe_cache.PNG, rdd_cache.PNG
>
>
> Calling DataFrame's lazy val .rdd results with a new RDD with a poor 
> distribution of partitions across the cluster. Moreover, any attempt to 
> repartition this RDD further will fail.
> Attached are a screenshot of the original DataFrame on cache and the 
> resulting RDD on cache.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data

2016-08-11 Thread Roi Reshef (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417218#comment-15417218
 ] 

Roi Reshef commented on SPARK-17020:


[~srowen] Should there be any effect on this if I cached and materialized the 
DF befor I call on .rdd?

> Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
> --
>
> Key: SPARK-17020
> URL: https://issues.apache.org/jira/browse/SPARK-17020
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 1.6.1, 1.6.2, 2.0.0
>Reporter: Roi Reshef
>Priority: Critical
> Attachments: dataframe_cache.PNG, rdd_cache.PNG
>
>
> Calling DataFrame's lazy val .rdd results with a new RDD with a poor 
> distribution of partitions across the cluster. Moreover, any attempt to 
> repartition this RDD further will fail.
> Attached are a screenshot of the original DataFrame on cache and the 
> resulting RDD on cache.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data

2016-08-11 Thread Roi Reshef (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417204#comment-15417204
 ] 

Roi Reshef commented on SPARK-17020:


[~srowen] I have 2 DataFrames that are generated from spark-csv reader.
Then I pass them through several transformations, and join them together.
After that I call either .rdd or .flatMap to get an RDD out of the joint 
DataFrame.

Throughout all the process I've monitored the distribution of the DataFrames. 
It is good until the point where ".rdd" is called

> Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
> --
>
> Key: SPARK-17020
> URL: https://issues.apache.org/jira/browse/SPARK-17020
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 1.6.1, 1.6.2, 2.0.0
>Reporter: Roi Reshef
>Priority: Critical
> Attachments: dataframe_cache.PNG, rdd_cache.PNG
>
>
> Calling DataFrame's lazy val .rdd results with a new RDD with a poor 
> distribution of partitions across the cluster. Moreover, any attempt to 
> repartition this RDD further will fail.
> Attached are a screenshot of the original DataFrame on cache and the 
> resulting RDD on cache.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17020) Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data

2016-08-11 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-17020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15417194#comment-15417194
 ] 

Sean Owen commented on SPARK-17020:
---

Can you provide more detail on how you created the DataFrame, the RDD, where 
you cached them, etc? just to rule out some other stuff. I can't reproduce this 
locally but may have a different situation from you.

> Materialization of RDD via DataFrame.rdd forces a poor re-distribution of data
> --
>
> Key: SPARK-17020
> URL: https://issues.apache.org/jira/browse/SPARK-17020
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core, SQL
>Affects Versions: 1.6.1, 1.6.2, 2.0.0
>Reporter: Roi Reshef
>Priority: Critical
> Attachments: dataframe_cache.PNG, rdd_cache.PNG
>
>
> Calling DataFrame's lazy val .rdd results with a new RDD with a poor 
> distribution of partitions across the cluster. Moreover, any attempt to 
> repartition this RDD further will fail.
> Attached are a screenshot of the original DataFrame on cache and the 
> resulting RDD on cache.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org