Author: chetanm
Date: Thu Sep 15 07:17:31 2016
New Revision: 1760850

URL: http://svn.apache.org/viewvc?rev=1760850&view=rev
Log:
OAK-4412 - Lucene hybrid index

Perform wrote operation in batches and by grouping documents belonging to same 
index. This reduces the time taken to half and reduces dropping of docs due to 
queue being full

Modified:
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java?rev=1760850&r1=1760849&r2=1760850&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java
 Thu Sep 15 07:17:31 2016
@@ -22,13 +22,18 @@ package org.apache.jackrabbit.oak.plugin
 import java.io.Closeable;
 import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 import org.apache.jackrabbit.oak.commons.concurrent.NotifyingFutureTask;
 import org.apache.jackrabbit.oak.plugins.index.lucene.IndexNode;
@@ -50,6 +55,13 @@ public class DocumentQueue implements Cl
     private final BlockingQueue<LuceneDoc> docsQueue;
     private final Executor executor;
     private final CounterStats queueSizeStats;
+
+    /**
+     * Time in millis for which add call to queue
+     * would wait before dropping off
+     */
+    private final int offerTimeMillis;
+
     private volatile boolean stopped;
 
     /**
@@ -76,12 +88,32 @@ public class DocumentQueue implements Cl
             @Override
             public Void call() throws Exception {
                 try {
-                    LuceneDoc doc = docsQueue.poll();
-                    if (doc != null && doc != STOP) {
-                        processDoc(doc);
-                        queueSizeStats.dec();
-                        currentTask.onComplete(completionHandler);
+                    int maxSize = docsQueue.size();
+                    List<LuceneDoc> docs = 
Lists.newArrayListWithCapacity(maxSize);
+                    ListMultimap<String, LuceneDoc> docsPerIndex = 
ArrayListMultimap.create();
+
+                    //Do the processing in batches
+                    int count = docsQueue.drainTo(docs, maxSize);
+                    if (count == 0) {
+                        return null;
+                    }
+
+                    queueSizeStats.dec(count);
+
+                    for (int i = 0; i < count; i++) {
+                        LuceneDoc doc = docs.get(i);
+                        if (doc == STOP) {
+                            return null;
+                        }
+                        docsPerIndex.get(doc.indexPath).add(doc);
                     }
+
+                    //If required it can optimized by indexing diff indexes in 
parallel
+                    //Something to consider if it becomes a bottleneck
+                    for (Map.Entry<String, Collection<LuceneDoc>> e : 
docsPerIndex.asMap().entrySet()) {
+                        processDoc(e.getKey(), e.getValue());
+                    }
+                    currentTask.onComplete(completionHandler);
                 } catch (Throwable t) {
                     exceptionHandler.uncaughtException(Thread.currentThread(), 
t);
                 }
@@ -104,12 +136,18 @@ public class DocumentQueue implements Cl
         this.docsQueue = new LinkedBlockingDeque<>(maxQueueSize);
         this.tracker = tracker;
         this.executor = executor;
+        this.offerTimeMillis = 100; //Wait for at most 100 mills while adding 
stuff to queue
         this.queueSizeStats = sp.getCounterStats("HYBRID_QUEUE_SIZE", 
StatsOptions.DEFAULT);
     }
 
     public boolean add(LuceneDoc doc){
         checkState(!stopped);
-        boolean added = docsQueue.offer(doc);
+        boolean added = false;
+        try {
+            added = docsQueue.offer(doc, offerTimeMillis, 
TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
         // Set the completion handler on the currently running task. Multiple 
calls
         // to onComplete are not a problem here since we always pass the same 
value.
         // Thus there is no question as to which of the handlers will 
effectively run.
@@ -117,7 +155,6 @@ public class DocumentQueue implements Cl
         if (added) {
             queueSizeStats.inc();
         }
-        //TODO log warning when queue is full
         return added;
     }
 
@@ -127,34 +164,35 @@ public class DocumentQueue implements Cl
         return docs;
     }
 
-    private void processDoc(LuceneDoc doc){
-        IndexNode indexNode = tracker.acquireIndexNode(doc.indexPath);
+    private void processDoc(String indexPath, Iterable<LuceneDoc> docs){
+        IndexNode indexNode = tracker.acquireIndexNode(indexPath);
         if (indexNode == null) {
-            log.debug("No IndexNode found for index [{}]. Skipping index entry 
for [{}]", doc.indexPath, doc.docPath);
+            log.debug("No IndexNode found for index [{}].", indexPath);
             return;
         }
 
         try{
             LuceneIndexWriter writer = indexNode.getLocalWriter();
-
-            if (writer == null){
-                //IndexDefinition per IndexNode might have changed and local
-                //indexing is disabled. Ignore
-                log.debug("No local IndexWriter found for index [{}]. Skipping 
index " +
-                                "entry for [{}]", doc.indexPath, doc.docPath);
-                return;
-            }
-            if (doc.delete) {
-                writer.deleteDocuments(doc.docPath);
-            } else {
-                writer.updateDocument(doc.docPath, doc.doc);
+            for (LuceneDoc doc : docs) {
+                if (writer == null) {
+                    //IndexDefinition per IndexNode might have changed and 
local
+                    //indexing is disabled. Ignore
+                    log.debug("No local IndexWriter found for index [{}]. 
Skipping index " +
+                            "entry for [{}]", indexPath, doc.docPath);
+                    return;
+                }
+                if (doc.delete) {
+                    writer.deleteDocuments(doc.docPath);
+                } else {
+                    writer.updateDocument(doc.docPath, doc.doc);
+                }
+                log.trace("Updated index with doc {}", doc);
             }
-            log.trace("Updated index with doc {}", doc);
             indexNode.refreshReadersIfRequired();
         } catch (Exception e) {
             //For now we just log it. Later we need to see if frequent error 
then to
             //temporarily disable indexing for this index
-            log.warn("Error occurred while indexing node [{}] for index 
[{}]",doc.docPath, doc.indexPath, e);
+            log.warn("Error occurred while indexing index [{}]",indexPath, e);
         } finally {
             indexNode.release();
         }


Reply via email to