Re: Understanding shuffle file name conflicts

2015-03-25 Thread Saisai Shao
Yes as Josh said, when application is started, Spark will create a unique
application-wide folder for related temporary files. And jobs in this
application will have a unique shuffle id with unique file names, so
shuffle stages within app will not meet name conflicts.

Also shuffle files between applications are separated by application
folder, so the name conflicts cannot be happened.

Maybe you changed some parts of the code while do the patch.

Thanks
Jerry


2015-03-25 14:22 GMT+08:00 Josh Rosen rosenvi...@gmail.com:

 Which version of Spark are you using?  What do you mean when you say that
 you used a hardcoded location for shuffle files?

 If you look at the current DiskBlockManager code, it looks like it will
 create a per-application subdirectory in each of the local root directories.

 Here's the call to create a subdirectory in each root dir:
 https://github.com/apache/spark/blob/c5cc41468e8709d09c09289bb55bc8edc99404b1/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala#L126

 This call to Utils.createDirectory() should result in a fresh subdirectory
 being created for just this application (note the use of random UUIDs, plus
 the check to ensure that the directory doesn't already exist):

 https://github.com/apache/spark/blob/c5cc41468e8709d09c09289bb55bc8edc99404b1/core/src/main/scala/org/apache/spark/util/Utils.scala#L273

 So, although the filenames for shuffle files are not globally unique,
 their full paths should be unique due to these unique per-application
 subdirectories.  Have you observed an instance where this isn't the case?

 - Josh

 On Tue, Mar 24, 2015 at 11:04 PM, Kannan Rajah kra...@maprtech.com
 wrote:

 Saisai,
 This is the not the case when I use spark-submit to run 2 jobs, one after
 another. The shuffle id remains the same.


 --
 Kannan

 On Tue, Mar 24, 2015 at 7:35 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

  Hi Kannan,
 
  As I know the shuffle Id in ShuffleDependency will be increased, so even
  if you run the same job twice, the shuffle dependency as well as
 shuffle id
  is different, so the shuffle file name which is combined by
  (shuffleId+mapId+reduceId) will be changed, so there's no name conflict
  even in the same directory as I know.
 
  Thanks
  Jerry
 
 
  2015-03-25 1:56 GMT+08:00 Kannan Rajah kra...@maprtech.com:
 
  I am working on SPARK-1529. I ran into an issue with my change, where
 the
  same shuffle file was being reused across 2 jobs. Please note this only
  happens when I use a hard coded location to use for shuffle files, say
  /tmp. It does not happen with normal code path that uses
  DiskBlockManager
  to pick different directories for each run. So I want to understand how
  DiskBlockManager guarantees that such a conflict will never happen.
 
  Let's say the shuffle block id has a value of shuffle_0_0_0. So the
 data
  file name is shuffle_0_0_0.data and index file name is
  shuffle_0_0_0.index.
  If I run a spark job twice, one after another, these files get created
  under different directories because of the hashing logic in
  DiskBlockManager. But the hash is based off the file name, so how are
 we
  sure that there won't be a conflict ever?
 
  --
  Kannan
 
 
 





Re: Understanding shuffle file name conflicts

2015-03-25 Thread Saisai Shao
DIskBlockManager doesn't need to know the app id, all it need to do is to
create a folder with a unique name (UUID based) and then put all the
shuffle files into it.

you can see the code in DiskBlockManager as below, it will create a bunch
unique folders when initialized, these folders are app specific

private[spark] val localDirs: Array[File] = createLocalDirs(conf)

UUID is for creating an app specific folder. and shuffle file hashed by
shuffle block id, which is deterministic by using getFile as you mentioned.



2015-03-25 15:03 GMT+08:00 Kannan Rajah kra...@maprtech.com:

 Josh  Saisai,
 When I say I am using a hardcoded location for shuffle files, I mean that
 I am not using DiskBlockManager.getFile API because that uses the
 directories created locally on the node. But for my use case, I need to
 look at creating those shuffle files on HDFS.

 I will take a closer look at this. But I have a couple of questions. From
 what I understand, DiskBlockManager code does not know about any
 application ID. It seems to pick up the top root temp dir location from
 SparkConf and then creates a bunch of sub dir under it. When a shuffle file
 needs to be created using getFile API, it hashes it to one of the existing
 dir. At this point, I don't see any app specific directory. Can you point
 out what I am missing here? The getFile API does not involve the random
 UUIDs. The random UUID generation happens inside createTempShuffleBlock and
 that is invoke only from ExternalSorter. On the other hand,
 DiskBlockManager.getFile is used to create the shuffle index and data file.


 --
 Kannan

 On Tue, Mar 24, 2015 at 11:56 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

 Yes as Josh said, when application is started, Spark will create a unique
 application-wide folder for related temporary files. And jobs in this
 application will have a unique shuffle id with unique file names, so
 shuffle stages within app will not meet name conflicts.

 Also shuffle files between applications are separated by application
 folder, so the name conflicts cannot be happened.

 Maybe you changed some parts of the code while do the patch.

 Thanks
 Jerry


 2015-03-25 14:22 GMT+08:00 Josh Rosen rosenvi...@gmail.com:

 Which version of Spark are you using?  What do you mean when you say
 that you used a hardcoded location for shuffle files?

 If you look at the current DiskBlockManager code, it looks like it will
 create a per-application subdirectory in each of the local root directories.

 Here's the call to create a subdirectory in each root dir:
 https://github.com/apache/spark/blob/c5cc41468e8709d09c09289bb55bc8edc99404b1/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala#L126

 This call to Utils.createDirectory() should result in a fresh
 subdirectory being created for just this application (note the use of
 random UUIDs, plus the check to ensure that the directory doesn't already
 exist):

 https://github.com/apache/spark/blob/c5cc41468e8709d09c09289bb55bc8edc99404b1/core/src/main/scala/org/apache/spark/util/Utils.scala#L273

 So, although the filenames for shuffle files are not globally unique,
 their full paths should be unique due to these unique per-application
 subdirectories.  Have you observed an instance where this isn't the case?

 - Josh

 On Tue, Mar 24, 2015 at 11:04 PM, Kannan Rajah kra...@maprtech.com
 wrote:

 Saisai,
 This is the not the case when I use spark-submit to run 2 jobs, one
 after
 another. The shuffle id remains the same.


 --
 Kannan

 On Tue, Mar 24, 2015 at 7:35 PM, Saisai Shao sai.sai.s...@gmail.com
 wrote:

  Hi Kannan,
 
  As I know the shuffle Id in ShuffleDependency will be increased, so
 even
  if you run the same job twice, the shuffle dependency as well as
 shuffle id
  is different, so the shuffle file name which is combined by
  (shuffleId+mapId+reduceId) will be changed, so there's no name
 conflict
  even in the same directory as I know.
 
  Thanks
  Jerry
 
 
  2015-03-25 1:56 GMT+08:00 Kannan Rajah kra...@maprtech.com:
 
  I am working on SPARK-1529. I ran into an issue with my change,
 where the
  same shuffle file was being reused across 2 jobs. Please note this
 only
  happens when I use a hard coded location to use for shuffle files,
 say
  /tmp. It does not happen with normal code path that uses
  DiskBlockManager
  to pick different directories for each run. So I want to understand
 how
  DiskBlockManager guarantees that such a conflict will never happen.
 
  Let's say the shuffle block id has a value of shuffle_0_0_0. So the
 data
  file name is shuffle_0_0_0.data and index file name is
  shuffle_0_0_0.index.
  If I run a spark job twice, one after another, these files get
 created
  under different directories because of the hashing logic in
  DiskBlockManager. But the hash is based off the file name, so how
 are we
  sure that there won't be a conflict ever?
 
  --
  Kannan
 
 
 







RE: Understanding shuffle file name conflicts

2015-03-25 Thread Shao, Saisai
Hi Cheng,

I think your scenario is acceptable for Spark's shuffle mechanism and will not 
occur shuffle file name conflicts. 

From my understanding I think the code snippet you mentioned is the same RDD 
graph, just running twice, these two jobs will generate 3 stages, map stage and 
collect stage for the first job, only collect stage for the second job (map 
stage is the same as previous job). So these two jobs will only generate one 
copy of shuffle files in the first job, and fetch the shuffle data twice for 
each job. So name conflicts will not be occurred, since these two jobs rely on 
the same ShuffledRDD. 

I think only shuffle write which generates shuffle files will have chance to 
meet name conflicts, multiple times of shuffle read is acceptable as the code 
snippet shows.

Thanks
Jerry



-Original Message-
From: Cheng Lian [mailto:lian.cs@gmail.com] 
Sent: Wednesday, March 25, 2015 7:40 PM
To: Saisai Shao; Kannan Rajah
Cc: dev@spark.apache.org
Subject: Re: Understanding shuffle file name conflicts

Hi Jerry  Josh

It has been a while since the last time I looked into Spark core shuffle code, 
maybe I’m wrong here. But the shuffle ID is created along with 
ShuffleDependency, which is part of the RDD DAG. So if we submit multiple jobs 
over the same RDD DAG, I think the shuffle IDs in these jobs should duplicate. 
For example:

|val  dag  =  sc.parallelize(Array(1,2,3)).map(i = i - 
|i).reduceByKey(_ + _)
dag.collect()
dag.collect()
|

 From the debug log output, I did see duplicated shuffle IDs in both jobs. 
Something like this:

|# Job 1
15/03/25 19:26:34 DEBUG BlockStoreShuffleFetcher: Fetching outputs for shuffle 
0, reduce 2

# Job 2
15/03/25 19:26:36 DEBUG BlockStoreShuffleFetcher: Fetching outputs for shuffle 
0, reduce 5
|

So it’s also possible that some shuffle output files get reused in different 
jobs. But Kannan, did you submit separate jobs over the same RDD DAG as I did 
above? If not, I’d agree with Jerry and Josh.

(Did I miss something here?)

Cheng

On 3/25/15 10:35 AM, Saisai Shao wrote:

 Hi Kannan,

 As I know the shuffle Id in ShuffleDependency will be increased, so 
 even if you run the same job twice, the shuffle dependency as well as 
 shuffle id is different, so the shuffle file name which is combined by
 (shuffleId+mapId+reduceId) will be changed, so there's no name 
 conflict even in the same directory as I know.

 Thanks
 Jerry


 2015-03-25 1:56 GMT+08:00 Kannan Rajah kra...@maprtech.com:

 I am working on SPARK-1529. I ran into an issue with my change, where 
 the same shuffle file was being reused across 2 jobs. Please note 
 this only happens when I use a hard coded location to use for shuffle 
 files, say /tmp. It does not happen with normal code path that uses 
 DiskBlockManager to pick different directories for each run. So I 
 want to understand how DiskBlockManager guarantees that such a conflict will 
 never happen.

 Let's say the shuffle block id has a value of shuffle_0_0_0. So the 
 data file name is shuffle_0_0_0.data and index file name is 
 shuffle_0_0_0.index.
 If I run a spark job twice, one after another, these files get 
 created under different directories because of the hashing logic in 
 DiskBlockManager. But the hash is based off the file name, so how are 
 we sure that there won't be a conflict ever?

 --
 Kannan

​

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Understanding shuffle file name conflicts

2015-03-25 Thread Cheng Lian
Ah, I see where I'm wrong here. What are reused here are the shuffle map 
output files themselves, rather than the file paths. No new shuffle map 
output files are generated for the 2nd job. Thanks! Really need to walk 
through Spark core code again :)


