[ 
https://issues.apache.org/jira/browse/MAPREDUCE-7378?focusedWorklogId=770794&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-770794
 ]

ASF GitHub Bot logged work on MAPREDUCE-7378:
---------------------------------------------

                Author: ASF GitHub Bot
            Created on: 16/May/22 12:05
            Start Date: 16/May/22 12:05
    Worklog Time Spent: 10m 
      Work Description: steveloughran commented on code in PR #4303:
URL: https://github.com/apache/hadoop/pull/4303#discussion_r873643599


##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java:
##########
@@ -163,7 +168,11 @@ public FileOutputCommitter(Path outputPath,
       this.outputPath = fs.makeQualified(outputPath);
     }
   }
-  
+
+  private static void setJobPendingDirName(JobContext context) {
+    JOB_PENDING_DIR_NAME = "_temporary_" + context.getJobID();

Review Comment:
   there's an assumption here that the job ID is always unique. this doesn''t 
hold for all spark versions. SPARK-33402



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java:
##########
@@ -57,6 +57,8 @@ public class FileOutputCommitter extends PathOutputCommitter {
    * committed yet.
    */
   public static final String PENDING_DIR_NAME = "_temporary";
+
+  public static String JOB_PENDING_DIR_NAME = "_temporary";

Review Comment:
   this doesn't actually work if you have multiple jobs running in the same 
process, which exactly spark drivers do. if job 2 starts while job 1 is active, 
job 1 will be pointed at the job2 temp dir. as a result, it will only commit 
those tasks which generate work after that switch, and not even notice a problem





Issue Time Tracking
-------------------

    Worklog Id:     (was: 770794)
    Time Spent: 1.5h  (was: 1h 20m)

> An error occurred while concurrently writing to a path
> ------------------------------------------------------
>
>                 Key: MAPREDUCE-7378
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-7378
>             Project: Hadoop Map/Reduce
>          Issue Type: Bug
>            Reporter: jingpan xiong
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> When we use FileOutputCommitter as the base class of Job Committer for other 
> components, we may meet an concurrently writing problem.
> Like HadoopMapReduceCommitProtocol in Spark, when there have multiple 
> application to write data in same path, they will commit job and task in the 
> "_temporary" dir. Once a Job finished ,it will delete the "_temporary" dir, 
> make the other jobs failed.
>  
> error message:
> {code:java}
> // code placeholder
> 21/11/04 19:01:21 ERROR Utils: Aborting task ExitCodeException exitCode=1: 
> chmod: cannot access 
> '/data/spark-examples/spark-warehouse/test/temporary/0/_temporary/attempt_202111041901182933014038999149736_0001_m_000001
>  
> 4/dt=2021-11-03/hour=10/.part-00001-95895b03-45d2-4ac6-806b-b76fd1dfa3dc.c000.snappy.parquet.crc':
>  No such file or directory at 
> org.apache.hadoop.util.Shell.runCommand(Shell.java:1008) at 
> org.apache.hadoop.util.Shell.run(Shell.java:901) at 
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1213) at 
> org.apache.hadoop.util.Shell.execCommand(Shell.java:1307) at 
> org.apache.hadoop.util.Shell.execCommand(Shell.java:1289) at 
> org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
>  at 
> org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:324)
>  at 
> org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:294)
>  at 
> org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:439)
>  at 
> org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:428) 
> at 
> org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:459) 
> at 
> org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:437)
>  at 
> org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:521) 
> at 
> org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500) 
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195) at 
> org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175) at 
> org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
>  at 
> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:329)
>  at 
> org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:482)
>  at 
> org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:420)
>  at 
> org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:409)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:36)
>  at 
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:150)
>  at 
> org.apache.spark.sql.execution.datasources.BaseDynamicPartitionDataWriter.renewCurrentWriter(FileFormatDataWriter.scala:290)
>  at 
> org.apache.spark.sql.execution.datasources.DynamicPartitionDataSingleWriter.write(FileFormatDataWriter.scala:357)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithMetrics(FileFormatDataWriter.scala:85)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:92)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:304)
>  at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1496)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:311)
>  at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$16(FileFormatWriter.scala:229)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at 
> org.apache.spark.scheduler.Task.run(Task.scala:131) at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748) 21/11/04 19:01:21 WARN 
> FileOutputCommitter: Could not delete 
> file:/data/spark-examples/spark-warehouse/test/temporary/0/_temporary/attempt_202111041901182933014038999149736
>  0001_m_000001_4
> {code}
>  
> The spark Issue is 
> [SPARK-37210](https://issues.apache.org/jira/browse/SPARK-37210)



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org

Reply via email to