Author: yurize
Date: Fri Nov 18 10:46:53 2011
New Revision: 1203579

URL: http://svn.apache.org/viewvc?rev=1203579&view=rev
Log:
Evicts old deltas. part 2. https://reviews.apache.org/r/2662/

Added:
    
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/RuntimeIOException.java
   (with props)
Modified:
    
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedSnapshotStore.java
    
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java
    
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveletContainerImpl.java
    
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveletState.java

Modified: 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedSnapshotStore.java
URL: 
http://svn.apache.org/viewvc/incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedSnapshotStore.java?rev=1203579&r1=1203578&r2=1203579&view=diff
==============================================================================
--- 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedSnapshotStore.java
 (original)
+++ 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedSnapshotStore.java
 Fri Nov 18 10:46:53 2011
@@ -45,22 +45,6 @@ import java.util.Iterator;
 class DeltaStoreBasedSnapshotStore implements DeltaAndSnapshotStore {
 
   /**
-   * Wraps an {@link IOException} in a {@link RuntimeException}.
-   */
-  private static class RuntimeIOException extends RuntimeException {
-    private final IOException cause;
-
-    public RuntimeIOException(IOException cause) {
-      super(cause);
-      this.cause = cause;
-    }
-
-    public IOException getIOException() {
-      return cause;
-    }
-  }
-
-  /**
    * Reads the transformed deltas from a {@link WaveletDeltaRecordReader}.
    */
   private static class TransformedWaveletDeltaIterator

Modified: 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java
URL: 
http://svn.apache.org/viewvc/incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java?rev=1203579&r1=1203578&r2=1203579&view=diff
==============================================================================
--- 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java
 (original)
+++ 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java
 Fri Nov 18 10:46:53 2011
@@ -22,9 +22,12 @@ import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.gxp.compiler.io.RuntimeIOException;
 
 import org.waveprotocol.box.common.DeltaSequence;
 import org.waveprotocol.box.server.persistence.PersistenceException;
@@ -46,8 +49,12 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.NavigableMap;
+import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -80,6 +87,18 @@ class DeltaStoreBasedWaveletState implem
       };
 
   /**
+   * @return An entry keyed by a hashed version with the given version number,
+   *         if any, otherwise null.
+   */
+  private static <T> Map.Entry<HashedVersion, T> 
lookupCached(NavigableMap<HashedVersion, T> map,
+      long version) {
+    // Smallest key with version number >= version.
+    HashedVersion key = HashedVersion.unsigned(version);
+    Map.Entry<HashedVersion, T> entry = map.ceilingEntry(key);
+    return (entry != null && entry.getKey().getVersion() == version) ? entry : 
null;
+  }
+
+  /**
    * Creates a new delta store based state.
    *
    * The executor must ensure that only one thread executes at any time for 
each
@@ -114,35 +133,30 @@ class DeltaStoreBasedWaveletState implem
    * Reads all deltas from persistent storage.
    */
   private static ImmutableList<WaveletDeltaRecord> 
