Re: Spark stages very slow to complete

2015-06-02 Thread Karlson
Hi, the code is some hundreds lines of Python. I can try to compose a 
minimal example as soon as I find the time, though. Any ideas until 
then?



Would you mind posting the code?
On 2 Jun 2015 00:53, "Karlson"  wrote:


Hi,

In all (pyspark) Spark jobs, that become somewhat more involved, I am
experiencing the issue that some stages take a very long time to 
complete
and sometimes don't at all. This clearly correlates with the size of 
my
input data. Looking at the stage details for one such stage, I am 
wondering

where Spark spends all this time. Take this table of the stages task
metrics for example:

Metric  Min 25th
percentile  Median  75th percentile Max
Duration1.4 min 1.5 min 1.7 
min

 1.9 min 2.3 min
Scheduler Delay 1 ms3 ms4 ms
  5 ms23 ms
Task Deserialization Time   1 ms2 ms3 ms
  8 ms22 ms
GC Time 0 ms0 ms0 ms
  0 ms0 ms
Result Serialization Time   0 ms0 ms0 ms
  0 ms1 ms
Getting Result Time 0 ms0 ms0 ms
  0 ms0 ms
Input Size / Records23.9 KB / 1 24.0 KB / 1 24.1 
KB /

1 24.1 KB / 1 24.3 KB / 1

Why is the overall duration almost 2min? Where is all this time spent,
when no progress of the stages is visible? The progress bar simply 
displays
0 succeeded tasks for a very long time before sometimes slowly 
progressing.


Also, the name of the stage displayed above is `javaToPython at 
null:-1`,
which I find very uninformative. I don't even know which action 
exactly is
responsible for this stage. Does anyone experience similar issues or 
have

any advice for me?

Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark stages very slow to complete

2015-06-01 Thread Karlson

Hi,

In all (pyspark) Spark jobs, that become somewhat more involved, I am 
experiencing the issue that some stages take a very long time to 
complete and sometimes don't at all. This clearly correlates with the 
size of my input data. Looking at the stage details for one such stage, 
I am wondering where Spark spends all this time. Take this table of the 
stages task metrics for example:


Metric  Min 25thpercentile  
Median  75th percentile Max
Duration1.4 min 1.5 min 1.7 min 
1.9 min 2.3 min
Scheduler Delay 1 ms3 ms4 ms
5 ms23 ms
Task Deserialization Time   1 ms2 ms3 ms
8 ms22 ms
GC Time 0 ms0 ms0 ms
0 ms0 ms
Result Serialization Time   0 ms0 ms0 ms
0 ms1 ms
Getting Result Time 0 ms0 ms0 ms
0 ms0 ms
Input Size / Records		23.9 KB / 1	24.0 KB / 1	24.1 KB / 1	24.1 KB / 
1	24.3 KB / 1


Why is the overall duration almost 2min? Where is all this time spent, 
when no progress of the stages is visible? The progress bar simply 
displays 0 succeeded tasks for a very long time before sometimes slowly 
progressing.


Also, the name of the stage displayed above is `javaToPython at 
null:-1`, which I find very uninformative. I don't even know which 
action exactly is responsible for this stage. Does anyone experience 
similar issues or have any advice for me?


Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Partitioning of Dataframes

2015-05-22 Thread Karlson

Alright, that doesn't seem to have made it into the Python API yet.

On 2015-05-22 15:12, Silvio Fiorito wrote:

This is added to 1.4.0

https://github.com/apache/spark/pull/5762







On 5/22/15, 8:48 AM, "Karlson"  wrote:


Hi,

wouldn't df.rdd.partitionBy() return a new RDD that I would then need 
to

make into a Dataframe again? Maybe like this:
df.rdd.partitionBy(...).toDF(schema=df.schema). That looks a bit weird
to me, though, and I'm not sure if the DF will be aware of its
partitioning.

On 2015-05-22 12:55, ayan guha wrote:

