Author: chetanm Date: Thu Sep 15 07:17:31 2016 New Revision: 1760850 URL: http://svn.apache.org/viewvc?rev=1760850&view=rev Log: OAK-4412 - Lucene hybrid index
Perform wrote operation in batches and by grouping documents belonging to same index. This reduces the time taken to half and reduces dropping of docs due to queue being full Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java Modified: jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java?rev=1760850&r1=1760849&r2=1760850&view=diff ============================================================================== --- jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java (original) +++ jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java Thu Sep 15 07:17:31 2016 @@ -22,13 +22,18 @@ package org.apache.jackrabbit.oak.plugin import java.io.Closeable; import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; +import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; import org.apache.jackrabbit.oak.commons.concurrent.NotifyingFutureTask; import org.apache.jackrabbit.oak.plugins.index.lucene.IndexNode; @@ -50,6 +55,13 @@ public class DocumentQueue implements Cl private final BlockingQueue<LuceneDoc> docsQueue; private final Executor executor; private final CounterStats queueSizeStats; + + /** + * Time in millis for which add call to queue + * would wait before dropping off + */ + private final int offerTimeMillis; + private volatile boolean stopped; /** @@ -76,12 +88,32 @@ public class DocumentQueue implements Cl @Override public Void call() throws Exception { try { - LuceneDoc doc = docsQueue.poll(); - if (doc != null && doc != STOP) { - processDoc(doc); - queueSizeStats.dec(); - currentTask.onComplete(completionHandler); + int maxSize = docsQueue.size(); + List<LuceneDoc> docs = Lists.newArrayListWithCapacity(maxSize); + ListMultimap<String, LuceneDoc> docsPerIndex = ArrayListMultimap.create(); + + //Do the processing in batches + int count = docsQueue.drainTo(docs, maxSize); + if (count == 0) { + return null; + } + + queueSizeStats.dec(count); + + for (int i = 0; i < count; i++) { + LuceneDoc doc = docs.get(i); + if (doc == STOP) { + return null; + } + docsPerIndex.get(doc.indexPath).add(doc); } + + //If required it can optimized by indexing diff indexes in parallel + //Something to consider if it becomes a bottleneck + for (Map.Entry<String, Collection<LuceneDoc>> e : docsPerIndex.asMap().entrySet()) { + processDoc(e.getKey(), e.getValue()); + } + currentTask.onComplete(completionHandler); } catch (Throwable t) { exceptionHandler.uncaughtException(Thread.currentThread(), t); } @@ -104,12 +136,18 @@ public class DocumentQueue implements Cl this.docsQueue = new LinkedBlockingDeque<>(maxQueueSize); this.tracker = tracker; this.executor = executor; + this.offerTimeMillis = 100; //Wait for at most 100 mills while adding stuff to queue this.queueSizeStats = sp.getCounterStats("HYBRID_QUEUE_SIZE", StatsOptions.DEFAULT); } public boolean add(LuceneDoc doc){ checkState(!stopped); - boolean added = docsQueue.offer(doc); + boolean added = false; + try { + added = docsQueue.offer(doc, offerTimeMillis, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } // Set the completion handler on the currently running task. Multiple calls // to onComplete are not a problem here since we always pass the same value. // Thus there is no question as to which of the handlers will effectively run. @@ -117,7 +155,6 @@ public class DocumentQueue implements Cl if (added) { queueSizeStats.inc(); } - //TODO log warning when queue is full return added; } @@ -127,34 +164,35 @@ public class DocumentQueue implements Cl return docs; } - private void processDoc(LuceneDoc doc){ - IndexNode indexNode = tracker.acquireIndexNode(doc.indexPath); + private void processDoc(String indexPath, Iterable<LuceneDoc> docs){ + IndexNode indexNode = tracker.acquireIndexNode(indexPath); if (indexNode == null) { - log.debug("No IndexNode found for index [{}]. Skipping index entry for [{}]", doc.indexPath, doc.docPath); + log.debug("No IndexNode found for index [{}].", indexPath); return; } try{ LuceneIndexWriter writer = indexNode.getLocalWriter(); - - if (writer == null){ - //IndexDefinition per IndexNode might have changed and local - //indexing is disabled. Ignore - log.debug("No local IndexWriter found for index [{}]. Skipping index " + - "entry for [{}]", doc.indexPath, doc.docPath); - return; - } - if (doc.delete) { - writer.deleteDocuments(doc.docPath); - } else { - writer.updateDocument(doc.docPath, doc.doc); + for (LuceneDoc doc : docs) { + if (writer == null) { + //IndexDefinition per IndexNode might have changed and local + //indexing is disabled. Ignore + log.debug("No local IndexWriter found for index [{}]. Skipping index " + + "entry for [{}]", indexPath, doc.docPath); + return; + } + if (doc.delete) { + writer.deleteDocuments(doc.docPath); + } else { + writer.updateDocument(doc.docPath, doc.doc); + } + log.trace("Updated index with doc {}", doc); } - log.trace("Updated index with doc {}", doc); indexNode.refreshReadersIfRequired(); } catch (Exception e) { //For now we just log it. Later we need to see if frequent error then to //temporarily disable indexing for this index - log.warn("Error occurred while indexing node [{}] for index [{}]",doc.docPath, doc.indexPath, e); + log.warn("Error occurred while indexing index [{}]",indexPath, e); } finally { indexNode.release(); }