Michael Blow has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1716
Change subject: Misc Cleanup / Fixes
......................................................................
Misc Cleanup / Fixes
- Improved InterruptedException handling (CPU spin in
ReplicationManager)
- Fix incorrectly setting wrong thread name in commit profiler
- Utilize ScheduledExecutorService to simplify repeating task logic
Change-Id: Iea9d89b6e557061209205fbe975cd20393b548ef
---
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java
M
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
M
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
M
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
M
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
6 files changed, 91 insertions(+), 62 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/16/1716/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
index ec1a386..5a544d1 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
@@ -18,7 +18,7 @@
*/
package org.apache.asterix.app.nc;
-import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import org.apache.asterix.common.config.ReplicationProperties;
@@ -58,7 +58,6 @@
//for profiling purpose
private long profilerEntityCommitLogCount = 0;
- private EntityCommitProfiler ecp;
public TransactionSubsystem(INCServiceContext serviceCtx, String id,
IAppRuntimeContextProvider asterixAppRuntimeContextProvider,
TransactionProperties txnProperties)
@@ -89,9 +88,10 @@
}
this.recoveryManager = new RecoveryManager(this, serviceCtx);
- if (this.txnProperties.isCommitProfilerEnabled()) {
- ecp = new EntityCommitProfiler(this,
this.txnProperties.getCommitProfilerReportInterval());
-
getAsterixAppRuntimeContextProvider().getThreadExecutor().submit(ecp);
+ if (txnProperties.isCommitProfilerEnabled()) {
+ final int intervalSecs =
txnProperties.getCommitProfilerReportInterval();
+
getAsterixAppRuntimeContextProvider().getThreadExecutor().scheduleWithFixedDelay(
+ new EntityCommitProfiler(this, intervalSecs),
intervalSecs, intervalSecs, TimeUnit.SECONDS);
}
}
@@ -147,41 +147,35 @@
* only if IS_PROFILE_MODE is set to true.
* However, the thread doesn't start reporting the count until the
entityCommitCount > 0.
*/
- static class EntityCommitProfiler implements Callable<Boolean> {
+ static class EntityCommitProfiler implements Runnable {
private static final Logger LOGGER =
Logger.getLogger(EntityCommitProfiler.class.getName());
- private final long reportIntervalInMillisec;
+ private static final long UNDEFINED_TIMESTAMP = 0L;
+
private long lastEntityCommitCount;
private int reportIntervalInSeconds;
private TransactionSubsystem txnSubsystem;
- private boolean firstReport = true;
- private long startTimeStamp = 0;
+ private long startTimeStamp = UNDEFINED_TIMESTAMP;
private long reportRound = 1;
- public EntityCommitProfiler(TransactionSubsystem txnSubsystem, int
reportIntervalInSeconds) {
- Thread.currentThread().setName("EntityCommitProfiler-Thread");
+ EntityCommitProfiler(TransactionSubsystem txnSubsystem, int
reportIntervalInSeconds) {
this.txnSubsystem = txnSubsystem;
this.reportIntervalInSeconds = reportIntervalInSeconds;
- this.reportIntervalInMillisec = reportIntervalInSeconds * 1000L;
lastEntityCommitCount = txnSubsystem.profilerEntityCommitLogCount;
}
@Override
- public Boolean call() throws Exception {
- while (true) {
- Thread.sleep(reportIntervalInMillisec);
- if (txnSubsystem.profilerEntityCommitLogCount > 0) {
- if (firstReport) {
- startTimeStamp = System.currentTimeMillis();
- firstReport = false;
- }
- outputCount();
- }
+ public void run() {
+ if (txnSubsystem.profilerEntityCommitLogCount > 0) {
+ outputCount();
}
}
private void outputCount() {
long currentTimeStamp = System.currentTimeMillis();
long currentEntityCommitCount =
txnSubsystem.profilerEntityCommitLogCount;
+ if (startTimeStamp == UNDEFINED_TIMESTAMP) {
+ startTimeStamp = currentTimeStamp;
+ }
LOGGER.severe("EntityCommitProfiler ReportRound[" + reportRound +
"], AbsoluteTimeStamp["
+ currentTimeStamp + "], ActualRelativeTimeStamp[" +
(currentTimeStamp - startTimeStamp)
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java
index 03cead0..888e293 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java
@@ -20,16 +20,19 @@
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+@SuppressWarnings("squid:S1452") // Generic wildcard types should not be used
in return parameters
public class ThreadExecutor implements Executor {
- private final ExecutorService executorService;
+ private final ScheduledExecutorService executorService;
public ThreadExecutor(ThreadFactory threadFactory) {
- executorService = Executors.newCachedThreadPool(threadFactory);
+ executorService = Executors.newScheduledThreadPool(0, threadFactory);
}
@Override
@@ -40,4 +43,24 @@
public <T> Future<T> submit(Callable<T> command) {
return executorService.submit(command);
}
+
+ public Future<?> submit(Runnable task) {
+ return executorService.submit(task);
+ }
+
+ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit
unit) {
+ return executorService.schedule(command, delay, unit);
+ }
+
+ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay,
TimeUnit unit) {
+ return executorService.schedule(callable, delay, unit);
+ }
+
+ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long
initialDelay, long period, TimeUnit unit) {
+ return executorService.scheduleAtFixedRate(command, initialDelay,
period, unit);
+ }
+
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long
initialDelay, long delay, TimeUnit unit) {
+ return executorService.scheduleWithFixedDelay(command, initialDelay,
delay, unit);
+ }
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
index b969bef..bab01c1 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
@@ -89,7 +89,7 @@
/**
* Checks and sets each remote replica state.
*/
- public void initializeReplicasState();
+ public void initializeReplicasState() throws InterruptedException;
/**
* Updates remote replica (in-memory) information.
diff --git
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
index be7cdc5..5efd482 100644
---
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
+++
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
@@ -23,16 +23,15 @@
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
-import java.util.concurrent.Callable;
import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.replication.Replica;
import org.apache.asterix.common.replication.Replica.ReplicaState;
import org.apache.asterix.replication.functions.ReplicationProtocol;
-public class ReplicaStateChecker implements Callable<Void> {
+public class ReplicaStateChecker implements Runnable {
- private final int WAIT_TIME = 2000;
+ private static final int WAIT_TIME = 2000;
private final Replica replica;
private final int replicationTimeOut;
private final ReplicationManager replicationManager;
@@ -49,30 +48,43 @@
}
@Override
- public Void call() throws Exception {
- Thread.currentThread().setName("ReplicaConnector Thread");
+ public void run() {
+ final String origName = Thread.currentThread().getName();
+ Thread.currentThread().setName(origName + ":Replica Connector Thread");
+ try {
+ long startTime = System.currentTimeMillis();
+ InetSocketAddress replicaAddress =
replica.getAddress(asterixReplicationProperties);
- long startTime = System.currentTimeMillis();
- InetSocketAddress replicaAddress =
replica.getAddress(asterixReplicationProperties);
-
- while (true) {
- try (SocketChannel connection = SocketChannel.open()) {
- connection.configureBlocking(true);
- connection.connect(new
InetSocketAddress(replicaAddress.getHostString(), replicaAddress.getPort()));
- ByteBuffer buffer = ReplicationProtocol.getGoodbyeBuffer();
- connection.write(buffer);
- replicationManager.updateReplicaState(replica.getId(),
ReplicaState.ACTIVE, suspendReplication);
- return null;
- } catch (IOException | UnresolvedAddressException e) {
- Thread.sleep(WAIT_TIME);
-
- //check if connection to replica timed out
- if (((System.currentTimeMillis() - startTime) / 1000) >=
replicationTimeOut) {
- replicationManager.updateReplicaState(replica.getId(),
ReplicaState.DEAD, suspendReplication);
- return null;
+ while (true) {
+ if (tryConnect(startTime, replicaAddress)) {
+ return;
}
}
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ Thread.currentThread().setName(origName);
}
}
+ private boolean tryConnect(long startTime, InetSocketAddress
replicaAddress) throws InterruptedException {
+ try (SocketChannel connection = SocketChannel.open()) {
+ connection.configureBlocking(true);
+ connection.connect(new
InetSocketAddress(replicaAddress.getHostString(), replicaAddress.getPort()));
+ ByteBuffer buffer = ReplicationProtocol.getGoodbyeBuffer();
+ connection.write(buffer);
+ replicationManager.updateReplicaState(replica.getId(),
ReplicaState.ACTIVE, suspendReplication);
+ return true;
+ } catch (IOException | UnresolvedAddressException e) {
+ Thread.sleep(WAIT_TIME);
+
+ //check if connection to replica timed out
+ if (((System.currentTimeMillis() - startTime) / 1000) >=
replicationTimeOut) {
+ replicationManager.updateReplicaState(replica.getId(),
ReplicaState.DEAD, suspendReplication);
+ return true;
+ }
+ }
+ return false;
+ }
+
}
diff --git
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index da45e42..3945470 100644
---
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -687,7 +687,7 @@
}
@Override
- public void initializeReplicasState() {
+ public void initializeReplicasState() throws InterruptedException {
for (Replica replica : replicas.values()) {
checkReplicaState(replica.getNode().getId(), false, false);
}
@@ -703,21 +703,20 @@
* @param suspendReplication
* a flag indicating whether to suspend replication on replica
state change or not.
*/
- private void checkReplicaState(String replicaId, boolean async, boolean
suspendReplication) {
+ private void checkReplicaState(String replicaId, boolean async, boolean
suspendReplication)
+ throws InterruptedException {
Replica replica = replicas.get(replicaId);
ReplicaStateChecker connector = new ReplicaStateChecker(replica,
replicationProperties.getReplicationTimeOut(),
this, replicationProperties, suspendReplication);
- Future<? extends Object> ft =
asterixAppRuntimeContextProvider.getThreadExecutor().submit(connector);
+ Future<?> ft =
asterixAppRuntimeContextProvider.getThreadExecutor().submit(connector);
if (!async) {
- //wait until task is done
- while (!ft.isDone()) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
+ // wait until task is done
+ try {
+ ft.get();
+ } catch (ExecutionException e) {
+ LOGGER.log(Level.WARNING, "Unexpected exception thrown by
ReplicaStateChecker", e);
}
}
}
diff --git
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
index 7c7a050..317a808 100644
---
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
+++
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
@@ -212,10 +212,11 @@
}
break;
} catch (IOException e) {
- if (LOGGER.isLoggable(Level.WARNING)) {
- LOGGER.log(Level.WARNING, "Failed during remote recovery.
Attempting again...", e);
- }
+ LOGGER.log(Level.WARNING, "Failed during remote recovery.
Attempting again...", e);
maxRecoveryAttempts--;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException("interrupted", e);
}
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1716
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Iea9d89b6e557061209205fbe975cd20393b548ef
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <[email protected]>