DataFrame is an abstraction of rdd. So you should be able to do
df.rdd.partitioyBy. however as far as I know, equijoines already
optimizes
partitioning. You may want to look explain plans more carefully and
materialise interim joins.
 On 22 May 2015 19:03, "Karlson"  wrote:


Hi,

is there any way to control how Dataframes are partitioned? I'm 
doing

lots
of joins and am seeing very large shuffle reads and writes in the
Spark UI.
With PairRDDs you can control how the data is partitioned across 
nodes

with
partitionBy. There is no such method on Dataframes however. Can I
somehow
partition the underlying the RDD manually? I am currently using the
Python
API.

Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Partitioning of Dataframes

2015-05-22 Thread Karlson

Hi,

wouldn't df.rdd.partitionBy() return a new RDD that I would then need to 
make into a Dataframe again? Maybe like this: 
df.rdd.partitionBy(...).toDF(schema=df.schema). That looks a bit weird 
to me, though, and I'm not sure if the DF will be aware of its 
partitioning.


On 2015-05-22 12:55, ayan guha wrote:

DataFrame is an abstraction of rdd. So you should be able to do
df.rdd.partitioyBy. however as far as I know, equijoines already 
optimizes

partitioning. You may want to look explain plans more carefully and
materialise interim joins.
 On 22 May 2015 19:03, "Karlson"  wrote:


Hi,

is there any way to control how Dataframes are partitioned? I'm doing 
lots
of joins and am seeing very large shuffle reads and writes in the 
Spark UI.
With PairRDDs you can control how the data is partitioned across nodes 
with
partitionBy. There is no such method on Dataframes however. Can I 
somehow
partition the underlying the RDD manually? I am currently using the 
Python

API.

Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Partitioning of Dataframes

2015-05-22 Thread Karlson

Hi,

is there any way to control how Dataframes are partitioned? I'm doing 
lots of joins and am seeing very large shuffle reads and writes in the 
Spark UI. With PairRDDs you can control how the data is partitioned 
across nodes with partitionBy. There is no such method on Dataframes 
however. Can I somehow partition the underlying the RDD manually? I am 
currently using the Python API.


Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [pyspark] Starting workers in a virtualenv

2015-05-21 Thread Karlson

That works, thank you!

On 2015-05-22 03:15, Davies Liu wrote:

Could you try with specify PYSPARK_PYTHON to the path of python in
your virtual env, for example

PYSPARK_PYTHON=/path/to/env/bin/python bin/spark-submit xx.py

On Mon, Apr 20, 2015 at 12:51 AM, Karlson  wrote:

Hi all,

I am running the Python process that communicates with Spark in a
virtualenv. Is there any way I can make sure that the Python processes 
of

the workers are also started in a virtualenv? Currently I am getting
ImportErrors when the worker tries to unpickle stuff that is not 
installed
system-wide. For now both the worker and the driver run on the same 
machine

in local mode.

Thanks in advance!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Join on DataFrames from the same source (Pyspark)

2015-04-22 Thread Karlson

DataFrames do not have the attributes 'alias' or 'as' in the Python API.

On 2015-04-21 20:41, Michael Armbrust wrote:

This is https://issues.apache.org/jira/browse/SPARK-6231

Unfortunately this is pretty hard to fix as its hard for us to
differentiate these without aliases.  However you can add an alias as
follows:

from pyspark.sql.functions import *
df.alias("a").join(df.alias("b"), col("a.col1") == col("b.col1"))

On Tue, Apr 21, 2015 at 8:10 AM, Karlson  wrote:


Sorry, my code actually was

df_one = df.select('col1', 'col2')
df_two = df.select('col1', 'col3')

But in Spark 1.4.0 this does not seem to make any difference anyway 
and

the problem is the same with both versions.



On 2015-04-21 17:04, ayan guha wrote:


your code should be

 df_one = df.select('col1', 'col2')
 df_two = df.select('col1', 'col3')

Your current code is generating a tupple, and of course df_1 and df_2 
are

different, so join is yielding to cartesian.

Best
Ayan

On Wed, Apr 22, 2015 at 12:42 AM, Karlson  
wrote:


 Hi,