Cheng

On 3/25/15 9:31 PM, Shao, Saisai wrote:

Hi Cheng,

I think your scenario is acceptable for Spark's shuffle mechanism and will not 
occur shuffle file name conflicts.

 From my understanding I think the code snippet you mentioned is the same RDD 
graph, just running twice, these two jobs will generate 3 stages, map stage and 
collect stage for the first job, only collect stage for the second job (map 
stage is the same as previous job). So these two jobs will only generate one 
copy of shuffle files in the first job, and fetch the shuffle data twice for 
each job. So name conflicts will not be occurred, since these two jobs rely on 
the same ShuffledRDD.

I think only shuffle write which generates shuffle files will have chance to 
meet name conflicts, multiple times of shuffle read is acceptable as the code 
snippet shows.

Thanks
Jerry



-Original Message-
From: Cheng Lian [mailto:lian.cs@gmail.com]
Sent: Wednesday, March 25, 2015 7:40 PM
To: Saisai Shao; Kannan Rajah
Cc: dev@spark.apache.org
Subject: Re: Understanding shuffle file name conflicts

Hi Jerry  Josh

It has been a while since the last time I looked into Spark core shuffle code, 
maybe I’m wrong here. But the shuffle ID is created along with 
ShuffleDependency, which is part of the RDD DAG. So if we submit multiple jobs 
over the same RDD DAG, I think the shuffle IDs in these jobs should duplicate. 
For example:

