Author: tgraves
Date: Wed Aug  1 14:02:22 2012
New Revision: 1368002

URL: http://svn.apache.org/viewvc?rev=1368002&view=rev
Log:
HADOOP-8633. Interrupted FsShell copies may leave tmp files (Daryn Sharp via 
tgraves)

Added:
    
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopy.java
Modified:
    hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
    
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java
    
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java

Modified: hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1368002&r1=1368001&r2=1368002&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt 
(original)
+++ hadoop/common/trunk/hadoop-common-project/hadoop-common/CHANGES.txt Wed Aug 
 1 14:02:22 2012
@@ -873,6 +873,9 @@ Release 0.23.3 - UNRELEASED
     org.apache.hadoop.classification.InterfaceAudience not found  (Trevor
     Robinson via tgraves)
 
+    HADOOP-8633. Interrupted FsShell copies may leave tmp files (Daryn Sharp
+    via tgraves)
+
 Release 0.23.2 - UNRELEASED 
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java?rev=1368002&r1=1368001&r2=1368002&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java
 (original)
+++ 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandWithDestination.java
 Wed Aug  1 14:02:22 2012
@@ -24,6 +24,8 @@ import java.io.InputStream;
 import java.util.LinkedList;
 
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.shell.PathExceptions.PathExistsException;
 import org.apache.hadoop.fs.shell.PathExceptions.PathIOException;
@@ -232,31 +234,65 @@ abstract class CommandWithDestination ex
     if (target.exists && (target.stat.isDirectory() || !overwrite)) {
       throw new PathExistsException(target.toString());
     }