can anyone confirm (and if so elaborate on) the following problem?

When I join two DataFrames that originate from the same source 
DataFrame,
the resulting DF will explode to a huge number of rows. A quick 
example:


I load a DataFrame with n rows from disk:

df = sql_context.parquetFile('data.parquet')

Then I create two DataFrames from that source.

df_one = df.select(['col1', 'col2'])
df_two = df.select(['col1', 'col3'])

Finally I want to (inner) join them back together:

df_joined = df_one.join(df_two, df_one['col1'] == 
df_two['col2'],

'inner')

The key in col1 is unique. The resulting DataFrame should have n 
rows,

however it does have n*n rows.

That does not happen, when I load df_one and df_two from disk 
directly. I
am on Spark 1.3.0, but this also happens on the current 1.4.0 
snapshot.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Join on DataFrames from the same source (Pyspark)

2015-04-21 Thread Karlson

Sorry, my code actually was

df_one = df.select('col1', 'col2')
df_two = df.select('col1', 'col3')

But in Spark 1.4.0 this does not seem to make any difference anyway and 
the problem is the same with both versions.



On 2015-04-21 17:04, ayan guha wrote:

your code should be

 df_one = df.select('col1', 'col2')
 df_two = df.select('col1', 'col3')

Your current code is generating a tupple, and of course df_1 and df_2 
are

different, so join is yielding to cartesian.

Best
Ayan

On Wed, Apr 22, 2015 at 12:42 AM, Karlson  wrote:


Hi,

can anyone confirm (and if so elaborate on) the following problem?

When I join two DataFrames that originate from the same source 
DataFrame,
the resulting DF will explode to a huge number of rows. A quick 
example:


I load a DataFrame with n rows from disk:

df = sql_context.parquetFile('data.parquet')

Then I create two DataFrames from that source.

df_one = df.select(['col1', 'col2'])
df_two = df.select(['col1', 'col3'])

Finally I want to (inner) join them back together:

df_joined = df_one.join(df_two, df_one['col1'] == df_two['col2'],
'inner')

The key in col1 is unique. The resulting DataFrame should have n rows,
however it does have n*n rows.

That does not happen, when I load df_one and df_two from disk 
directly. I
am on Spark 1.3.0, but this also happens on the current 1.4.0 
snapshot.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Join on DataFrames from the same source (Pyspark)

2015-04-21 Thread Karlson

Hi,

can anyone confirm (and if so elaborate on) the following problem?

When I join two DataFrames that originate from the same source 
DataFrame, the resulting DF will explode to a huge number of rows. A 
quick example:


I load a DataFrame with n rows from disk:

df = sql_context.parquetFile('data.parquet')

Then I create two DataFrames from that source.

df_one = df.select(['col1', 'col2'])
df_two = df.select(['col1', 'col3'])

Finally I want to (inner) join them back together:

df_joined = df_one.join(df_two, df_one['col1'] == df_two['col2'], 
'inner')


The key in col1 is unique. The resulting DataFrame should have n rows, 
however it does have n*n rows.


That does not happen, when I load df_one and df_two from disk directly. 
I am on Spark 1.3.0, but this also happens on the current 1.4.0 
snapshot.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[pyspark] Starting workers in a virtualenv

2015-04-20 Thread Karlson

Hi all,

I am running the Python process that communicates with Spark in a 
virtualenv. Is there any way I can make sure that the Python processes 
of the workers are also started in a virtualenv? Currently I am getting 
ImportErrors when the worker tries to unpickle stuff that is not 
installed system-wide. For now both the worker and the driver run on the 
same machine in local mode.


Thanks in advance!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Save and read parquet from the same path

2015-03-04 Thread Karlson

Hi all,

what would happen if I save a RDD via saveAsParquetFile to the same path 
that RDD is originally read from? Is that a safe thing to do in Pyspark?


Thanks!


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Shuffle on joining two RDDs

2015-02-13 Thread Karlson
In 
https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38, 
wouldn't it help to change the lines


vs = rdd.map(lambda (k, v): (k, (1, v)))
ws = other.map(lambda (k, v): (k, (2, v)))

