Author: reschke
Date: Wed Sep 30 10:38:07 2015
New Revision: 1705998

URL: http://svn.apache.org/viewvc?rev=1705998&view=rev
Log:
OAK-3449: switch DocumentNodeStore support for predefined clusterIds to use 
ClusterNodeInfos

Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java?rev=1705998&r1=1705997&r2=1705998&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
 Wed Sep 30 10:38:07 2015
@@ -330,10 +330,11 @@ public class ClusterNodeInfo {
      * Create a cluster node info instance for the store, with the
      *
      * @param store the document store (for the lease)
+     * @param configuredClusterId the configured cluster id (or 0 for dynamic 
assignment)
      * @return the cluster node info
      */
-    public static ClusterNodeInfo getInstance(DocumentStore store) {
-        return getInstance(store, MACHINE_ID, WORKING_DIR, false);
+    public static ClusterNodeInfo getInstance(DocumentStore store, int 
configuredClusterId) {
+        return getInstance(store, MACHINE_ID, WORKING_DIR, 
configuredClusterId, false);
     }
 
     /**
@@ -346,7 +347,7 @@ public class ClusterNodeInfo {
      */
     public static ClusterNodeInfo getInstance(DocumentStore store, String 
machineId,
             String instanceId) {
-        return getInstance(store, machineId, instanceId, true);
+        return getInstance(store, machineId, instanceId, 0, true);
     }
 
     /**
@@ -355,21 +356,27 @@ public class ClusterNodeInfo {
      * @param store the document store (for the lease)
      * @param machineId the machine id (null for MAC address)
      * @param instanceId the instance id (null for current working directory)
+     * @param configuredClusterId the configured cluster id (or 0 for dynamic 
assignment)
      * @param updateLease whether to update the lease
      * @return the cluster node info
      */
     public static ClusterNodeInfo getInstance(DocumentStore store, String 
machineId,
-            String instanceId, boolean updateLease) {
+            String instanceId, int configuredClusterId, boolean updateLease) {
+
+        // defaults for machineId and instanceID
         if (machineId == null) {
             machineId = MACHINE_ID;
         }
         if (instanceId == null) {
             instanceId = WORKING_DIR;
         }
-        for (int i = 0; i < 10; i++) {
-            ClusterNodeInfo clusterNode = createInstance(store, machineId, 
instanceId);
-            UpdateOp update = new UpdateOp("" + clusterNode.id, true);
-            update.set(ID, String.valueOf(clusterNode.id));
+
+        int retries = 10;
+        for (int i = 0; i < retries; i++) {
+            ClusterNodeInfo clusterNode = createInstance(store, machineId, 
instanceId, configuredClusterId);
+            String key = String.valueOf(clusterNode.id);
+            UpdateOp update = new UpdateOp(key, true);
+            update.set(ID, key);
             update.set(MACHINE_ID_KEY, clusterNode.machineId);
             update.set(INSTANCE_ID_KEY, clusterNode.instanceId);
             if (updateLease) {
@@ -384,8 +391,8 @@ public class ClusterNodeInfo {
 
             final boolean success;
             if (clusterNode.newEntry) {
-                //For new entry do a create. This ensures that if two nodes 
create
-                //entry with same id then only one would succeed
+                // For new entry do a create. This ensures that if two nodes
+                // create entry with same id then only one would succeed
                 success = store.create(Collection.CLUSTER_NODES, 
Collections.singletonList(update));
             } else {
                 // No expiration of earlier cluster info, so update
@@ -397,34 +404,56 @@ public class ClusterNodeInfo {
                 return clusterNode;
             }
         }
-        throw new DocumentStoreException("Could not get cluster node info");
+        throw new DocumentStoreException("Could not get cluster node info 
(retried " + retries + " times)");
     }
 
     private static ClusterNodeInfo createInstance(DocumentStore store, String 
machineId,
-            String instanceId) {
+            String instanceId, int configuredClusterId) {
+
         long now = getCurrentTime();
-        List<ClusterNodeInfoDocument> list = 
ClusterNodeInfoDocument.all(store);
         int clusterNodeId = 0;
         int maxId = 0;
         ClusterNodeState state = ClusterNodeState.NONE;
         Long prevLeaseEnd = null;
         boolean newEntry = false;
-        for (Document doc : list) {
+
+        ClusterNodeInfoDocument alreadyExistingConfigured = null;
+        List<ClusterNodeInfoDocument> list = 
ClusterNodeInfoDocument.all(store);
+
+        for (ClusterNodeInfoDocument doc : list) {
+
             String key = doc.getId();
+
             int id;
             try {
-                id = Integer.parseInt(key);
+                id = doc.getClusterId();
             } catch (Exception e) {
-                // not an integer - ignore
+                LOG.debug("Skipping cluster node info document {} because ID 
is invalid", key);
                 continue;
             }
+
             maxId = Math.max(maxId, id);
+
+            // if a cluster id was configured: check that and abort if it does
+            // not match
+            if (configuredClusterId != 0) {
+                if (configuredClusterId != id) {
+                    continue;
+                } else {
+                    alreadyExistingConfigured = doc;
+                }
+            }
+
             Long leaseEnd = (Long) doc.get(LEASE_END_KEY);
+
             if (leaseEnd != null && leaseEnd > now) {
+                // TODO wait for lease end, see OAK-3449
                 continue;
             }
+
             String mId = "" + doc.get(MACHINE_ID_KEY);
             String iId = "" + doc.get(INSTANCE_ID_KEY);
+
             if (mId.startsWith(RANDOM_PREFIX)) {
                 // remove expired entries with random keys
                 store.remove(Collection.CLUSTER_NODES, key);
@@ -432,14 +461,14 @@ public class ClusterNodeInfo {
                         leaseEnd == null ? "n/a" : 
Utils.timestampToString(leaseEnd));
                 continue;
             }
-            if (!mId.equals(machineId) ||
-                    !iId.equals(instanceId)) {
+
+            if (!mId.equals(machineId) || !iId.equals(instanceId)) {
                 // a different machine or instance
                 continue;
             }
 
-            //and cluster node which matches current machine identity but
-            //not being used
+            // a cluster node which matches current machine identity but
+            // not being used
             if (clusterNodeId == 0 || id < clusterNodeId) {
                 // if there are multiple, use the smallest value
                 clusterNodeId = id;
@@ -448,11 +477,19 @@ public class ClusterNodeInfo {
             }
         }
 
-        //No existing entry with matching signature found so
-        //create a new entry
+        // No usable existing entry with matching signature found so
+        // create a new entry
         if (clusterNodeId == 0) {
-            clusterNodeId = maxId + 1;
             newEntry = true;
+            if (configuredClusterId != 0) {
+                if (alreadyExistingConfigured != null) {
+                    throw new DocumentStoreException(
+                            "Configured cluster node id " + 
configuredClusterId + " already in use: " + alreadyExistingConfigured);
+                }
+                clusterNodeId = configuredClusterId;
+            } else {
+                clusterNodeId = maxId + 1;
+            }
         }
 
         // Do not expire entries and stick on the earlier state, and leaseEnd 
so,
@@ -803,5 +840,4 @@ public class ClusterNodeInfo {
     private static long getCurrentTime() {
         return clock.getTime();
     }
-
 }

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1705998&r1=1705997&r2=1705998&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
 Wed Sep 30 10:38:07 2015
@@ -204,6 +204,7 @@ public final class DocumentNodeStore
     /**
      * The cluster instance info.
      */
+    @Nonnull
     private final ClusterNodeInfo clusterNodeInfo;
 
     /**
@@ -287,8 +288,8 @@ public final class DocumentNodeStore
 
     /**
      * Background thread performing the clusterId lease renew.
-     * Will be {@code null} if {@link #clusterNodeInfo} is {@code null}.
      */
+    @Nonnull
     private Thread leaseUpdateThread;
 
     /**
@@ -427,22 +428,18 @@ public final class DocumentNodeStore
         this.changes = Collection.JOURNAL.newDocument(s);
         this.executor = builder.getExecutor();
         this.clock = builder.getClock();
+
         int cid = builder.getClusterId();
         cid = Integer.getInteger("oak.documentMK.clusterId", cid);
-        if (cid == 0) {
-            clusterNodeInfo = ClusterNodeInfo.getInstance(s);
-            // TODO we should ensure revisions generated from now on
-            // are never "older" than revisions already in the repository for
-            // this cluster id
-            cid = clusterNodeInfo.getId();
-        } else {
-            clusterNodeInfo = null;
-        }
+        clusterNodeInfo = ClusterNodeInfo.getInstance(s, cid);
+        // TODO we should ensure revisions generated from now on
+        // are never "older" than revisions already in the repository for
+        // this cluster id
+        cid = clusterNodeInfo.getId();
+
         if (builder.getLeaseCheck()) {
             s = new LeaseCheckDocumentStoreWrapper(s, clusterNodeInfo);
-            if (clusterNodeInfo!=null) {
-                
clusterNodeInfo.setLeaseFailureHandler(builder.getLeaseFailureHandler());
-            }
+            
clusterNodeInfo.setLeaseFailureHandler(builder.getLeaseFailureHandler());
         }
         this.store = s;
         this.clusterId = cid;
@@ -530,17 +527,14 @@ public final class DocumentNodeStore
         backgroundReadThread.start();
         backgroundUpdateThread.start();
 
-        if (clusterNodeInfo != null) {
-            leaseUpdateThread = new Thread(
-                    new BackgroundLeaseUpdate(this, isDisposed),
-                    "DocumentNodeStore lease update thread " + 
threadNamePostfix);
-            leaseUpdateThread.setDaemon(true);
-            // OAK-3398 : make lease updating more robust by ensuring it
-            // has higher likelihood of succeeding than other threads
-            // on a very busy machine - so as to prevent lease timeout.
-            leaseUpdateThread.setPriority(Thread.MAX_PRIORITY);
-            leaseUpdateThread.start();
-        }
+        leaseUpdateThread = new Thread(new BackgroundLeaseUpdate(this, 
isDisposed),
+                "DocumentNodeStore lease update thread " + threadNamePostfix);
+        leaseUpdateThread.setDaemon(true);
+        // OAK-3398 : make lease updating more robust by ensuring it
+        // has higher likelihood of succeeding than other threads
+        // on a very busy machine - so as to prevent lease timeout.
+        leaseUpdateThread.setPriority(Thread.MAX_PRIORITY);
+        leaseUpdateThread.start();
 
         this.mbean = createMBean();
         LOG.info("Initialized DocumentNodeStore with clusterNodeId: {} ({})", 
clusterId,
@@ -601,9 +595,7 @@ public final class DocumentNodeStore
 
         // now mark this cluster node as inactive by
         // disposing the clusterNodeInfo
-        if (clusterNodeInfo != null) {
-            clusterNodeInfo.dispose();
-        }
+        clusterNodeInfo.dispose();
         store.dispose();
 
         if (blobStore instanceof Closeable) {
@@ -620,7 +612,7 @@ public final class DocumentNodeStore
     }
 
     private String getClusterNodeInfoDisplayString() {
-        return clusterNodeInfo == null ? "no cluster node info" : 
clusterNodeInfo.toString().replaceAll("[\r\n\t]", " ").trim();
+        return clusterNodeInfo.toString().replaceAll("[\r\n\t]", " ").trim();
     }
 
     Revision setHeadRevision(@Nonnull Revision newHead) {
@@ -756,7 +748,7 @@ public final class DocumentNodeStore
         return enableConcurrentAddRemove;
     }
 
-    @CheckForNull
+    @Nonnull
     public ClusterNodeInfo getClusterInfo() {
         return clusterNodeInfo;
     }
@@ -1776,7 +1768,7 @@ public final class DocumentNodeStore
      * @return {@code true} if the lease was renewed; {@code false} otherwise.
      */
     boolean renewClusterIdLease() {
-        return clusterNodeInfo != null && clusterNodeInfo.renewLease();
+        return clusterNodeInfo.renewLease();
     }
 
     /**


Reply via email to