readAll(WaveletDeltaRecordReader reader)
-      throws IOException{
+      throws IOException {
+    long startVersion = 0;
+    long endVersion = reader.getEndVersion().getVersion();
+    return readDeltasInRange(reader, startVersion, endVersion);
+  }
+
+  private static ImmutableList<WaveletDeltaRecord> readDeltasInRange(
+      final WaveletDeltaRecordReader reader, final long startVersion, final 
long endVersion) throws IOException {
     Preconditions.checkArgument(!reader.isEmpty());
     ImmutableList.Builder<WaveletDeltaRecord> result = ImmutableList.builder();
-    HashedVersion endVersion = reader.getEndVersion();
-    long version = 0;
-    while (version < endVersion.getVersion()) {
-      WaveletDeltaRecord delta = reader.getDelta(version);
+    long i = startVersion;
+    while (i < endVersion) {
+      WaveletDeltaRecord delta;
+      delta = reader.getDelta(i);
       result.add(delta);
-      version = delta.getResultingVersion().getVersion();
+      i = delta.getResultingVersion().getVersion();
     }
     return result.build();
   }
 
-  /**
-   * @return An entry keyed by a hashed version with the given version number,
-   *         if any, otherwise null.
-   */
-  private static <T> Map.Entry<HashedVersion, T> lookup(
-      NavigableMap<HashedVersion, T> map, long version) {
-    // Smallest key with version number >= version.
-    HashedVersion key = HashedVersion.unsigned(version);
-    Map.Entry<HashedVersion, T> entry = map.ceilingEntry(key);
-    return (entry != null && entry.getKey().getVersion() == version) ? entry : 
null;
-  }
-
   private final Executor persistExecutor;
   private final HashedVersion versionZero;
   private final DeltaStore.DeltasAccess deltasAccess;
-  
+
   /** The lock that guards access to persistence related state. */
   private Object persistLock = new Object();
   
@@ -170,8 +184,8 @@ class DeltaStoreBasedWaveletState implem
         version = latestVersionToPersist;
       }
       if (last != null && version.getVersion() <= last.getVersion()) {
-        LOG.info("Attempt to persist version " + version + " smaller than last 
persisted version "
-            + last);
+        LOG.info("Attempt to persist version " + version
+            + " smaller than last persisted version " + last);
         // Done, version is already persisted.
         version = last;
       } else {
@@ -202,12 +216,12 @@ class DeltaStoreBasedWaveletState implem
   };
 
   /** Keyed by appliedAtVersion. */
-  private final NavigableMap<HashedVersion, 
ByteStringMessage<ProtocolAppliedWaveletDelta>>
-      appliedDeltas = Maps.newTreeMap();
+  private final ConcurrentNavigableMap<HashedVersion, 
ByteStringMessage<ProtocolAppliedWaveletDelta>> appliedDeltas =
+      new ConcurrentSkipListMap<HashedVersion, 
ByteStringMessage<ProtocolAppliedWaveletDelta>>();
 
   /** Keyed by appliedAtVersion. */
-  private final NavigableMap<HashedVersion, TransformedWaveletDelta> 
transformedDeltas =
-      Maps.newTreeMap();
+  private final ConcurrentNavigableMap<HashedVersion, TransformedWaveletDelta> 
transformedDeltas =
+      new ConcurrentSkipListMap<HashedVersion, TransformedWaveletDelta>();
 
   /** Is null if the wavelet state is empty. */
   private WaveletData snapshot;
@@ -236,11 +250,6 @@ class DeltaStoreBasedWaveletState implem
     this.persistExecutor = persistExecutor;
     this.versionZero = 
HASH_FACTORY.createVersionZero(deltasAccess.getWaveletName());
     this.deltasAccess = deltasAccess;
-    for (WaveletDeltaRecord delta : deltas) {
-      HashedVersion hashedVersion = delta.getAppliedAtVersion();
-      appliedDeltas.put(hashedVersion, delta.getAppliedDelta());
-      transformedDeltas.put(hashedVersion, delta.getTransformedDelta());
-    }
     this.snapshot = snapshot;
     this.lastPersistedVersion = new 
AtomicReference<HashedVersion>(deltasAccess.getEndVersion());
   }
@@ -268,6 +277,8 @@ class DeltaStoreBasedWaveletState implem
 
   @Override
   public HashedVersion getHashedVersion(long version) {
+    final Entry<HashedVersion, TransformedWaveletDelta> cachedEntry =
+        lookupCached(transformedDeltas, version);
     if (version == 0) {
       return versionZero;
     } else if (snapshot == null) {
@@ -275,58 +286,123 @@ class DeltaStoreBasedWaveletState implem
     } else if (version == snapshot.getVersion()) {
       return snapshot.getHashedVersion();
     } else {
-      Map.Entry<HashedVersion, TransformedWaveletDelta> entry = 
lookup(transformedDeltas, version);
-      return (entry == null) ? null : entry.getKey();
+      WaveletDeltaRecord delta;
+      try {
+        delta = lookup(version);
+      } catch (IOException e) {
+        throw new RuntimeIOException(e);
+      }
+      if (delta == null && cachedEntry != null) {
+        return cachedEntry.getKey();
+      } else {
+       return delta != null ? delta.getAppliedAtVersion() : null;
+      }
     }
   }
 
   @Override