to

vs = rdd.mapValues(lambda v: (1, v))
ws = other.mapValues(lambda v: (2, v))

?
As I understand, this would preserve the original partitioning.


On 2015-02-13 12:43, Karlson wrote:

Does that mean partitioning does not work in Python? Or does this only
effect joining?

On 2015-02-12 19:27, Davies Liu wrote:
The feature works as expected in Scala/Java, but not implemented in 
Python.


On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid  
wrote:

I wonder if the issue is that these lines just need to add
preservesPartitioning = true
?

https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38

I am getting the feeling this is an issue w/ pyspark


On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid  
wrote:


ah, sorry I am not too familiar w/ pyspark, sorry I missed that 
part.  It
could be that pyspark doesn't properly support narrow dependencies, 
or maybe
you need to be more explicit about the partitioner.  I am looking 
into the
pyspark api but you might have some better guesses here than I 
thought.


My suggestion to do

joinedRdd.getPartitions.foreach{println}

was just to see if the partition was a NarrowCoGroupSplitDep or a
ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, 
those fields
are hidden deeper inside and are not user-visible.  But I think a 
better way
(in scala, anyway) is to look at rdd.dependencies.  its a little 
tricky,

though, you need to look deep into the lineage (example at the end).

Sean -- yes it does require both RDDs have the same partitioner, but 
that
should happen naturally if you just specify the same number of 
partitions,
you'll get equal HashPartitioners.  There is a little difference in 
the
scala & python api that I missed here.  For partitionBy in scala, 
you
actually need to specify the partitioner, but not in python.  
However I

thought it would work like groupByKey, which does just take an int.


Here's a code example in scala -- not sure what is available from 
python.
Hopefully somebody knows a simpler way to confirm narrow 
dependencies??


val d = sc.parallelize(1 to 1e6.toInt).map{x => x -> 
x}.groupByKey(64)
val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x -> 
x}.groupByKey(64)

scala> d.partitioner == d2.partitioner
res2: Boolean = true
val joined = d.join(d2)
val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x -> 
x}.groupByKey(100)

val badJoined = d.join(d3)

d.setName("d")
d2.setName("d2")
d3.setName("d3")
joined.setName("joined")
badJoined.setName("badJoined")


//unfortunatley, just looking at the immediate dependencies of 
joined &

badJoined is misleading, b/c join actually creates
// one more step after the shuffle
scala> joined.dependencies
res20: Seq[org.apache.spark.Dependency[_]] =
List(org.apache.spark.OneToOneDependency@74751ac8)
//even with the join that does require a shuffle, we still see a
OneToOneDependency, but thats just a simple flatMap step
scala> badJoined.dependencies
res21: Seq[org.apache.spark.Dependency[_]] =
List(org.apache.spark.OneToOneDependency@1cf356cc)






 //so lets make a helper function to get all the dependencies 
recursively


def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = {
  val deps = rdd.dependencies
  deps.map{rdd -> _} ++ deps.flatMap{dep => flattenDeps(dep.rdd)}
}


//full dependencies of the good join

scala> flattenDeps(joined).foreach{println}
(joined FlatMappedValuesRDD[9] at join at
:16,org.apache.spark.OneToOneDependency@74751ac8)
(MappedValuesRDD[8] at join at
:16,org.apache.spark.OneToOneDependency@623264af)
(CoGroupedRDD[7] at join at
:16,org.apache.spark.OneToOneDependency@5a704f86)
(CoGroupedRDD[7] at join at
:16,org.apache.spark.OneToOneDependency@37514cd)
(d ShuffledRDD[3] at groupByKey at
:12,org.apache.spark.ShuffleDependency@7ba8a080)
(MappedRDD[2] at map at
:12,org.apache.spark.OneToOneDependency@7bc172ec)
(d2 ShuffledRDD[6] at groupByKey at
:12,org.apache.spark.ShuffleDependency@5960236d)
(MappedRDD[5] at map at
:12,org.apache.spark.OneToOneDependency@36b5f6f2)



