Author: kihwal
Date: Fri May 10 16:25:03 2013
New Revision: 1481077

URL: http://svn.apache.org/r1481077
Log:
svn merge -c 1481075 Merging from trunk to branch-2 to fix HADOOP-9549.

Modified:
    
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
    
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegationTokenRenewer.java
    
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDelegationTokenRenewer.java

Modified: 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1481077&r1=1481076&r2=1481077&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt 
(original)
+++ 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt 
Fri May 10 16:25:03 2013
@@ -150,6 +150,8 @@ Release 2.0.5-beta - UNRELEASED
 
     HADOOP-9550. Remove aspectj dependency. (kkambatl via tucu)
 
+    HADOOP-9549. WebHdfsFileSystem hangs on close(). (daryn via kihwal)
+
 Release 2.0.4-alpha - 2013-04-25 
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegationTokenRenewer.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegationTokenRenewer.java?rev=1481077&r1=1481076&r2=1481077&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegationTokenRenewer.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegationTokenRenewer.java
 Fri May 10 16:25:03 2013
@@ -61,10 +61,12 @@ public class DelegationTokenRenewer
     private long renewalTime;
     /** a weak reference to the file system so that it can be garbage 
collected */
     private final WeakReference<T> weakFs;
+    private Token<?> token; 
 
     private RenewAction(final T fs) {
       this.weakFs = new WeakReference<T>(fs);
-      updateRenewalTime();
+      this.token = fs.getRenewToken();
+      updateRenewalTime(renewCycle);
     }
  
     /** Get the delay until this event should happen. */
