Author: chetanm
Date: Thu Sep 15 07:19:50 2016
New Revision: 1760862

URL: http://svn.apache.org/viewvc?rev=1760862&view=rev
Log:
OAK-4412 - Lucene hybrid index

Synchronous indexing support continue - Simplified the refresh policy to take 
in a callback which is responsible for refreshing the readers if the index is 
found to be updated.

For sync indexes the refresh is done by the writer in DocumentQueue i.e. its 
done as part of commit thread itself.

Removed:
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/RefreshIfDirtyPolicy.java
    
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/RefreshPolicyTest.java
Modified:
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexNode.java
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserver.java
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndex.java
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndexFactory.java
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/ReaderRefreshPolicy.java
    
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/TimedRefreshPolicy.java
    
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueueTest.java
    
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/TimedRefreshPolicyTest.java

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexNode.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexNode.java?rev=1760862&r1=1760861&r2=1760862&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexNode.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexNode.java
 Thu Sep 15 07:19:50 2016
@@ -76,6 +76,13 @@ public class IndexNode {
 
     private final ReaderRefreshPolicy refreshPolicy;
 
+    private final Runnable refreshCallback = new Runnable() {
+        @Override
+        public void run() {
+            refreshReaders();
+        }
+    };
+
     private boolean closed = false;
 
     IndexNode(String name, IndexDefinition definition, List<LuceneIndexReader> 
readers, @Nullable NRTIndex nrtIndex)
@@ -115,7 +122,7 @@ public class IndexNode {
             lock.readLock().unlock();
             return false;
         } else {
-            refreshReadersIfRequired();
+            refreshPolicy.refreshOnReadIfRequired(refreshCallback);
             return true;
         }
     }
@@ -146,15 +153,11 @@ public class IndexNode {
         return nrtIndex != null ? nrtIndex.getWriter() : null;
     }
 
-    public void refreshReadersIfRequired() {
-        if (refreshPolicy.shouldRefresh()){
-            refreshReaders();
-        }
+    public void refreshReadersOnWriteIfRequired() {
+        refreshPolicy.refreshOnWriteIfRequired(refreshCallback);
     }
 
-    public void refreshReaders(){
-        //TODO Instead of refresh on write switch on refresh on read
-        //for sync indexes
+    private void refreshReaders(){
         indexSearcher = new IndexSearcher(createReader());
         log.debug("Refreshed reader for index [{}]", definition);
     }

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java?rev=1760862&r1=1760861&r2=1760862&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueue.java
 Thu Sep 15 07:19:50 2016
@@ -111,7 +111,7 @@ public class DocumentQueue implements Cl
                         docsPerIndex.get(doc.indexPath).add(doc);
                     }
 
-                    addAllSynchronously(docsPerIndex.asMap(), false);
+                    addAllSynchronously(docsPerIndex.asMap());
 
                     currentTask.onComplete(completionHandler);
                 } catch (Throwable t) {
@@ -162,11 +162,11 @@ public class DocumentQueue implements Cl
         return added;
     }
 
-    public void addAllSynchronously(Map<String, Collection<LuceneDoc>> 
docsPerIndex, boolean alwaysRefreshReaders) {
+    public void addAllSynchronously(Map<String, Collection<LuceneDoc>> 
docsPerIndex) {
         //If required it can optimized by indexing diff indexes in parallel
         //Something to consider if it becomes a bottleneck
         for (Map.Entry<String, Collection<LuceneDoc>> e : 
docsPerIndex.entrySet()) {
-            processDocs(e.getKey(), e.getValue(), alwaysRefreshReaders);
+            processDocs(e.getKey(), e.getValue());
             added.mark(e.getValue().size());
         }
     }
@@ -177,7 +177,7 @@ public class DocumentQueue implements Cl
         return docs;
     }
 
