Author: yurize
Date: Tue Nov  1 19:04:58 2011
New Revision: 1196213

URL: http://svn.apache.org/viewvc?rev=1196213&view=rev
Log:
Improves persistence logic and addresses some old TODOs. In collaboration with 
[email protected]

Modified:
    
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java
    
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveServerModule.java
    
incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/WaveServerTest.java

Modified: 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java
URL: 
http://svn.apache.org/viewvc/incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java?rev=1196213&r1=1196212&r2=1196213&view=diff
==============================================================================
--- 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java
 (original)
+++ 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java
 Tue Nov  1 19:04:58 2011
@@ -53,17 +53,11 @@ import java.util.concurrent.atomic.Atomi
 
 /**
  * Simplistic {@link DeltaStore}-backed wavelet state implementation
- * which keeps the entire delta history in memory.
- *
- * TODO(soren): only keep in memory what's not persisted
+ * which goes to persistent storage for every history request.
  *
  * TODO(soren): rewire this class to be backed by {@link WaveletStore} and
  * read the snapshot from there instead of computing it in the
- * DeltaStoreBasedWaveletState constructor
- *
- * TODO(soren): refine the persist() logic to make it batch successive
- * writes to storage, when write latency exceeds the intervals between
- * calls to persist()
+ * DeltaStoreBasedWaveletState constructor.
  *
  * @author [email protected] (Soren Lassen)
  */
