Author: adulceanu
Date: Thu Apr 27 09:55:46 2017
New Revision: 1792857

URL: http://svn.apache.org/viewvc?rev=1792857&view=rev
Log:
OAK-6074 - Simplify merge logic in LockBasedScheduler

Modified:
    
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java
    
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/MergeTest.java

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java?rev=1792857&r1=1792856&r2=1792857&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/scheduler/LockBasedScheduler.java
 Thu Apr 27 09:55:46 2017
@@ -18,17 +18,12 @@ package org.apache.jackrabbit.oak.segmen
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
-import static java.lang.System.currentTimeMillis;
 import static java.lang.Thread.currentThread;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.jackrabbit.oak.api.Type.LONG;
 
 import java.io.Closeable;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Semaphore;
@@ -73,19 +68,18 @@ public class LockBasedScheduler implemen
 
         @Nonnull
         private StatisticsProvider statsProvider = StatisticsProvider.NOOP;
-        
+
         private boolean dispatchChanges = true;
 
-        private long maximumBackoff = MILLISECONDS.convert(10, SECONDS);
-        
-        private LockBasedSchedulerBuilder(@Nonnull Revisions revisions,
-                @Nonnull SegmentReader reader) {
+        private LockBasedSchedulerBuilder(@Nonnull Revisions revisions, 
@Nonnull SegmentReader reader) {
             this.revisions = revisions;
             this.reader = reader;
         }
-        
+
         /**
-         * {@link StatisticsProvider} for collecting statistics related to 
SegmentStore
+         * {@link StatisticsProvider} for collecting statistics related to
+         * SegmentStore
+         * 
          * @param statisticsProvider
          * @return this instance
          */
@@ -94,26 +88,20 @@ public class LockBasedScheduler implemen
             this.statsProvider = checkNotNull(statisticsProvider);
             return this;
         }
-        
+
         @Nonnull
         public LockBasedSchedulerBuilder dispatchChanges(boolean 
dispatchChanges) {
             this.dispatchChanges = dispatchChanges;
             return this;
         }
-        
-        @Nonnull
-        public LockBasedSchedulerBuilder withMaximumBackoff(long 
maximumBackoff) {
-            this.maximumBackoff = maximumBackoff;
-            return this;
-        }
-        
+
         @Nonnull
         public LockBasedScheduler build() {
             return new LockBasedScheduler(this);
         }
-        
+
     }
-    
+
     public static LockBasedSchedulerBuilder builder(@Nonnull Revisions 
revisions, @Nonnull SegmentReader reader) {
         return new LockBasedSchedulerBuilder(checkNotNull(revisions), 
checkNotNull(reader));
     }
@@ -150,12 +138,8 @@ public class LockBasedScheduler implemen
     private final AtomicReference<SegmentNodeState> head;
 
     private final ChangeDispatcher changeDispatcher;
-    
-    private final Random random = new Random();
-    
+
     private final SegmentNodeStoreStats stats;
-    
-    private final long maximumBackoff;
 
     public LockBasedScheduler(LockBasedSchedulerBuilder builder) {
         if (COMMIT_FAIR_LOCK) {
@@ -170,9 +154,8 @@ public class LockBasedScheduler implemen
         } else {
             this.changeDispatcher = null;
         }
-        
+
         this.stats = new SegmentNodeStoreStats(builder.statsProvider);
-        this.maximumBackoff = builder.maximumBackoff;
     }
 
     @Override
@@ -182,7 +165,7 @@ public class LockBasedScheduler implemen
         }
         return NOOP;
     }
-    
+
     @Override
     public NodeState getHeadNodeState() {
         if (commitSemaphore.tryAcquire()) {
@@ -194,7 +177,7 @@ public class LockBasedScheduler implemen
         }
         return head.get();
     }
