[27/50] [abbrv] hadoop git commit: HDFS-10468. HDFS read ends up ignoring an interrupt. Contributed by Jing Zhao
HDFS-10468. HDFS read ends up ignoring an interrupt. Contributed by Jing Zhao Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/be34e85e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/be34e85e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/be34e85e Branch: refs/heads/HDFS-7240 Commit: be34e85e682880f46eee0310bf00ecc7d39cd5bd Parents: c14c1b2 Author: Jing ZhaoAuthored: Tue Jun 7 10:48:21 2016 -0700 Committer: Jing Zhao Committed: Tue Jun 7 10:48:21 2016 -0700 -- .../org/apache/hadoop/hdfs/DFSInputStream.java | 36 ++-- .../java/org/apache/hadoop/hdfs/TestRead.java | 87 .../server/datanode/SimulatedFSDataset.java | 4 +- 3 files changed, 119 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/be34e85e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java -- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 2ed0abd..7f32a56 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -19,8 +19,10 @@ package org.apache.hadoop.hdfs; import java.io.EOFException; import java.io.IOException; +import java.io.InterruptedIOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; @@ -304,7 +306,7 @@ public class DFSInputStream extends FSInputStream try { Thread.sleep(waitTime); } catch (InterruptedException e) { - throw new IOException( + throw new InterruptedIOException( "Interrupted while getting the last block length."); } } @@ -379,6 +381,7 @@ public class DFSInputStream extends FSInputStream return n; } } catch (IOException ioe) { +checkInterrupted(ioe); if (ioe instanceof RemoteException) { if (((RemoteException) ioe).unwrapRemoteException() instanceof ReplicaNotFoundException) { @@ -414,7 +417,8 @@ public class DFSInputStream extends FSInputStream try { Thread.sleep(500); // delay between retries. } catch (InterruptedException e) { - throw new IOException("Interrupted while getting the length."); + throw new InterruptedIOException( + "Interrupted while getting the length."); } } @@ -660,6 +664,7 @@ public class DFSInputStream extends FSInputStream } return chosenNode; } catch (IOException ex) { +checkInterrupted(ex); if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { DFSClient.LOG.info("Will fetch a new encryption key and retry, " + "encryption key was invalid when connecting to " + targetAddr @@ -681,6 +686,15 @@ public class DFSInputStream extends FSInputStream } } + private void checkInterrupted(IOException e) throws IOException { +if (Thread.currentThread().isInterrupted() && +(e instanceof ClosedByInterruptException || +e instanceof InterruptedIOException)) { + DFSClient.LOG.debug("The reading thread has been interrupted.", e); + throw e; +} + } + protected BlockReader getBlockReader(LocatedBlock targetBlock, long offsetInBlock, long length, InetSocketAddress targetAddr, StorageType storageType, DatanodeInfo datanode) throws IOException { @@ -948,6 +962,7 @@ public class DFSInputStream extends FSInputStream } catch (ChecksumException ce) { throw ce; } catch (IOException e) { + checkInterrupted(e); if (retries == 1) { DFSClient.LOG.warn("DFS Read", e); } @@ -1044,9 +1059,12 @@ public class DFSInputStream extends FSInputStream // expanding time window for each failure timeWindow * (failures + 1) * ThreadLocalRandom.current().nextDouble(); - DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec."); + DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + + " IOException, will wait for " + waitTime + " msec."); Thread.sleep((long)waitTime); -} catch (InterruptedException ignored) { +} catch
[01/24] hadoop git commit: HDFS-10468. HDFS read ends up ignoring an interrupt. Contributed by Jing Zhao
Repository: hadoop Updated Branches: refs/heads/HDFS-1312 4f6fe511c -> f56ab2e77 HDFS-10468. HDFS read ends up ignoring an interrupt. Contributed by Jing Zhao Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/be34e85e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/be34e85e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/be34e85e Branch: refs/heads/HDFS-1312 Commit: be34e85e682880f46eee0310bf00ecc7d39cd5bd Parents: c14c1b2 Author: Jing ZhaoAuthored: Tue Jun 7 10:48:21 2016 -0700 Committer: Jing Zhao Committed: Tue Jun 7 10:48:21 2016 -0700 -- .../org/apache/hadoop/hdfs/DFSInputStream.java | 36 ++-- .../java/org/apache/hadoop/hdfs/TestRead.java | 87 .../server/datanode/SimulatedFSDataset.java | 4 +- 3 files changed, 119 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/be34e85e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java -- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 2ed0abd..7f32a56 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -19,8 +19,10 @@ package org.apache.hadoop.hdfs; import java.io.EOFException; import java.io.IOException; +import java.io.InterruptedIOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; @@ -304,7 +306,7 @@ public class DFSInputStream extends FSInputStream try { Thread.sleep(waitTime); } catch (InterruptedException e) { - throw new IOException( + throw new InterruptedIOException( "Interrupted while getting the last block length."); } } @@ -379,6 +381,7 @@ public class DFSInputStream extends FSInputStream return n; } } catch (IOException ioe) { +checkInterrupted(ioe); if (ioe instanceof RemoteException) { if (((RemoteException) ioe).unwrapRemoteException() instanceof ReplicaNotFoundException) { @@ -414,7 +417,8 @@ public class DFSInputStream extends FSInputStream try { Thread.sleep(500); // delay between retries. } catch (InterruptedException e) { - throw new IOException("Interrupted while getting the length."); + throw new InterruptedIOException( + "Interrupted while getting the length."); } } @@ -660,6 +664,7 @@ public class DFSInputStream extends FSInputStream } return chosenNode; } catch (IOException ex) { +checkInterrupted(ex); if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { DFSClient.LOG.info("Will fetch a new encryption key and retry, " + "encryption key was invalid when connecting to " + targetAddr @@ -681,6 +686,15 @@ public class DFSInputStream extends FSInputStream } } + private void checkInterrupted(IOException e) throws IOException { +if (Thread.currentThread().isInterrupted() && +(e instanceof ClosedByInterruptException || +e instanceof InterruptedIOException)) { + DFSClient.LOG.debug("The reading thread has been interrupted.", e); + throw e; +} + } + protected BlockReader getBlockReader(LocatedBlock targetBlock, long offsetInBlock, long length, InetSocketAddress targetAddr, StorageType storageType, DatanodeInfo datanode) throws IOException { @@ -948,6 +962,7 @@ public class DFSInputStream extends FSInputStream } catch (ChecksumException ce) { throw ce; } catch (IOException e) { + checkInterrupted(e); if (retries == 1) { DFSClient.LOG.warn("DFS Read", e); } @@ -1044,9 +1059,12 @@ public class DFSInputStream extends FSInputStream // expanding time window for each failure timeWindow * (failures + 1) * ThreadLocalRandom.current().nextDouble(); - DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec."); + DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + + " IOException, will wait for " + waitTime + " msec.");
[50/50] [abbrv] hadoop git commit: HDFS-10468. HDFS read ends up ignoring an interrupt. Contributed by Jing Zhao
HDFS-10468. HDFS read ends up ignoring an interrupt. Contributed by Jing Zhao Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/be34e85e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/be34e85e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/be34e85e Branch: refs/heads/YARN-4757 Commit: be34e85e682880f46eee0310bf00ecc7d39cd5bd Parents: c14c1b2 Author: Jing ZhaoAuthored: Tue Jun 7 10:48:21 2016 -0700 Committer: Jing Zhao Committed: Tue Jun 7 10:48:21 2016 -0700 -- .../org/apache/hadoop/hdfs/DFSInputStream.java | 36 ++-- .../java/org/apache/hadoop/hdfs/TestRead.java | 87 .../server/datanode/SimulatedFSDataset.java | 4 +- 3 files changed, 119 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/be34e85e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java -- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 2ed0abd..7f32a56 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -19,8 +19,10 @@ package org.apache.hadoop.hdfs; import java.io.EOFException; import java.io.IOException; +import java.io.InterruptedIOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; @@ -304,7 +306,7 @@ public class DFSInputStream extends FSInputStream try { Thread.sleep(waitTime); } catch (InterruptedException e) { - throw new IOException( + throw new InterruptedIOException( "Interrupted while getting the last block length."); } } @@ -379,6 +381,7 @@ public class DFSInputStream extends FSInputStream return n; } } catch (IOException ioe) { +checkInterrupted(ioe); if (ioe instanceof RemoteException) { if (((RemoteException) ioe).unwrapRemoteException() instanceof ReplicaNotFoundException) { @@ -414,7 +417,8 @@ public class DFSInputStream extends FSInputStream try { Thread.sleep(500); // delay between retries. } catch (InterruptedException e) { - throw new IOException("Interrupted while getting the length."); + throw new InterruptedIOException( + "Interrupted while getting the length."); } } @@ -660,6 +664,7 @@ public class DFSInputStream extends FSInputStream } return chosenNode; } catch (IOException ex) { +checkInterrupted(ex); if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { DFSClient.LOG.info("Will fetch a new encryption key and retry, " + "encryption key was invalid when connecting to " + targetAddr @@ -681,6 +686,15 @@ public class DFSInputStream extends FSInputStream } } + private void checkInterrupted(IOException e) throws IOException { +if (Thread.currentThread().isInterrupted() && +(e instanceof ClosedByInterruptException || +e instanceof InterruptedIOException)) { + DFSClient.LOG.debug("The reading thread has been interrupted.", e); + throw e; +} + } + protected BlockReader getBlockReader(LocatedBlock targetBlock, long offsetInBlock, long length, InetSocketAddress targetAddr, StorageType storageType, DatanodeInfo datanode) throws IOException { @@ -948,6 +962,7 @@ public class DFSInputStream extends FSInputStream } catch (ChecksumException ce) { throw ce; } catch (IOException e) { + checkInterrupted(e); if (retries == 1) { DFSClient.LOG.warn("DFS Read", e); } @@ -1044,9 +1059,12 @@ public class DFSInputStream extends FSInputStream // expanding time window for each failure timeWindow * (failures + 1) * ThreadLocalRandom.current().nextDouble(); - DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec."); + DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + + " IOException, will wait for " + waitTime + " msec."); Thread.sleep((long)waitTime); -} catch (InterruptedException ignored) { +} catch
hadoop git commit: HDFS-10468. HDFS read ends up ignoring an interrupt. Contributed by Jing Zhao
Repository: hadoop Updated Branches: refs/heads/branch-2 19eb997f6 -> 8b34040cb HDFS-10468. HDFS read ends up ignoring an interrupt. Contributed by Jing Zhao (cherry picked from commit be34e85e682880f46eee0310bf00ecc7d39cd5bd) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8b34040c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8b34040c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8b34040c Branch: refs/heads/branch-2 Commit: 8b34040cb905a3244e8d688e3c5713da557139f1 Parents: 19eb997 Author: Jing ZhaoAuthored: Tue Jun 7 10:48:21 2016 -0700 Committer: Jing Zhao Committed: Tue Jun 7 10:52:33 2016 -0700 -- .../org/apache/hadoop/hdfs/DFSInputStream.java | 36 ++-- .../java/org/apache/hadoop/hdfs/TestRead.java | 87 .../server/datanode/SimulatedFSDataset.java | 4 +- 3 files changed, 119 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b34040c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java -- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 4a83a53..fb8d207 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -19,8 +19,10 @@ package org.apache.hadoop.hdfs; import java.io.EOFException; import java.io.IOException; +import java.io.InterruptedIOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; @@ -307,7 +309,7 @@ public class DFSInputStream extends FSInputStream try { Thread.sleep(waitTime); } catch (InterruptedException e) { - throw new IOException( + throw new InterruptedIOException( "Interrupted while getting the last block length."); } } @@ -382,6 +384,7 @@ public class DFSInputStream extends FSInputStream return n; } } catch (IOException ioe) { +checkInterrupted(ioe); if (ioe instanceof RemoteException) { if (((RemoteException) ioe).unwrapRemoteException() instanceof ReplicaNotFoundException) { @@ -417,7 +420,8 @@ public class DFSInputStream extends FSInputStream try { Thread.sleep(500); // delay between retries. } catch (InterruptedException e) { - throw new IOException("Interrupted while getting the length."); + throw new InterruptedIOException( + "Interrupted while getting the length."); } } @@ -663,6 +667,7 @@ public class DFSInputStream extends FSInputStream } return chosenNode; } catch (IOException ex) { +checkInterrupted(ex); if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { DFSClient.LOG.info("Will fetch a new encryption key and retry, " + "encryption key was invalid when connecting to " + targetAddr @@ -684,6 +689,15 @@ public class DFSInputStream extends FSInputStream } } + private void checkInterrupted(IOException e) throws IOException { +if (Thread.currentThread().isInterrupted() && +(e instanceof ClosedByInterruptException || +e instanceof InterruptedIOException)) { + DFSClient.LOG.debug("The reading thread has been interrupted.", e); + throw e; +} + } + protected BlockReader getBlockReader(LocatedBlock targetBlock, long offsetInBlock, long length, InetSocketAddress targetAddr, StorageType storageType, DatanodeInfo datanode) throws IOException { @@ -950,6 +964,7 @@ public class DFSInputStream extends FSInputStream } catch (ChecksumException ce) { throw ce; } catch (IOException e) { + checkInterrupted(e); if (retries == 1) { DFSClient.LOG.warn("DFS Read", e); } @@ -1064,9 +1079,12 @@ public class DFSInputStream extends FSInputStream // expanding time window for each failure timeWindow * (failures + 1) * ThreadLocalRandom.current().nextDouble(); - DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec."); + DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + + " IOException,
hadoop git commit: HDFS-10468. HDFS read ends up ignoring an interrupt. Contributed by Jing Zhao
Repository: hadoop Updated Branches: refs/heads/trunk c14c1b298 -> be34e85e6 HDFS-10468. HDFS read ends up ignoring an interrupt. Contributed by Jing Zhao Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/be34e85e Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/be34e85e Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/be34e85e Branch: refs/heads/trunk Commit: be34e85e682880f46eee0310bf00ecc7d39cd5bd Parents: c14c1b2 Author: Jing ZhaoAuthored: Tue Jun 7 10:48:21 2016 -0700 Committer: Jing Zhao Committed: Tue Jun 7 10:48:21 2016 -0700 -- .../org/apache/hadoop/hdfs/DFSInputStream.java | 36 ++-- .../java/org/apache/hadoop/hdfs/TestRead.java | 87 .../server/datanode/SimulatedFSDataset.java | 4 +- 3 files changed, 119 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/be34e85e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java -- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 2ed0abd..7f32a56 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -19,8 +19,10 @@ package org.apache.hadoop.hdfs; import java.io.EOFException; import java.io.IOException; +import java.io.InterruptedIOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; import java.util.AbstractMap; import java.util.ArrayList; import java.util.Arrays; @@ -304,7 +306,7 @@ public class DFSInputStream extends FSInputStream try { Thread.sleep(waitTime); } catch (InterruptedException e) { - throw new IOException( + throw new InterruptedIOException( "Interrupted while getting the last block length."); } } @@ -379,6 +381,7 @@ public class DFSInputStream extends FSInputStream return n; } } catch (IOException ioe) { +checkInterrupted(ioe); if (ioe instanceof RemoteException) { if (((RemoteException) ioe).unwrapRemoteException() instanceof ReplicaNotFoundException) { @@ -414,7 +417,8 @@ public class DFSInputStream extends FSInputStream try { Thread.sleep(500); // delay between retries. } catch (InterruptedException e) { - throw new IOException("Interrupted while getting the length."); + throw new InterruptedIOException( + "Interrupted while getting the length."); } } @@ -660,6 +664,7 @@ public class DFSInputStream extends FSInputStream } return chosenNode; } catch (IOException ex) { +checkInterrupted(ex); if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { DFSClient.LOG.info("Will fetch a new encryption key and retry, " + "encryption key was invalid when connecting to " + targetAddr @@ -681,6 +686,15 @@ public class DFSInputStream extends FSInputStream } } + private void checkInterrupted(IOException e) throws IOException { +if (Thread.currentThread().isInterrupted() && +(e instanceof ClosedByInterruptException || +e instanceof InterruptedIOException)) { + DFSClient.LOG.debug("The reading thread has been interrupted.", e); + throw e; +} + } + protected BlockReader getBlockReader(LocatedBlock targetBlock, long offsetInBlock, long length, InetSocketAddress targetAddr, StorageType storageType, DatanodeInfo datanode) throws IOException { @@ -948,6 +962,7 @@ public class DFSInputStream extends FSInputStream } catch (ChecksumException ce) { throw ce; } catch (IOException e) { + checkInterrupted(e); if (retries == 1) { DFSClient.LOG.warn("DFS Read", e); } @@ -1044,9 +1059,12 @@ public class DFSInputStream extends FSInputStream // expanding time window for each failure timeWindow * (failures + 1) * ThreadLocalRandom.current().nextDouble(); - DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec."); + DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + + " IOException, will wait for " + waitTime + " msec.");