@@ -99,11 +93,6 @@ class DeltaStoreBasedWaveletState implem
    */
   public static DeltaStoreBasedWaveletState create(DeltaStore.DeltasAccess 
deltasAccess,
       Executor persistExecutor) throws PersistenceException {
-    // Note that the logic in persist() depends on persistExecutor being 
single-threaded.
-    // TODO(soren): finesse the logic in persist() so it
-    // doesn't require it to be single-threaded, because it would be useful to 
be
-    // able to use a shared executor with a thread-count set to the 
appropriate level
-    // of write parallelism for the storage subsystem.
     if (deltasAccess.isEmpty()) {
       return new DeltaStoreBasedWaveletState(deltasAccess, 
ImmutableList.<WaveletDeltaRecord>of(),
           null, persistExecutor);
@@ -122,7 +111,7 @@ class DeltaStoreBasedWaveletState implem
   }
 
   /**
-   * Reads all deltas from
+   * Reads all deltas from persistent storage.
    */
   private static ImmutableList<WaveletDeltaRecord> 
readAll(WaveletDeltaRecordReader reader)
       throws IOException{
@@ -153,6 +142,64 @@ class DeltaStoreBasedWaveletState implem
   private final Executor persistExecutor;
   private final HashedVersion versionZero;
   private final DeltaStore.DeltasAccess deltasAccess;
+  
+  /** The lock that guards access to persistence related state. */
+  private Object persistLock = new Object();
+  
+  /**
+   * Indicates the version of the latest appended delta that was already 
requested to be
+   * persisted.
+   */
+  private HashedVersion latestVersionToPersist = null;
+  
+  /** The persist task that will be executed next. */
+  private ListenableFutureTask<Void> nextPersistTask = null;
+  
+  /**
+   * Processes the persist task and checks if there is another task to do when
+   * one task is done. In such a case, it writes all waiting to be persisted
+   * deltas to persistent storage in one operation.
+   */
+  private final Callable<Void> persisterTask = new Callable<Void>() {
+    @Override
+    public Void call() throws PersistenceException {
+      HashedVersion last;
+      HashedVersion version;
+      synchronized (persistLock) {
+        last = lastPersistedVersion.get();
+        version = latestVersionToPersist;
+      }
+      if (last != null && version.getVersion() <= last.getVersion()) {
+        LOG.info("Attempt to persist version " + version + " smaller than last 
persisted version "
+            + last);
+        // Done, version is already persisted.
+        version = last;
+      } else {
+        ImmutableList.Builder<WaveletDeltaRecord> deltas = 
ImmutableList.builder();
+        HashedVersion v = (last == null) ? versionZero : last;
+        do {
+          WaveletDeltaRecord d =
+              new WaveletDeltaRecord(v, appliedDeltas.get(v), 
transformedDeltas.get(v));
+          deltas.add(d);
+          v = d.getResultingVersion();
+        } while (v.getVersion() < version.getVersion());
+        Preconditions.checkState(v.equals(version));
+        deltasAccess.append(deltas.build());
+      }
+      synchronized (persistLock) {
+        Preconditions.checkState(last == lastPersistedVersion.get(),
+            "lastPersistedVersion changed while we were writing to storage");
+        lastPersistedVersion.set(version);
+        if (nextPersistTask != null) {
+          persistExecutor.execute(nextPersistTask);
+          nextPersistTask = null;
+        } else {
+          latestVersionToPersist = null;
+        }
+      }
+      return null;
+    }
+  };
 
   /** Keyed by appliedAtVersion. */
   private final NavigableMap<HashedVersion, 
ByteStringMessage<ProtocolAppliedWaveletDelta>>
@@ -314,42 +361,30 @@ class DeltaStoreBasedWaveletState implem
 
   @Override
   public ListenableFuture<Void> persist(final HashedVersion version) {
-    Preconditions.checkArgument(version.getVersion() > 0,
-        "Cannot persist non-positive version %s", version);
-    Preconditions.checkArgument(isDeltaBoundary(version),
-        "Version to persist %s matches no delta", version);
-
-    // The following logic relies on persistExecutor being single-threaded,
-    // so no two tasks execute in parallel.
-    ListenableFutureTask<Void> resultTask = new ListenableFutureTask<Void>(
-        new Callable<Void>() {
-          @Override
-          public Void call() throws PersistenceException {
-            HashedVersion last = lastPersistedVersion.get();
-            if (last != null && version.getVersion() <= last.getVersion()) {
-              LOG.info("Attempt to persist version " + version
-                  + " smaller than last persisted version " + last);
-              // done, version is already persisted
-            } else {
-              ImmutableList.Builder<WaveletDeltaRecord> deltas = 
ImmutableList.builder();
-              HashedVersion v = (last == null) ? versionZero : last;
-              do {
-                WaveletDeltaRecord d =
-                    new WaveletDeltaRecord(v, appliedDeltas.get(v), 
transformedDeltas.get(v));
-                deltas.add(d);
-                v = d.getResultingVersion();
-              } while (v.getVersion() < version.getVersion());
-              Preconditions.checkState(v.equals(version));
-              deltasAccess.append(deltas.build());
-              Preconditions.checkState(last == lastPersistedVersion.get(),
-                  "lastPersistedVersion changed while we were writing to 
storage");
-              lastPersistedVersion.set(version);
-            }
-            return null;
-          }
-        });
-    persistExecutor.execute(resultTask);
-    return resultTask;
+    Preconditions.checkArgument(version.getVersion() > 0, "Cannot persist 
non-positive version %s",
+        version);
+    Preconditions.checkArgument(isDeltaBoundary(version), "Version to persist 
%s matches no delta",
+        version);
+    synchronized (persistLock) {
+      if (latestVersionToPersist != null) {
+        // There's a persist task in flight.
+        if (version.getVersion() <= latestVersionToPersist.getVersion()) {
+          LOG.info("Attempt to persist version " + version
+              + " smaller than last version requested " + 
latestVersionToPersist);
+        } else {
+          latestVersionToPersist = version;
+        }
+        if (nextPersistTask == null) {
+          nextPersistTask = new ListenableFutureTask<Void>(persisterTask);
+        }
+        return nextPersistTask;
+      } else {
+        latestVersionToPersist = version;
+        ListenableFutureTask<Void> resultTask = new 
ListenableFutureTask<Void>(persisterTask);
+        persistExecutor.execute(resultTask);
+        return resultTask;
+      }
+    }
   }
 
   @Override

Modified: 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveServerModule.java
URL: 
http://svn.apache.org/viewvc/incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveServerModule.java?rev=1196213&r1=1196212&r2=1196213&view=diff
==============================================================================
--- 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveServerModule.java
 (original)
+++ 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveServerModule.java
 Tue Nov  1 19:04:58 2011
@@ -57,12 +57,15 @@ public class WaveServerModule extends Ab
   // TODO(soren): move to global config file
   private static final int LISTENER_EXECUTOR_THREAD_COUNT = 2;
   private static final int WAVELET_LOAD_EXECUTOR_THREAD_COUNT = 2;
+  private static final int DELTA_PERSIST_EXECUTOR_THREAD_COUNT = 2;
   private static final IdURIEncoderDecoder URI_CODEC =
       new IdURIEncoderDecoder(new JavaUrlCodec());
   private static final HashedVersionFactory HASH_FACTORY = new 
HashedVersionFactoryImpl(URI_CODEC);
 
   private final Executor waveletLoadExecutor =
       Executors.newFixedThreadPool(WAVELET_LOAD_EXECUTOR_THREAD_COUNT);
+  private final Executor persistExecutor =
+      Executors.newFixedThreadPool(DELTA_PERSIST_EXECUTOR_THREAD_COUNT);
   private final boolean enableFederation;
 
   public WaveServerModule(boolean enableFederation) {
@@ -114,7 +117,7 @@ public class WaveServerModule extends Ab
       public LocalWaveletContainer create(WaveletNotificationSubscriber 
notifiee,
           WaveletName waveletName, String waveDomain) {
         return new LocalWaveletContainerImpl(waveletName, notifiee, 
loadWaveletState(
-            waveletLoadExecutor, deltaStore, waveletName), waveDomain);
+            waveletLoadExecutor, deltaStore, waveletName, persistExecutor), 
waveDomain);
       }
     };
   }
