This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 87f69cb HDDS-2504. Handle InterruptedException properly (#386)
87f69cb is described below
commit 87f69cbc1b0a089fbe2f06b56f161766a7c2f6ab
Author: Vivek Ratnavel Subramanian <[email protected]>
AuthorDate: Thu Jan 2 16:24:48 2020 +0530
HDDS-2504. Handle InterruptedException properly (#386)
---
.../hadoop/hdds/scm/storage/CommitWatcher.java | 23 ++++++++++------
.../apache/hadoop/hdds/scm/XceiverClientSpi.java | 23 +++++++++++-----
.../hadoop/hdds/utils/BackgroundService.java | 10 ++++++-
.../org/apache/hadoop/hdds/utils/Scheduler.java | 31 ++++++++++++----------
4 files changed, 58 insertions(+), 29 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
index 766065f..ebcc6dc 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
@@ -164,7 +164,6 @@ public class CommitWatcher {
}
}
-
private void adjustBuffers(long commitIndex) {
List<Long> keyList = commitIndex2flushedDataMap.keySet().stream()
.filter(p -> p <= commitIndex).collect(Collectors.toList());
@@ -180,7 +179,6 @@ public class CommitWatcher {
adjustBuffers(xceiverClient.getReplicatedMinCommitIndex());
}
-
/**
* calls watchForCommit API of the Ratis Client. For Standalone client,
* it is a no op.
@@ -201,15 +199,24 @@ public class CommitWatcher {
}
adjustBuffers(index);
return reply;
- } catch (TimeoutException | InterruptedException | ExecutionException e) {
- LOG.warn("watchForCommit failed for index " + commitIndex, e);
- IOException ioException = new IOException(
- "Unexpected Storage Container Exception: " + e.toString(), e);
- releaseBuffersOnException();
- throw ioException;
+ } catch (InterruptedException e) {
+ // Re-interrupt the thread while catching InterruptedException
+ Thread.currentThread().interrupt();
+ throw getIOExceptionForWatchForCommit(commitIndex, e);
+ } catch (TimeoutException | ExecutionException e) {
+ throw getIOExceptionForWatchForCommit(commitIndex, e);
}
}
+ private IOException getIOExceptionForWatchForCommit(long commitIndex,
+ Exception e) {
+ LOG.warn("watchForCommit failed for index {}", commitIndex, e);
+ IOException ioException = new IOException(
+ "Unexpected Storage Container Exception: " + e.toString(), e);
+ releaseBuffersOnException();
+ return ioException;
+ }
+
@VisibleForTesting
public Map<Long, List<ChunkBuffer>> getCommitIndex2flushedDataMap() {
return commitIndex2flushedDataMap;
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
index d9e5a1f..f938448 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
@@ -107,10 +107,13 @@ public abstract class XceiverClientSpi implements
Closeable {
try {
XceiverClientReply reply;
reply = sendCommandAsync(request);
- ContainerCommandResponseProto responseProto = reply.getResponse().get();
- return responseProto;
- } catch (ExecutionException | InterruptedException e) {
- throw new IOException("Failed to command " + request, e);
+ return reply.getResponse().get();
+ } catch (InterruptedException e) {
+ // Re-interrupt the thread while catching InterruptedException
+ Thread.currentThread().interrupt();
+ throw getIOExceptionForSendCommand(request, e);
+ } catch (ExecutionException e) {
+ throw getIOExceptionForSendCommand(request, e);
}
}
@@ -133,11 +136,19 @@ public abstract class XceiverClientSpi implements
Closeable {
function.apply(request, responseProto);
}
return responseProto;
- } catch (ExecutionException | InterruptedException e) {
- throw new IOException("Failed to command " + request, e);
+ } catch (InterruptedException e) {
+ // Re-interrupt the thread while catching InterruptedException
+ Thread.currentThread().interrupt();
+ throw getIOExceptionForSendCommand(request, e);
+ } catch (ExecutionException e) {
+ throw getIOExceptionForSendCommand(request, e);
}
}
+ private IOException getIOExceptionForSendCommand(
+ ContainerCommandRequestProto request, Exception e) {
+ return new IOException("Failed to execute command " + request, e);
+ }
/**
* Sends a given command to server gets a waitable future back.
*
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java
index ca8d870..727e903 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/BackgroundService.java
@@ -134,7 +134,13 @@ public abstract class BackgroundService {
if (LOG.isDebugEnabled()) {
LOG.debug("task execution result size {}", result.getSize());
}
- } catch (InterruptedException | ExecutionException e) {
+ } catch (InterruptedException e) {
+ LOG.warn(
+ "Background task failed due to interruption, retrying in " +
+ "next interval", e);
+ // Re-interrupt the thread while catching InterruptedException
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
LOG.warn(
"Background task fails to execute, "
+ "retrying in next interval", e);
@@ -155,6 +161,8 @@ public abstract class BackgroundService {
exec.shutdownNow();
}
} catch (InterruptedException e) {
+ // Re-interrupt the thread while catching InterruptedException
+ Thread.currentThread().interrupt();
exec.shutdownNow();
}
if (threadGroup.activeCount() == 0 && !threadGroup.isDestroyed()) {
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Scheduler.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Scheduler.java
index f5e55c1..8a1c5fb 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Scheduler.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Scheduler.java
@@ -34,7 +34,7 @@ public class Scheduler {
private static final Logger LOG =
LoggerFactory.getLogger(Scheduler.class);
- private ScheduledExecutorService scheduler;
+ private ScheduledExecutorService scheduledExecutorService;
private volatile boolean isClosed;
@@ -48,23 +48,24 @@ public class Scheduler {
* @param numCoreThreads - number of core threads to maintain in the
scheduler
*/
public Scheduler(String threadName, boolean isDaemon, int numCoreThreads) {
- scheduler = Executors.newScheduledThreadPool(numCoreThreads, r -> {
- Thread t = new Thread(r);
- t.setName(threadName);
- t.setDaemon(isDaemon);
- return t;
- });
+ scheduledExecutorService = Executors.newScheduledThreadPool(numCoreThreads,
+ r -> {
+ Thread t = new Thread(r);
+ t.setName(threadName);
+ t.setDaemon(isDaemon);
+ return t;
+ });
this.threadName = threadName;
isClosed = false;
}
public void schedule(Runnable runnable, long delay, TimeUnit timeUnit) {
- scheduler.schedule(runnable, delay, timeUnit);
+ scheduledExecutorService.schedule(runnable, delay, timeUnit);
}
public void schedule(CheckedRunnable runnable, long delay,
TimeUnit timeUnit, Logger logger, String errMsg) {
- scheduler.schedule(() -> {
+ scheduledExecutorService.schedule(() -> {
try {
runnable.run();
} catch (Throwable throwable) {
@@ -75,7 +76,7 @@ public class Scheduler {
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable runnable,
long initialDelay, long fixedDelay, TimeUnit timeUnit) {
- return scheduler
+ return scheduledExecutorService
.scheduleWithFixedDelay(runnable, initialDelay, fixedDelay, timeUnit);
}
@@ -90,16 +91,18 @@ public class Scheduler {
*/
public synchronized void close() {
isClosed = true;
- if (scheduler != null) {
- scheduler.shutdownNow();
+ if (scheduledExecutorService != null) {
+ scheduledExecutorService.shutdownNow();
try {
- scheduler.awaitTermination(60, TimeUnit.SECONDS);
+ scheduledExecutorService.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.info(
threadName + " interrupted while waiting for task completion {}",
e);
+ // Re-interrupt the thread while catching InterruptedException
+ Thread.currentThread().interrupt();
}
}
- scheduler = null;
+ scheduledExecutorService = null;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]