Re: Joined RDD

2015-04-17 Thread Archit Thakur
map phase of join*

On Fri, Apr 17, 2015 at 5:28 PM, Archit Thakur 
wrote:

> Ajay,
>
> This is true. When we call join again on two RDD's.Rather than computing
> the whole pipe again, It reads the map output of the map phase of an
> RDD(which it usually gets from shuffle manager).
>
> If you see the code:
>
>  override def compute(s: Partition, context: TaskContext): Iterator[(K,
> Array[Iterable[_]])] = {
>
> val sparkConf = SparkEnv.get.conf
>
> val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true
> )
>
> for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
>
>   case NarrowCoGroupSplitDep(rdd, _, itsSplit) =>
>
> // Read them from the parent
>
> val it = rdd.iterator(itsSplit, context).asInstanceOf[Iterator[
> Product2[K, Any]]]
>
> rddIterators += ((it, depNum))
>
>
>   case ShuffleCoGroupSplitDep(handle) =>
>
> // Read map outputs of shuffle
>
> val it = SparkEnv.get.shuffleManager
>
>   .getReader(handle, split.index, split.index + 1, context)
>
>   .read()
>
> rddIterators += ((it, depNum))
>
> }
>
> This is CoGroupedRDD.scala, spark-1.3 code.
> If you see the UI, it shows these map stages as skipped.(And, this answers
> your question as well, Hoai-Thu Vong[in different thread about skipped
> stages.]).
>
> Thanks and Regards,
>
> Archit Thakur.
>
>
>
>
> On Thu, Nov 13, 2014 at 3:10 PM, ajay garg  wrote:
>
>> Yes that is my understanding of how it should work.
>> But in my case when I call collect first time, it reads the data from
>> files
>> on the disk.
>> Subsequent collect queries are not reading data files ( Verified from the
>> logs.)
>> On spark ui I see only shuffle read and no shuffle write.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Joined-RDD-tp18820p18829.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Joined RDD

2015-04-17 Thread Archit Thakur
Ajay,

This is true. When we call join again on two RDD's.Rather than computing
the whole pipe again, It reads the map output of the map phase of an
RDD(which it usually gets from shuffle manager).

If you see the code:

 override def compute(s: Partition, context: TaskContext): Iterator[(K,
Array[Iterable[_]])] = {

val sparkConf = SparkEnv.get.conf

val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)

for ((dep, depNum) <- split.deps.zipWithIndex) dep match {

  case NarrowCoGroupSplitDep(rdd, _, itsSplit) =>

// Read them from the parent

val it = rdd.iterator(itsSplit, context).asInstanceOf[Iterator[
Product2[K, Any]]]

rddIterators += ((it, depNum))


  case ShuffleCoGroupSplitDep(handle) =>

// Read map outputs of shuffle

val it = SparkEnv.get.shuffleManager

  .getReader(handle, split.index, split.index + 1, context)

  .read()

rddIterators += ((it, depNum))

}

This is CoGroupedRDD.scala, spark-1.3 code.
If you see the UI, it shows these map stages as skipped.(And, this answers
your question as well, Hoai-Thu Vong[in different thread about skipped
stages.]).

Thanks and Regards,

Archit Thakur.




On Thu, Nov 13, 2014 at 3:10 PM, ajay garg  wrote:

> Yes that is my understanding of how it should work.
> But in my case when I call collect first time, it reads the data from files
> on the disk.
> Subsequent collect queries are not reading data files ( Verified from the
> logs.)
> On spark ui I see only shuffle read and no shuffle write.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Joined-RDD-tp18820p18829.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Joined RDD

2014-11-13 Thread ajay garg
Yes that is my understanding of how it should work.
But in my case when I call collect first time, it reads the data from files
on the disk.
Subsequent collect queries are not reading data files ( Verified from the
logs.)
On spark ui I see only shuffle read and no shuffle write.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Joined-RDD-tp18820p18829.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Joined RDD

2014-11-13 Thread Mayur Rustagi
First of all any action is only performed when you trigger a collect,
When you trigger collect, at that point it retrieves data from disk joins
the datasets together & delivers it to you.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 


On Thu, Nov 13, 2014 at 12:26 PM, ajay garg  wrote:

> Hi,
>  I have two RDDs A and B which are created from reading file from HDFS.
> I have a third RDD C which is created by taking join of A and B. All three
> RDDs (A, B and C ) are not cached.
> Now if I perform any action on C (let say collect), action is served
> without
> reading any data from the disk.
> Since no data is cached in spark how is action on C is served without
> reading data from disk.
>
> Thanks
> --Ajay
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Joined-RDD-tp18820.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Joined RDD

2014-11-12 Thread qinwei






 I think it is because A.join(B) is a shuffle map stage, whose result is stored 
temporarily (i'm not sure it's in memeory or in disk)I saw the word "map 
output" in the log of my spark application, i think it is the intermediate 
result of my application, and according to the log, it is stored


qinwei
 From: ajay gargDate: 2014-11-13 14:56To: userSubject: Joined RDDHi,
 I have two RDDs A and B which are created from reading file from HDFS.
I have a third RDD C which is created by taking join of A and B. All three
RDDs (A, B and C ) are not cached.
Now if I perform any action on C (let say collect), action is served without
reading any data from the disk.
Since no data is cached in spark how is action on C is served without
reading data from disk.
 
Thanks
--Ajay
 
 
 
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Joined-RDD-tp18820.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org