-    
+
     /**
      * Refreshes the head state. Should only be called while holding a permit
      * from the {@link #commitSemaphore}.
@@ -217,9 +200,10 @@ public class LockBasedScheduler implemen
             changeDispatcher.contentChanged(root, info);
         }
     }
-    
+
     @Override
-    public NodeState schedule(@Nonnull Commit commit, SchedulerOption... 
schedulingOptions) throws CommitFailedException {
+    public NodeState schedule(@Nonnull Commit commit, SchedulerOption... 
schedulingOptions)
+            throws CommitFailedException {
         boolean queued = false;
 
         try {
@@ -260,95 +244,22 @@ public class LockBasedScheduler implemen
         }
     }
 
-    private NodeState execute(Commit commit)
-            throws CommitFailedException, InterruptedException {
+    private NodeState execute(Commit commit) throws CommitFailedException, 
InterruptedException {
         // only do the merge if there are some changes to commit
         if (commit.hasChanges()) {
-            long timeout = optimisticMerge(commit);
-            if (timeout >= 0) {
-                pessimisticMerge(commit, timeout);
-            }
-        }
-        return head.get().getChildNode(ROOT);
-    }
-    
-    private long optimisticMerge(Commit commit)
-            throws CommitFailedException, InterruptedException {
-        long timeout = 1;
-
-        // use exponential backoff in case of concurrent commits
-        for (long backoff = 1; backoff < maximumBackoff; backoff *= 2) {
-            long start = System.nanoTime();
-
             refreshHead(true);
-            SegmentNodeState state = head.get();
-            if (state.hasProperty("token")
-                    && state.getLong("timeout") >= currentTimeMillis()) {
-                // someone else has a pessimistic lock on the journal,
-                // so we should not try to commit anything yet
-            } else {
-                // use optimistic locking to update the journal
-                if (setHead(state, commit.apply(state), commit.info())) {
-                    return -1;
-                }
-            }
-
-            // someone else was faster, so wait a while and retry later
-            Thread.sleep(backoff, random.nextInt(1000000));
-
-            long stop = System.nanoTime();
-            if (stop - start > timeout) {
-                timeout = stop - start;
+            SegmentNodeState before = head.get();
+            SegmentNodeState after = commit.apply(before);
+            if (revisions.setHead(before.getRecordId(), after.getRecordId())) {
+                head.set(after);
+                contentChanged(after.getChildNode(ROOT), commit.info());
+                refreshHead(true);
             }
         }
 
-        return MILLISECONDS.convert(timeout, NANOSECONDS);
+        return head.get().getChildNode(ROOT);
     }
 
-    private void pessimisticMerge(Commit commit, long timeout)
-            throws CommitFailedException, InterruptedException {
-        while (true) {
-            long now = currentTimeMillis();
-            SegmentNodeState state = head.get();
-            if (state.hasProperty("token")
-                    && state.getLong("timeout") >= now) {
-                // locked by someone else, wait until unlocked or expired
-                Thread.sleep(
-                        Math.min(state.getLong("timeout") - now, 1000),
-                        random.nextInt(1000000));
-            } else {
-                // attempt to acquire the lock
-                SegmentNodeBuilder builder = state.builder();
-                builder.setProperty("token", UUID.randomUUID().toString());
-                builder.setProperty("timeout", now + timeout);
-
-                if (setHead(state, builder.getNodeState(), commit.info())) {
-                     // lock acquired; rebase, apply commit hooks, and unlock
-                    builder = commit.apply(state).builder();
-                    builder.removeProperty("token");
-                    builder.removeProperty("timeout");
-
-                    // complete the commit
-                    if (setHead(state, builder.getNodeState(), commit.info())) 
{
-                        return;
-                    }
-                }
-            }
-        }
-    }
-    
-    private boolean setHead(SegmentNodeState before, SegmentNodeState after, 
CommitInfo info) {
-        refreshHead(true);
-        if (revisions.setHead(before.getRecordId(), after.getRecordId())) {
-            head.set(after);
-            contentChanged(after.getChildNode(ROOT), info);
-            refreshHead(true);
-            return true;
-        } else {
-            return false;
-        }
-    }
-    
     @Override
     public String checkpoint(long lifetime, @Nonnull Map<String, String> 
properties) {
         checkArgument(lifetime > 0);
@@ -391,8 +302,7 @@ public class LockBasedScheduler implemen
                     SegmentNodeState state = head.get();
                     SegmentNodeBuilder builder = state.builder();
 
-                    NodeBuilder cp = builder.child("checkpoints").child(
-                            name);
+                    NodeBuilder cp = builder.child("checkpoints").child(name);
                     if (cp.exists()) {
                         cp.remove();
                         SegmentNodeState newState = builder.getNodeState();

Modified: 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/MergeTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/MergeTest.java?rev=1792857&r1=1792856&r2=1792857&view=diff
==============================================================================
--- 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/MergeTest.java
 (original)
+++ 
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/MergeTest.java
 Thu Apr 27 09:55:46 2017
@@ -20,23 +20,15 @@ package org.apache.jackrabbit.oak.segmen
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.io.IOException;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.annotation.Nonnull;
 
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.segment.memory.MemoryStore;
-import org.apache.jackrabbit.oak.spi.commit.CommitHook;
 import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
 import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
-import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.jackrabbit.oak.spi.state.NodeStore;
-import org.junit.Ignore;
 import org.junit.Test;
 
 public class MergeTest {
@@ -86,59 +78,4 @@ public class MergeTest {
         assertTrue(store.getRoot().hasProperty("foo"));
         assertTrue(store.getRoot().hasProperty("bar"));
     }
-
-    @Test
-    @Ignore("OAK-4122")
-    public void testPessimisticMerge() throws Exception {
-        final SegmentNodeStore store = SegmentNodeStoreBuilders.builder(new 
MemoryStore()).build();
-        final Semaphore semaphore = new Semaphore(0);
-        final AtomicBoolean running = new AtomicBoolean(true);
-
-        Thread background = new Thread() {
-            @Override
-            public void run() {
-                for (int i = 0; running.get(); i++) {
-                    try {
-                        NodeBuilder a = store.getRoot().builder();
-                        a.setProperty("foo", "abc" + i);
-                        store.merge(a, EmptyHook.INSTANCE, CommitInfo.EMPTY);
-                        semaphore.release();
-                    } catch (CommitFailedException e) {
-                        fail();
-                    }
-                }
-            }
-        };
-        background.start();
-
-        // wait for the first commit
-        semaphore.acquire();
-
-        assertTrue(store.getRoot().hasProperty("foo"));
-        assertFalse(store.getRoot().hasProperty("bar"));
-
-        NodeBuilder b = store.getRoot().builder();
-        b.setProperty("bar", "xyz");
-        
-        // FIXME OAK-4122
-        //  store.setMaximumBackoff(100);
-        store.merge(b, new CommitHook() {
-            @Override @Nonnull
-            public NodeState processCommit(
-                    NodeState before, NodeState after, CommitInfo info) {
-                try {
-                    Thread.sleep(100);
-                } catch (InterruptedException e) {
-                    fail();
-                }
-                return after;
-            }
-        }, CommitInfo.EMPTY);
-
-        assertTrue(store.getRoot().hasProperty("foo"));
-        assertTrue(store.getRoot().hasProperty("bar"));
-
-        running.set(false);
-        background.join();
-    }
 }


Reply via email to