@@ -128,7 +131,7 @@ public class WaveServerModule extends Ab
       public RemoteWaveletContainer create(WaveletNotificationSubscriber 
notifiee,
           WaveletName waveletName, String waveDomain) {
         return new RemoteWaveletContainerImpl(waveletName, notifiee,
-            loadWaveletState(waveletLoadExecutor, deltaStore, waveletName));
+            loadWaveletState(waveletLoadExecutor, deltaStore, waveletName, 
persistExecutor));
       }
     };
   }
@@ -147,21 +150,18 @@ public class WaveServerModule extends Ab
   }
 
   /**
-   * Returns a future whose result is the state of the wavelet after it has
-   * been loaded from storage.
-   * Any failure is reported as a {@link PersistenceException}.
+   * Returns a future whose result is the state of the wavelet after it has 
been
+   * loaded from storage. Any failure is reported as a
+   * {@link PersistenceException}.
    */
   @VisibleForTesting
-  static ListenableFuture<DeltaStoreBasedWaveletState> loadWaveletState(
-      Executor executor, final DeltaStore deltaStore, final WaveletName 
waveletName) {
+  static ListenableFuture<DeltaStoreBasedWaveletState> 
loadWaveletState(Executor executor,
+      final DeltaStore deltaStore, final WaveletName waveletName, final 
Executor persistExecutor) {
     ListenableFutureTask<DeltaStoreBasedWaveletState> task =
         new ListenableFutureTask<DeltaStoreBasedWaveletState>(
             new Callable<DeltaStoreBasedWaveletState>() {
               @Override
               public DeltaStoreBasedWaveletState call() throws 
PersistenceException {
-                // One executor per wave is inefficient; see comment in
-                // DeltaStoreBasedWaveletState.
-                Executor persistExecutor = Executors.newSingleThreadExecutor();
                 return 
DeltaStoreBasedWaveletState.create(deltaStore.open(waveletName),
                     persistExecutor);
               }

Modified: 
incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/WaveServerTest.java
URL: 
http://svn.apache.org/viewvc/incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/WaveServerTest.java?rev=1196213&r1=1196212&r2=1196213&view=diff
==============================================================================
--- 
incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/WaveServerTest.java
 (original)
+++ 
incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/WaveServerTest.java
 Tue Nov  1 19:04:58 2011
@@ -106,7 +106,8 @@ public class WaveServerTest extends Test
       public LocalWaveletContainer create(WaveletNotificationSubscriber 
notifiee,
           WaveletName waveletName, String waveDomain) {
         return new LocalWaveletContainerImpl(waveletName, notifiee,
-            WaveServerModule.loadWaveletState(executor, deltaStore, 
waveletName), waveDomain);
+            WaveServerModule.loadWaveletState(executor, deltaStore, 
waveletName, executor),
+            waveDomain);
       }
     };
 


Reply via email to