|val  dag  =  sc.parallelize(Array(1,2,3)).map(i = i -
|i).reduceByKey(_ + _)
dag.collect()
dag.collect()
|

  From the debug log output, I did see duplicated shuffle IDs in both jobs. 
Something like this:

|# Job 1
15/03/25 19:26:34 DEBUG BlockStoreShuffleFetcher: Fetching outputs for shuffle 
0, reduce 2

# Job 2
15/03/25 19:26:36 DEBUG BlockStoreShuffleFetcher: Fetching outputs for shuffle 
0, reduce 5
|

So it’s also possible that some shuffle output files get reused in different 
jobs. But Kannan, did you submit separate jobs over the same RDD DAG as I did 
above? If not, I’d agree with Jerry and Josh.

(Did I miss something here?)

Cheng

On 3/25/15 10:35 AM, Saisai Shao wrote:


Hi Kannan,

As I know the shuffle Id in ShuffleDependency will be increased, so
even if you run the same job twice, the shuffle dependency as well as
shuffle id is different, so the shuffle file name which is combined by
(shuffleId+mapId+reduceId) will be changed, so there's no name
conflict even in the same directory as I know.

Thanks
Jerry


2015-03-25 1:56 GMT+08:00 Kannan Rajah kra...@maprtech.com:


