Re: Understanding shuffle file name conflicts
Yes as Josh said, when application is started, Spark will create a unique application-wide folder for related temporary files. And jobs in this application will have a unique shuffle id with unique file names, so shuffle stages within app will not meet name conflicts. Also shuffle files between applications are separated by application folder, so the name conflicts cannot be happened. Maybe you changed some parts of the code while do the patch. Thanks Jerry 2015-03-25 14:22 GMT+08:00 Josh Rosen rosenvi...@gmail.com: Which version of Spark are you using? What do you mean when you say that you used a hardcoded location for shuffle files? If you look at the current DiskBlockManager code, it looks like it will create a per-application subdirectory in each of the local root directories. Here's the call to create a subdirectory in each root dir: https://github.com/apache/spark/blob/c5cc41468e8709d09c09289bb55bc8edc99404b1/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala#L126 This call to Utils.createDirectory() should result in a fresh subdirectory being created for just this application (note the use of random UUIDs, plus the check to ensure that the directory doesn't already exist): https://github.com/apache/spark/blob/c5cc41468e8709d09c09289bb55bc8edc99404b1/core/src/main/scala/org/apache/spark/util/Utils.scala#L273 So, although the filenames for shuffle files are not globally unique, their full paths should be unique due to these unique per-application subdirectories. Have you observed an instance where this isn't the case? - Josh On Tue, Mar 24, 2015 at 11:04 PM, Kannan Rajah kra...@maprtech.com wrote: Saisai, This is the not the case when I use spark-submit to run 2 jobs, one after another. The shuffle id remains the same. -- Kannan On Tue, Mar 24, 2015 at 7:35 PM, Saisai Shao sai.sai.s...@gmail.com wrote: Hi Kannan, As I know the shuffle Id in ShuffleDependency will be increased, so even if you run the same job twice, the shuffle dependency as well as shuffle id is different, so the shuffle file name which is combined by (shuffleId+mapId+reduceId) will be changed, so there's no name conflict even in the same directory as I know. Thanks Jerry 2015-03-25 1:56 GMT+08:00 Kannan Rajah kra...@maprtech.com: I am working on SPARK-1529. I ran into an issue with my change, where the same shuffle file was being reused across 2 jobs. Please note this only happens when I use a hard coded location to use for shuffle files, say /tmp. It does not happen with normal code path that uses DiskBlockManager to pick different directories for each run. So I want to understand how DiskBlockManager guarantees that such a conflict will never happen. Let's say the shuffle block id has a value of shuffle_0_0_0. So the data file name is shuffle_0_0_0.data and index file name is shuffle_0_0_0.index. If I run a spark job twice, one after another, these files get created under different directories because of the hashing logic in DiskBlockManager. But the hash is based off the file name, so how are we sure that there won't be a conflict ever? -- Kannan
Re: Understanding shuffle file name conflicts
DIskBlockManager doesn't need to know the app id, all it need to do is to create a folder with a unique name (UUID based) and then put all the shuffle files into it. you can see the code in DiskBlockManager as below, it will create a bunch unique folders when initialized, these folders are app specific private[spark] val localDirs: Array[File] = createLocalDirs(conf) UUID is for creating an app specific folder. and shuffle file hashed by shuffle block id, which is deterministic by using getFile as you mentioned. 2015-03-25 15:03 GMT+08:00 Kannan Rajah kra...@maprtech.com: Josh Saisai, When I say I am using a hardcoded location for shuffle files, I mean that I am not using DiskBlockManager.getFile API because that uses the directories created locally on the node. But for my use case, I need to look at creating those shuffle files on HDFS. I will take a closer look at this. But I have a couple of questions. From what I understand, DiskBlockManager code does not know about any application ID. It seems to pick up the top root temp dir location from SparkConf and then creates a bunch of sub dir under it. When a shuffle file needs to be created using getFile API, it hashes it to one of the existing dir. At this point, I don't see any app specific directory. Can you point out what I am missing here? The getFile API does not involve the random UUIDs. The random UUID generation happens inside createTempShuffleBlock and that is invoke only from ExternalSorter. On the other hand, DiskBlockManager.getFile is used to create the shuffle index and data file. -- Kannan On Tue, Mar 24, 2015 at 11:56 PM, Saisai Shao sai.sai.s...@gmail.com wrote: Yes as Josh said, when application is started, Spark will create a unique application-wide folder for related temporary files. And jobs in this application will have a unique shuffle id with unique file names, so shuffle stages within app will not meet name conflicts. Also shuffle files between applications are separated by application folder, so the name conflicts cannot be happened. Maybe you changed some parts of the code while do the patch. Thanks Jerry 2015-03-25 14:22 GMT+08:00 Josh Rosen rosenvi...@gmail.com: Which version of Spark are you using? What do you mean when you say that you used a hardcoded location for shuffle files? If you look at the current DiskBlockManager code, it looks like it will create a per-application subdirectory in each of the local root directories. Here's the call to create a subdirectory in each root dir: https://github.com/apache/spark/blob/c5cc41468e8709d09c09289bb55bc8edc99404b1/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala#L126 This call to Utils.createDirectory() should result in a fresh subdirectory being created for just this application (note the use of random UUIDs, plus the check to ensure that the directory doesn't already exist): https://github.com/apache/spark/blob/c5cc41468e8709d09c09289bb55bc8edc99404b1/core/src/main/scala/org/apache/spark/util/Utils.scala#L273 So, although the filenames for shuffle files are not globally unique, their full paths should be unique due to these unique per-application subdirectories. Have you observed an instance where this isn't the case? - Josh On Tue, Mar 24, 2015 at 11:04 PM, Kannan Rajah kra...@maprtech.com wrote: Saisai, This is the not the case when I use spark-submit to run 2 jobs, one after another. The shuffle id remains the same. -- Kannan On Tue, Mar 24, 2015 at 7:35 PM, Saisai Shao sai.sai.s...@gmail.com wrote: Hi Kannan, As I know the shuffle Id in ShuffleDependency will be increased, so even if you run the same job twice, the shuffle dependency as well as shuffle id is different, so the shuffle file name which is combined by (shuffleId+mapId+reduceId) will be changed, so there's no name conflict even in the same directory as I know. Thanks Jerry 2015-03-25 1:56 GMT+08:00 Kannan Rajah kra...@maprtech.com: I am working on SPARK-1529. I ran into an issue with my change, where the same shuffle file was being reused across 2 jobs. Please note this only happens when I use a hard coded location to use for shuffle files, say /tmp. It does not happen with normal code path that uses DiskBlockManager to pick different directories for each run. So I want to understand how DiskBlockManager guarantees that such a conflict will never happen. Let's say the shuffle block id has a value of shuffle_0_0_0. So the data file name is shuffle_0_0_0.data and index file name is shuffle_0_0_0.index. If I run a spark job twice, one after another, these files get created under different directories because of the hashing logic in DiskBlockManager. But the hash is based off the file name, so how are we sure that there won't be a conflict ever? -- Kannan
RE: Understanding shuffle file name conflicts
Hi Cheng, I think your scenario is acceptable for Spark's shuffle mechanism and will not occur shuffle file name conflicts. From my understanding I think the code snippet you mentioned is the same RDD graph, just running twice, these two jobs will generate 3 stages, map stage and collect stage for the first job, only collect stage for the second job (map stage is the same as previous job). So these two jobs will only generate one copy of shuffle files in the first job, and fetch the shuffle data twice for each job. So name conflicts will not be occurred, since these two jobs rely on the same ShuffledRDD. I think only shuffle write which generates shuffle files will have chance to meet name conflicts, multiple times of shuffle read is acceptable as the code snippet shows. Thanks Jerry -Original Message- From: Cheng Lian [mailto:lian.cs@gmail.com] Sent: Wednesday, March 25, 2015 7:40 PM To: Saisai Shao; Kannan Rajah Cc: dev@spark.apache.org Subject: Re: Understanding shuffle file name conflicts Hi Jerry Josh It has been a while since the last time I looked into Spark core shuffle code, maybe I’m wrong here. But the shuffle ID is created along with ShuffleDependency, which is part of the RDD DAG. So if we submit multiple jobs over the same RDD DAG, I think the shuffle IDs in these jobs should duplicate. For example: |val dag = sc.parallelize(Array(1,2,3)).map(i = i - |i).reduceByKey(_ + _) dag.collect() dag.collect() | From the debug log output, I did see duplicated shuffle IDs in both jobs. Something like this: |# Job 1 15/03/25 19:26:34 DEBUG BlockStoreShuffleFetcher: Fetching outputs for shuffle 0, reduce 2 # Job 2 15/03/25 19:26:36 DEBUG BlockStoreShuffleFetcher: Fetching outputs for shuffle 0, reduce 5 | So it’s also possible that some shuffle output files get reused in different jobs. But Kannan, did you submit separate jobs over the same RDD DAG as I did above? If not, I’d agree with Jerry and Josh. (Did I miss something here?) Cheng On 3/25/15 10:35 AM, Saisai Shao wrote: Hi Kannan, As I know the shuffle Id in ShuffleDependency will be increased, so even if you run the same job twice, the shuffle dependency as well as shuffle id is different, so the shuffle file name which is combined by (shuffleId+mapId+reduceId) will be changed, so there's no name conflict even in the same directory as I know. Thanks Jerry 2015-03-25 1:56 GMT+08:00 Kannan Rajah kra...@maprtech.com: I am working on SPARK-1529. I ran into an issue with my change, where the same shuffle file was being reused across 2 jobs. Please note this only happens when I use a hard coded location to use for shuffle files, say /tmp. It does not happen with normal code path that uses DiskBlockManager to pick different directories for each run. So I want to understand how DiskBlockManager guarantees that such a conflict will never happen. Let's say the shuffle block id has a value of shuffle_0_0_0. So the data file name is shuffle_0_0_0.data and index file name is shuffle_0_0_0.index. If I run a spark job twice, one after another, these files get created under different directories because of the hashing logic in DiskBlockManager. But the hash is based off the file name, so how are we sure that there won't be a conflict ever? -- Kannan - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Understanding shuffle file name conflicts
Ah, I see where I'm wrong here. What are reused here are the shuffle map output files themselves, rather than the file paths. No new shuffle map output files are generated for the 2nd job. Thanks! Really need to walk through Spark core code again :) Cheng On 3/25/15 9:31 PM, Shao, Saisai wrote: Hi Cheng, I think your scenario is acceptable for Spark's shuffle mechanism and will not occur shuffle file name conflicts. From my understanding I think the code snippet you mentioned is the same RDD graph, just running twice, these two jobs will generate 3 stages, map stage and collect stage for the first job, only collect stage for the second job (map stage is the same as previous job). So these two jobs will only generate one copy of shuffle files in the first job, and fetch the shuffle data twice for each job. So name conflicts will not be occurred, since these two jobs rely on the same ShuffledRDD. I think only shuffle write which generates shuffle files will have chance to meet name conflicts, multiple times of shuffle read is acceptable as the code snippet shows. Thanks Jerry -Original Message- From: Cheng Lian [mailto:lian.cs@gmail.com] Sent: Wednesday, March 25, 2015 7:40 PM To: Saisai Shao; Kannan Rajah Cc: dev@spark.apache.org Subject: Re: Understanding shuffle file name conflicts Hi Jerry Josh It has been a while since the last time I looked into Spark core shuffle code, maybe I’m wrong here. But the shuffle ID is created along with ShuffleDependency, which is part of the RDD DAG. So if we submit multiple jobs over the same RDD DAG, I think the shuffle IDs in these jobs should duplicate. For example: |val dag = sc.parallelize(Array(1,2,3)).map(i = i - |i).reduceByKey(_ + _) dag.collect() dag.collect() | From the debug log output, I did see duplicated shuffle IDs in both jobs. Something like this: |# Job 1 15/03/25 19:26:34 DEBUG BlockStoreShuffleFetcher: Fetching outputs for shuffle 0, reduce 2 # Job 2 15/03/25 19:26:36 DEBUG BlockStoreShuffleFetcher: Fetching outputs for shuffle 0, reduce 5 | So it’s also possible that some shuffle output files get reused in different jobs. But Kannan, did you submit separate jobs over the same RDD DAG as I did above? If not, I’d agree with Jerry and Josh. (Did I miss something here?) Cheng On 3/25/15 10:35 AM, Saisai Shao wrote: Hi Kannan, As I know the shuffle Id in ShuffleDependency will be increased, so even if you run the same job twice, the shuffle dependency as well as shuffle id is different, so the shuffle file name which is combined by (shuffleId+mapId+reduceId) will be changed, so there's no name conflict even in the same directory as I know. Thanks Jerry 2015-03-25 1:56 GMT+08:00 Kannan Rajah kra...@maprtech.com: I am working on SPARK-1529. I ran into an issue with my change, where the same shuffle file was being reused across 2 jobs. Please note this only happens when I use a hard coded location to use for shuffle files, say /tmp. It does not happen with normal code path that uses DiskBlockManager to pick different directories for each run. So I want to understand how DiskBlockManager guarantees that such a conflict will never happen. Let's say the shuffle block id has a value of shuffle_0_0_0. So the data file name is shuffle_0_0_0.data and index file name is shuffle_0_0_0.index. If I run a spark job twice, one after another, these files get created under different directories because of the hashing logic in DiskBlockManager. But the hash is based off the file name, so how are we sure that there won't be a conflict ever? -- Kannan - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Understanding shuffle file name conflicts
Hi Kannan, As I know the shuffle Id in ShuffleDependency will be increased, so even if you run the same job twice, the shuffle dependency as well as shuffle id is different, so the shuffle file name which is combined by (shuffleId+mapId+reduceId) will be changed, so there's no name conflict even in the same directory as I know. Thanks Jerry 2015-03-25 1:56 GMT+08:00 Kannan Rajah kra...@maprtech.com: I am working on SPARK-1529. I ran into an issue with my change, where the same shuffle file was being reused across 2 jobs. Please note this only happens when I use a hard coded location to use for shuffle files, say /tmp. It does not happen with normal code path that uses DiskBlockManager to pick different directories for each run. So I want to understand how DiskBlockManager guarantees that such a conflict will never happen. Let's say the shuffle block id has a value of shuffle_0_0_0. So the data file name is shuffle_0_0_0.data and index file name is shuffle_0_0_0.index. If I run a spark job twice, one after another, these files get created under different directories because of the hashing logic in DiskBlockManager. But the hash is based off the file name, so how are we sure that there won't be a conflict ever? -- Kannan