//full dependencies of the bad join -- notice the 
ShuffleDependency!


scala> flattenDeps(badJoined).foreach{println}
(badJoined FlatMappedValuesRDD[15] at join at
:16,org.apache.spark.OneToOneDependency@1cf356cc)
(MappedValuesRDD[14] at join at
:16,org.apache.spark.OneToOneDependency@5dea4db)
(CoGroupedRDD[13] at join at
:16,org.apache.spark.ShuffleDependency@5c1928df)
(CoGroupedRDD[13] at join at
:16,org.apache.spark.OneToOneDependency@77ca77b5)
(d ShuffledRDD[3] at groupByKey at
:12,org.apache.spark.ShuffleDependency@7ba8a080)
(MappedRDD[2] at map at
:12,org.apache.spark.OneToOneDependency@7bc172ec)
(d3 ShuffledRDD[12] at groupByKey at
:12,org.apache.spark.

Re: Shuffle on joining two RDDs

2015-02-13 Thread Karlson
Does that mean partitioning does not work in Python? Or does this only 
effect joining?


On 2015-02-12 19:27, Davies Liu wrote:
The feature works as expected in Scala/Java, but not implemented in 
Python.


On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid  
wrote:

I wonder if the issue is that these lines just need to add
preservesPartitioning = true
?

https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38

I am getting the feeling this is an issue w/ pyspark


On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid  
wrote:


ah, sorry I am not too familiar w/ pyspark, sorry I missed that part. 
 It
could be that pyspark doesn't properly support narrow dependencies, 
or maybe
you need to be more explicit about the partitioner.  I am looking 
into the
pyspark api but you might have some better guesses here than I 
thought.


My suggestion to do

joinedRdd.getPartitions.foreach{println}

was just to see if the partition was a NarrowCoGroupSplitDep or a
ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those 
fields
are hidden deeper inside and are not user-visible.  But I think a 
better way
(in scala, anyway) is to look at rdd.dependencies.  its a little 
tricky,

though, you need to look deep into the lineage (example at the end).

Sean -- yes it does require both RDDs have the same partitioner, but 
that
should happen naturally if you just specify the same number of 
partitions,
you'll get equal HashPartitioners.  There is a little difference in 
the

scala & python api that I missed here.  For partitionBy in scala, you
actually need to specify the partitioner, but not in python.  However 
I

thought it would work like groupByKey, which does just take an int.


Here's a code example in scala -- not sure what is available from 
python.
Hopefully somebody knows a simpler way to confirm narrow 
dependencies??


val d = sc.parallelize(1 to 1e6.toInt).map{x => x -> 
x}.groupByKey(64)
val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x -> 
x}.groupByKey(64)

scala> d.partitioner == d2.partitioner
res2: Boolean = true
val joined = d.join(d2)
val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x -> 
x}.groupByKey(100)

val badJoined = d.join(d3)

d.setName("d")
d2.setName("d2")
d3.setName("d3")
joined.setName("joined")
badJoined.setName("badJoined")


//unfortunatley, just looking at the immediate dependencies of 
joined &

badJoined is misleading, b/c join actually creates
// one more step after the shuffle
scala> joined.dependencies
res20: Seq[org.apache.spark.Dependency[_]] =
List(org.apache.spark.OneToOneDependency@74751ac8)
//even with the join that does require a shuffle, we still see a
OneToOneDependency, but thats just a simple flatMap step
scala> badJoined.dependencies
res21: Seq[org.apache.spark.Dependency[_]] =
List(org.apache.spark.OneToOneDependency@1cf356cc)






 //so lets make a helper function to get all the dependencies 
recursively


def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = {
  val deps = rdd.dependencies
  deps.map{rdd -> _} ++ deps.flatMap{dep => flattenDeps(dep.rdd)}
}


//full dependencies of the good join