I am working on SPARK-1529. I ran into an issue with my change, where
the same shuffle file was being reused across 2 jobs. Please note
this only happens when I use a hard coded location to use for shuffle
files, say /tmp. It does not happen with normal code path that uses
DiskBlockManager to pick different directories for each run. So I
want to understand how DiskBlockManager guarantees that such a conflict will 
never happen.

Let's say the shuffle block id has a value of shuffle_0_0_0. So the
data file name is shuffle_0_0_0.data and index file name is shuffle_0_0_0.index.
If I run a spark job twice, one after another, these files get
created under different directories because of the hashing logic in
DiskBlockManager. But the hash is based off the file name, so how are
we sure that there won't be a conflict ever?

--
Kannan


​



-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Understanding shuffle file name conflicts

2015-03-24 Thread Saisai Shao
Hi Kannan,

As I know the shuffle Id in ShuffleDependency will be increased, so even if
you run the same job twice, the shuffle dependency as well as shuffle id is
different, so the shuffle file name which is combined by
(shuffleId+mapId+reduceId) will be changed, so there's no name conflict
even in the same directory as I know.

Thanks
Jerry


2015-03-25 1:56 GMT+08:00 Kannan Rajah kra...@maprtech.com:

 I am working on SPARK-1529. I ran into an issue with my change, where the
 same shuffle file was being reused across 2 jobs. Please note this only
 happens when I use a hard coded location to use for shuffle files, say
 /tmp. It does not happen with normal code path that uses DiskBlockManager
 to pick different directories for each run. So I want to understand how
 DiskBlockManager guarantees that such a conflict will never happen.

 Let's say the shuffle block id has a value of shuffle_0_0_0. So the data
 file name is shuffle_0_0_0.data and index file name is shuffle_0_0_0.index.
 If I run a spark job twice, one after another, these files get created
 under different directories because of the hashing logic in
 DiskBlockManager. But the hash is based off the file name, so how are we
 sure that there won't be a conflict ever?

 --
 Kannan