-    private void processDocs(String indexPath, Iterable<LuceneDoc> docs, 
boolean alwaysRefreshReaders){
+    private void processDocs(String indexPath, Iterable<LuceneDoc> docs){
 
         //Drop the write call if stopped
         if (stopped) {
@@ -207,11 +207,7 @@ public class DocumentQueue implements Cl
                 }
                 log.trace("Updated index with doc {}", doc);
             }
-            if (alwaysRefreshReaders) {
-                indexNode.refreshReaders();
-            } else {
-                indexNode.refreshReadersIfRequired();
-            }
+            indexNode.refreshReadersOnWriteIfRequired();
         } catch (Exception e) {
             //For now we just log it. Later we need to see if frequent error 
then to
             //temporarily disable indexing for this index

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserver.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserver.java?rev=1760862&r1=1760861&r2=1760862&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserver.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/LocalIndexObserver.java
 Thu Sep 15 07:19:50 2016
@@ -67,7 +67,7 @@ public class LocalIndexObserver implemen
         //After nrt docs add all sync indexed docs
         //Doing it *after* ensures thar nrt index might catch
         //up by the time sync one are finished
-        docQueue.addAllSynchronously(holder.getSyncIndexedDocs(), true);
+        docQueue.addAllSynchronously(holder.getSyncIndexedDocs());
 
         if (droppedCount > 0){
             //TODO Ensure that log do not flood

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndex.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndex.java?rev=1760862&r1=1760861&r2=1760862&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndex.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndex.java
 Thu Sep 15 07:19:50 2016
@@ -62,7 +62,7 @@ public class NRTIndex implements Closeab
     private final IndexDefinition definition;
     private final IndexCopier indexCopier;
     private final LuceneIndexReader previousReader;
-    private final ReaderRefreshPolicy refreshPolicy;
+    private final TimedRefreshPolicy refreshPolicy;
 
     private IndexWriter indexWriter;
     private NRTIndexWriter nrtIndexWriter;
@@ -71,7 +71,7 @@ public class NRTIndex implements Closeab
     private boolean closed;
 
     public NRTIndex(IndexDefinition definition, IndexCopier indexCopier,
-                    ReaderRefreshPolicy refreshPolicy, @Nullable NRTIndex 
previous) {
+                    TimedRefreshPolicy refreshPolicy, @Nullable NRTIndex 
previous) {
         this.definition = definition;
         this.indexCopier = indexCopier;
         this.refreshPolicy = refreshPolicy;

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndexFactory.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndexFactory.java?rev=1760862&r1=1760861&r2=1760862&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndexFactory.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/NRTIndexFactory.java
 Thu Sep 15 07:19:50 2016
@@ -102,7 +102,7 @@ public class NRTIndexFactory implements
         return existing.get(existing.size() - 1);
     }
 
-    private ReaderRefreshPolicy getRefreshPolicy(IndexDefinition definition) {
-        return new TimedRefreshPolicy(clock, TimeUnit.SECONDS, 
refreshDeltaInSecs);
+    private TimedRefreshPolicy getRefreshPolicy(IndexDefinition definition) {
+        return new TimedRefreshPolicy(definition.isSyncIndexingEnabled(), 
clock, TimeUnit.SECONDS, refreshDeltaInSecs);
     }
 }

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/ReaderRefreshPolicy.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/ReaderRefreshPolicy.java?rev=1760862&r1=1760861&r2=1760862&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/ReaderRefreshPolicy.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/ReaderRefreshPolicy.java
 Thu Sep 15 07:19:50 2016
@@ -22,32 +22,17 @@ package org.apache.jackrabbit.oak.plugin
 public interface ReaderRefreshPolicy {
     ReaderRefreshPolicy NEVER = new ReaderRefreshPolicy() {
         @Override
-        public boolean shouldRefresh() {
-            return false;
+        public void refreshOnReadIfRequired(Runnable refreshCallback) {
+            //Never refresh
         }
 
         @Override
-        public void updated() {
-
+        public void refreshOnWriteIfRequired(Runnable refreshCallback) {
+            //Never refresh
         }
     };
 
-    /**
-     * Returns  true if refresh is to be done and
-     * resets the internal state. The caller which
-     * gets the true answer would be responsible for
-     * refreshing the readers.
-     *
-     * <p>For e.g. once updated the first call to
-     * this method would return true and subsequent
-     * calls return false
-     *
-     * @return true if refresh is to be done
-     */
-    boolean shouldRefresh();
+    void refreshOnReadIfRequired(Runnable refreshCallback);
 
-    /**
-     * Invoked when index gets updated
-     */
-    void updated();
+    void refreshOnWriteIfRequired(Runnable refreshCallback);
 }

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/TimedRefreshPolicy.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/TimedRefreshPolicy.java?rev=1760862&r1=1760861&r2=1760862&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/TimedRefreshPolicy.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/TimedRefreshPolicy.java
 Thu Sep 15 07:19:50 2016
@@ -26,30 +26,60 @@ import org.apache.jackrabbit.oak.stats.C
 
 public class TimedRefreshPolicy implements ReaderRefreshPolicy {
     private final AtomicBoolean dirty = new AtomicBoolean();
+    private final boolean syncIndexingMode;
     private final Clock clock;
     private final long refreshDelta;
     private volatile long lastRefreshTime;
 
-    public TimedRefreshPolicy(Clock clock, TimeUnit unit, long refreshDelta) {
+    public TimedRefreshPolicy(boolean syncIndexingMode, Clock clock, TimeUnit 
unit, long refreshDelta) {
+        this.syncIndexingMode = syncIndexingMode;
         this.clock = clock;
         this.refreshDelta = unit.toMillis(refreshDelta);
     }
 
     @Override
-    public boolean shouldRefresh() {
+    public void refreshOnReadIfRequired(Runnable refreshCallback) {
+        if (syncIndexingMode) {
+            //As writer itself refreshes the index. No refresh done
+            //on read
+            return;
+        }
+        refreshIfRequired(refreshCallback);
+    }
+
+    @Override
+    public void refreshOnWriteIfRequired(Runnable refreshCallback) {
+        if (syncIndexingMode) {
+            //For sync indexing mode we refresh the reader immediately
+            //on the writer thread. So that any read call later sees upto date 
index
+
+            //Another possibility is to refresh the readers upon first query 
post index update
+            //but that would mean that if multiple queries get invoked 
simultaneously then
+            //others would get blocked. So here we take hit on write side. If 
that proves to
+            //be problematic query side refresh can be looked into
+            if (dirty.get()) {
+                refreshCallback.run();
+                dirty.set(false);
+            }
+        } else {
+            refreshIfRequired(refreshCallback);
+        }
+    }
+
+    public void updated() {
+        dirty.set(true);
+    }
+
+    private void refreshIfRequired(Runnable refreshCallback) {
         if (dirty.get()){
             long currentTime = clock.getTime();
             if (currentTime - lastRefreshTime > refreshDelta
                     && dirty.compareAndSet(true, false)){
                 lastRefreshTime = currentTime;
-                return true;
+                refreshCallback.run();
             }
         }
-        return false;
     }
 
-    @Override
-    public void updated() {
-        dirty.set(true);
-    }
+
 }

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueueTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueueTest.java?rev=1760862&r1=1760861&r2=1760862&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueueTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/DocumentQueueTest.java
 Thu Sep 15 07:19:50 2016
@@ -38,6 +38,7 @@ import org.apache.jackrabbit.oak.plugins
 import org.apache.jackrabbit.oak.plugins.index.lucene.IndexCopier;
 import org.apache.jackrabbit.oak.plugins.index.lucene.IndexNode;
 import org.apache.jackrabbit.oak.plugins.index.lucene.IndexTracker;
+import 
org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexConstants.IndexingMode;
 import 
org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexEditorProvider;
 import org.apache.jackrabbit.oak.plugins.index.lucene.TestUtil;
 import 
org.apache.jackrabbit.oak.plugins.index.lucene.reader.DefaultIndexReaderFactory;
@@ -130,7 +131,7 @@ public class DocumentQueueTest {
 
     @Test
     public void noIssueIfNoWriter() throws Exception{
-        NodeState indexed = createAndPopulateAsyncIndex();
+        NodeState indexed = createAndPopulateAsyncIndex(IndexingMode.NRT);
         DocumentQueue queue = new DocumentQueue(2, tracker, 
sameThreadExecutor());
 
         tracker.update(indexed);
@@ -140,7 +141,7 @@ public class DocumentQueueTest {
     @Test
     public void updateDocument() throws Exception{
         IndexTracker tracker = createTracker();
-        NodeState indexed = createAndPopulateAsyncIndex();
+        NodeState indexed = createAndPopulateAsyncIndex(IndexingMode.NRT);
         tracker.update(indexed);
         DocumentQueue queue = new DocumentQueue(2, tracker, 
sameThreadExecutor());
 
@@ -157,7 +158,7 @@ public class DocumentQueueTest {
     @Test
     public void indexRefresh() throws Exception{
         tracker = createTracker();
-        NodeState indexed = createAndPopulateAsyncIndex();
+        NodeState indexed = createAndPopulateAsyncIndex(IndexingMode.NRT);
         tracker.update(indexed);
 
         clock.waitUntil(refreshDelta);
@@ -220,7 +221,7 @@ public class DocumentQueueTest {
     public void addAllSync() throws Exception{
         ListMultimap<String, LuceneDoc> docs = ArrayListMultimap.create();
         tracker = createTracker();
-        NodeState indexed = createAndPopulateAsyncIndex();
+        NodeState indexed = createAndPopulateAsyncIndex(IndexingMode.SYNC);
         tracker.update(indexed);
 
         DocumentQueue queue = new DocumentQueue(2, tracker, 
sameThreadExecutor());
@@ -229,7 +230,7 @@ public class DocumentQueueTest {
         assertEquals(1, td.totalHits);
 
         docs.get("/oak:index/fooIndex").add(createDoc("/a/c", "bar"));
-        queue.addAllSynchronously(docs.asMap(), true);
+        queue.addAllSynchronously(docs.asMap());
 
         td = doSearch("bar");
         assertEquals(2, td.totalHits);
@@ -237,7 +238,7 @@ public class DocumentQueueTest {
         docs.clear();
 
         docs.get("/oak:index/fooIndex").add(createDoc("/a/d", "bar"));
-        queue.addAllSynchronously(docs.asMap(), true);
+        queue.addAllSynchronously(docs.asMap());
 
         td = doSearch("bar");
         assertEquals(3, td.totalHits);
@@ -252,7 +253,7 @@ public class DocumentQueueTest {
                 new DefaultIndexReaderFactory(defaultMountInfoProvider(), 
indexCopier),
                 indexFactory
         );
-        NodeState indexed = createAndPopulateAsyncIndex();
+        NodeState indexed = createAndPopulateAsyncIndex(IndexingMode.NRT);
         tracker.update(indexed);
 
         DocumentQueue queue = new DocumentQueue(1000, tracker, executor);
@@ -281,7 +282,7 @@ public class DocumentQueueTest {
         for (int i = 0; i < numDocs; i++) {
             ListMultimap<String, LuceneDoc> docs = ArrayListMultimap.create();
             docs.get("/oak:index/fooIndex").add(doc);
-            queue.addAllSynchronously(docs.asMap(), true);
+            queue.addAllSynchronously(docs.asMap());
         }
         System.out.printf("%n[sync] Time taken for %d is %s%n", numDocs, w);
 
@@ -325,8 +326,8 @@ public class DocumentQueueTest {
         );
     }
 
-    private NodeState createAndPopulateAsyncIndex() throws 
CommitFailedException {
-        createIndexDefinition("fooIndex");
+    private NodeState createAndPopulateAsyncIndex(IndexingMode indexingMode) 
throws CommitFailedException {
+        createIndexDefinition("fooIndex", indexingMode);
 
         //Have some stuff to be indexed
         builder.child("a").setProperty("foo", "bar");
@@ -340,12 +341,12 @@ public class DocumentQueueTest {
         return info;
     }
 
-    private void createIndexDefinition(String idxName) {
+    private void createIndexDefinition(String idxName, IndexingMode 
indexingMode) {
         NodeBuilder idx = 
newLucenePropertyIndexDefinition(builder.child("oak:index"),
                 idxName, ImmutableSet.of("foo"), "async");
         //Disable compression
         //idx.setProperty("codec", "oakCodec");
-        TestUtil.enableNRTIndexing(idx);
+        TestUtil.enableIndexingMode(idx, indexingMode);
     }
 
 }
\ No newline at end of file

Modified: 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/TimedRefreshPolicyTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/TimedRefreshPolicyTest.java?rev=1760862&r1=1760861&r2=1760862&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/TimedRefreshPolicyTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/hybrid/TimedRefreshPolicyTest.java
 Thu Sep 15 07:19:50 2016
@@ -24,52 +24,133 @@ import java.util.concurrent.TimeUnit;
 import org.apache.jackrabbit.oak.stats.Clock;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class TimedRefreshPolicyTest {
     private Clock clock = new Clock.Virtual();
+    private RecordingRunnable refreshCallback = new RecordingRunnable();
 
     @Test
     public void dirtyAndFirstCheck() throws Exception{
         clock.waitUntil(System.currentTimeMillis());
-        ReaderRefreshPolicy policy = new TimedRefreshPolicy(clock, 
TimeUnit.SECONDS, 1);
-        assertFalse(policy.shouldRefresh());
+        TimedRefreshPolicy policy = new TimedRefreshPolicy(false, clock, 
TimeUnit.SECONDS, 1);
+        policy.refreshOnWriteIfRequired(refreshCallback);
+        refreshCallback.assertNotInvokedAndRest();
 
         policy.updated();
-        assertTrue(policy.shouldRefresh());
-        assertFalse(policy.shouldRefresh());
+
+        policy.refreshOnWriteIfRequired(refreshCallback);
+        refreshCallback.assertInvokedAndReset();
+
+        policy.refreshOnWriteIfRequired(refreshCallback);
+        refreshCallback.assertNotInvokedAndRest();
     }
 
     @Test
     public void dirtyAndNotElapsedTimed() throws Exception{
         clock.waitUntil(System.currentTimeMillis());
-        ReaderRefreshPolicy policy = new TimedRefreshPolicy(clock, 
TimeUnit.SECONDS, 1);
+        TimedRefreshPolicy policy = new TimedRefreshPolicy(false, clock, 
TimeUnit.SECONDS, 1);
 
         policy.updated();
-        assertTrue(policy.shouldRefresh());
-        assertFalse(policy.shouldRefresh());
+        policy.refreshOnWriteIfRequired(refreshCallback);
+        refreshCallback.assertInvokedAndReset();
+
+        policy.refreshOnWriteIfRequired(refreshCallback);
+        refreshCallback.assertNotInvokedAndRest();
 
         policy.updated();
         //Given time has not elapsed it should still be false
-        assertFalse(policy.shouldRefresh());
+        policy.refreshOnWriteIfRequired(refreshCallback);
+        refreshCallback.assertNotInvokedAndRest();
     }
 
     @Test
     public void dirtyAndElapsedTime() throws Exception{
         clock.waitUntil(System.currentTimeMillis());
-        ReaderRefreshPolicy policy = new TimedRefreshPolicy(clock, 
TimeUnit.SECONDS, 1);
+        TimedRefreshPolicy policy = new TimedRefreshPolicy(false, clock, 
TimeUnit.SECONDS, 1);
 
         policy.updated();
-        assertTrue(policy.shouldRefresh());
-        assertFalse(policy.shouldRefresh());
+        policy.refreshOnWriteIfRequired(refreshCallback);
+        refreshCallback.assertInvokedAndReset();
+
+        policy.refreshOnWriteIfRequired(refreshCallback);
+        refreshCallback.assertNotInvokedAndRest();
 
         policy.updated();
         //Given time has not elapsed it should still be false
-        assertFalse(policy.shouldRefresh());
+        //in both reader and writer mode
+        policy.refreshOnWriteIfRequired(refreshCallback);
+        refreshCallback.assertNotInvokedAndRest();
+
+        policy.refreshOnWriteIfRequired(refreshCallback);
+        refreshCallback.assertNotInvokedAndRest();
+
+        //Let the refresh delta time elapse
+        long refreshDelta = TimeUnit.SECONDS.toMillis(1) + 1;
+        clock.waitUntil(System.currentTimeMillis() + refreshDelta);
+
+        policy.refreshOnWriteIfRequired(refreshCallback);
+        refreshCallback.assertInvokedAndReset();
+
+        policy.refreshOnWriteIfRequired(refreshCallback);
+        refreshCallback.assertNotInvokedAndRest();
+
+        policy.updated();
+        //Do similar check for read
+        clock.waitUntil(clock.getTime() + refreshDelta);
+
+        policy.refreshOnReadIfRequired(refreshCallback);
+        refreshCallback.assertInvokedAndReset();
+
+        policy.refreshOnReadIfRequired(refreshCallback);
+        refreshCallback.assertNotInvokedAndRest();
+    }
+
+    @Test
+    public void syncIndex() throws Exception{
+        clock.waitUntil(System.currentTimeMillis());
+        TimedRefreshPolicy policy = new TimedRefreshPolicy(true, clock, 
TimeUnit.SECONDS, 1);
+
+        policy.updated();
+        policy.refreshOnWriteIfRequired(refreshCallback);
+        refreshCallback.assertInvokedAndReset();
+
+        policy.updated();
+
+        //Write would not lead to reader refresh as time has yet not expired
+        policy.refreshOnWriteIfRequired(refreshCallback);
+        refreshCallback.assertInvokedAndReset();
+
+        //Already refreshed so reader should not be refreshed again
+        policy.refreshOnReadIfRequired(refreshCallback);
+        refreshCallback.assertNotInvokedAndRest();
+
+        //Checking on write again should not lead to refresh
+        policy.refreshOnWriteIfRequired(refreshCallback);
+        refreshCallback.assertNotInvokedAndRest();
+    }
 
-        clock.waitUntil(System.currentTimeMillis() + 
TimeUnit.SECONDS.toMillis(1) + 1);
-        assertTrue(policy.shouldRefresh());
-        assertFalse(policy.shouldRefresh());
+    private static class RecordingRunnable implements Runnable {
+        private boolean invoked;
+        @Override
+        public void run() {
+            invoked = true;
+        }
+
+        public void assertInvokedAndReset(){
+            assertTrue(invoked);
+            reset();
+        }
+
+        public void assertNotInvokedAndRest(){
+            assertFalse(invoked);
+            reset();
+        }
+
+        void reset(){
+            invoked = false;
+        }
     }
 
 }
\ No newline at end of file


Reply via email to