Michael Blow has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1716

Change subject: Misc Cleanup / Fixes
......................................................................

Misc Cleanup / Fixes

- Improved InterruptedException handling (CPU spin in
  ReplicationManager)
- Fix incorrectly setting wrong thread name in commit profiler
- Utilize ScheduledExecutorService to simplify repeating task logic

Change-Id: Iea9d89b6e557061209205fbe975cd20393b548ef
---
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
M 
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
M 
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
M 
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
6 files changed, 91 insertions(+), 62 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/16/1716/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
index ec1a386..5a544d1 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
@@ -18,7 +18,7 @@
  */
 package org.apache.asterix.app.nc;
 
-import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.config.ReplicationProperties;
@@ -58,7 +58,6 @@
 
     //for profiling purpose
     private long profilerEntityCommitLogCount = 0;
-    private EntityCommitProfiler ecp;
 
     public TransactionSubsystem(INCServiceContext serviceCtx, String id,
             IAppRuntimeContextProvider asterixAppRuntimeContextProvider, 
TransactionProperties txnProperties)
@@ -89,9 +88,10 @@
         }
         this.recoveryManager = new RecoveryManager(this, serviceCtx);
 
-        if (this.txnProperties.isCommitProfilerEnabled()) {
-            ecp = new EntityCommitProfiler(this, 
this.txnProperties.getCommitProfilerReportInterval());
-            
getAsterixAppRuntimeContextProvider().getThreadExecutor().submit(ecp);
+        if (txnProperties.isCommitProfilerEnabled()) {
+            final int intervalSecs = 
txnProperties.getCommitProfilerReportInterval();
+            
getAsterixAppRuntimeContextProvider().getThreadExecutor().scheduleWithFixedDelay(
+                    new EntityCommitProfiler(this, intervalSecs), 
intervalSecs, intervalSecs, TimeUnit.SECONDS);
         }
     }
 
@@ -147,41 +147,35 @@
      * only if IS_PROFILE_MODE is set to true.
      * However, the thread doesn't start reporting the count until the 
entityCommitCount > 0.
      */