scala> flattenDeps(joined).foreach{println}
(joined FlatMappedValuesRDD[9] at join at
:16,org.apache.spark.OneToOneDependency@74751ac8)
(MappedValuesRDD[8] at join at
:16,org.apache.spark.OneToOneDependency@623264af)
(CoGroupedRDD[7] at join at
:16,org.apache.spark.OneToOneDependency@5a704f86)
(CoGroupedRDD[7] at join at
:16,org.apache.spark.OneToOneDependency@37514cd)
(d ShuffledRDD[3] at groupByKey at
:12,org.apache.spark.ShuffleDependency@7ba8a080)
(MappedRDD[2] at map at
:12,org.apache.spark.OneToOneDependency@7bc172ec)
(d2 ShuffledRDD[6] at groupByKey at
:12,org.apache.spark.ShuffleDependency@5960236d)
(MappedRDD[5] at map at
:12,org.apache.spark.OneToOneDependency@36b5f6f2)



//full dependencies of the bad join -- notice the ShuffleDependency!

scala> flattenDeps(badJoined).foreach{println}
(badJoined FlatMappedValuesRDD[15] at join at
:16,org.apache.spark.OneToOneDependency@1cf356cc)
(MappedValuesRDD[14] at join at
:16,org.apache.spark.OneToOneDependency@5dea4db)
(CoGroupedRDD[13] at join at
:16,org.apache.spark.ShuffleDependency@5c1928df)
(CoGroupedRDD[13] at join at
:16,org.apache.spark.OneToOneDependency@77ca77b5)
(d ShuffledRDD[3] at groupByKey at
:12,org.apache.spark.ShuffleDependency@7ba8a080)
(MappedRDD[2] at map at
:12,org.apache.spark.OneToOneDependency@7bc172ec)
(d3 ShuffledRDD[12] at groupByKey at
:12,org.apache.spark.ShuffleDependency@d794984)
(MappedRDD[11] at map at
:12,org.apache.spark.OneToOneDependency@15c98005)




On Thu, Feb 12, 2015 at 10:05 AM, Karlson  
wrote:


Hi Imran,

thanks for your quick reply.

Actually I am doing this:

rddA = rddA.partitionBy(n).cache()
rddB = rddB.partitionBy(n).cache()

followed by

rddA.count()
rddB.count()

then joinedRDD = rddA.join(rddB)

I thought that the 

Re: Shuffle on joining two RDDs

2015-02-12 Thread Karlson

Hi,

I believe that partitionBy will use the same (default) partitioner on 
both RDDs.


On 2015-02-12 17:12, Sean Owen wrote:

Doesn't this require that both RDDs have the same partitioner?

On Thu, Feb 12, 2015 at 3:48 PM, Imran Rashid  
wrote:

Hi Karlson,

I think your assumptions are correct -- that join alone shouldn't 
require

any shuffling.  But its possible you are getting tripped up by lazy
evaluation of RDDs.  After you do your partitionBy, are you sure those 
RDDs
are actually materialized & cached somewhere?  eg., if you just did 
this:


val rddA = someData.partitionBy(N)
val rddB = someOtherData.partitionBy(N)
val joinedRdd = rddA.join(rddB)
joinedRdd.count() //or any other action

then the partitioning isn't actually getting run until you do the 
join.  So
though the join itself can happen without partitioning, 
joinedRdd.count()
will trigger the evaluation of rddA & rddB which will require 
shuffles.

Note that even if you have some intervening action on rddA & rddB that
shuffles them, unless you persist the result, you will need to 
reshuffle

them for the join.

If this doesn't help explain things, for debugging

joinedRdd.getPartitions.foreach{println}

this is getting into the weeds, but at least this will tell us whether 
or
not you are getting narrow dependencies, which would avoid the 
shuffle.

(Does anyone know of a simpler way to check this?)

hope this helps,
Imran




On Thu, Feb 12, 2015 at 9:25 AM, Karlson  wrote:


Hi All,

using Pyspark, I create two RDDs (one with about 2M records (~200MB), 
the

other with about 8M records (~2GB)) of the format (key, value).

I've done a partitionBy(num_partitions) on both RDDs and verified 
that
both RDDs have the same number of partitions and that equal keys 
reside on

