Author: kihwal Date: Fri May 10 16:25:03 2013 New Revision: 1481077 URL: http://svn.apache.org/r1481077 Log: svn merge -c 1481075 Merging from trunk to branch-2 to fix HADOOP-9549.
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegationTokenRenewer.java hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDelegationTokenRenewer.java Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1481077&r1=1481076&r2=1481077&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt Fri May 10 16:25:03 2013 @@ -150,6 +150,8 @@ Release 2.0.5-beta - UNRELEASED HADOOP-9550. Remove aspectj dependency. (kkambatl via tucu) + HADOOP-9549. WebHdfsFileSystem hangs on close(). (daryn via kihwal) + Release 2.0.4-alpha - 2013-04-25 INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegationTokenRenewer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegationTokenRenewer.java?rev=1481077&r1=1481076&r2=1481077&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegationTokenRenewer.java (original) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegationTokenRenewer.java Fri May 10 16:25:03 2013 @@ -61,10 +61,12 @@ public class DelegationTokenRenewer private long renewalTime; /** a weak reference to the file system so that it can be garbage collected */ private final WeakReference<T> weakFs; + private Token<?> token; private RenewAction(final T fs) { this.weakFs = new WeakReference<T>(fs); - updateRenewalTime(); + this.token = fs.getRenewToken(); + updateRenewalTime(renewCycle); } /** Get the delay until this event should happen. */ @@ -83,28 +85,32 @@ public class DelegationTokenRenewer @Override public int hashCode() { - return (int)renewalTime ^ (int)(renewalTime >>> 32); + return token.hashCode(); } @Override public boolean equals(final Object that) { - if (that == null || !(that instanceof RenewAction)) { + if (this == that) { + return true; + } else if (that == null || !(that instanceof RenewAction)) { return false; } - return compareTo((Delayed)that) == 0; + return token.equals(((RenewAction<?>)that).token); } /** * Set a new time for the renewal. - * It can only be called when the action is not in the queue. + * It can only be called when the action is not in the queue or any + * collection because the hashCode may change * @param newTime the new time */ - private void updateRenewalTime() { - renewalTime = renewCycle + Time.now(); + private void updateRenewalTime(long delay) { + renewalTime = Time.now() + delay - delay/10; } /** * Renew or replace the delegation token for this file system. + * It can only be called when the action is not in the queue. * @return * @throws IOException */ @@ -114,14 +120,17 @@ public class DelegationTokenRenewer if (b) { synchronized(fs) { try { - fs.getRenewToken().renew(fs.getConf()); + long expires = token.renew(fs.getConf()); + updateRenewalTime(expires - Time.now()); } catch (IOException ie) { try { Token<?>[] tokens = fs.addDelegationTokens(null, null); if (tokens.length == 0) { throw new IOException("addDelegationTokens returned no tokens"); } - fs.setDelegationToken(tokens[0]); + token = tokens[0]; + updateRenewalTime(renewCycle); + fs.setDelegationToken(token); } catch (IOException ie2) { throw new IOException("Can't renew or get new delegation token ", ie); } @@ -131,20 +140,27 @@ public class DelegationTokenRenewer return b; } + private void cancel() throws IOException, InterruptedException { + final T fs = weakFs.get(); + if (fs != null) { + token.cancel(fs.getConf()); + } + } + @Override public String toString() { Renewable fs = weakFs.get(); return fs == null? "evaporated token renew" : "The token will be renewed in " + getDelay(TimeUnit.SECONDS) - + " secs, renewToken=" + fs.getRenewToken(); + + " secs, renewToken=" + token; } } - /** Wait for 95% of a day between renewals */ - private static final int RENEW_CYCLE = 24 * 60 * 60 * 950; + /** assumes renew cycle for a token is 24 hours... */ + private static final long RENEW_CYCLE = 24 * 60 * 60 * 1000; @InterfaceAudience.Private - protected static int renewCycle = RENEW_CYCLE; + protected static long renewCycle = RENEW_CYCLE; /** Queue to maintain the RenewActions to be processed by the {@link #run()} */ private volatile DelayQueue<RenewAction<?>> queue = new DelayQueue<RenewAction<?>>(); @@ -173,11 +189,34 @@ public class DelegationTokenRenewer return INSTANCE; } + @VisibleForTesting + static synchronized void reset() { + if (INSTANCE != null) { + INSTANCE.queue.clear(); + INSTANCE.interrupt(); + try { + INSTANCE.join(); + } catch (InterruptedException e) { + LOG.warn("Failed to reset renewer"); + } finally { + INSTANCE = null; + } + } + } + /** Add a renew action to the queue. */ - public synchronized <T extends FileSystem & Renewable> void addRenewAction(final T fs) { - queue.add(new RenewAction<T>(fs)); - if (!isAlive()) { - start(); + @SuppressWarnings("static-access") + public <T extends FileSystem & Renewable> void addRenewAction(final T fs) { + synchronized (this) { + if (!isAlive()) { + start(); + } + } + RenewAction<T> action = new RenewAction<T>(fs); + if (action.token != null) { + queue.add(action); + } else { + fs.LOG.error("does not have a token for renewal"); } } @@ -186,21 +225,18 @@ public class DelegationTokenRenewer * * @throws IOException */ - public synchronized <T extends FileSystem & Renewable> void removeRenewAction( + public <T extends FileSystem & Renewable> void removeRenewAction( final T fs) throws IOException { - for (RenewAction<?> action : queue) { - if (action.weakFs.get() == fs) { - try { - fs.getRenewToken().cancel(fs.getConf()); - } catch (InterruptedException ie) { - LOG.error("Interrupted while canceling token for " + fs.getUri() - + "filesystem"); - if (LOG.isDebugEnabled()) { - LOG.debug(ie.getStackTrace()); - } + RenewAction<T> action = new RenewAction<T>(fs); + if (queue.remove(action)) { + try { + action.cancel(); + } catch (InterruptedException ie) { + LOG.error("Interrupted while canceling token for " + fs.getUri() + + "filesystem"); + if (LOG.isDebugEnabled()) { + LOG.debug(ie.getStackTrace()); } - queue.remove(action); - return; } } } @@ -211,12 +247,9 @@ public class DelegationTokenRenewer for(;;) { RenewAction<?> action = null; try { - synchronized (this) { - action = queue.take(); - if (action.renew()) { - action.updateRenewalTime(); - queue.add(action); - } + action = queue.take(); + if (action.renew()) { + queue.add(action); } } catch (InterruptedException ie) { return; Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDelegationTokenRenewer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDelegationTokenRenewer.java?rev=1481077&r1=1481076&r2=1481077&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDelegationTokenRenewer.java (original) +++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestDelegationTokenRenewer.java Fri May 10 16:25:03 2013 @@ -17,155 +17,203 @@ */ package org.apache.hadoop.fs; -import java.io.FileNotFoundException; import java.io.IOException; -import java.net.URI; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.DelegationTokenRenewer.Renewable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.Time; import org.junit.Before; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestDelegationTokenRenewer { - private static final int RENEW_CYCLE = 1000; - private static final int MAX_RENEWALS = 100; - - @SuppressWarnings("rawtypes") - static class TestToken extends Token { - public volatile int renewCount = 0; - public volatile boolean cancelled = false; - - @Override - public long renew(Configuration conf) { - if (renewCount == MAX_RENEWALS) { - Thread.currentThread().interrupt(); - } else { - renewCount++; - } - return renewCount; - } - - @Override - public void cancel(Configuration conf) { - cancelled = true; - } - } + public abstract class RenewableFileSystem extends FileSystem + implements Renewable { } + + private static final long RENEW_CYCLE = 1000; - static class TestFileSystem extends FileSystem implements - DelegationTokenRenewer.Renewable { - private Configuration mockConf = mock(Configuration.class);; - private TestToken testToken = new TestToken(); - - @Override - public Configuration getConf() { - return mockConf; - } - - @Override - public Token<?> getRenewToken() { - return testToken; - } - - @Override - public URI getUri() { - return null; - } - - @Override - public FSDataInputStream open(Path f, int bufferSize) throws IOException { - return null; - } - - @Override - public FSDataOutputStream create(Path f, FsPermission permission, - boolean overwrite, int bufferSize, short replication, long blockSize, - Progressable progress) throws IOException { - return null; - } - - @Override - public FSDataOutputStream append(Path f, int bufferSize, - Progressable progress) throws IOException { - return null; - } - - @Override - public boolean rename(Path src, Path dst) throws IOException { - return false; - } - - @Override - public boolean delete(Path f, boolean recursive) throws IOException { - return false; - } - - @Override - public FileStatus[] listStatus(Path f) throws FileNotFoundException, - IOException { - return null; - } - - @Override - public void setWorkingDirectory(Path new_dir) { - } - - @Override - public Path getWorkingDirectory() { - return null; - } - - @Override - public boolean mkdirs(Path f, FsPermission permission) throws IOException { - return false; - } - - @Override - public FileStatus getFileStatus(Path f) throws IOException { - return null; - } - - @Override - public <T extends TokenIdentifier> void setDelegationToken(Token<T> token) { - return; - } - } - private DelegationTokenRenewer renewer; - + Configuration conf; + FileSystem fs; + @Before public void setup() { DelegationTokenRenewer.renewCycle = RENEW_CYCLE; + DelegationTokenRenewer.reset(); renewer = DelegationTokenRenewer.getInstance(); } - + + @SuppressWarnings("unchecked") @Test public void testAddRemoveRenewAction() throws IOException, InterruptedException { - TestFileSystem tfs = new TestFileSystem(); - renewer.addRenewAction(tfs); - assertEquals("FileSystem not added to DelegationTokenRenewer", 1, - renewer.getRenewQueueLength()); + Text service = new Text("myservice"); + Configuration conf = mock(Configuration.class); - for (int i = 0; i < 60; i++) { - Thread.sleep(RENEW_CYCLE); - if (tfs.testToken.renewCount > 0) { - renewer.removeRenewAction(tfs); - break; + Token<?> token = mock(Token.class); + doReturn(service).when(token).getService(); + doAnswer(new Answer<Long>() { + public Long answer(InvocationOnMock invocation) { + return Time.now() + RENEW_CYCLE; } - } + }).when(token).renew(any(Configuration.class)); + + RenewableFileSystem fs = mock(RenewableFileSystem.class); + doReturn(conf).when(fs).getConf(); + doReturn(token).when(fs).getRenewToken(); - assertTrue("Token not renewed even after 1 minute", - (tfs.testToken.renewCount > 0)); + renewer.addRenewAction(fs); + + assertEquals("FileSystem not added to DelegationTokenRenewer", 1, + renewer.getRenewQueueLength()); + + Thread.sleep(RENEW_CYCLE*2); + verify(token, atLeast(2)).renew(eq(conf)); + verify(token, atMost(3)).renew(eq(conf)); + verify(token, never()).cancel(any(Configuration.class)); + renewer.removeRenewAction(fs); + verify(token).cancel(eq(conf)); + + verify(fs, never()).getDelegationToken(null); + verify(fs, never()).setDelegationToken(any(Token.class)); + assertEquals("FileSystem not removed from DelegationTokenRenewer", 0, renewer.getRenewQueueLength()); - assertTrue("Token not cancelled", tfs.testToken.cancelled); + } + + @Test + public void testAddRenewActionWithNoToken() throws IOException, + InterruptedException { + Configuration conf = mock(Configuration.class); + + RenewableFileSystem fs = mock(RenewableFileSystem.class); + doReturn(conf).when(fs).getConf(); + doReturn(null).when(fs).getRenewToken(); + + renewer.addRenewAction(fs); + + verify(fs).getRenewToken(); + assertEquals(0, renewer.getRenewQueueLength()); + } + + @Test + public void testGetNewTokenOnRenewFailure() throws IOException, + InterruptedException { + Text service = new Text("myservice"); + Configuration conf = mock(Configuration.class); + + final Token<?> token1 = mock(Token.class); + doReturn(service).when(token1).getService(); + doThrow(new IOException("boom")).when(token1).renew(eq(conf)); + + final Token<?> token2 = mock(Token.class); + doReturn(service).when(token2).getService(); + doAnswer(new Answer<Long>() { + public Long answer(InvocationOnMock invocation) { + return Time.now() + RENEW_CYCLE; + } + }).when(token2).renew(eq(conf)); + + RenewableFileSystem fs = mock(RenewableFileSystem.class); + doReturn(conf).when(fs).getConf(); + doReturn(token1).doReturn(token2).when(fs).getRenewToken(); + doReturn(token2).when(fs).getDelegationToken(null); + + doAnswer(new Answer<Token<?>[]>() { + public Token<?>[] answer(InvocationOnMock invocation) { + return new Token<?>[]{token2}; + } + }).when(fs).addDelegationTokens(null, null); + + renewer.addRenewAction(fs); + assertEquals(1, renewer.getRenewQueueLength()); + + Thread.sleep(RENEW_CYCLE); + verify(fs).getRenewToken(); + verify(token1, atLeast(1)).renew(eq(conf)); + verify(token1, atMost(2)).renew(eq(conf)); + verify(fs).addDelegationTokens(null, null); + verify(fs).setDelegationToken(eq(token2)); + assertEquals(1, renewer.getRenewQueueLength()); + + renewer.removeRenewAction(fs); + verify(token2).cancel(eq(conf)); + assertEquals(0, renewer.getRenewQueueLength()); + } + + @Test + public void testStopRenewalWhenFsGone() throws IOException, + InterruptedException { + Configuration conf = mock(Configuration.class); + + Token<?> token = mock(Token.class); + doReturn(new Text("myservice")).when(token).getService(); + doAnswer(new Answer<Long>() { + public Long answer(InvocationOnMock invocation) { + return Time.now() + RENEW_CYCLE; + } + }).when(token).renew(any(Configuration.class)); + + RenewableFileSystem fs = mock(RenewableFileSystem.class); + doReturn(conf).when(fs).getConf(); + doReturn(token).when(fs).getRenewToken(); + + renewer.addRenewAction(fs); + assertEquals(1, renewer.getRenewQueueLength()); + + Thread.sleep(RENEW_CYCLE); + verify(token, atLeast(1)).renew(eq(conf)); + verify(token, atMost(2)).renew(eq(conf)); + // drop weak ref + fs = null; + System.gc(); System.gc(); System.gc(); + // next renew should detect the fs as gone + Thread.sleep(RENEW_CYCLE); + verify(token, atLeast(1)).renew(eq(conf)); + verify(token, atMost(2)).renew(eq(conf)); + assertEquals(0, renewer.getRenewQueueLength()); + } + + @Test(timeout=4000) + public void testMultipleTokensDoNotDeadlock() throws IOException, + InterruptedException { + Configuration conf = mock(Configuration.class); + FileSystem fs = mock(FileSystem.class); + doReturn(conf).when(fs).getConf(); + + long distantFuture = Time.now() + 3600 * 1000; // 1h + Token<?> token1 = mock(Token.class); + doReturn(new Text("myservice1")).when(token1).getService(); + doReturn(distantFuture).when(token1).renew(eq(conf)); + + Token<?> token2 = mock(Token.class); + doReturn(new Text("myservice2")).when(token2).getService(); + doReturn(distantFuture).when(token2).renew(eq(conf)); + + RenewableFileSystem fs1 = mock(RenewableFileSystem.class); + doReturn(conf).when(fs1).getConf(); + doReturn(token1).when(fs1).getRenewToken(); + + RenewableFileSystem fs2 = mock(RenewableFileSystem.class); + doReturn(conf).when(fs2).getConf(); + doReturn(token2).when(fs2).getRenewToken(); + + renewer.addRenewAction(fs1); + renewer.addRenewAction(fs2); + assertEquals(2, renewer.getRenewQueueLength()); + + renewer.removeRenewAction(fs1); + assertEquals(1, renewer.getRenewQueueLength()); + renewer.removeRenewAction(fs2); + assertEquals(0, renewer.getRenewQueueLength()); + + verify(token1).cancel(eq(conf)); + verify(token2).cancel(eq(conf)); } }