Author: chetanm
Date: Mon Mar 31 13:21:27 2014
New Revision: 1583323

URL: http://svn.apache.org/r1583323
Log:
OAK-1295 - Recovery for missing _lastRev updates (WIP)

Added:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecovery.java
   (with props)

Added: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecovery.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecovery.java?rev=1583323&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecovery.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecovery.java
 Mon Mar 31 13:21:27 2014
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.plugins.document;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.annotation.CheckForNull;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.collect.ImmutableList.of;
+import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Iterables.mergeSorted;
+
+public class LastRevRecovery {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private final DocumentNodeStore nodeStore;
+
+    public LastRevRecovery(DocumentNodeStore nodeStore) {
+        this.nodeStore = nodeStore;
+    }
+
+    public void recover(Iterator<NodeDocument> suspects, int clusterId) {
+        UnsavedModifications unsaved = new UnsavedModifications();
+
+        //Set of parent path whose lastRev has been updated based on
+        //last rev information obtained from suspects. Its possible
+        //that lastRev for such parents present in DS has
+        //higher value. So before persisting the changes for these
+        //paths we need to ensure that there actual lastRev is lesser
+        //than one being set via unsaved
+        Set<String> unverifiedParentPaths = Sets.newHashSet();
+
+        //Map of known last rev of checked paths
+        Map<String, Revision> knownLastRevs = Maps.newHashMap();
+
+        while (suspects.hasNext()) {
+            NodeDocument doc = suspects.next();
+
+            Revision currentLastRev = doc.getLastRev().get(clusterId);
+            if (currentLastRev != null) {
+                knownLastRevs.put(doc.getPath(), currentLastRev);
+            }
+
+            Revision lostLastRev = determineMissedLastRev(doc, clusterId);
+
+            //lastRev is consistent
+            if (lostLastRev == null) {
+                continue;
+            }
+
+            //1. Update lastRev for this doc
+            unsaved.put(doc.getPath(), lostLastRev);
+
+            //2. Update lastRev for parent paths
+            String path = doc.getPath();
+            while (true) {
+                if (PathUtils.denotesRoot(path)) {
+                    break;
+                }
+                path = PathUtils.getParentPath(path);
+                unsaved.put(path, lostLastRev);
+                unverifiedParentPaths.add(path);
+            }
+        }
+
+        //By now we have iterated over all suspects so remove entries for paths
+        //whose lastRev have been determined on the basis of state obtained 
from
+        //DS
+        Iterator<String> unverifiedParentPathsItr = 
unverifiedParentPaths.iterator();
+        while (unverifiedParentPathsItr.hasNext()) {
+            String unverifiedParentPath = unverifiedParentPathsItr.next();
+            Revision knownRevision = knownLastRevs.get(unverifiedParentPath);
+            if (knownRevision != null) {
+                unverifiedParentPathsItr.remove();
+                unsaved.put(unverifiedParentPath, knownRevision);
+            }
+        }
+
+        //Now for the left over unverifiedParentPaths determine the lastRev
+        //from DS and add them to unsaved. This ensures that we do not set 
lastRev
+        //to a lower value
+
+        //TODO For Mongo case we can fetch such documents more efficiently
+        //via batch fetch
+        for (String path : unverifiedParentPaths) {
+            NodeDocument doc = getDocument(path);
+            if (doc != null) {
+                Revision lastRev = doc.getLastRev().get(clusterId);
+                unsaved.put(path, lastRev);
+            }
+        }
+
+        int size = unsaved.getPaths().size();
+
+        if (log.isDebugEnabled()) {
+            log.debug("Last revision for following documents would be updated 
{}", unsaved.getPaths());
+        }
+
+        //UnsavedModifications is designed to be used in concurrent
+        //access mode. For recovery case there is no concurrent access
+        //involve so just pass a new lock instance
+        unsaved.persist(nodeStore, new ReentrantLock());
+
+        log.info("Updated lastRev of [{}] documents while performing lastRev 
recovery for " +
+                "cluster node [{}]", size, clusterId);
+    }
+
+    /**
+     * Determines the last revision value which needs to set for given 
clusterId
+     * on the passed document. If the last rev entries are consisted
+     *
+     * @param doc       NodeDocument where lastRev entries needs to be fixed
+     * @param clusterId clusterId for which lastRev has to be checked
+     * @return lastRev which needs to be updated. <tt>null</tt> if no
+     * updated is required i.e. lastRev entries are valid
+     */
+    @CheckForNull
+    private Revision determineMissedLastRev(NodeDocument doc, int clusterId) {
+        Revision currentLastRev = doc.getLastRev().get(clusterId);
+        if (currentLastRev == null) {
+            currentLastRev = new Revision(0, 0, clusterId);
+        }
+
+        ClusterPredicate cp = new ClusterPredicate(clusterId);
+
+        //Merge sort the revs for which changes have been made
+        //to this doc
+
+        //TODO Would looking into the Local map be sufficient
+        //Probably yes as entries for a particular cluster node
+        //are split by that cluster only
+        Iterable<Revision> revs = mergeSorted(of(
+                        filter(doc.getLocalCommitRoot().keySet(), cp),
+                        filter(doc.getLocalRevisions().keySet(), cp)),
+                StableRevisionComparator.REVERSE
+        );
+
+        //Look for latest valid revision > currentLastRev
+        //if found then lastRev needs to be fixed
+        for (Revision rev : revs) {
+            if (rev.compareRevisionTime(currentLastRev) > 0) {
+                if (doc.isCommitted(rev)) {
+                    return rev;
+                }
+            } else {
+                //No valid revision found > currentLastRev
+                //indicates that lastRev is valid for given clusterId
+                //and no further checks are required
+                break;
+            }
+        }
+        return null;
+    }
+
+    private NodeDocument getDocument(String path) {
+        return nodeStore.getDocumentStore().find(Collection.NODES, 
Utils.getIdFromPath(path));
+    }
+
+    private static class ClusterPredicate implements Predicate<Revision> {
+        private final int clusterId;
+
+        private ClusterPredicate(int clusterId) {
+            this.clusterId = clusterId;
+        }
+
+        @Override
+        public boolean apply(Revision input) {
+            return clusterId == input.getClusterId();
+        }
+    }
+}

Propchange: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/LastRevRecovery.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to