-  public TransformedWaveletDelta getTransformedDelta(HashedVersion 
beginVersion) {
-    return transformedDeltas.get(beginVersion);
+  public TransformedWaveletDelta getTransformedDelta(
+      final HashedVersion beginVersion) {
+    TransformedWaveletDelta delta = transformedDeltas.get(beginVersion);
+    if (delta != null) {
+      return delta;
+    } else {
+      WaveletDeltaRecord nowDelta;
+      try {
+        nowDelta = lookup(beginVersion.getVersion());
+      } catch (IOException e) {
+        throw new RuntimeIOException(e);
+      }
+      return nowDelta != null ? nowDelta.transformed : null;
+    }
   }
 
   @Override
-  public TransformedWaveletDelta getTransformedDeltaByEndVersion(HashedVersion 
endVersion) {
-    Preconditions.checkArgument(endVersion.getVersion() > 0,
-        "end version %s is not positive", endVersion);
+  public TransformedWaveletDelta getTransformedDeltaByEndVersion(final 
HashedVersion endVersion) {
+    Preconditions.checkArgument(endVersion.getVersion() > 0, "end version %s 
is not positive",
+        endVersion);
+    final TransformedWaveletDelta cachedDelta = 
transformedDeltas.lowerEntry(endVersion).getValue();
     if (snapshot == null) {
       return null;
-    } else if (endVersion.equals(snapshot.getHashedVersion())) {
-      return transformedDeltas.lastEntry().getValue();
     } else {
-      TransformedWaveletDelta delta = 
transformedDeltas.lowerEntry(endVersion).getValue();
-      return delta.getResultingVersion().equals(endVersion) ? delta : null;
+      WaveletDeltaRecord deltaRecord = getDeltaRecordByEndVersion(endVersion);
+      TransformedWaveletDelta delta;
+      if (deltaRecord == null && 
cachedDelta.getResultingVersion().equals(endVersion)) {
+        delta = cachedDelta;
+      } else {
+        delta = deltaRecord != null ? deltaRecord.getTransformedDelta() : null;
+      }
+      return delta;
     }
   }
 
   @Override
-  public DeltaSequence getTransformedDeltaHistory(HashedVersion startVersion,
-      HashedVersion endVersion) {
+  public DeltaSequence getTransformedDeltaHistory(final HashedVersion 
startVersion,
+      final HashedVersion endVersion) {
     Preconditions.checkArgument(startVersion.getVersion() < 
endVersion.getVersion(),
         "Start version %s should be smaller than end version %s", 
startVersion, endVersion);
-    NavigableMap<HashedVersion, TransformedWaveletDelta> deltas =
-        transformedDeltas.subMap(startVersion, true, endVersion, false);
-    return
-        (!deltas.isEmpty() &&
-         deltas.firstKey().equals(startVersion) &&
-         
deltas.lastEntry().getValue().getResultingVersion().equals(endVersion))
-        ? DeltaSequence.of(deltas.values())
-        : null;
+    // The history deltas can be either in the memory - waiting to be 
persisted,
+    // or already persisted. We take both and merge into one list.
+    final NavigableMap<HashedVersion, TransformedWaveletDelta> cachedDeltas = 
Maps.newTreeMap();
+    cachedDeltas.putAll(transformedDeltas.subMap(startVersion, true, 
endVersion, false));
+    ImmutableList<WaveletDeltaRecord> persistedDeltas;
+    try {
+      persistedDeltas =
+          readDeltasInRange(deltasAccess, startVersion.getVersion(), 
endVersion.getVersion());
+    } catch (IOException e) {
+      throw new RuntimeIOException(e);
+    }
+    NavigableMap<HashedVersion, TransformedWaveletDelta> 
allTransformedDeltasMap =
+        Maps.newTreeMap();
+    allTransformedDeltasMap.putAll(cachedDeltas);
+    for (WaveletDeltaRecord d : persistedDeltas) {
+      allTransformedDeltasMap.put(d.getAppliedAtVersion(), 
d.getTransformedDelta());
+    }
+    DeltaSequence nowDeltaSequence;
+    if (!allTransformedDeltasMap.isEmpty()
+        && allTransformedDeltasMap.firstKey().equals(startVersion)
+        && 
allTransformedDeltasMap.lastEntry().getValue().getResultingVersion().equals(endVersion))
 {
+      List<TransformedWaveletDelta> cachedAndPersitentDeltasList =
+          Lists.newArrayList(allTransformedDeltasMap.values());
+      nowDeltaSequence = DeltaSequence.of(cachedAndPersitentDeltasList);
+    } else {
+      nowDeltaSequence = null;
+    }
+    return nowDeltaSequence;
   }
 
   @Override
   public ByteStringMessage<ProtocolAppliedWaveletDelta> getAppliedDelta(
       HashedVersion beginVersion) {
-    return appliedDeltas.get(beginVersion);
+    ByteStringMessage<ProtocolAppliedWaveletDelta> delta = 
appliedDeltas.get(beginVersion);
+    if (delta != null) {
+      return delta;
+    } else {
+      WaveletDeltaRecord record = null;
+      try {
+        record = lookup(beginVersion.getVersion());
+      } catch (IOException e) {
+        new RuntimeIOException(e);
+      }
+      return record != null ? record.applied : null;
+    }
   }
 
   @Override
   public ByteStringMessage<ProtocolAppliedWaveletDelta> 
getAppliedDeltaByEndVersion(
-      HashedVersion endVersion) {
+      final HashedVersion endVersion) {
     Preconditions.checkArgument(endVersion.getVersion() > 0,
         "end version %s is not positive", endVersion);
-    return isDeltaBoundary(endVersion) ? 
appliedDeltas.lowerEntry(endVersion).getValue() : null;
-
+    final ByteStringMessage<ProtocolAppliedWaveletDelta> cachedDelta =
+        appliedDeltas.lowerEntry(endVersion).getValue();
+    WaveletDeltaRecord deltaRecord = getDeltaRecordByEndVersion(endVersion);
+    ByteStringMessage<ProtocolAppliedWaveletDelta> appliedDelta;
+    if (deltaRecord == null && isDeltaBoundary(endVersion)) {
+      appliedDelta = cachedDelta;
+    } else {
+      appliedDelta = deltaRecord != null ? deltaRecord.getAppliedDelta() : 
null;
+    }
+    return appliedDelta;
   }
 
   @Override
@@ -338,6 +414,31 @@ class DeltaStoreBasedWaveletState implem
         : null;
   }
 
+  public Collection<ByteStringMessage<ProtocolAppliedWaveletDelta>> 
getAppliedDeltaHistory1(
+      final HashedVersion startVersion, final HashedVersion endVersion) {
+    Preconditions.checkArgument(startVersion.getVersion() < 
endVersion.getVersion());
+    final Set<ByteStringMessage<ProtocolAppliedWaveletDelta>> allDeltas = 
Sets.newHashSet();
+    allDeltas.addAll(appliedDeltas.subMap(startVersion, endVersion).values());
+    ImmutableList<WaveletDeltaRecord> persistedDeltas;
+    try {
+      persistedDeltas =
+          readDeltasInRange(deltasAccess, startVersion.getVersion(), 
endVersion.getVersion());
+    } catch (IOException e) {
+      throw new RuntimeIOException(e);
+    }
+    for (WaveletDeltaRecord d : persistedDeltas) {
+      allDeltas.add(d.getAppliedDelta());
+    }
+    Collection<ByteStringMessage<ProtocolAppliedWaveletDelta>> deltaCollection 
=
+        Lists.newArrayList();
+    if (isDeltaBoundary(startVersion) && isDeltaBoundary(endVersion)) {
+      for (ByteStringMessage<ProtocolAppliedWaveletDelta> appliedDelta : 
allDeltas) {
+        deltaCollection.add(appliedDelta);
+      }
+    }
+    return deltaCollection;
+  }
+
   @Override
   public void appendDelta(HashedVersion appliedAtVersion,
       TransformedWaveletDelta transformedDelta,
@@ -361,10 +462,10 @@ class DeltaStoreBasedWaveletState implem
 
   @Override
   public ListenableFuture<Void> persist(final HashedVersion version) {
-    Preconditions.checkArgument(version.getVersion() > 0, "Cannot persist 
non-positive version %s",
-        version);
-    Preconditions.checkArgument(isDeltaBoundary(version), "Version to persist 
%s matches no delta",
-        version);
+    Preconditions.checkArgument(version.getVersion() > 0,
+        "Cannot persist non-positive version %s", version);
+    Preconditions.checkArgument(isDeltaBoundary(version),
+        "Version to persist %s matches no delta", version);
     synchronized (persistLock) {
       if (latestVersionToPersist != null) {
         // There's a persist task in flight.
@@ -388,9 +489,35 @@ class DeltaStoreBasedWaveletState implem
   }
 
   @Override
+  public void flush(HashedVersion version) {
+    transformedDeltas.remove(version);
+    appliedDeltas.remove(version);
+    if (LOG.isFineLoggable()) {
+      LOG.fine("Flushed delta " + version);
+    }
+  }
+
+  @Override
   public void close() {
   }
 
+  /**
+   * @return An entry keyed by a hashed version with the given version number,
+   *         if any, otherwise null.
+   */
+  private WaveletDeltaRecord lookup(long version) throws IOException {
+    return deltasAccess.getDelta(version);
+  }
+
+  private WaveletDeltaRecord getDeltaRecordByEndVersion(HashedVersion 
endVersion) {
+    long version = endVersion.getVersion();
+    try {
+      return deltasAccess.getDeltaByEndVersion(version);
+    } catch (IOException e) {
+      throw new RuntimeIOException(e);
+    }
+  }
+
   private boolean isDeltaBoundary(HashedVersion version) {
     Preconditions.checkNotNull(version, "version is null");
     return version.equals(getCurrentVersion()) || 
transformedDeltas.containsKey(version);

Added: 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/RuntimeIOException.java
URL: 
http://svn.apache.org/viewvc/incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/RuntimeIOException.java?rev=1203579&view=auto
==============================================================================
--- 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/RuntimeIOException.java
 (added)
+++ 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/RuntimeIOException.java
 Fri Nov 18 10:46:53 2011
@@ -0,0 +1,38 @@
+/**
+ * Copyright 2011 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.waveprotocol.box.server.waveserver;
+
+import java.io.IOException;
+
+/**
+ * Wraps an {@link IOException} in a {@link RuntimeException}.
+ *
+ * @author [email protected] (Yuri Zelikov)
+ */
+@SuppressWarnings("serial")
+public class RuntimeIOException extends RuntimeException {
+  private final IOException cause;
+
+  public RuntimeIOException(IOException cause) {
+    super(cause);
+    this.cause = cause;
+  }
+
+  public IOException getIOException() {
+    return cause;
+  }
+}

Propchange: 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/RuntimeIOException.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveletContainerImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveletContainerImpl.java?rev=1203579&r1=1203578&r2=1203579&view=diff
==============================================================================
--- 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveletContainerImpl.java
 (original)
+++ 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveletContainerImpl.java
 Fri Nov 18 10:46:53 2011
@@ -237,7 +237,7 @@ abstract class WaveletContainerImpl impl
           public void run() {
             acquireWriteLock();
             try {
-              // waveletState.flush(version); // TODO(soren): implement this
+              waveletState.flush(version);
               notifyOfCommit(version, domainsToNotify);
             } finally {
               releaseWriteLock();

Modified: 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveletState.java
URL: 
http://svn.apache.org/viewvc/incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveletState.java?rev=1203579&r1=1203578&r2=1203579&view=diff
==============================================================================
--- 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveletState.java
 (original)
+++ 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveletState.java
 Fri Nov 18 10:46:53 2011
@@ -135,6 +135,9 @@ interface WaveletState {
    */
   ListenableFuture<Void> persist(HashedVersion version);
 
+  /** Flushes persisted delta from memory. */
+  void flush(HashedVersion version);
+
   /**
    * Closes the object. No other methods on the object should be invoked after
    * this class.


Reply via email to