Murtadha Hubail has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1372

Change subject: Fix indefinite wait time for replication Job ACK
......................................................................

Fix indefinite wait time for replication Job ACK

Change-Id: I88d2d61270522c766441e16fd996ac975935594b
---
M 
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
1 file changed, 11 insertions(+), 2 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/72/1372/1

diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
index 87be768..cd0179d 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationManager.java
@@ -97,6 +97,7 @@
 
     private static final Logger LOGGER = 
Logger.getLogger(ReplicationManager.class.getName());
     private static final int INITIAL_REPLICATION_FACTOR = 1;
+    private static final int MAX_JOB_COMMIT_ACK_WAIT = 10000;
     private final String nodeId;
     private ExecutorService replicationListenerThreads;
     private final Map<Integer, Set<String>> jobCommitAcks;
@@ -575,8 +576,16 @@
         if (logsRepSockets != null) {
             synchronized (jobCommitAcks) {
                 try {
-                    while (jobCommitAcks.size() != 0) {
-                        jobCommitAcks.wait();
+                    long waitStartTime = System.currentTimeMillis();
+                    while (!jobCommitAcks.isEmpty()) {
+                        jobCommitAcks.wait(1000);
+                        long waitDuration = System.currentTimeMillis() - 
waitStartTime;
+                        if (waitDuration > MAX_JOB_COMMIT_ACK_WAIT) {
+                            LOGGER.log(Level.SEVERE,
+                                    "Timeout before receving all job ACKs from 
replicas. Pending jobs ("
+                                            + 
jobCommitAcks.keySet().toString() + ")");
+                            break;
+                        }
                     }
                 } catch (InterruptedException e) {
                     if (LOGGER.isLoggable(Level.SEVERE)) {

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1372
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I88d2d61270522c766441e16fd996ac975935594b
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <hubail...@gmail.com>

Reply via email to