-    static class EntityCommitProfiler implements Callable<Boolean> {
+    static class EntityCommitProfiler implements Runnable {
         private static final Logger LOGGER = 
Logger.getLogger(EntityCommitProfiler.class.getName());
-        private final long reportIntervalInMillisec;
+        private static final long UNDEFINED_TIMESTAMP = 0L;
+
         private long lastEntityCommitCount;
         private int reportIntervalInSeconds;
         private TransactionSubsystem txnSubsystem;
-        private boolean firstReport = true;
-        private long startTimeStamp = 0;
+        private long startTimeStamp = UNDEFINED_TIMESTAMP;
         private long reportRound = 1;
 
-        public EntityCommitProfiler(TransactionSubsystem txnSubsystem, int 
reportIntervalInSeconds) {
-            Thread.currentThread().setName("EntityCommitProfiler-Thread");
+        EntityCommitProfiler(TransactionSubsystem txnSubsystem, int 
reportIntervalInSeconds) {
             this.txnSubsystem = txnSubsystem;
             this.reportIntervalInSeconds = reportIntervalInSeconds;
-            this.reportIntervalInMillisec = reportIntervalInSeconds * 1000L;
             lastEntityCommitCount = txnSubsystem.profilerEntityCommitLogCount;
         }
 
         @Override
-        public Boolean call() throws Exception {
-            while (true) {
-                Thread.sleep(reportIntervalInMillisec);
-                if (txnSubsystem.profilerEntityCommitLogCount > 0) {
-                    if (firstReport) {
-                        startTimeStamp = System.currentTimeMillis();
-                        firstReport = false;
-                    }
-                    outputCount();
-                }
+        public void run() {
+            if (txnSubsystem.profilerEntityCommitLogCount > 0) {
+                outputCount();
             }
         }
 
         private void outputCount() {
             long currentTimeStamp = System.currentTimeMillis();
             long currentEntityCommitCount = 
txnSubsystem.profilerEntityCommitLogCount;
+            if (startTimeStamp == UNDEFINED_TIMESTAMP) {
+                startTimeStamp = currentTimeStamp;
+            }
 
             LOGGER.severe("EntityCommitProfiler ReportRound[" + reportRound + 
"], AbsoluteTimeStamp["
                     + currentTimeStamp + "], ActualRelativeTimeStamp[" + 
(currentTimeStamp - startTimeStamp)
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java
index 03cead0..888e293 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/ThreadExecutor.java
@@ -20,16 +20,19 @@
 
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 
+@SuppressWarnings("squid:S1452") // Generic wildcard types should not be used 
in return parameters
 public class ThreadExecutor implements Executor {
-    private final ExecutorService executorService;
+    private final ScheduledExecutorService executorService;
 
     public ThreadExecutor(ThreadFactory threadFactory) {
-        executorService = Executors.newCachedThreadPool(threadFactory);
+        executorService = Executors.newScheduledThreadPool(0, threadFactory);
     }
 
     @Override
@@ -40,4 +43,24 @@
     public <T> Future<T> submit(Callable<T> command) {
         return executorService.submit(command);
     }
+
+    public Future<?> submit(Runnable task) {
+        return executorService.submit(task);
+    }
+
+    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit 
unit) {
+        return executorService.schedule(command, delay, unit);
+    }
+
+    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, 
TimeUnit unit) {
+        return executorService.schedule(callable, delay, unit);
+    }
+
+    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long 
initialDelay, long period, TimeUnit unit) {
+        return executorService.scheduleAtFixedRate(command, initialDelay, 
period, unit);
+    }
+
+    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long 
initialDelay, long delay, TimeUnit unit) {
+        return executorService.scheduleWithFixedDelay(command, initialDelay, 
delay, unit);
+    }
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
index b969bef..bab01c1 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IReplicationManager.java
@@ -89,7 +89,7 @@
     /**
      * Checks and sets each remote replica state.
      */
-    public void initializeReplicasState();
+    public void initializeReplicasState() throws InterruptedException;
 
     /**
      * Updates remote replica (in-memory) information.
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
index be7cdc5..5efd482 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicaStateChecker.java
@@ -23,16 +23,15 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 import java.nio.channels.UnresolvedAddressException;
-import java.util.concurrent.Callable;
 
 import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.replication.Replica;
 import org.apache.asterix.common.replication.Replica.ReplicaState;
 import org.apache.asterix.replication.functions.ReplicationProtocol;
 
-public class ReplicaStateChecker implements Callable<Void> {
+public class ReplicaStateChecker implements Runnable {
 
-    private final int WAIT_TIME = 2000;
+    private static final int WAIT_TIME = 2000;
     private final Replica replica;
     private final int replicationTimeOut;
     private final ReplicationManager replicationManager;
@@ -49,30 +48,43 @@
     }
 
     @Override
-    public Void call() throws Exception {
-        Thread.currentThread().setName("ReplicaConnector Thread");
+    public void run() {
+        final String origName = Thread.currentThread().getName();
+        Thread.currentThread().setName(origName + ":Replica Connector Thread");
+        try {
+            long startTime = System.currentTimeMillis();
+            InetSocketAddress replicaAddress = 
replica.getAddress(asterixReplicationProperties);
 
-        long startTime = System.currentTimeMillis();
-        InetSocketAddress replicaAddress = 
replica.getAddress(asterixReplicationProperties);
-
-        while (true) {
-            try (SocketChannel connection = SocketChannel.open()) {
-                connection.configureBlocking(true);
-                connection.connect(new 
InetSocketAddress(replicaAddress.getHostString(), replicaAddress.getPort()));
-                ByteBuffer buffer = ReplicationProtocol.getGoodbyeBuffer();
-                connection.write(buffer);
-                replicationManager.updateReplicaState(replica.getId(), 
ReplicaState.ACTIVE, suspendReplication);
-                return null;
-            } catch (IOException | UnresolvedAddressException e) {
-                Thread.sleep(WAIT_TIME);
-
-                //check if connection to replica timed out
-                if (((System.currentTimeMillis() - startTime) / 1000) >= 
replicationTimeOut) {
-                    replicationManager.updateReplicaState(replica.getId(), 
ReplicaState.DEAD, suspendReplication);
-                    return null;
+            while (true) {
+                if (tryConnect(startTime, replicaAddress)) {
+                    return;
                 }
             }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        } finally {
+            Thread.currentThread().setName(origName);
         }
     }
 
+    private boolean tryConnect(long startTime, InetSocketAddress 
replicaAddress) throws InterruptedException {
+        try (SocketChannel connection = SocketChannel.open()) {
+            connection.configureBlocking(true);
+            connection.connect(new 
InetSocketAddress(replicaAddress.getHostString(), replicaAddress.getPort()));
+            ByteBuffer buffer = ReplicationProtocol.getGoodbyeBuffer();
+            connection.write(buffer);
+            replicationManager.updateReplicaState(replica.getId(), 
ReplicaState.ACTIVE, suspendReplication);
+            return true;
+        } catch (IOException | UnresolvedAddressException e) {
+            Thread.sleep(WAIT_TIME);
+
+            //check if connection to replica timed out
+            if (((System.currentTimeMillis() - startTime) / 1000) >= 
replicationTimeOut) {
+                replicationManager.updateReplicaState(replica.getId(), 
ReplicaState.DEAD, suspendReplication);
+                return true;
+            }
+        }
+        return false;
+    }
+
 }
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index da45e42..3945470 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -687,7 +687,7 @@
     }
 
     @Override
-    public void initializeReplicasState() {
+    public void initializeReplicasState() throws InterruptedException {
         for (Replica replica : replicas.values()) {
             checkReplicaState(replica.getNode().getId(), false, false);
         }
@@ -703,21 +703,20 @@
      * @param suspendReplication
      *            a flag indicating whether to suspend replication on replica 
state change or not.
      */
-    private void checkReplicaState(String replicaId, boolean async, boolean 
suspendReplication) {
+    private void checkReplicaState(String replicaId, boolean async, boolean 
suspendReplication)
+            throws InterruptedException {
         Replica replica = replicas.get(replicaId);
 
         ReplicaStateChecker connector = new ReplicaStateChecker(replica, 
replicationProperties.getReplicationTimeOut(),
                 this, replicationProperties, suspendReplication);
-        Future<? extends Object> ft = 
asterixAppRuntimeContextProvider.getThreadExecutor().submit(connector);
+        Future<?> ft = 
asterixAppRuntimeContextProvider.getThreadExecutor().submit(connector);
 
         if (!async) {
-            //wait until task is done
-            while (!ft.isDone()) {
-                try {
-                    Thread.sleep(1000);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                }
+            // wait until task is done
+            try {
+                ft.get();
+            } catch (ExecutionException e) {
+                LOGGER.log(Level.WARNING, "Unexpected exception thrown by 
ReplicaStateChecker", e);
             }
         }
     }
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
index 7c7a050..317a808 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/recovery/RemoteRecoveryManager.java
@@ -212,10 +212,11 @@
                 }
                 break;
             } catch (IOException e) {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.log(Level.WARNING, "Failed during remote recovery. 
Attempting again...", e);
-                }
+                LOGGER.log(Level.WARNING, "Failed during remote recovery. 
Attempting again...", e);
                 maxRecoveryAttempts--;
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new IllegalStateException("interrupted", e);
             }
         }
     }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1716
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Iea9d89b6e557061209205fbe975cd20393b548ef
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Michael Blow <[email protected]>

Reply via email to