Author: mreutegg
Date: Tue Jun 30 14:10:57 2015
New Revision: 1688453

URL: http://svn.apache.org/r1688453
Log:
OAK-3023: Long running MongoDB query may block other threads

Added:
    
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStoreTest.java
   (with props)
Modified:
    
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java

Modified: 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java?rev=1688453&r1=1688452&r2=1688453&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStore.java
 Tue Jun 30 14:10:57 2015
@@ -44,6 +44,7 @@ import com.google.common.collect.Immutab
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.UncheckedExecutionException;
 import com.mongodb.MongoClientURI;
+import com.mongodb.MongoExecutionTimeoutException;
 import com.mongodb.QueryOperators;
 import com.mongodb.ReadPreference;
 
@@ -176,6 +177,17 @@ public class MongoDocumentStore implemen
     private final long maxQueryTimeMS =
             Long.getLong("oak.mongo.maxQueryTimeMS", 
TimeUnit.MINUTES.toMillis(1));
 
+    /**
+     * Duration in milliseconds after a mongo query with an additional
+     * constraint (e.g. _modified) on the NODES collection times out and is
+     * executed again without holding a {@link TreeLock} and without updating
+     * the cache with data retrieved from MongoDB.
+     * <p>
+     * Default is 3000 (three seconds).
+     */
+    private long maxLockedQueryTimeMS =
+            Long.getLong("oak.mongo.maxLockedQueryTimeMS", 
TimeUnit.SECONDS.toMillis(3));
+
     private String lastReadWriteMode;
 
     private final Map<String, String> metadata;
