Author: chetanm
Date: Mon Jan 9 05:35:28 2017
New Revision: 1777930
URL: http://svn.apache.org/viewvc?rev=1777930&view=rev
Log:
OAK-5421 - Add LuceneDoc directly to queue from LuceneIndexEditor
Add lock to ensure writes to sync index are not done concurrently. Can happen
when some docs are added to queue and some to in memory queue in holder as
queue became full
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java
Modified:
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java?rev=1777930&r1=1777929&r2=1777930&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java
(original)
+++
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java
Mon Jan 9 05:35:28 2017
@@ -31,10 +31,12 @@ import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Striped;
import org.apache.jackrabbit.oak.commons.concurrent.NotifyingFutureTask;
import org.apache.jackrabbit.oak.plugins.index.lucene.IndexNode;
import org.apache.jackrabbit.oak.plugins.index.lucene.IndexTracker;
@@ -58,6 +60,7 @@ public class DocumentQueue implements Cl
private final CounterStats queueSizeStats;
private final MeterStats added;
private final MeterStats dropped;
+ private final Striped<Lock> locks = Striped.lock(64);
/**
* Time in millis for which add call to queue
@@ -184,7 +187,19 @@ public class DocumentQueue implements Cl
//If required it can optimized by indexing diff indexes in parallel
//Something to consider if it becomes a bottleneck
for (Map.Entry<String, Collection<LuceneDoc>> e :
docsPerIndex.entrySet()) {
- processDocs(e.getKey(), e.getValue());
+ //In NRT case the indexing would be single threaded as it always
happens via queue
+ //For sync case it can happen that indexing is requested by
LocalIndexObserver and also
+ //via elements in queue. So we need to lock the indexing path
+ //Lock contention should not happen much as in most cases elements
added
+ //to queue would get processed before observer is invoked
+ String indexPath = e.getKey();
+ Lock indexingLock = locks.get(indexPath);
+ indexingLock.lock();
+ try {
+ processDocs(indexPath, e.getValue());
+ } finally {
+ indexingLock.unlock();
+ }
added.mark(e.getValue().size());
}
}