Re: Spark stages very slow to complete
Hi, the code is some hundreds lines of Python. I can try to compose a minimal example as soon as I find the time, though. Any ideas until then? Would you mind posting the code? On 2 Jun 2015 00:53, "Karlson" wrote: Hi, In all (pyspark) Spark jobs, that become somewhat more involved, I am experiencing the issue that some stages take a very long time to complete and sometimes don't at all. This clearly correlates with the size of my input data. Looking at the stage details for one such stage, I am wondering where Spark spends all this time. Take this table of the stages task metrics for example: Metric Min 25th percentile Median 75th percentile Max Duration1.4 min 1.5 min 1.7 min 1.9 min 2.3 min Scheduler Delay 1 ms3 ms4 ms 5 ms23 ms Task Deserialization Time 1 ms2 ms3 ms 8 ms22 ms GC Time 0 ms0 ms0 ms 0 ms0 ms Result Serialization Time 0 ms0 ms0 ms 0 ms1 ms Getting Result Time 0 ms0 ms0 ms 0 ms0 ms Input Size / Records23.9 KB / 1 24.0 KB / 1 24.1 KB / 1 24.1 KB / 1 24.3 KB / 1 Why is the overall duration almost 2min? Where is all this time spent, when no progress of the stages is visible? The progress bar simply displays 0 succeeded tasks for a very long time before sometimes slowly progressing. Also, the name of the stage displayed above is `javaToPython at null:-1`, which I find very uninformative. I don't even know which action exactly is responsible for this stage. Does anyone experience similar issues or have any advice for me? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark stages very slow to complete
Hi, In all (pyspark) Spark jobs, that become somewhat more involved, I am experiencing the issue that some stages take a very long time to complete and sometimes don't at all. This clearly correlates with the size of my input data. Looking at the stage details for one such stage, I am wondering where Spark spends all this time. Take this table of the stages task metrics for example: Metric Min 25thpercentile Median 75th percentile Max Duration1.4 min 1.5 min 1.7 min 1.9 min 2.3 min Scheduler Delay 1 ms3 ms4 ms 5 ms23 ms Task Deserialization Time 1 ms2 ms3 ms 8 ms22 ms GC Time 0 ms0 ms0 ms 0 ms0 ms Result Serialization Time 0 ms0 ms0 ms 0 ms1 ms Getting Result Time 0 ms0 ms0 ms 0 ms0 ms Input Size / Records 23.9 KB / 1 24.0 KB / 1 24.1 KB / 1 24.1 KB / 1 24.3 KB / 1 Why is the overall duration almost 2min? Where is all this time spent, when no progress of the stages is visible? The progress bar simply displays 0 succeeded tasks for a very long time before sometimes slowly progressing. Also, the name of the stage displayed above is `javaToPython at null:-1`, which I find very uninformative. I don't even know which action exactly is responsible for this stage. Does anyone experience similar issues or have any advice for me? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Partitioning of Dataframes
Alright, that doesn't seem to have made it into the Python API yet. On 2015-05-22 15:12, Silvio Fiorito wrote: This is added to 1.4.0 https://github.com/apache/spark/pull/5762 On 5/22/15, 8:48 AM, "Karlson" wrote: Hi, wouldn't df.rdd.partitionBy() return a new RDD that I would then need to make into a Dataframe again? Maybe like this: df.rdd.partitionBy(...).toDF(schema=df.schema). That looks a bit weird to me, though, and I'm not sure if the DF will be aware of its partitioning. On 2015-05-22 12:55, ayan guha wrote: DataFrame is an abstraction of rdd. So you should be able to do df.rdd.partitioyBy. however as far as I know, equijoines already optimizes partitioning. You may want to look explain plans more carefully and materialise interim joins. On 22 May 2015 19:03, "Karlson" wrote: Hi, is there any way to control how Dataframes are partitioned? I'm doing lots of joins and am seeing very large shuffle reads and writes in the Spark UI. With PairRDDs you can control how the data is partitioned across nodes with partitionBy. There is no such method on Dataframes however. Can I somehow partition the underlying the RDD manually? I am currently using the Python API. Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Partitioning of Dataframes
Hi, wouldn't df.rdd.partitionBy() return a new RDD that I would then need to make into a Dataframe again? Maybe like this: df.rdd.partitionBy(...).toDF(schema=df.schema). That looks a bit weird to me, though, and I'm not sure if the DF will be aware of its partitioning. On 2015-05-22 12:55, ayan guha wrote: DataFrame is an abstraction of rdd. So you should be able to do df.rdd.partitioyBy. however as far as I know, equijoines already optimizes partitioning. You may want to look explain plans more carefully and materialise interim joins. On 22 May 2015 19:03, "Karlson" wrote: Hi, is there any way to control how Dataframes are partitioned? I'm doing lots of joins and am seeing very large shuffle reads and writes in the Spark UI. With PairRDDs you can control how the data is partitioned across nodes with partitionBy. There is no such method on Dataframes however. Can I somehow partition the underlying the RDD manually? I am currently using the Python API. Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Partitioning of Dataframes
Hi, is there any way to control how Dataframes are partitioned? I'm doing lots of joins and am seeing very large shuffle reads and writes in the Spark UI. With PairRDDs you can control how the data is partitioned across nodes with partitionBy. There is no such method on Dataframes however. Can I somehow partition the underlying the RDD manually? I am currently using the Python API. Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: [pyspark] Starting workers in a virtualenv
That works, thank you! On 2015-05-22 03:15, Davies Liu wrote: Could you try with specify PYSPARK_PYTHON to the path of python in your virtual env, for example PYSPARK_PYTHON=/path/to/env/bin/python bin/spark-submit xx.py On Mon, Apr 20, 2015 at 12:51 AM, Karlson wrote: Hi all, I am running the Python process that communicates with Spark in a virtualenv. Is there any way I can make sure that the Python processes of the workers are also started in a virtualenv? Currently I am getting ImportErrors when the worker tries to unpickle stuff that is not installed system-wide. For now both the worker and the driver run on the same machine in local mode. Thanks in advance! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Join on DataFrames from the same source (Pyspark)
DataFrames do not have the attributes 'alias' or 'as' in the Python API. On 2015-04-21 20:41, Michael Armbrust wrote: This is https://issues.apache.org/jira/browse/SPARK-6231 Unfortunately this is pretty hard to fix as its hard for us to differentiate these without aliases. However you can add an alias as follows: from pyspark.sql.functions import * df.alias("a").join(df.alias("b"), col("a.col1") == col("b.col1")) On Tue, Apr 21, 2015 at 8:10 AM, Karlson wrote: Sorry, my code actually was df_one = df.select('col1', 'col2') df_two = df.select('col1', 'col3') But in Spark 1.4.0 this does not seem to make any difference anyway and the problem is the same with both versions. On 2015-04-21 17:04, ayan guha wrote: your code should be df_one = df.select('col1', 'col2') df_two = df.select('col1', 'col3') Your current code is generating a tupple, and of course df_1 and df_2 are different, so join is yielding to cartesian. Best Ayan On Wed, Apr 22, 2015 at 12:42 AM, Karlson wrote: Hi, can anyone confirm (and if so elaborate on) the following problem? When I join two DataFrames that originate from the same source DataFrame, the resulting DF will explode to a huge number of rows. A quick example: I load a DataFrame with n rows from disk: df = sql_context.parquetFile('data.parquet') Then I create two DataFrames from that source. df_one = df.select(['col1', 'col2']) df_two = df.select(['col1', 'col3']) Finally I want to (inner) join them back together: df_joined = df_one.join(df_two, df_one['col1'] == df_two['col2'], 'inner') The key in col1 is unique. The resulting DataFrame should have n rows, however it does have n*n rows. That does not happen, when I load df_one and df_two from disk directly. I am on Spark 1.3.0, but this also happens on the current 1.4.0 snapshot. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Join on DataFrames from the same source (Pyspark)
Sorry, my code actually was df_one = df.select('col1', 'col2') df_two = df.select('col1', 'col3') But in Spark 1.4.0 this does not seem to make any difference anyway and the problem is the same with both versions. On 2015-04-21 17:04, ayan guha wrote: your code should be df_one = df.select('col1', 'col2') df_two = df.select('col1', 'col3') Your current code is generating a tupple, and of course df_1 and df_2 are different, so join is yielding to cartesian. Best Ayan On Wed, Apr 22, 2015 at 12:42 AM, Karlson wrote: Hi, can anyone confirm (and if so elaborate on) the following problem? When I join two DataFrames that originate from the same source DataFrame, the resulting DF will explode to a huge number of rows. A quick example: I load a DataFrame with n rows from disk: df = sql_context.parquetFile('data.parquet') Then I create two DataFrames from that source. df_one = df.select(['col1', 'col2']) df_two = df.select(['col1', 'col3']) Finally I want to (inner) join them back together: df_joined = df_one.join(df_two, df_one['col1'] == df_two['col2'], 'inner') The key in col1 is unique. The resulting DataFrame should have n rows, however it does have n*n rows. That does not happen, when I load df_one and df_two from disk directly. I am on Spark 1.3.0, but this also happens on the current 1.4.0 snapshot. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Join on DataFrames from the same source (Pyspark)
Hi, can anyone confirm (and if so elaborate on) the following problem? When I join two DataFrames that originate from the same source DataFrame, the resulting DF will explode to a huge number of rows. A quick example: I load a DataFrame with n rows from disk: df = sql_context.parquetFile('data.parquet') Then I create two DataFrames from that source. df_one = df.select(['col1', 'col2']) df_two = df.select(['col1', 'col3']) Finally I want to (inner) join them back together: df_joined = df_one.join(df_two, df_one['col1'] == df_two['col2'], 'inner') The key in col1 is unique. The resulting DataFrame should have n rows, however it does have n*n rows. That does not happen, when I load df_one and df_two from disk directly. I am on Spark 1.3.0, but this also happens on the current 1.4.0 snapshot. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
[pyspark] Starting workers in a virtualenv
Hi all, I am running the Python process that communicates with Spark in a virtualenv. Is there any way I can make sure that the Python processes of the workers are also started in a virtualenv? Currently I am getting ImportErrors when the worker tries to unpickle stuff that is not installed system-wide. For now both the worker and the driver run on the same machine in local mode. Thanks in advance! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Save and read parquet from the same path
Hi all, what would happen if I save a RDD via saveAsParquetFile to the same path that RDD is originally read from? Is that a safe thing to do in Pyspark? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle on joining two RDDs
In https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38, wouldn't it help to change the lines vs = rdd.map(lambda (k, v): (k, (1, v))) ws = other.map(lambda (k, v): (k, (2, v))) to vs = rdd.mapValues(lambda v: (1, v)) ws = other.mapValues(lambda v: (2, v)) ? As I understand, this would preserve the original partitioning. On 2015-02-13 12:43, Karlson wrote: Does that mean partitioning does not work in Python? Or does this only effect joining? On 2015-02-12 19:27, Davies Liu wrote: The feature works as expected in Scala/Java, but not implemented in Python. On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid wrote: I wonder if the issue is that these lines just need to add preservesPartitioning = true ? https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38 I am getting the feeling this is an issue w/ pyspark On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid wrote: ah, sorry I am not too familiar w/ pyspark, sorry I missed that part. It could be that pyspark doesn't properly support narrow dependencies, or maybe you need to be more explicit about the partitioner. I am looking into the pyspark api but you might have some better guesses here than I thought. My suggestion to do joinedRdd.getPartitions.foreach{println} was just to see if the partition was a NarrowCoGroupSplitDep or a ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those fields are hidden deeper inside and are not user-visible. But I think a better way (in scala, anyway) is to look at rdd.dependencies. its a little tricky, though, you need to look deep into the lineage (example at the end). Sean -- yes it does require both RDDs have the same partitioner, but that should happen naturally if you just specify the same number of partitions, you'll get equal HashPartitioners. There is a little difference in the scala & python api that I missed here. For partitionBy in scala, you actually need to specify the partitioner, but not in python. However I thought it would work like groupByKey, which does just take an int. Here's a code example in scala -- not sure what is available from python. Hopefully somebody knows a simpler way to confirm narrow dependencies?? val d = sc.parallelize(1 to 1e6.toInt).map{x => x -> x}.groupByKey(64) val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(64) scala> d.partitioner == d2.partitioner res2: Boolean = true val joined = d.join(d2) val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(100) val badJoined = d.join(d3) d.setName("d") d2.setName("d2") d3.setName("d3") joined.setName("joined") badJoined.setName("badJoined") //unfortunatley, just looking at the immediate dependencies of joined & badJoined is misleading, b/c join actually creates // one more step after the shuffle scala> joined.dependencies res20: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@74751ac8) //even with the join that does require a shuffle, we still see a OneToOneDependency, but thats just a simple flatMap step scala> badJoined.dependencies res21: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@1cf356cc) //so lets make a helper function to get all the dependencies recursively def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = { val deps = rdd.dependencies deps.map{rdd -> _} ++ deps.flatMap{dep => flattenDeps(dep.rdd)} } //full dependencies of the good join scala> flattenDeps(joined).foreach{println} (joined FlatMappedValuesRDD[9] at join at :16,org.apache.spark.OneToOneDependency@74751ac8) (MappedValuesRDD[8] at join at :16,org.apache.spark.OneToOneDependency@623264af) (CoGroupedRDD[7] at join at :16,org.apache.spark.OneToOneDependency@5a704f86) (CoGroupedRDD[7] at join at :16,org.apache.spark.OneToOneDependency@37514cd) (d ShuffledRDD[3] at groupByKey at :12,org.apache.spark.ShuffleDependency@7ba8a080) (MappedRDD[2] at map at :12,org.apache.spark.OneToOneDependency@7bc172ec) (d2 ShuffledRDD[6] at groupByKey at :12,org.apache.spark.ShuffleDependency@5960236d) (MappedRDD[5] at map at :12,org.apache.spark.OneToOneDependency@36b5f6f2) //full dependencies of the bad join -- notice the ShuffleDependency! scala> flattenDeps(badJoined).foreach{println} (badJoined FlatMappedValuesRDD[15] at join at :16,org.apache.spark.OneToOneDependency@1cf356cc) (MappedValuesRDD[14] at join at :16,org.apache.spark.OneToOneDependency@5dea4db) (CoGroupedRDD[13] at join at :16,org.apache.spark.ShuffleDependency@5c1928df) (CoGroupedRDD[13] at join at :16,org.apache.spark.OneToOneDependency@77ca77b5) (d ShuffledRDD[3] at groupByKey at :12,org.apache.spark.ShuffleDependency@7ba8a080) (MappedRDD[2] at map at :12,org.apache.spark.OneToOneDependency@7bc172ec) (d3 ShuffledRDD[12] at groupByKey at :12,org.apache.spark.
Re: Shuffle on joining two RDDs
Does that mean partitioning does not work in Python? Or does this only effect joining? On 2015-02-12 19:27, Davies Liu wrote: The feature works as expected in Scala/Java, but not implemented in Python. On Thu, Feb 12, 2015 at 9:24 AM, Imran Rashid wrote: I wonder if the issue is that these lines just need to add preservesPartitioning = true ? https://github.com/apache/spark/blob/master/python/pyspark/join.py#L38 I am getting the feeling this is an issue w/ pyspark On Thu, Feb 12, 2015 at 10:43 AM, Imran Rashid wrote: ah, sorry I am not too familiar w/ pyspark, sorry I missed that part. It could be that pyspark doesn't properly support narrow dependencies, or maybe you need to be more explicit about the partitioner. I am looking into the pyspark api but you might have some better guesses here than I thought. My suggestion to do joinedRdd.getPartitions.foreach{println} was just to see if the partition was a NarrowCoGroupSplitDep or a ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those fields are hidden deeper inside and are not user-visible. But I think a better way (in scala, anyway) is to look at rdd.dependencies. its a little tricky, though, you need to look deep into the lineage (example at the end). Sean -- yes it does require both RDDs have the same partitioner, but that should happen naturally if you just specify the same number of partitions, you'll get equal HashPartitioners. There is a little difference in the scala & python api that I missed here. For partitionBy in scala, you actually need to specify the partitioner, but not in python. However I thought it would work like groupByKey, which does just take an int. Here's a code example in scala -- not sure what is available from python. Hopefully somebody knows a simpler way to confirm narrow dependencies?? val d = sc.parallelize(1 to 1e6.toInt).map{x => x -> x}.groupByKey(64) val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(64) scala> d.partitioner == d2.partitioner res2: Boolean = true val joined = d.join(d2) val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(100) val badJoined = d.join(d3) d.setName("d") d2.setName("d2") d3.setName("d3") joined.setName("joined") badJoined.setName("badJoined") //unfortunatley, just looking at the immediate dependencies of joined & badJoined is misleading, b/c join actually creates // one more step after the shuffle scala> joined.dependencies res20: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@74751ac8) //even with the join that does require a shuffle, we still see a OneToOneDependency, but thats just a simple flatMap step scala> badJoined.dependencies res21: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.OneToOneDependency@1cf356cc) //so lets make a helper function to get all the dependencies recursively def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = { val deps = rdd.dependencies deps.map{rdd -> _} ++ deps.flatMap{dep => flattenDeps(dep.rdd)} } //full dependencies of the good join scala> flattenDeps(joined).foreach{println} (joined FlatMappedValuesRDD[9] at join at :16,org.apache.spark.OneToOneDependency@74751ac8) (MappedValuesRDD[8] at join at :16,org.apache.spark.OneToOneDependency@623264af) (CoGroupedRDD[7] at join at :16,org.apache.spark.OneToOneDependency@5a704f86) (CoGroupedRDD[7] at join at :16,org.apache.spark.OneToOneDependency@37514cd) (d ShuffledRDD[3] at groupByKey at :12,org.apache.spark.ShuffleDependency@7ba8a080) (MappedRDD[2] at map at :12,org.apache.spark.OneToOneDependency@7bc172ec) (d2 ShuffledRDD[6] at groupByKey at :12,org.apache.spark.ShuffleDependency@5960236d) (MappedRDD[5] at map at :12,org.apache.spark.OneToOneDependency@36b5f6f2) //full dependencies of the bad join -- notice the ShuffleDependency! scala> flattenDeps(badJoined).foreach{println} (badJoined FlatMappedValuesRDD[15] at join at :16,org.apache.spark.OneToOneDependency@1cf356cc) (MappedValuesRDD[14] at join at :16,org.apache.spark.OneToOneDependency@5dea4db) (CoGroupedRDD[13] at join at :16,org.apache.spark.ShuffleDependency@5c1928df) (CoGroupedRDD[13] at join at :16,org.apache.spark.OneToOneDependency@77ca77b5) (d ShuffledRDD[3] at groupByKey at :12,org.apache.spark.ShuffleDependency@7ba8a080) (MappedRDD[2] at map at :12,org.apache.spark.OneToOneDependency@7bc172ec) (d3 ShuffledRDD[12] at groupByKey at :12,org.apache.spark.ShuffleDependency@d794984) (MappedRDD[11] at map at :12,org.apache.spark.OneToOneDependency@15c98005) On Thu, Feb 12, 2015 at 10:05 AM, Karlson wrote: Hi Imran, thanks for your quick reply. Actually I am doing this: rddA = rddA.partitionBy(n).cache() rddB = rddB.partitionBy(n).cache() followed by rddA.count() rddB.count() then joinedRDD = rddA.join(rddB) I thought that the
Re: Shuffle on joining two RDDs
Hi, I believe that partitionBy will use the same (default) partitioner on both RDDs. On 2015-02-12 17:12, Sean Owen wrote: Doesn't this require that both RDDs have the same partitioner? On Thu, Feb 12, 2015 at 3:48 PM, Imran Rashid wrote: Hi Karlson, I think your assumptions are correct -- that join alone shouldn't require any shuffling. But its possible you are getting tripped up by lazy evaluation of RDDs. After you do your partitionBy, are you sure those RDDs are actually materialized & cached somewhere? eg., if you just did this: val rddA = someData.partitionBy(N) val rddB = someOtherData.partitionBy(N) val joinedRdd = rddA.join(rddB) joinedRdd.count() //or any other action then the partitioning isn't actually getting run until you do the join. So though the join itself can happen without partitioning, joinedRdd.count() will trigger the evaluation of rddA & rddB which will require shuffles. Note that even if you have some intervening action on rddA & rddB that shuffles them, unless you persist the result, you will need to reshuffle them for the join. If this doesn't help explain things, for debugging joinedRdd.getPartitions.foreach{println} this is getting into the weeds, but at least this will tell us whether or not you are getting narrow dependencies, which would avoid the shuffle. (Does anyone know of a simpler way to check this?) hope this helps, Imran On Thu, Feb 12, 2015 at 9:25 AM, Karlson wrote: Hi All, using Pyspark, I create two RDDs (one with about 2M records (~200MB), the other with about 8M records (~2GB)) of the format (key, value). I've done a partitionBy(num_partitions) on both RDDs and verified that both RDDs have the same number of partitions and that equal keys reside on the same partition (via mapPartitionsWithIndex). Now I'd expect that for a join on the two RDDs no shuffling is necessary. Looking at the Web UI under http://driver:4040 however reveals that that assumption is false. In fact I am seeing shuffle writes of about 200MB and reads of about 50MB. What's the explanation for that behaviour? Where am I wrong with my assumption? Thanks in advance, Karlson - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Shuffle on joining two RDDs
Hi Imran, thanks for your quick reply. Actually I am doing this: rddA = rddA.partitionBy(n).cache() rddB = rddB.partitionBy(n).cache() followed by rddA.count() rddB.count() then joinedRDD = rddA.join(rddB) I thought that the count() would force the evaluation, so any subsequent joins would be shuffleless. I was wrong about the shuffle amounts however. The shuffle write is actually 2GB (i.e. the size of the bigger RDD) whil there is no Shuffle read. A joinedRdd.count() does a shuffle read of about 1GB in size, though. The getPartitions-method does not exist on the resulting RDD (I am using the Python API). There is however foreachPartition(). What is the line joinedRdd.getPartitions.foreach{println} supposed to do? Thank you, Karlson PS: Sorry for sending this twice, I accidentally did not reply to the mailing list first. On 2015-02-12 16:48, Imran Rashid wrote: Hi Karlson, I think your assumptions are correct -- that join alone shouldn't require any shuffling. But its possible you are getting tripped up by lazy evaluation of RDDs. After you do your partitionBy, are you sure those RDDs are actually materialized & cached somewhere? eg., if you just did this: val rddA = someData.partitionBy(N) val rddB = someOtherData.partitionBy(N) val joinedRdd = rddA.join(rddB) joinedRdd.count() //or any other action then the partitioning isn't actually getting run until you do the join. So though the join itself can happen without partitioning, joinedRdd.count() will trigger the evaluation of rddA & rddB which will require shuffles. Note that even if you have some intervening action on rddA & rddB that shuffles them, unless you persist the result, you will need to reshuffle them for the join. If this doesn't help explain things, for debugging joinedRdd.getPartitions.foreach{println} this is getting into the weeds, but at least this will tell us whether or not you are getting narrow dependencies, which would avoid the shuffle. (Does anyone know of a simpler way to check this?) hope this helps, Imran On Thu, Feb 12, 2015 at 9:25 AM, Karlson wrote: Hi All, using Pyspark, I create two RDDs (one with about 2M records (~200MB), the other with about 8M records (~2GB)) of the format (key, value). I've done a partitionBy(num_partitions) on both RDDs and verified that both RDDs have the same number of partitions and that equal keys reside on the same partition (via mapPartitionsWithIndex). Now I'd expect that for a join on the two RDDs no shuffling is necessary. Looking at the Web UI under http://driver:4040 however reveals that that assumption is false. In fact I am seeing shuffle writes of about 200MB and reads of about 50MB. What's the explanation for that behaviour? Where am I wrong with my assumption? Thanks in advance, Karlson - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Shuffle on joining two RDDs
Hi All, using Pyspark, I create two RDDs (one with about 2M records (~200MB), the other with about 8M records (~2GB)) of the format (key, value). I've done a partitionBy(num_partitions) on both RDDs and verified that both RDDs have the same number of partitions and that equal keys reside on the same partition (via mapPartitionsWithIndex). Now I'd expect that for a join on the two RDDs no shuffling is necessary. Looking at the Web UI under http://driver:4040 however reveals that that assumption is false. In fact I am seeing shuffle writes of about 200MB and reads of about 50MB. What's the explanation for that behaviour? Where am I wrong with my assumption? Thanks in advance, Karlson - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org