sharkd tu created SPARK-19576: --------------------------------- Summary: Task attempt paths exist in output path after saveAsNewAPIHadoopFile completes with speculation enabled Key: SPARK-19576 URL: https://issues.apache.org/jira/browse/SPARK-19576 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.1.0 Reporter: sharkd tu
{{writeShard}} in {{saveAsNewAPIHadoopDataset}} always committed its tasks without question. The problem is that when speculation is enabled sometimes this can result in multiple tasks committing their output to the same path, which may lead to task temporary paths exist in output path after {{saveAsNewAPIHadoopFile}} completes. {code} -rw-r--r-- 3 user group 0 2017-02-11 19:36 hdfs://.../out/_SUCCESS drwxr-xr-x - user group 0 2017-02-11 19:36 hdfs://.../out/attempt_201702111936_32487_r_000044_0 -rw-r--r-- 3 user group 8952 2017-02-11 19:36 hdfs://.../out/part-r-00000 -rw-r--r-- 3 user group 7878 2017-02-11 19:36 hdfs://.../out/part-r-00001 ... {code} Assume there are two attempt tasks that commit at the same time, The two attempt tasks maybe rename their task attempt paths to task committed path at the same time. When one task's {{rename}} operation completes, the other task's {{rename}} operation will let its task attempt path under the task committed path. {{commitTask}} in {{FileOutputCommitter}}: {code} public void commitTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException { TaskAttemptID attemptId = context.getTaskAttemptID(); if (hasOutputPath()) { context.progress(); if(taskAttemptPath == null) { taskAttemptPath = getTaskAttemptPath(context); } Path committedTaskPath = getCommittedTaskPath(context); FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration()); if (fs.exists(taskAttemptPath)) { if(fs.exists(committedTaskPath)) { if(!fs.delete(committedTaskPath, true)) { throw new IOException("Could not delete " + committedTaskPath); } } if(!fs.rename(taskAttemptPath, committedTaskPath)) { throw new IOException("Could not rename " + taskAttemptPath + " to " + committedTaskPath); } LOG.info("Saved output of task '" + attemptId + "' to " + committedTaskPath); } else { LOG.warn("No Output found for " + attemptId); } } else { LOG.warn("Output Path is null in commitTask()"); } } {code} Anyway, it is not recommended that {{writeShard}} in {{saveAsNewAPIHadoopDataset}} always committed its tasks without question. Similar question in SPARK-4879 triggered by calling {{saveAsHadoopFile}} has been solved. So, we should solve another case that triggered by calling {{saveAsNewAPIHadoopFile}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org