@@ -83,28 +85,32 @@ public class DelegationTokenRenewer
 
     @Override
     public int hashCode() {
-      return (int)renewalTime ^ (int)(renewalTime >>> 32);
+      return token.hashCode();
     }
 
     @Override
     public boolean equals(final Object that) {
-      if (that == null || !(that instanceof RenewAction)) {
+      if (this == that) {
+        return true;
+      } else if (that == null || !(that instanceof RenewAction)) {
         return false;
       }
-      return compareTo((Delayed)that) == 0;
+      return token.equals(((RenewAction<?>)that).token);
     }
 
     /**
      * Set a new time for the renewal.
-     * It can only be called when the action is not in the queue.
+     * It can only be called when the action is not in the queue or any
+     * collection because the hashCode may change
      * @param newTime the new time
      */
-    private void updateRenewalTime() {
-      renewalTime = renewCycle + Time.now();
+    private void updateRenewalTime(long delay) {
+      renewalTime = Time.now() + delay - delay/10;
     }
 
     /**
      * Renew or replace the delegation token for this file system.
+     * It can only be called when the action is not in the queue.
      * @return
      * @throws IOException
      */
@@ -114,14 +120,17 @@ public class DelegationTokenRenewer
       if (b) {
         synchronized(fs) {
           try {
-            fs.getRenewToken().renew(fs.getConf());
+            long expires = token.renew(fs.getConf());
+            updateRenewalTime(expires - Time.now());
           } catch (IOException ie) {
             try {
               Token<?>[] tokens = fs.addDelegationTokens(null, null);
               if (tokens.length == 0) {
                 throw new IOException("addDelegationTokens returned no 
tokens");
               }
-              fs.setDelegationToken(tokens[0]);
+              token = tokens[0];
+              updateRenewalTime(renewCycle);
+              fs.setDelegationToken(token);
             } catch (IOException ie2) {
               throw new IOException("Can't renew or get new delegation token 
", ie);
             }
@@ -131,20 +140,27 @@ public class DelegationTokenRenewer
       return b;
     }
 
+    private void cancel() throws IOException, InterruptedException {
+      final T fs = weakFs.get();
+      if (fs != null) {
+        token.cancel(fs.getConf());
+      }
+    }
+
     @Override
     public String toString() {
       Renewable fs = weakFs.get();
       return fs == null? "evaporated token renew"
           : "The token will be renewed in " + getDelay(TimeUnit.SECONDS)
-            + " secs, renewToken=" + fs.getRenewToken();
+            + " secs, renewToken=" + token;
     }
   }
 
-  /** Wait for 95% of a day between renewals */
-  private static final int RENEW_CYCLE = 24 * 60 * 60 * 950; 
+  /** assumes renew cycle for a token is 24 hours... */
+  private static final long RENEW_CYCLE = 24 * 60 * 60 * 1000; 
 
   @InterfaceAudience.Private
-  protected static int renewCycle = RENEW_CYCLE;
+  protected static long renewCycle = RENEW_CYCLE;
 
   /** Queue to maintain the RenewActions to be processed by the {@link #run()} 
*/
   private volatile DelayQueue<RenewAction<?>> queue = new 
DelayQueue<RenewAction<?>>();
@@ -173,11 +189,34 @@ public class DelegationTokenRenewer
     return INSTANCE;
   }
 
+  @VisibleForTesting
+  static synchronized void reset() {
+    if (INSTANCE != null) {
+      INSTANCE.queue.clear();
+      INSTANCE.interrupt();
+      try {
+        INSTANCE.join();
+      } catch (InterruptedException e) {
+        LOG.warn("Failed to reset renewer");
+      } finally {
+        INSTANCE = null;
+      }
+    }
+  }
+  
   /** Add a renew action to the queue. */
-  public synchronized <T extends FileSystem & Renewable> void 
addRenewAction(final T fs) {
-    queue.add(new RenewAction<T>(fs));
-    if (!isAlive()) {
-      start();
+  @SuppressWarnings("static-access")
+  public <T extends FileSystem & Renewable> void addRenewAction(final T fs) {
+    synchronized (this) {
+      if (!isAlive()) {
+        start();
+      }
+    }
+    RenewAction<T> action = new RenewAction<T>(fs);
+    if (action.token != null) {
+      queue.add(action);
+    } else {
+      fs.LOG.error("does not have a token for renewal");
     }
   }
 
@@ -186,21 +225,18 @@ public class DelegationTokenRenewer
    * 
    * @throws IOException
    */
-  public synchronized <T extends FileSystem & Renewable> void 
removeRenewAction(
+  public <T extends FileSystem & Renewable> void removeRenewAction(
       final T fs) throws IOException {
-    for (RenewAction<?> action : queue) {
-      if (action.weakFs.get() == fs) {
-        try {
-          fs.getRenewToken().cancel(fs.getConf());
-        } catch (InterruptedException ie) {
-          LOG.error("Interrupted while canceling token for " + fs.getUri()
-              + "filesystem");
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(ie.getStackTrace());
-          }
+    RenewAction<T> action = new RenewAction<T>(fs);
+    if (queue.remove(action)) {
+      try {
+        action.cancel();
+      } catch (InterruptedException ie) {
+        LOG.error("Interrupted while canceling token for " + fs.getUri()
+            + "filesystem");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(ie.getStackTrace());
         }
-        queue.remove(action);
-        return;
       }
     }
   }
@@ -211,12 +247,9 @@ public class DelegationTokenRenewer
     for(;;) {
       RenewAction<?> action = null;
       try {
-        synchronized (this) {
-          action = queue.take();
-          if (action.renew()) {
-            action.updateRenewalTime();
-            queue.add(action);
-          }
+        action = queue.take();
+        if (action.renew()) {
+          queue.add(action);
         }
       } catch (InterruptedException ie) {
         return;

Modified: 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDelegationTokenRenewer.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDelegationTokenRenewer.java?rev=1481077&r1=1481076&r2=1481077&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDelegationTokenRenewer.java
 (original)
+++ 
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDelegationTokenRenewer.java
 Fri May 10 16:25:03 2013
@@ -17,155 +17,203 @@
 */
 package org.apache.hadoop.fs;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.net.URI;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.DelegationTokenRenewer.Renewable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.Time;
 
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class TestDelegationTokenRenewer {
-  private static final int RENEW_CYCLE = 1000;
-  private static final int MAX_RENEWALS = 100;
-
-  @SuppressWarnings("rawtypes")
-  static class TestToken extends Token {
-    public volatile int renewCount = 0;
-    public volatile boolean cancelled = false;
-
-    @Override
-    public long renew(Configuration conf) {
-      if (renewCount == MAX_RENEWALS) {
-        Thread.currentThread().interrupt();
-      } else {
-        renewCount++;
-      }
-      return renewCount;
-    }
-
-    @Override
-    public void cancel(Configuration conf) {
-      cancelled = true;
-    }
-  }
+  public abstract class RenewableFileSystem extends FileSystem
+  implements Renewable { }
+  
+  private static final long RENEW_CYCLE = 1000;
   
-  static class TestFileSystem extends FileSystem implements
-      DelegationTokenRenewer.Renewable {
-    private Configuration mockConf = mock(Configuration.class);;
-    private TestToken testToken = new TestToken();
-
-    @Override
-    public Configuration getConf() {
-      return mockConf;
-    }
-
-    @Override
-    public Token<?> getRenewToken() {
-      return testToken;
-    }
-
-    @Override
-    public URI getUri() {
-      return null;
-    }
-
-    @Override
-    public FSDataInputStream open(Path f, int bufferSize) throws IOException {
-      return null;
-    }
-
-    @Override
-    public FSDataOutputStream create(Path f, FsPermission permission,
-        boolean overwrite, int bufferSize, short replication, long blockSize,
-        Progressable progress) throws IOException {
-      return null;
-    }
-
-    @Override
-    public FSDataOutputStream append(Path f, int bufferSize,
-        Progressable progress) throws IOException {
-      return null;
-    }
-
-    @Override
-    public boolean rename(Path src, Path dst) throws IOException {
-      return false;
-    }
-
-    @Override
-    public boolean delete(Path f, boolean recursive) throws IOException {
-      return false;
-    }
-
-    @Override
-    public FileStatus[] listStatus(Path f) throws FileNotFoundException,
-        IOException {
-      return null;
-    }
-
-    @Override
-    public void setWorkingDirectory(Path new_dir) {
-    }
-
-    @Override
-    public Path getWorkingDirectory() {
-      return null;
-    }
-
-    @Override
-    public boolean mkdirs(Path f, FsPermission permission) throws IOException {
-      return false;
-    }
-
-    @Override
-    public FileStatus getFileStatus(Path f) throws IOException {
-      return null;
-    }
-
-    @Override
-    public <T extends TokenIdentifier> void setDelegationToken(Token<T> token) 
{
-      return;
-    }
-  }
-
   private DelegationTokenRenewer renewer;
-
+  Configuration conf;
+  FileSystem fs;
+  
   @Before
   public void setup() {
     DelegationTokenRenewer.renewCycle = RENEW_CYCLE;
+    DelegationTokenRenewer.reset();
     renewer = DelegationTokenRenewer.getInstance();
   }
-
+  
+  @SuppressWarnings("unchecked")
   @Test
   public void testAddRemoveRenewAction() throws IOException,
       InterruptedException {
-    TestFileSystem tfs = new TestFileSystem();
-    renewer.addRenewAction(tfs);
-    assertEquals("FileSystem not added to DelegationTokenRenewer", 1,
-        renewer.getRenewQueueLength());
+    Text service = new Text("myservice");
+    Configuration conf = mock(Configuration.class);
 
-    for (int i = 0; i < 60; i++) {
-      Thread.sleep(RENEW_CYCLE);
-      if (tfs.testToken.renewCount > 0) {
-        renewer.removeRenewAction(tfs);
-        break;
+    Token<?> token = mock(Token.class);
+    doReturn(service).when(token).getService();
+    doAnswer(new Answer<Long>() {
+      public Long answer(InvocationOnMock invocation) {
+        return Time.now() + RENEW_CYCLE;
       }
-    }
+    }).when(token).renew(any(Configuration.class));
+
+    RenewableFileSystem fs = mock(RenewableFileSystem.class);
+    doReturn(conf).when(fs).getConf();
+    doReturn(token).when(fs).getRenewToken();
 
-    assertTrue("Token not renewed even after 1 minute",
-        (tfs.testToken.renewCount > 0));
+    renewer.addRenewAction(fs);
+    
+    assertEquals("FileSystem not added to DelegationTokenRenewer", 1,
+        renewer.getRenewQueueLength());
+    
+    Thread.sleep(RENEW_CYCLE*2);
+    verify(token, atLeast(2)).renew(eq(conf));
+    verify(token, atMost(3)).renew(eq(conf));
+    verify(token, never()).cancel(any(Configuration.class));
+    renewer.removeRenewAction(fs);
+    verify(token).cancel(eq(conf));
+
+    verify(fs, never()).getDelegationToken(null);
+    verify(fs, never()).setDelegationToken(any(Token.class));
+    
     assertEquals("FileSystem not removed from DelegationTokenRenewer", 0,
         renewer.getRenewQueueLength());
-    assertTrue("Token not cancelled", tfs.testToken.cancelled);
+  }
+
+  @Test
+  public void testAddRenewActionWithNoToken() throws IOException,
+      InterruptedException {
+    Configuration conf = mock(Configuration.class);
+
+    RenewableFileSystem fs = mock(RenewableFileSystem.class);
+    doReturn(conf).when(fs).getConf();
+    doReturn(null).when(fs).getRenewToken();
+
+    renewer.addRenewAction(fs);
+    
+    verify(fs).getRenewToken();
+    assertEquals(0, renewer.getRenewQueueLength());
+  }
+
+  @Test
+  public void testGetNewTokenOnRenewFailure() throws IOException,
+      InterruptedException {
+    Text service = new Text("myservice");
+    Configuration conf = mock(Configuration.class);
+    
+    final Token<?> token1 = mock(Token.class);
+    doReturn(service).when(token1).getService();
+    doThrow(new IOException("boom")).when(token1).renew(eq(conf));
+    
+    final Token<?> token2 = mock(Token.class);
+    doReturn(service).when(token2).getService();
+    doAnswer(new Answer<Long>() {
+      public Long answer(InvocationOnMock invocation) {
+        return Time.now() + RENEW_CYCLE;
+      }
+    }).when(token2).renew(eq(conf));    
+
+    RenewableFileSystem fs = mock(RenewableFileSystem.class);
+    doReturn(conf).when(fs).getConf();
+    doReturn(token1).doReturn(token2).when(fs).getRenewToken();
+    doReturn(token2).when(fs).getDelegationToken(null);
+    
+    doAnswer(new Answer<Token<?>[]>() {
+      public Token<?>[] answer(InvocationOnMock invocation) {
+        return new Token<?>[]{token2};
+      }
+    }).when(fs).addDelegationTokens(null, null);
+        
+    renewer.addRenewAction(fs);
+    assertEquals(1, renewer.getRenewQueueLength());
+    
+    Thread.sleep(RENEW_CYCLE);
+    verify(fs).getRenewToken();
+    verify(token1, atLeast(1)).renew(eq(conf));
+    verify(token1, atMost(2)).renew(eq(conf));
+    verify(fs).addDelegationTokens(null, null);
+    verify(fs).setDelegationToken(eq(token2));
+    assertEquals(1, renewer.getRenewQueueLength());
+    
+    renewer.removeRenewAction(fs);
+    verify(token2).cancel(eq(conf));
+    assertEquals(0, renewer.getRenewQueueLength());
+  }
+
+  @Test
+  public void testStopRenewalWhenFsGone() throws IOException,
+      InterruptedException {
+    Configuration conf = mock(Configuration.class);
+    
+    Token<?> token = mock(Token.class);
+    doReturn(new Text("myservice")).when(token).getService();
+    doAnswer(new Answer<Long>() {
+      public Long answer(InvocationOnMock invocation) {
+        return Time.now() + RENEW_CYCLE;
+      }
+    }).when(token).renew(any(Configuration.class));
+
+    RenewableFileSystem fs = mock(RenewableFileSystem.class);
+    doReturn(conf).when(fs).getConf();
+    doReturn(token).when(fs).getRenewToken();
+
+    renewer.addRenewAction(fs);
+    assertEquals(1, renewer.getRenewQueueLength());
+
+    Thread.sleep(RENEW_CYCLE);
+    verify(token, atLeast(1)).renew(eq(conf));
+    verify(token, atMost(2)).renew(eq(conf));
+    // drop weak ref
+    fs = null;
+    System.gc(); System.gc(); System.gc();
+    // next renew should detect the fs as gone
+    Thread.sleep(RENEW_CYCLE);
+    verify(token, atLeast(1)).renew(eq(conf));
+    verify(token, atMost(2)).renew(eq(conf));
+    assertEquals(0, renewer.getRenewQueueLength());
+  }
+  
+  @Test(timeout=4000)
+  public void testMultipleTokensDoNotDeadlock() throws IOException,
+      InterruptedException {
+    Configuration conf = mock(Configuration.class);
+    FileSystem fs = mock(FileSystem.class);
+    doReturn(conf).when(fs).getConf();
+    
+    long distantFuture = Time.now() + 3600 * 1000; // 1h
+    Token<?> token1 = mock(Token.class);
+    doReturn(new Text("myservice1")).when(token1).getService();
+    doReturn(distantFuture).when(token1).renew(eq(conf));
+    
+    Token<?> token2 = mock(Token.class);
+    doReturn(new Text("myservice2")).when(token2).getService();
+    doReturn(distantFuture).when(token2).renew(eq(conf));
+
+    RenewableFileSystem fs1 = mock(RenewableFileSystem.class);
+    doReturn(conf).when(fs1).getConf();
+    doReturn(token1).when(fs1).getRenewToken();
+
+    RenewableFileSystem fs2 = mock(RenewableFileSystem.class);
+    doReturn(conf).when(fs2).getConf();
+    doReturn(token2).when(fs2).getRenewToken();
+
+    renewer.addRenewAction(fs1);
+    renewer.addRenewAction(fs2);
+    assertEquals(2, renewer.getRenewQueueLength());
+    
+    renewer.removeRenewAction(fs1);
+    assertEquals(1, renewer.getRenewQueueLength());
+    renewer.removeRenewAction(fs2);
+    assertEquals(0, renewer.getRenewQueueLength());
+    
+    verify(token1).cancel(eq(conf));
+    verify(token2).cancel(eq(conf));
   }
 }


Reply via email to