-    target.fs.setWriteChecksum(writeChecksum);
-    PathData tempFile = null;
+    TargetFileSystem targetFs = new TargetFileSystem(target.fs);
     try {
-      tempFile = target.createTempFile(target+"._COPYING_");
-      FSDataOutputStream out = target.fs.create(tempFile.path, true);
-      IOUtils.copyBytes(in, out, getConf(), true);
+      PathData tempTarget = target.suffix("._COPYING_");
+      targetFs.setWriteChecksum(writeChecksum);
+      targetFs.writeStreamToFile(in, tempTarget);
+      targetFs.rename(tempTarget, target);
+    } finally {
+      targetFs.close(); // last ditch effort to ensure temp file is removed
+    }
+  }
+
+  // Helper filter filesystem that registers created files as temp files to
+  // be deleted on exit unless successfully renamed
+  private static class TargetFileSystem extends FilterFileSystem {
+    TargetFileSystem(FileSystem fs) {
+      super(fs);
+    }
+
+    void writeStreamToFile(InputStream in, PathData target) throws IOException 
{
+      FSDataOutputStream out = null;
+      try {
+        out = create(target);
+        IOUtils.copyBytes(in, out, getConf(), true);
+      } finally {
+        IOUtils.closeStream(out); // just in case copyBytes didn't
+      }
+    }
+    
+    // tag created files as temp files
+    FSDataOutputStream create(PathData item) throws IOException {
+      try {
+        return create(item.path, true);
+      } finally { // might have been created but stream was interrupted
+        deleteOnExit(item.path);
+      }
+    }
+
+    void rename(PathData src, PathData target) throws IOException {
       // the rename method with an option to delete the target is deprecated
-      if (target.exists && !target.fs.delete(target.path, false)) {
+      if (target.exists && !delete(target.path, false)) {
         // too bad we don't know why it failed
         PathIOException e = new PathIOException(target.toString());
         e.setOperation("delete");
         throw e;
       }
-      if (!tempFile.fs.rename(tempFile.path, target.path)) {
+      if (!rename(src.path, target.path)) {
         // too bad we don't know why it failed
-        PathIOException e = new PathIOException(tempFile.toString());
+        PathIOException e = new PathIOException(src.toString());
         e.setOperation("rename");
         e.setTargetPath(target.toString());
         throw e;
       }
-      tempFile = null;
-    } finally {
-      if (tempFile != null) {
-        tempFile.fs.delete(tempFile.path, false);
-      }
+      // cancel delete on exit if rename is successful
+      cancelDeleteOnExit(src.path);
+    }
+    @Override
+    public void close() {
+      // purge any remaining temp files, but don't close underlying fs
+      processDeleteOnExit();
     }
   }
 }
\ No newline at end of file

Modified: 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java?rev=1368002&r1=1368001&r2=1368002&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java
 (original)
+++ 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/PathData.java
 Wed Aug  1 14:02:22 2012
@@ -60,7 +60,7 @@ public class PathData implements Compara
    * @throws IOException if anything goes wrong...
    */
   public PathData(String pathString, Configuration conf) throws IOException {
-    this(FileSystem.get(URI.create(pathString), conf), pathString);
+    this(FileSystem.get(stringToUri(pathString), conf), pathString);
   }
   
   /**
@@ -170,16 +170,13 @@ public class PathData implements Compara
   }
   
   /**
-   * Returns a temporary file for this PathData with the given extension.
-   * The file will be deleted on exit.
-   * @param extension for the temporary file
+   * Returns a new PathData with the given extension.
+   * @param extension for the suffix
    * @return PathData
    * @throws IOException shouldn't happen
    */
-  public PathData createTempFile(String extension) throws IOException {
-    PathData tmpFile = new PathData(fs, uri+"._COPYING_");
-    fs.deleteOnExit(tmpFile.path);
-    return tmpFile;
+  public PathData suffix(String extension) throws IOException {
+    return new PathData(fs, this+extension);
   }
 
   /**

Added: 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopy.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopy.java?rev=1368002&view=auto
==============================================================================
--- 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopy.java
 (added)
+++ 
hadoop/common/trunk/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCopy.java
 Wed Aug  1 14:02:22 2012
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.shell;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.shell.CopyCommands.Put;
+import org.apache.hadoop.util.Progressable;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.stubbing.OngoingStubbing;
+
+public class TestCopy {
+  static Configuration conf;
+  static Path path = new Path("mockfs:/file");
+  static Path tmpPath = new Path("mockfs:/file._COPYING_");
+  static Put cmd;
+  static FileSystem mockFs;
+  static PathData target;
+  static FileStatus fileStat;
+  
+  @BeforeClass
+  public static void setup() throws IOException {
+    conf = new Configuration();
+    conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
+    mockFs = mock(FileSystem.class);
+    fileStat = mock(FileStatus.class);
+    when(fileStat.isDirectory()).thenReturn(false);
+  }
+  
+  @Before
+  public void resetMock() throws IOException {
+    reset(mockFs);
+    target = new PathData(path.toString(), conf);
+    cmd = new CopyCommands.Put();
+    cmd.setConf(conf);
+  }
+
+  @Test
+  public void testCopyStreamTarget() throws Exception {
+    FSDataOutputStream out = mock(FSDataOutputStream.class);
+    whenFsCreate().thenReturn(out);
+    when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
+    when(mockFs.rename(eq(tmpPath), eq(path))).thenReturn(true);
+    FSInputStream in = mock(FSInputStream.class);
+    when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
+    
+    tryCopyStream(in, true);
+    verify(mockFs, never()).delete(eq(path), anyBoolean());
+    verify(mockFs).rename(eq(tmpPath), eq(path));
+    verify(mockFs, never()).delete(eq(tmpPath), anyBoolean());
+    verify(mockFs, never()).close();
+  }
+
+  @Test
+  public void testCopyStreamTargetExists() throws Exception {
+    FSDataOutputStream out = mock(FSDataOutputStream.class);
+    whenFsCreate().thenReturn(out);
+    when(mockFs.getFileStatus(eq(path))).thenReturn(fileStat);
+    target.refreshStatus(); // so it's updated as existing
+    cmd.setOverwrite(true);
+    when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
+    when(mockFs.delete(eq(path), eq(false))).thenReturn(true);
+    when(mockFs.rename(eq(tmpPath), eq(path))).thenReturn(true);
+    FSInputStream in = mock(FSInputStream.class);
+    when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
+    
+    tryCopyStream(in, true);
+    verify(mockFs).delete(eq(path), anyBoolean());
+    verify(mockFs).rename(eq(tmpPath), eq(path));
+    verify(mockFs, never()).delete(eq(tmpPath), anyBoolean());
+    verify(mockFs, never()).close();
+  }
+
+  @Test
+  public void testInterruptedCreate() throws Exception {
+    whenFsCreate().thenThrow(new InterruptedIOException());
+    when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
+    FSDataInputStream in = mock(FSDataInputStream.class);
+
+    tryCopyStream(in, false);
+    verify(mockFs).delete(eq(tmpPath), anyBoolean());
+    verify(mockFs, never()).rename(any(Path.class), any(Path.class));
+    verify(mockFs, never()).delete(eq(path), anyBoolean());
+    verify(mockFs, never()).close();
+  }
+
+  @Test
+  public void testInterruptedCopyBytes() throws Exception {
+    FSDataOutputStream out = mock(FSDataOutputStream.class);
+    whenFsCreate().thenReturn(out);
+    when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
+    FSInputStream in = mock(FSInputStream.class);
+    // make IOUtils.copyBytes fail
+    when(in.read(any(byte[].class), anyInt(), anyInt())).thenThrow(
+        new InterruptedIOException());
+    
+    tryCopyStream(in, false);
+    verify(mockFs).delete(eq(tmpPath), anyBoolean());
+    verify(mockFs, never()).rename(any(Path.class), any(Path.class));
+    verify(mockFs, never()).delete(eq(path), anyBoolean());
+    verify(mockFs, never()).close();
+  }
+
+  @Test
+  public void testInterruptedRename() throws Exception {
+    FSDataOutputStream out = mock(FSDataOutputStream.class);
+    whenFsCreate().thenReturn(out);
+    when(mockFs.getFileStatus(eq(tmpPath))).thenReturn(fileStat);
+    when(mockFs.rename(eq(tmpPath), eq(path))).thenThrow(
+        new InterruptedIOException());
+    FSInputStream in = mock(FSInputStream.class);
+    when(in.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
+    
+    tryCopyStream(in, false);
+    verify(mockFs).delete(eq(tmpPath), anyBoolean());
+    verify(mockFs).rename(eq(tmpPath), eq(path));
+    verify(mockFs, never()).delete(eq(path), anyBoolean());
+    verify(mockFs, never()).close();
+  }
+
+  private OngoingStubbing<FSDataOutputStream> whenFsCreate() throws 
IOException {
+    return when(mockFs.create(eq(tmpPath), any(FsPermission.class),
+        anyBoolean(), anyInt(), anyShort(), anyLong(),
+        any(Progressable.class)));
+  }
+  
+  private void tryCopyStream(InputStream in, boolean shouldPass) {
+    try {
+      cmd.copyStreamToTarget(new FSDataInputStream(in), target);
+    } catch (InterruptedIOException e) {
+      assertFalse("copy failed", shouldPass);
+    } catch (Throwable e) {
+      assertFalse(e.getMessage(), shouldPass);
+    }    
+  }
+  
+  static class MockFileSystem extends FilterFileSystem {
+    Configuration conf;
+    MockFileSystem() {
+      super(mockFs);
+    }
+    @Override
+    public void initialize(URI uri, Configuration conf) {
+      this.conf = conf;
+    }
+    @Override
+    public Path makeQualified(Path path) {
+      return path;
+    }
+    @Override
+    public Configuration getConf() {
+      return conf;
+    }
+  }
+}
\ No newline at end of file


Reply via email to