the same partition (via mapPartitionsWithIndex).

Now I'd expect that for a join on the two RDDs no shuffling is 
necessary.
Looking at the Web UI under http://driver:4040 however reveals that 
that

assumption is false.

In fact I am seeing shuffle writes of about 200MB and reads of about 
50MB.


What's the explanation for that behaviour? Where am I wrong with my
assumption?

Thanks in advance,

Karlson

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Shuffle on joining two RDDs

2015-02-12 Thread Karlson

Hi Imran,

thanks for your quick reply.

Actually I am doing this:

rddA = rddA.partitionBy(n).cache()
rddB = rddB.partitionBy(n).cache()

followed by

rddA.count()
rddB.count()

then joinedRDD = rddA.join(rddB)

I thought that the count() would force the evaluation, so any subsequent 
joins would be shuffleless. I was wrong about the shuffle amounts 
however. The shuffle write is actually 2GB (i.e. the size of the bigger 
RDD) whil there is no Shuffle read. A joinedRdd.count() does a shuffle 
read of about 1GB in size, though.


The getPartitions-method does not exist on the resulting RDD (I am using 
the Python API). There is however foreachPartition(). What is the line


joinedRdd.getPartitions.foreach{println}

supposed to do?

Thank you,

Karlson

PS: Sorry for sending this twice, I accidentally did not reply to the 
mailing list first.



On 2015-02-12 16:48, Imran Rashid wrote:

Hi Karlson,

I think your assumptions are correct -- that join alone shouldn't 
require

any shuffling.  But its possible you are getting tripped up by lazy
evaluation of RDDs.  After you do your partitionBy, are you sure those 
RDDs
are actually materialized & cached somewhere?  eg., if you just did 
this:


val rddA = someData.partitionBy(N)
val rddB = someOtherData.partitionBy(N)
val joinedRdd = rddA.join(rddB)
joinedRdd.count() //or any other action

then the partitioning isn't actually getting run until you do the join. 
 So
though the join itself can happen without partitioning, 
joinedRdd.count()

will trigger the evaluation of rddA & rddB which will require shuffles.
Note that even if you have some intervening action on rddA & rddB that
shuffles them, unless you persist the result, you will need to 
reshuffle

them for the join.

If this doesn't help explain things, for debugging

joinedRdd.getPartitions.foreach{println}

this is getting into the weeds, but at least this will tell us whether 
or

not you are getting narrow dependencies, which would avoid the shuffle.
 (Does anyone know of a simpler way to check this?)

hope this helps,
Imran




On Thu, Feb 12, 2015 at 9:25 AM, Karlson  wrote:


Hi All,

using Pyspark, I create two RDDs (one with about 2M records (~200MB), 
the

other with about 8M records (~2GB)) of the format (key, value).

I've done a partitionBy(num_partitions) on both RDDs and verified that
both RDDs have the same number of partitions and that equal keys 
reside on

the same partition (via mapPartitionsWithIndex).

Now I'd expect that for a join on the two RDDs no shuffling is 
necessary.
Looking at the Web UI under http://driver:4040 however reveals that 
that

assumption is false.

In fact I am seeing shuffle writes of about 200MB and reads of about 
50MB.


What's the explanation for that behaviour? Where am I wrong with my
assumption?

Thanks in advance,

Karlson

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Shuffle on joining two RDDs

2015-02-12 Thread Karlson

Hi All,

using Pyspark, I create two RDDs (one with about 2M records (~200MB), 
the other with about 8M records (~2GB)) of the format (key, value).


I've done a partitionBy(num_partitions) on both RDDs and verified that 
both RDDs have the same number of partitions and that equal keys reside 
on the same partition (via mapPartitionsWithIndex).


Now I'd expect that for a join on the two RDDs no shuffling is 
necessary. Looking at the Web UI under http://driver:4040 however 
reveals that that assumption is false.


In fact I am seeing shuffle writes of about 200MB and reads of about 
50MB.


What's the explanation for that behaviour? Where am I wrong with my 
assumption?


Thanks in advance,

Karlson

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org