Author: reschke
Date: Wed Sep 30 10:38:07 2015
New Revision: 1705998
URL: http://svn.apache.org/viewvc?rev=1705998&view=rev
Log:
OAK-3449: switch DocumentNodeStore support for predefined clusterIds to use
ClusterNodeInfos
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java?rev=1705998&r1=1705997&r2=1705998&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/ClusterNodeInfo.java
Wed Sep 30 10:38:07 2015
@@ -330,10 +330,11 @@ public class ClusterNodeInfo {
* Create a cluster node info instance for the store, with the
*
* @param store the document store (for the lease)
+ * @param configuredClusterId the configured cluster id (or 0 for dynamic
assignment)
* @return the cluster node info
*/
- public static ClusterNodeInfo getInstance(DocumentStore store) {
- return getInstance(store, MACHINE_ID, WORKING_DIR, false);
+ public static ClusterNodeInfo getInstance(DocumentStore store, int
configuredClusterId) {
+ return getInstance(store, MACHINE_ID, WORKING_DIR,
configuredClusterId, false);
}
/**
@@ -346,7 +347,7 @@ public class ClusterNodeInfo {
*/
public static ClusterNodeInfo getInstance(DocumentStore store, String
machineId,
String instanceId) {
- return getInstance(store, machineId, instanceId, true);
+ return getInstance(store, machineId, instanceId, 0, true);
}
/**
@@ -355,21 +356,27 @@ public class ClusterNodeInfo {
* @param store the document store (for the lease)
* @param machineId the machine id (null for MAC address)
* @param instanceId the instance id (null for current working directory)
+ * @param configuredClusterId the configured cluster id (or 0 for dynamic
assignment)
* @param updateLease whether to update the lease
* @return the cluster node info
*/
public static ClusterNodeInfo getInstance(DocumentStore store, String
machineId,
- String instanceId, boolean updateLease) {
+ String instanceId, int configuredClusterId, boolean updateLease) {
+
+ // defaults for machineId and instanceID
if (machineId == null) {
machineId = MACHINE_ID;
}
if (instanceId == null) {
instanceId = WORKING_DIR;
}
- for (int i = 0; i < 10; i++) {
- ClusterNodeInfo clusterNode = createInstance(store, machineId,
instanceId);
- UpdateOp update = new UpdateOp("" + clusterNode.id, true);
- update.set(ID, String.valueOf(clusterNode.id));
+
+ int retries = 10;
+ for (int i = 0; i < retries; i++) {
+ ClusterNodeInfo clusterNode = createInstance(store, machineId,
instanceId, configuredClusterId);
+ String key = String.valueOf(clusterNode.id);
+ UpdateOp update = new UpdateOp(key, true);
+ update.set(ID, key);
update.set(MACHINE_ID_KEY, clusterNode.machineId);
update.set(INSTANCE_ID_KEY, clusterNode.instanceId);
if (updateLease) {
@@ -384,8 +391,8 @@ public class ClusterNodeInfo {
final boolean success;
if (clusterNode.newEntry) {
- //For new entry do a create. This ensures that if two nodes
create
- //entry with same id then only one would succeed
+ // For new entry do a create. This ensures that if two nodes
+ // create entry with same id then only one would succeed
success = store.create(Collection.CLUSTER_NODES,
Collections.singletonList(update));
} else {
// No expiration of earlier cluster info, so update
@@ -397,34 +404,56 @@ public class ClusterNodeInfo {
return clusterNode;
}
}
- throw new DocumentStoreException("Could not get cluster node info");
+ throw new DocumentStoreException("Could not get cluster node info
(retried " + retries + " times)");
}
private static ClusterNodeInfo createInstance(DocumentStore store, String
machineId,
- String instanceId) {
+ String instanceId, int configuredClusterId) {
+
long now = getCurrentTime();
- List<ClusterNodeInfoDocument> list =
ClusterNodeInfoDocument.all(store);
int clusterNodeId = 0;
int maxId = 0;
ClusterNodeState state = ClusterNodeState.NONE;
Long prevLeaseEnd = null;
boolean newEntry = false;
- for (Document doc : list) {
+
+ ClusterNodeInfoDocument alreadyExistingConfigured = null;
+ List<ClusterNodeInfoDocument> list =
ClusterNodeInfoDocument.all(store);
+
+ for (ClusterNodeInfoDocument doc : list) {
+
String key = doc.getId();
+
int id;
try {
- id = Integer.parseInt(key);
+ id = doc.getClusterId();
} catch (Exception e) {
- // not an integer - ignore
+ LOG.debug("Skipping cluster node info document {} because ID
is invalid", key);
continue;
}
+
maxId = Math.max(maxId, id);
+
+ // if a cluster id was configured: check that and abort if it does
+ // not match
+ if (configuredClusterId != 0) {
+ if (configuredClusterId != id) {
+ continue;
+ } else {
+ alreadyExistingConfigured = doc;
+ }
+ }
+
Long leaseEnd = (Long) doc.get(LEASE_END_KEY);
+
if (leaseEnd != null && leaseEnd > now) {
+ // TODO wait for lease end, see OAK-3449
continue;
}
+
String mId = "" + doc.get(MACHINE_ID_KEY);
String iId = "" + doc.get(INSTANCE_ID_KEY);
+
if (mId.startsWith(RANDOM_PREFIX)) {
// remove expired entries with random keys
store.remove(Collection.CLUSTER_NODES, key);
@@ -432,14 +461,14 @@ public class ClusterNodeInfo {
leaseEnd == null ? "n/a" :
Utils.timestampToString(leaseEnd));
continue;
}
- if (!mId.equals(machineId) ||
- !iId.equals(instanceId)) {
+
+ if (!mId.equals(machineId) || !iId.equals(instanceId)) {
// a different machine or instance
continue;
}
- //and cluster node which matches current machine identity but
- //not being used
+ // a cluster node which matches current machine identity but
+ // not being used
if (clusterNodeId == 0 || id < clusterNodeId) {
// if there are multiple, use the smallest value
clusterNodeId = id;
@@ -448,11 +477,19 @@ public class ClusterNodeInfo {
}
}
- //No existing entry with matching signature found so
- //create a new entry
+ // No usable existing entry with matching signature found so
+ // create a new entry
if (clusterNodeId == 0) {
- clusterNodeId = maxId + 1;
newEntry = true;
+ if (configuredClusterId != 0) {
+ if (alreadyExistingConfigured != null) {
+ throw new DocumentStoreException(
+ "Configured cluster node id " +
configuredClusterId + " already in use: " + alreadyExistingConfigured);
+ }
+ clusterNodeId = configuredClusterId;
+ } else {
+ clusterNodeId = maxId + 1;
+ }
}
// Do not expire entries and stick on the earlier state, and leaseEnd
so,
@@ -803,5 +840,4 @@ public class ClusterNodeInfo {
private static long getCurrentTime() {
return clock.getTime();
}
-
}
Modified:
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java?rev=1705998&r1=1705997&r2=1705998&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
(original)
+++
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/DocumentNodeStore.java
Wed Sep 30 10:38:07 2015
@@ -204,6 +204,7 @@ public final class DocumentNodeStore
/**
* The cluster instance info.
*/
+ @Nonnull
private final ClusterNodeInfo clusterNodeInfo;
/**
@@ -287,8 +288,8 @@ public final class DocumentNodeStore
/**
* Background thread performing the clusterId lease renew.
- * Will be {@code null} if {@link #clusterNodeInfo} is {@code null}.
*/
+ @Nonnull
private Thread leaseUpdateThread;
/**
@@ -427,22 +428,18 @@ public final class DocumentNodeStore
this.changes = Collection.JOURNAL.newDocument(s);
this.executor = builder.getExecutor();
this.clock = builder.getClock();
+
int cid = builder.getClusterId();
cid = Integer.getInteger("oak.documentMK.clusterId", cid);
- if (cid == 0) {
- clusterNodeInfo = ClusterNodeInfo.getInstance(s);
- // TODO we should ensure revisions generated from now on
- // are never "older" than revisions already in the repository for
- // this cluster id
- cid = clusterNodeInfo.getId();
- } else {
- clusterNodeInfo = null;
- }
+ clusterNodeInfo = ClusterNodeInfo.getInstance(s, cid);
+ // TODO we should ensure revisions generated from now on
+ // are never "older" than revisions already in the repository for
+ // this cluster id
+ cid = clusterNodeInfo.getId();
+
if (builder.getLeaseCheck()) {
s = new LeaseCheckDocumentStoreWrapper(s, clusterNodeInfo);
- if (clusterNodeInfo!=null) {
-
clusterNodeInfo.setLeaseFailureHandler(builder.getLeaseFailureHandler());
- }
+
clusterNodeInfo.setLeaseFailureHandler(builder.getLeaseFailureHandler());
}
this.store = s;
this.clusterId = cid;
@@ -530,17 +527,14 @@ public final class DocumentNodeStore
backgroundReadThread.start();
backgroundUpdateThread.start();
- if (clusterNodeInfo != null) {
- leaseUpdateThread = new Thread(
- new BackgroundLeaseUpdate(this, isDisposed),
- "DocumentNodeStore lease update thread " +
threadNamePostfix);
- leaseUpdateThread.setDaemon(true);
- // OAK-3398 : make lease updating more robust by ensuring it
- // has higher likelihood of succeeding than other threads
- // on a very busy machine - so as to prevent lease timeout.
- leaseUpdateThread.setPriority(Thread.MAX_PRIORITY);
- leaseUpdateThread.start();
- }
+ leaseUpdateThread = new Thread(new BackgroundLeaseUpdate(this,
isDisposed),
+ "DocumentNodeStore lease update thread " + threadNamePostfix);
+ leaseUpdateThread.setDaemon(true);
+ // OAK-3398 : make lease updating more robust by ensuring it
+ // has higher likelihood of succeeding than other threads
+ // on a very busy machine - so as to prevent lease timeout.
+ leaseUpdateThread.setPriority(Thread.MAX_PRIORITY);
+ leaseUpdateThread.start();
this.mbean = createMBean();
LOG.info("Initialized DocumentNodeStore with clusterNodeId: {} ({})",
clusterId,
@@ -601,9 +595,7 @@ public final class DocumentNodeStore
// now mark this cluster node as inactive by
// disposing the clusterNodeInfo
- if (clusterNodeInfo != null) {
- clusterNodeInfo.dispose();
- }
+ clusterNodeInfo.dispose();
store.dispose();
if (blobStore instanceof Closeable) {
@@ -620,7 +612,7 @@ public final class DocumentNodeStore
}
private String getClusterNodeInfoDisplayString() {
- return clusterNodeInfo == null ? "no cluster node info" :
clusterNodeInfo.toString().replaceAll("[\r\n\t]", " ").trim();
+ return clusterNodeInfo.toString().replaceAll("[\r\n\t]", " ").trim();
}
Revision setHeadRevision(@Nonnull Revision newHead) {
@@ -756,7 +748,7 @@ public final class DocumentNodeStore
return enableConcurrentAddRemove;
}
- @CheckForNull
+ @Nonnull
public ClusterNodeInfo getClusterInfo() {
return clusterNodeInfo;
}
@@ -1776,7 +1768,7 @@ public final class DocumentNodeStore
* @return {@code true} if the lease was renewed; {@code false} otherwise.
*/
boolean renewClusterIdLease() {
- return clusterNodeInfo != null && clusterNodeInfo.renewLease();
+ return clusterNodeInfo.renewLease();
}
/**