@@ -547,6 +559,36 @@ public class MongoDocumentStore implemen
                                               String indexedProperty,
                                               long startValue,
                                               int limit) {
+        boolean withLock = true;
+        if (collection == Collection.NODES && indexedProperty != null) {
+            long maxQueryTime;
+            if (maxQueryTimeMS > 0) {
+                maxQueryTime = Math.min(maxQueryTimeMS, maxLockedQueryTimeMS);
+            } else {
+                maxQueryTime = maxLockedQueryTimeMS;
+            }
+            try {
+                return queryInternal(collection, fromKey, toKey, 
indexedProperty,
+                        startValue, limit, maxQueryTime, true);
+            } catch (MongoExecutionTimeoutException e) {
+                LOG.info("query timed out after {} milliseconds and will be 
retried without lock {}",
+                        maxQueryTime, Lists.newArrayList(fromKey, toKey, 
indexedProperty, startValue, limit));
+                withLock = false;
+            }
+        }
+        return queryInternal(collection, fromKey, toKey, indexedProperty,
+                startValue, limit, maxQueryTimeMS, withLock);
+    }
+
+    @Nonnull
+    <T extends Document> List<T> queryInternal(Collection<T> collection,
+                                                       String fromKey,
+                                                       String toKey,
+                                                       String indexedProperty,
+                                                       long startValue,
+                                                       int limit,
+                                                       long maxQueryTime,
+                                                       boolean withLock) {
         log("query", fromKey, toKey, indexedProperty, startValue, limit);
         DBCollection dbCollection = getDBCollection(collection);
         QueryBuilder queryBuilder = QueryBuilder.start(Document.ID);
@@ -578,18 +620,18 @@ public class MongoDocumentStore implemen
         String parentId = Utils.getParentIdFromLowerLimit(fromKey);
         long lockTime = -1;
         final long start = PERFLOG.start();
-        TreeLock lock = acquireExclusive(parentId != null ? parentId : "");
-        if (start != -1) {
-            lockTime = System.currentTimeMillis() - start;
-        }
+        TreeLock lock = withLock ? acquireExclusive(parentId != null ? 
parentId : "") : null;
         try {
+            if (start != -1) {
+                lockTime = System.currentTimeMillis() - start;
+            }
             DBCursor cursor = dbCollection.find(query).sort(BY_ID_ASC);
             if (!disableIndexHint) {
                 cursor.hint(hint);
             }
-            if (maxQueryTimeMS > 0) {
+            if (maxQueryTime > 0) {
                 // OAK-2614: set maxTime if maxQueryTimeMS > 0
-                cursor.maxTime(maxQueryTimeMS, TimeUnit.MILLISECONDS);
+                cursor.maxTime(maxQueryTime, TimeUnit.MILLISECONDS);
             }
             ReadPreference readPreference =
                     getMongoReadPreference(collection, parentId, 
getDefaultReadPreference(collection));
@@ -606,7 +648,9 @@ public class MongoDocumentStore implemen
                 for (int i = 0; i < limit && cursor.hasNext(); i++) {
                     DBObject o = cursor.next();
                     T doc = convertFromDBObject(collection, o);
-                    if (collection == Collection.NODES && doc != null) {
+                    if (collection == Collection.NODES
+                            && doc != null
+                            && lock != null) {
                         doc.seal();
                         String id = doc.getId();
                         CacheValue cacheKey = new StringValue(id);
@@ -635,7 +679,9 @@ public class MongoDocumentStore implemen
             }
             return list;
         } finally {
-            lock.unlock();
+            if (lock != null) {
+                lock.unlock();
+            }
             PERFLOG.end(start, 1, "query for children from [{}] to [{}], 
lock:{}", fromKey, toKey, lockTime);
         }
     }
@@ -1372,6 +1418,10 @@ public class MongoDocumentStore implemen
         this.clock = clock;
     }
 
+    void setMaxLockedQueryTimeMS(long maxLockedQueryTimeMS) {
+        this.maxLockedQueryTimeMS = maxLockedQueryTimeMS;
+    }
+
     private final static class TreeLock {
 
         private final Lock parentLock;

Added: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStoreTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStoreTest.java?rev=1688453&view=auto
==============================================================================
--- 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStoreTest.java
 (added)
+++ 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStoreTest.java
 Tue Jun 30 14:10:57 2015
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.document.mongo;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.annotation.Nonnull;
+
+import com.mongodb.DB;
+
+import org.apache.jackrabbit.oak.plugins.document.AbstractMongoConnectionTest;
+import org.apache.jackrabbit.oak.plugins.document.Collection;
+import org.apache.jackrabbit.oak.plugins.document.Document;
+import org.apache.jackrabbit.oak.plugins.document.DocumentMK;
+import org.apache.jackrabbit.oak.plugins.document.MongoUtils;
+import org.apache.jackrabbit.oak.plugins.document.NodeDocument;
+import org.apache.jackrabbit.oak.plugins.document.util.Utils;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * <code>MongoDocumentStoreTest</code>...
+ */
+public class MongoDocumentStoreTest extends AbstractMongoConnectionTest {
+
+    private TestStore store;
+
+    @Override
+    public void setUpConnection() throws Exception {
+        mongoConnection = MongoUtils.getConnection();
+        MongoUtils.dropCollections(mongoConnection.getDB());
+        DocumentMK.Builder builder = new DocumentMK.Builder();
+        store = new TestStore(mongoConnection.getDB(), builder);
+        builder.setDocumentStore(store);
+        mk = builder.setMongoDB(mongoConnection.getDB()).open();
+    }
+
+    @Test
+    public void timeoutQuery() {
+        String fromId = Utils.getKeyLowerLimit("/");
+        String toId = Utils.getKeyUpperLimit("/");
+        store.setMaxLockedQueryTimeMS(1);
+        long index = 0;
+        for (int i = 0; i < 100; i++) {
+            // keep adding nodes until the query runs into the timeout
+            StringBuilder sb = new StringBuilder();
+            for (int j = 0; j < 1000; j++) {
+                sb.append("+\"node-").append(index++).append("\":{}");
+            }
+            mk.commit("/", sb.toString(), null, null);
+            store.queriesWithoutLock.set(0);
+            List<NodeDocument> docs = store.query(Collection.NODES, fromId, 
toId,
+                    "foo", System.currentTimeMillis(), Integer.MAX_VALUE);
+            assertTrue(docs.isEmpty());
+            if (store.queriesWithoutLock.get() > 0) {
+                return;
+            }
+        }
+        fail("No query timeout triggered even after adding " + index + " 
nodes");
+    }
+
+    static final class TestStore extends MongoDocumentStore {
+
+        AtomicInteger queriesWithoutLock = new AtomicInteger();
+
+        TestStore(DB db, DocumentMK.Builder builder) {
+            super(db, builder);
+        }
+
+        @Nonnull
+        @Override
+        <T extends Document> List<T> queryInternal(Collection<T> collection,
+                                                   String fromKey,
+                                                   String toKey,
+                                                   String indexedProperty,
+                                                   long startValue,
+                                                   int limit,
+                                                   long maxQueryTime,
+                                                   boolean withLock) {
+            if (collection == Collection.NODES && !withLock) {
+                queriesWithoutLock.incrementAndGet();
+            }
+            return super.queryInternal(collection, fromKey, toKey,
+                    indexedProperty, startValue, limit, maxQueryTime, 
withLock);
+        }
+    }
+}

Propchange: 
jackrabbit/oak/trunk/oak-core/src/test/java/org/apache/jackrabbit/oak/plugins/document/mongo/MongoDocumentStoreTest.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to