Wavelet snapshot storage with MongoDB

Project: http://git-wip-us.apache.org/repos/asf/incubator-wave/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-wave/commit/94fa1878
Tree: http://git-wip-us.apache.org/repos/asf/incubator-wave/tree/94fa1878
Diff: http://git-wip-us.apache.org/repos/asf/incubator-wave/diff/94fa1878

Branch: refs/heads/swellrt
Commit: 94fa187895df34e4c9319db74634be8d537ceede
Parents: 95f178b
Author: Pablo Ojanguren <pablo...@gmail.com>
Authored: Mon Oct 17 19:06:46 2016 +0200
Committer: Pablo Ojanguren <pablo...@gmail.com>
Committed: Mon Oct 17 19:06:46 2016 +0200

----------------------------------------------------------------------
 wave/config/reference.conf                      |   3 +
 .../persistence/file/FileDeltaCollection.java   |  30 ++-
 .../memory/MemoryDeltaCollection.java           |  19 ++
 .../mongodb/MongoDBSnapshotStore.java           | 195 +++++++++++++++++
 .../mongodb/MongoDbDeltaCollection.java         |  97 +++++---
 .../persistence/mongodb/MongoDbDeltaStore.java  |  74 +++++--
 .../persistence/mongodb/MongoDbProvider.java    |   2 +-
 .../box/server/waveserver/DeltaStore.java       |  35 +++
 .../DeltaStoreBasedSnapshotStore.java           |   5 +
 .../waveserver/DeltaStoreBasedWaveletState.java | 219 ++++++++++++++++---
 .../box/server/waveserver/WaveServerModule.java |  34 ++-
 .../waveserver/WaveletDeltaRecordReader.java    |  15 +-
 .../box/server/waveserver/WaveServerTest.java   |   2 +-
 13 files changed, 629 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/94fa1878/wave/config/reference.conf
----------------------------------------------------------------------
diff --git a/wave/config/reference.conf b/wave/config/reference.conf
index f751ca8..005872d 100644
--- a/wave/config/reference.conf
+++ b/wave/config/reference.conf
@@ -137,6 +137,9 @@ core {
 
   # Duration to keep the waves in cache.
   wave_cache_expire = 60m
+  
+  # Persist wavelet snapshot after a number of deltas received
+  persist_snapshots_on_deltas_count = 100
 }
 
 network {

http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/94fa1878/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileDeltaCollection.java
----------------------------------------------------------------------
diff --git 
a/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileDeltaCollection.java
 
b/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileDeltaCollection.java
index 11dbb31..6677d33 100644
--- 
a/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileDeltaCollection.java
+++ 
b/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileDeltaCollection.java
@@ -35,11 +35,13 @@ import 
org.waveprotocol.box.server.waveserver.AppliedDeltaUtil;
 import org.waveprotocol.box.server.waveserver.ByteStringMessage;
 import org.waveprotocol.box.server.waveserver.WaveletDeltaRecord;
 import org.waveprotocol.box.server.waveserver.DeltaStore.DeltasAccess;
+import org.waveprotocol.box.server.waveserver.DeltaStore.Snapshot;
 import org.waveprotocol.wave.federation.Proto.ProtocolAppliedWaveletDelta;
 import org.waveprotocol.wave.model.id.WaveletName;
 import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta;
 import org.waveprotocol.wave.model.util.Pair;
 import org.waveprotocol.wave.model.version.HashedVersion;
+import org.waveprotocol.wave.model.wave.data.WaveletData;
 import org.waveprotocol.wave.util.logging.Log;
 
 import java.io.File;
@@ -641,9 +643,27 @@ public class FileDeltaCollection implements DeltasAccess {
     file.setLength(file.getFilePointer());
   }
 
-@Override
-public long getAllDeltas(Receiver<WaveletDeltaRecord> receiver) throws 
IOException {
-       // TODO Auto-generated method stub
-       return 0;
-}
+  @Override
+  public long getAllDeltas(Receiver<WaveletDeltaRecord> receiver) throws 
IOException {
+     return getDeltasInRange(0, endVersion.getVersion(), receiver);
+  }
+
+  @Override
+  public Snapshot loadSnapshot() throws PersistenceException {    
+    // Not supported!
+    return null;
+  }
+
+  @Override
+  public void storeSnapshot(WaveletData waveletData)
+      throws PersistenceException {
+    // No-op    
+  }
+
+  @Override
+  public WaveletDeltaRecord getLastDelta() throws IOException {
+    return getDeltaByEndVersion(endVersion.getVersion());
+  }
+  
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/94fa1878/wave/src/main/java/org/waveprotocol/box/server/persistence/memory/MemoryDeltaCollection.java
----------------------------------------------------------------------
diff --git 
a/wave/src/main/java/org/waveprotocol/box/server/persistence/memory/MemoryDeltaCollection.java
 
b/wave/src/main/java/org/waveprotocol/box/server/persistence/memory/MemoryDeltaCollection.java
index 135b99a..471ff6b 100644
--- 
a/wave/src/main/java/org/waveprotocol/box/server/persistence/memory/MemoryDeltaCollection.java
+++ 
b/wave/src/main/java/org/waveprotocol/box/server/persistence/memory/MemoryDeltaCollection.java
@@ -24,13 +24,16 @@ import com.google.common.collect.Maps;
 import com.google.protobuf.InvalidProtocolBufferException;
 
 import org.waveprotocol.box.common.Receiver;
+import org.waveprotocol.box.server.persistence.PersistenceException;
 import org.waveprotocol.box.server.waveserver.ByteStringMessage;
 import org.waveprotocol.box.server.waveserver.WaveletDeltaRecord;
 import org.waveprotocol.box.server.waveserver.DeltaStore.DeltasAccess;
+import org.waveprotocol.box.server.waveserver.DeltaStore.Snapshot;
 import org.waveprotocol.wave.federation.Proto.ProtocolAppliedWaveletDelta;
 import org.waveprotocol.wave.model.id.WaveletName;
 import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta;
 import org.waveprotocol.wave.model.version.HashedVersion;
+import org.waveprotocol.wave.model.wave.data.WaveletData;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -150,4 +153,20 @@ public class MemoryDeltaCollection implements DeltasAccess 
{
 
     return count;
   }
+
+  @Override
+  public Snapshot loadSnapshot() throws PersistenceException {
+    return null;
+  }
+
+  @Override
+  public void storeSnapshot(WaveletData waveletData)
+      throws PersistenceException {
+    // No op
+  }
+
+  @Override
+  public WaveletDeltaRecord getLastDelta() throws IOException {
+    return endDeltas.get(endVersion.getVersion());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/94fa1878/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDBSnapshotStore.java
----------------------------------------------------------------------
diff --git 
a/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDBSnapshotStore.java
 
b/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDBSnapshotStore.java
new file mode 100644
index 0000000..5b6aa8d
--- /dev/null
+++ 
b/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDBSnapshotStore.java
@@ -0,0 +1,195 @@
+package org.waveprotocol.box.server.persistence.mongodb;
+
+import org.waveprotocol.box.common.comms.WaveClientRpc.WaveletSnapshot;
+import org.waveprotocol.box.server.common.SnapshotSerializer;
+import org.waveprotocol.box.server.persistence.PersistenceException;
+import org.waveprotocol.box.server.waveserver.DeltaStore;
+import org.waveprotocol.wave.model.id.InvalidIdException;
+import org.waveprotocol.wave.model.id.ModernIdSerialiser;
+import org.waveprotocol.wave.model.id.WaveletName;
+import org.waveprotocol.wave.model.operation.OperationException;
+import org.waveprotocol.wave.model.util.Preconditions;
+import org.waveprotocol.wave.model.wave.InvalidParticipantAddress;
+import org.waveprotocol.wave.model.wave.data.ReadableWaveletData;
+import org.waveprotocol.wave.model.wave.data.WaveletData;
+import org.waveprotocol.wave.util.logging.Log;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.mongodb.BasicDBObject;
+import com.mongodb.DB;
+import com.mongodb.DBCollection;
+import com.mongodb.DBObject;
+import com.mongodb.MongoException;
+import com.mongodb.WriteConcern;
+
+/**
+ * A MongoDB-backed store of Wavelet snapshots.
+ * The aim is to avoid the whole processing of deltas when a
+ * wavelet is loaded into server's memory for the first time.
+ * 
+ * This class is not thread-safe.
+ * 
+ * @author pablo...@gmail.com (Pablo Ojanguren)
+ *
+ */
+public class MongoDBSnapshotStore {
+  
+  private static final Log LOG = Log.get(MongoDBSnapshotStore.class);
+  
+  protected static final String WAVE_ID_FIELD  = "waveid";
+  protected static final String WAVELET_ID_FIELD = "waveletid";
+  protected static final String VERSION_FIELD = "version";
+  protected static final String VERSION_HASH_FIELD = "versionhash";
+  protected static final String LASTMOD_FIELD = "lastmod";
+  
+  protected static final String SNAPSHOT_DATA = "data";
+
+  
+  /** Name of the MongoDB collection to store Deltas */
+  protected static final String SNAPSHOT_COLLECTION = "snapshots";
+
+
+  private final DBCollection collection;
+
+  /**
+   * Get a reference to the snapshots store.
+   * 
+   * @param database
+   * @return
+   */
+  public static MongoDBSnapshotStore create(DB database) {
+      Preconditions.checkArgument(database != null, "Unable to get reference 
to mongoDB snapshots collection");
+      DBCollection collection = database.getCollection(SNAPSHOT_COLLECTION);
+      return new MongoDBSnapshotStore(collection);
+  }
+  
+  /**
+   * Construct a new snapshots store.
+   *
+   * @param database the database connection object
+   */
+  protected MongoDBSnapshotStore(DBCollection collection) {
+    this.collection = collection;
+  }  
+  
+  protected void deleteSnapshot(WaveletName waveletName) throws 
PersistenceException {
+    
+    DBObject criteria = new BasicDBObject();
+    criteria.put(WAVE_ID_FIELD, 
ModernIdSerialiser.INSTANCE.serialiseWaveId(waveletName.waveId));
+    criteria.put(WAVELET_ID_FIELD, 
ModernIdSerialiser.INSTANCE.serialiseWaveletId(waveletName.waveletId));
+
+    try {
+      // Using Journaled Write Concern
+      // (http://docs.mongodb.org/manual/core/write-concern/#journaled)
+      collection.remove(criteria, WriteConcern.JOURNALED);
+    } catch (MongoException e) {
+      throw new PersistenceException(e);
+    }
+  }
+  
+  /**
+   * Store a snapshot
+   * 
+   * @param waveletData
+   * @param hashedVersion
+   * @throws PersistenceException 
+   */
+  public void store(ReadableWaveletData waveletData) throws 
PersistenceException {
+    
+    String waveId = 
ModernIdSerialiser.INSTANCE.serialiseWaveId(waveletData.getWaveId());
+    String waveletId = 
ModernIdSerialiser.INSTANCE.serialiseWaveletId(waveletData.getWaveletId());
+    
+  
+    // store new snapshot    
+    BasicDBObject dbo = new BasicDBObject();
+    dbo.put(WAVE_ID_FIELD, 
ModernIdSerialiser.INSTANCE.serialiseWaveId(waveletData.getWaveId()));
+    dbo.put(WAVELET_ID_FIELD, 
ModernIdSerialiser.INSTANCE.serialiseWaveletId(waveletData.getWaveletId()));
+    
+    dbo.put(VERSION_FIELD, waveletData.getHashedVersion().getVersion());
+    dbo.put(VERSION_HASH_FIELD, 
waveletData.getHashedVersion().getHistoryHash());
+    dbo.put(LASTMOD_FIELD,waveletData.getLastModifiedTime());
+    
+
+    WaveletSnapshot snapshot = 
SnapshotSerializer.serializeWavelet(waveletData, 
waveletData.getHashedVersion());    
+    dbo.put(SNAPSHOT_DATA,snapshot.toByteArray()); 
+    
+    try {
+      collection.insert(dbo);
+    } catch (MongoException e) {
+      LOG.warning("Error storing wavelet snapshot for "+waveId+"/"+waveletId, 
e);
+      throw new PersistenceException(e);
+    }    
+    
+    
+    // Delete previous snapshots of this wavelet
+    DBObject query = new BasicDBObject();
+    query.put(WAVE_ID_FIELD, waveId);
+    query.put(WAVELET_ID_FIELD, waveletId);
+    query.put(VERSION_FIELD, new BasicDBObject("$lt", 
waveletData.getHashedVersion().getVersion()));
+    
+    try {
+        collection.remove(query);
+    } catch (MongoException e) {
+      LOG.warning("Error deleting outdated wavelet snapshots for 
"+waveId+"/"+waveletId, e);
+      throw new PersistenceException(e);
+    }
+    
+    
+    
+    LOG.info("Stored snaphost for "+waveId+"/"+waveletId+" version "+ 
waveletData.getHashedVersion().getVersion());
+  }
+  
+  
+  public DeltaStore.Snapshot load(WaveletName waveletName) throws 
PersistenceException {
+        
+    String waveId = 
ModernIdSerialiser.INSTANCE.serialiseWaveId(waveletName.waveId);
+    String waveletId = 
ModernIdSerialiser.INSTANCE.serialiseWaveletId(waveletName.waveletId);
+    
+    // find last snapshot stored
+    DBObject query = new BasicDBObject();
+    query.put(WAVE_ID_FIELD, waveId);
+    query.put(WAVELET_ID_FIELD, waveletId);
+    
+    DBObject snapshotDBObject = null;
+
+    try {
+      snapshotDBObject = collection.findOne(query);
+    } catch (MongoException e) {
+      LOG.warning("Error querying wavelet snapshots for 
"+waveId+"/"+waveletId, e);
+      throw new PersistenceException(e);
+    }
+    
+    if (snapshotDBObject == null)
+      return null;
+    
+    
+    WaveletSnapshot snapshot = null;
+    try {
+      snapshot = WaveletSnapshot.parseFrom((byte[]) 
snapshotDBObject.get(SNAPSHOT_DATA));
+    } catch (InvalidProtocolBufferException e) {
+      throw new PersistenceException(e);
+    }
+    
+    try {
+    
+      final WaveletData waveletData = 
SnapshotSerializer.deserializeWavelet(snapshot, waveletName.waveId);
+      
+      return new DeltaStore.Snapshot() {           
+  
+        @Override
+        public WaveletData getWaveletData() {
+         
+          return waveletData;
+        }
+
+      };
+    
+    } catch (OperationException e) {
+      throw new PersistenceException(e);
+    } catch (InvalidParticipantAddress e) {
+      throw new PersistenceException(e);
+    } catch (InvalidIdException e) {
+      throw new PersistenceException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/94fa1878/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaCollection.java
----------------------------------------------------------------------
diff --git 
a/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaCollection.java
 
b/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaCollection.java
index 0ed4a48..aacb793 100644
--- 
a/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaCollection.java
+++ 
b/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaCollection.java
@@ -19,11 +19,8 @@
 
 package org.waveprotocol.box.server.persistence.mongodb;
 
-import com.mongodb.BasicDBObject;
-import com.mongodb.DBCollection;
-import com.mongodb.DBCursor;
-import com.mongodb.DBObject;
-import com.mongodb.WriteConcern;
+import java.io.IOException;
+import java.util.Collection;
 
 import org.waveprotocol.box.common.Receiver;
 import org.waveprotocol.box.server.persistence.PersistenceException;
@@ -34,10 +31,15 @@ import 
org.waveprotocol.wave.federation.Proto.ProtocolAppliedWaveletDelta;
 import org.waveprotocol.wave.model.id.WaveletName;
 import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta;
 import org.waveprotocol.wave.model.version.HashedVersion;
+import org.waveprotocol.wave.model.wave.data.WaveletData;
 import org.waveprotocol.wave.util.logging.Log;
 
-import java.io.IOException;
-import java.util.Collection;
+import com.google.gwt.thirdparty.guava.common.base.Preconditions;
+import com.mongodb.BasicDBObject;
+import com.mongodb.DBCollection;
+import com.mongodb.DBCursor;
+import com.mongodb.DBObject;
+import com.mongodb.WriteConcern;
 
 /**
  * A MongoDB based Delta Access implementation using a simple <b>deltas</b>
@@ -51,21 +53,31 @@ public class MongoDbDeltaCollection implements 
DeltaStore.DeltasAccess {
   private static final Log LOG = Log.get(MongoDbDeltaCollection.class);
 
 
+  
   /** Wavelet name to work with. */
   private final WaveletName waveletName;
 
   /** MongoDB Collection object for delta storage */
-  private final DBCollection deltaDbCollection;
-
+  private final DBCollection deltasCollection;
+  
+  /** MongoDB based wavelet snapshot store */
+  private final MongoDBSnapshotStore snapshotStore;
+
+  
+  public static MongoDbDeltaCollection create(WaveletName waveletName, 
DBCollection deltasCollection, MongoDBSnapshotStore snapshotStore) {
+    return new MongoDbDeltaCollection(waveletName, deltasCollection, 
snapshotStore);
+  }
+  
   /**
    * Construct a new Delta Access object for the wavelet
    *
    * @param waveletName The wavelet name.
    * @param deltaDbCollection The MongoDB deltas collection
    */
-  public MongoDbDeltaCollection(WaveletName waveletName, DBCollection 
deltaDbCollection) {
+  public MongoDbDeltaCollection(WaveletName waveletName, DBCollection 
deltasCollection, MongoDBSnapshotStore snapshotStore) {
     this.waveletName = waveletName;
-    this.deltaDbCollection = deltaDbCollection;
+    this.deltasCollection = deltasCollection;
+    this.snapshotStore = snapshotStore;
   }
 
   @Override
@@ -90,7 +102,7 @@ public class MongoDbDeltaCollection implements 
DeltaStore.DeltasAccess {
   @Override
   public boolean isEmpty() {
 
-    return deltaDbCollection.count(createWaveletDBQuery()) == 0;
+    return deltasCollection.count(createWaveletDBQuery()) == 0;
   }
 
   @Override
@@ -106,7 +118,7 @@ public class MongoDbDeltaCollection implements 
DeltaStore.DeltasAccess {
     DBObject field = new BasicDBObject();
     field.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_RESULTINGVERSION, 1);
 
-    DBObject result = deltaDbCollection.findOne(query, field, sort);
+    DBObject result = deltasCollection.findOne(query, field, sort);
 
     return result != null ? MongoDbDeltaStoreUtil
         .deserializeHashedVersion((DBObject) ((DBObject) result
@@ -120,7 +132,7 @@ public class MongoDbDeltaCollection implements 
DeltaStore.DeltasAccess {
     DBObject query = createWaveletDBQuery();
     query.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_APPLIEDATVERSION, 
version);
 
-    DBObject result = deltaDbCollection.findOne(query);
+    DBObject result = deltasCollection.findOne(query);
 
     WaveletDeltaRecord waveletDelta = null;
 
@@ -137,7 +149,7 @@ public class MongoDbDeltaCollection implements 
DeltaStore.DeltasAccess {
     DBObject query = createWaveletDBQuery();
     
query.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_RESULTINGVERSION_VERSION, 
version);
 
-    DBObject result = deltaDbCollection.findOne(query);
+    DBObject result = deltasCollection.findOne(query);
 
     WaveletDeltaRecord waveletDelta = null;
 
@@ -156,7 +168,7 @@ public class MongoDbDeltaCollection implements 
DeltaStore.DeltasAccess {
     DBObject query = createWaveletDBQuery();
     query.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_APPLIEDATVERSION, 
version);
 
-    DBObject result = deltaDbCollection.findOne(query);
+    DBObject result = deltasCollection.findOne(query);
 
     if (result != null)
       return MongoDbDeltaStoreUtil.deserializeHashedVersion((DBObject) result
@@ -169,7 +181,7 @@ public class MongoDbDeltaCollection implements 
DeltaStore.DeltasAccess {
     DBObject query = createWaveletDBQuery();
     query.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_APPLIEDATVERSION, 
version);
 
-    DBObject result = deltaDbCollection.findOne(query);
+    DBObject result = deltasCollection.findOne(query);
 
     if (result != null)
       return MongoDbDeltaStoreUtil.deserializeHashedVersion((DBObject) result
@@ -203,7 +215,7 @@ public class MongoDbDeltaCollection implements 
DeltaStore.DeltasAccess {
     for (WaveletDeltaRecord delta : newDeltas) {
       // Using Journaled Write Concern
       // (http://docs.mongodb.org/manual/core/write-concern/#journaled)
-      deltaDbCollection.insert(MongoDbDeltaStoreUtil.serialize(delta,
+      deltasCollection.insert(MongoDbDeltaStoreUtil.serialize(delta,
           waveletName.waveId.serialise(), waveletName.waveletId.serialise()),
           WriteConcern.JOURNALED);
     }
@@ -219,7 +231,7 @@ public class MongoDbDeltaCollection implements 
DeltaStore.DeltasAccess {
     sort.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED + "."
         + MongoDbDeltaStoreUtil.FIELD_APPLICATIONTIMESTAMP, 1);
 
-    DBCursor result = deltaDbCollection.find(query).sort(sort);
+    DBCursor result = deltasCollection.find(query).sort(sort);
 
     long count = 0;
     HashedVersion lastResultingVersion = null;
@@ -258,15 +270,13 @@ public class MongoDbDeltaCollection implements 
DeltaStore.DeltasAccess {
       Receiver<WaveletDeltaRecord> receiver) throws IOException {
 
     BasicDBObject sort = new BasicDBObject();
-    sort.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_APPLIEDATVERSION, 1);
-    sort.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED + "."
-        + MongoDbDeltaStoreUtil.FIELD_APPLICATIONTIMESTAMP, 1);
+    sort.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_RESULTINGVERSION_VERSION, 
1);
 
     DBObject query = createWaveletDBQuery();
-    query.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_APPLIEDATVERSION,
-        new BasicDBObject("$gte", startVersion).append("$lte", endVersion));
+    query.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_RESULTINGVERSION_VERSION,
+        new BasicDBObject("$gt", startVersion).append("$lte", endVersion));
 
-    DBCursor result = deltaDbCollection.find(query).sort(sort);
+    DBCursor result = deltasCollection.find(query).sort(sort);
 
     long count = 0;
     HashedVersion lastResultingVersion = null;
@@ -298,4 +308,41 @@ public class MongoDbDeltaCollection implements 
DeltaStore.DeltasAccess {
 
     return count;
   }
+
+  @Override
+  public WaveletDeltaRecord getLastDelta() throws IOException {
+    
+    // Search the max of delta.getTransformedDelta().getResultingVersion()
+
+    DBObject query = createWaveletDBQuery();
+
+    DBObject sort = new BasicDBObject();
+    sort.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_RESULTINGVERSION_VERSION, 
-1); // Descending
+
+
+    DBObject result = deltasCollection.findOne(query, null, sort);
+
+    try {
+      return result != null ? MongoDbDeltaStoreUtil
+          .deserializeWaveletDeltaRecord(result) : null;
+    } catch (PersistenceException e) {
+      throw new IOException(e);
+    }
+    
+  }
+  
+  @Override
+  public DeltaStore.Snapshot loadSnapshot() throws PersistenceException {      
     
+    return snapshotStore.load(waveletName);
+  }
+
+  @Override
+  public void storeSnapshot(WaveletData waveletData)
+      throws PersistenceException {
+    
+    
Preconditions.checkArgument(waveletName.equals(WaveletName.of(waveletData.getWaveId(),
 waveletData.getWaveletId())), 
+        "Can't store snapshots for different wavelet");
+    
+    snapshotStore.store(waveletData);  
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/94fa1878/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaStore.java
----------------------------------------------------------------------
diff --git 
a/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaStore.java
 
b/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaStore.java
index 05aacbd..313b928 100644
--- 
a/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaStore.java
+++ 
b/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaStore.java
@@ -36,37 +36,71 @@ import org.waveprotocol.box.server.waveserver.DeltaStore;
 import org.waveprotocol.wave.model.id.WaveId;
 import org.waveprotocol.wave.model.id.WaveletId;
 import org.waveprotocol.wave.model.id.WaveletName;
+import org.waveprotocol.wave.util.logging.Log;
 
 import java.util.List;
 
 /**
- * A MongoDB based Delta Store implementation using a simple <b>deltas</b>
- * collection, storing a delta record per each MongoDb document.
+ * A MongoDB based Delta Store implementation using a <b>deltas</b>
+ * collection and a snapshots collection.
+ * 
+ * Each deltas is stored as a MongoDb document.
+ * 
+ * For snapshots store details check out {@link MongoDBSnapshotStore}
  *
  * @author pablo...@gmail.com (Pablo Ojanguren)
  *
  */
 public class MongoDbDeltaStore implements DeltaStore {
-
+  
+  private static final Log LOG = Log.get(MongoDbDeltaStore.class);
+  
   /** Name of the MongoDB collection to store Deltas */
   private static final String DELTAS_COLLECTION = "deltas";
 
-  /** Database connection object */
-  private final DB database;
+  /** MongoDB Collection for deltas */
+  private final DBCollection deltasCollection;
+  
+  /** A specific class handling snapshots */
+  private final MongoDBSnapshotStore snapshotStore;
 
   /**
-   * Construct a new store
+   * Creates a mongoDB based delta/snapshot store.
+   * 
+   * @param database
+   * @return
+   */
+  public static MongoDbDeltaStore create(DB database) {
+    
+    DBCollection deltasCollection = database.getCollection(DELTAS_COLLECTION); 
   
+    checkDeltasCollectionIndexes(deltasCollection);
+    MongoDBSnapshotStore snapshotStore = MongoDBSnapshotStore.create(database);
+    
+    return new MongoDbDeltaStore(deltasCollection, snapshotStore);
+  }
+  
+  private static void checkDeltasCollectionIndexes(DBCollection 
deltasCollection) {
+    
+    LOG.info("For production environments, set MongoDB index in 'deltas' 
collection with fields 'waveid', 'waveletid' and 
'transformed.resultingversion'");
+    
+  }
+  
+  
+  /**
+   * Construct a new store instance
    *
    * @param database the database connection object
    */
-  public MongoDbDeltaStore(DB database) {
-    this.database = database;
+  private MongoDbDeltaStore(DBCollection deltasCollection, 
MongoDBSnapshotStore snapshotStore) {
+    this.deltasCollection = deltasCollection;
+    this.snapshotStore = snapshotStore;
   }
+  
+  
 
   @Override
   public DeltasAccess open(WaveletName waveletName) throws 
PersistenceException {
-
-    return new MongoDbDeltaCollection(waveletName, getDeltaDbCollection());
+    return new MongoDbDeltaCollection(waveletName, deltasCollection, 
snapshotStore);
   }
 
   @Override
@@ -80,10 +114,14 @@ public class MongoDbDeltaStore implements DeltaStore {
     try {
       // Using Journaled Write Concern
       // (http://docs.mongodb.org/manual/core/write-concern/#journaled)
-      getDeltaDbCollection().remove(criteria, WriteConcern.JOURNALED);
+      deltasCollection.remove(criteria, WriteConcern.JOURNALED);
     } catch (MongoException e) {
       throw new PersistenceException(e);
     }
+    
+    // Also delete wavelet snapshots
+    snapshotStore.deleteSnapshot(waveletName);
+
   }
 
   @Override
@@ -99,7 +137,7 @@ public class MongoDbDeltaStore implements DeltaStore {
     DBCursor cursor = null;
 
     try {
-      cursor = getDeltaDbCollection().find(query, projection);
+      cursor = deltasCollection.find(query, projection);
     } catch (MongoException e) {
       throw new PersistenceException(e);
     }
@@ -126,7 +164,7 @@ public class MongoDbDeltaStore implements DeltaStore {
     try {
 
       @SuppressWarnings("rawtypes")
-      List results = 
getDeltaDbCollection().distinct(MongoDbDeltaStoreUtil.FIELD_WAVE_ID);
+      List results = 
deltasCollection.distinct(MongoDbDeltaStoreUtil.FIELD_WAVE_ID);
 
       for (Object o : results)
         builder.add(WaveId.deserialise((String) o));
@@ -138,13 +176,5 @@ public class MongoDbDeltaStore implements DeltaStore {
 
     return ExceptionalIterator.FromIterator.create(builder.build().iterator());
   }
-
-  /**
-   * Access to deltas collection
-   *
-   * @return DBCollection of deltas
-   */
-  private DBCollection getDeltaDbCollection() {
-    return database.getCollection(DELTAS_COLLECTION);
-  }
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/94fa1878/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbProvider.java
----------------------------------------------------------------------
diff --git 
a/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbProvider.java
 
b/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbProvider.java
index c48a8a6..bc78406 100644
--- 
a/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbProvider.java
+++ 
b/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbProvider.java
@@ -159,7 +159,7 @@ public class MongoDbProvider {
    */
   public MongoDbDeltaStore provideMongoDbDeltaStore() {
     if (mongoDbDeltaStore == null) {
-      mongoDbDeltaStore = new MongoDbDeltaStore(getDatabase());
+      mongoDbDeltaStore = MongoDbDeltaStore.create(getDatabase());
     }
 
     return mongoDbDeltaStore;

http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/94fa1878/wave/src/main/java/org/waveprotocol/box/server/waveserver/DeltaStore.java
----------------------------------------------------------------------
diff --git 
a/wave/src/main/java/org/waveprotocol/box/server/waveserver/DeltaStore.java 
b/wave/src/main/java/org/waveprotocol/box/server/waveserver/DeltaStore.java
index 8ca5a5d..b34e01a 100644
--- a/wave/src/main/java/org/waveprotocol/box/server/waveserver/DeltaStore.java
+++ b/wave/src/main/java/org/waveprotocol/box/server/waveserver/DeltaStore.java
@@ -20,6 +20,8 @@
 package org.waveprotocol.box.server.waveserver;
 
 import org.waveprotocol.box.server.persistence.PersistenceException;
+import org.waveprotocol.wave.model.version.HashedVersion;
+import org.waveprotocol.wave.model.wave.data.WaveletData;
 
 import java.io.Closeable;
 import java.util.Collection;
@@ -28,12 +30,26 @@ import java.util.Collection;
  * Stores wavelet deltas.
  *
  * @author so...@google.com (Soren Lassen)
+ * @author pablo...@gmail.com (Pablo Ojanguren)
  */
 public interface DeltaStore extends WaveletStore<DeltaStore.DeltasAccess> {
 
   /**
+   * Wavelet snapshot and resulting version together.
+   * 
+   * @author pablo...@gmail.com (Pablo Ojanguren)
+   */
+  interface Snapshot {
+    
+    /** Gets the wavelet snapshot */
+    public WaveletData getWaveletData();
+  }
+  
+  /**
    * Accesses the delta history for a wavelet.
    * Permits reading historical deltas and appending deltas to the history.
+   * 
+   * Optionally, it can provide service to load and store snapshots. 
    */
   interface DeltasAccess extends WaveletDeltaRecordReader, Closeable {
     /**
@@ -50,5 +66,24 @@ public interface DeltaStore extends 
WaveletStore<DeltaStore.DeltasAccess> {
      *         storage.
      */
     void append(Collection<WaveletDeltaRecord> deltas) throws 
PersistenceException;
+    
+    /**
+     * Loads the last snapshot of the wavelet.
+     * 
+     * @return the wavelet data object
+     * @throws PersistenceException if anything goes wrong with the underlying
+     *         storage.
+     */
+    Snapshot loadSnapshot() throws PersistenceException;
+    
+    /**
+     * Stores a wavelet snapshot as the last one.  
+     * 
+     * @param waveletData the wavelet data
+     * @throws PersistenceException if anything goes wrong with the underlying
+     *         storage.
+     */
+    void storeSnapshot(WaveletData waveletData) throws PersistenceException;
+    
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/94fa1878/wave/src/main/java/org/waveprotocol/box/server/waveserver/DeltaStoreBasedSnapshotStore.java
----------------------------------------------------------------------
diff --git 
a/wave/src/main/java/org/waveprotocol/box/server/waveserver/DeltaStoreBasedSnapshotStore.java
 
b/wave/src/main/java/org/waveprotocol/box/server/waveserver/DeltaStoreBasedSnapshotStore.java
index d564470..0d2c449 100644
--- 
a/wave/src/main/java/org/waveprotocol/box/server/waveserver/DeltaStoreBasedSnapshotStore.java
+++ 
b/wave/src/main/java/org/waveprotocol/box/server/waveserver/DeltaStoreBasedSnapshotStore.java
@@ -188,6 +188,11 @@ class DeltaStoreBasedSnapshotStore implements 
DeltaAndSnapshotStore {
       isClosed = true;
       deltasAccess.close();
     }
+
+    @Override
+    public WaveletDeltaRecord getLastDelta() throws IOException {
+      return deltasAccess.getLastDelta();
+    }
   }
 
   private final DeltaStore deltaStore;

http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/94fa1878/wave/src/main/java/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java
----------------------------------------------------------------------
diff --git 
a/wave/src/main/java/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java
 
b/wave/src/main/java/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java
index 8581fee..983aece 100644
--- 
a/wave/src/main/java/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java
+++ 
b/wave/src/main/java/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java
@@ -35,6 +35,8 @@ import 
org.waveprotocol.box.server.persistence.PersistenceException;
 import org.waveprotocol.box.server.util.WaveletDataUtil;
 import org.waveprotocol.wave.federation.Proto.ProtocolAppliedWaveletDelta;
 import org.waveprotocol.wave.model.id.IdURIEncoderDecoder;
+import org.waveprotocol.wave.model.id.IdUtil;
+import org.waveprotocol.wave.model.id.ModernIdSerialiser;
 import org.waveprotocol.wave.model.id.WaveletName;
 import org.waveprotocol.wave.model.operation.OperationException;
 import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta;
@@ -47,7 +49,6 @@ import org.waveprotocol.wave.util.escapers.jvm.JavaUrlCodec;
 import org.waveprotocol.wave.util.logging.Log;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableMap;
@@ -60,18 +61,26 @@ import java.util.concurrent.atomic.AtomicReference;
 /**
  * Simplistic {@link DeltaStore}-backed wavelet state implementation
  * which goes to persistent storage for every history request.
- *
- * TODO(soren): rewire this class to be backed by {@link WaveletStore} and
- * read the snapshot from there instead of computing it in the
- * DeltaStoreBasedWaveletState constructor.
+ * <br/><br/>
+ * If implementation of {@link DeltaStore.DeltaAccess} supports snapshot 
storage,
+ * this class will take advantage of it:
+ * <br/><br/>
+ * On creation, it'll try to load a snapshot from the storage instead of 
compose all deltas.
+ * If the snapshot is built from deltas composition, persist the snapshot.
+ * <br/><br/>
+ * Snapshot updates will be persisted every time a fixed number of deltas are 
added to the wavelet.
+ * This will be done by the persistence task, after deltas are persisted.
+ * 
  *
  * @author so...@google.com (Soren Lassen)
  * @author akapla...@gmail.com (Andew Kaplanov)
+ * @author pablo...@gmail.com (Pablo Ojanguren)
  */
 class DeltaStoreBasedWaveletState implements WaveletState {
 
   private static final Log LOG = Log.get(DeltaStoreBasedWaveletState.class);
-
+  
+  
   private static final IdURIEncoderDecoder URI_CODEC =
       new IdURIEncoderDecoder(new JavaUrlCodec());
 
@@ -111,21 +120,133 @@ class DeltaStoreBasedWaveletState implements 
WaveletState {
    *         processing stored deltas
    */
   public static DeltaStoreBasedWaveletState create(DeltaStore.DeltasAccess 
deltasAccess,
-      Executor persistExecutor) throws PersistenceException {
+      Executor persistExecutor) throws PersistenceException { 
+    return create(deltasAccess, persistExecutor, 250);
+  }
+  
+  /**
+   * Creates a new delta store based state.
+   *
+   * The executor must ensure that only one thread executes at any time for 
each
+   * state instance.
+   *
+   * @param deltasAccess delta store accessor
+   * @param persistExecutor executor for making persistence calls
+   * @param persistSnapshotOnDeltasCount number of deltas to receive before 
storing the snapshot
+   * @return a state initialized from the deltas
+   * @throws PersistenceException if a failure occurs while reading or
+   *         processing stored deltas
+   */
+  public static DeltaStoreBasedWaveletState create(DeltaStore.DeltasAccess 
deltasAccess,
+      Executor persistExecutor, int persistSnapshotOnDeltasCount) throws 
PersistenceException {
     if (deltasAccess.isEmpty()) {
-      return new DeltaStoreBasedWaveletState(deltasAccess, 
ImmutableList.<WaveletDeltaRecord>of(),
-          null, persistExecutor);
+      return new DeltaStoreBasedWaveletState(deltasAccess, null, 
persistExecutor, persistSnapshotOnDeltasCount);
     } else {
-      try {
-        ImmutableList<WaveletDeltaRecord> deltas = readAll(deltasAccess, null);
-        WaveletData snapshot = 
WaveletDataUtil.buildWaveletFromDeltas(deltasAccess.getWaveletName(),
-            Iterators.transform(deltas.iterator(), TRANSFORMED));
-        return new DeltaStoreBasedWaveletState(deltasAccess, deltas, snapshot, 
persistExecutor);
-      } catch (IOException e) {
-        throw new PersistenceException("Failed to read stored deltas", e);
-      } catch (OperationException e) {
-        throw new PersistenceException("Failed to compose stored deltas", e);
+      
+      long t1 = 0;
+      long t2 = 0;
+      
+      ImmutableList<WaveletDeltaRecord> deltas = null;
+      WaveletData snapshot = null;
+
+      // Get snapshot from persistence. 
+      // Snapshot storage is optional for underlying {@link 
DeltaStore.DeltasAccess}
+      // In case of no available snapshot, build it from deltas.
+      
+      t1 = System.currentTimeMillis();       
+      DeltaStore.Snapshot persistenceSnapshot = deltasAccess.loadSnapshot();   
   
+      t2 = System.currentTimeMillis();
+      
+      if (persistenceSnapshot != null) {
+                
+        try {
+          
+          WaveletDeltaRecord lastStoredDelta = deltasAccess.getLastDelta();
+
+          // Is the persisted snapshot up to date?
+          
+          if (lastStoredDelta.getResultingVersion().getVersion() > 
persistenceSnapshot
+              .getWaveletData().getVersion()) {
+
+            long startVersion = 
persistenceSnapshot.getWaveletData().getHashedVersion().getVersion();
+            long endVersion = 
lastStoredDelta.getResultingVersion().getVersion();
+
+            deltasAccess.getDeltasInRange(startVersion, endVersion,
+                new Receiver<WaveletDeltaRecord>() {
+
+                  @Override
+                  public boolean put(WaveletDeltaRecord delta) {
+                    try {
+                      
WaveletDataUtil.applyWaveletDelta(delta.getTransformedDelta(),
+                          persistenceSnapshot.getWaveletData());
+                    } catch (OperationException e) {
+                      return false;
+                    }
+                    return true;
+                  }
+                });
+            
+            t2 = System.currentTimeMillis();
+            
+            snapshot = persistenceSnapshot.getWaveletData();
+
+          } else if (lastStoredDelta.getResultingVersion().getVersion() == 
persistenceSnapshot
+              .getWaveletData().getVersion()) {
+
+            snapshot = persistenceSnapshot.getWaveletData();
+
+          } else {
+            // Wow, the snapshot has a version higher than last delta!
+            // That's impossible, let's suppose delta history is right and
+            // ignore snapshot
+          }
+
+        } catch (IllegalStateException e) {
+          throw new PersistenceException("Failed to compose wavelet snapshot", 
e);
+        } catch (IOException e) {
+          throw new PersistenceException("Failed to compose wavelet snapshot", 
e);
+        }
+
+        String waveletName = 
ModernIdSerialiser.INSTANCE.serialiseWaveId(deltasAccess.getWaveletName().waveId)+"/"+
+            deltasAccess.getWaveletName().waveletId.getId();
+        
+        LOG.info("Snapshot loaded for "+waveletName+" in "+ (t2-t1) +"ms");
+        
       }
+        
+        // Build snapshot from deltas if it is not already built
+        if (snapshot == null) {          
+          
+        try {
+
+          t1 = System.currentTimeMillis(); 
+          
+          deltas = readAll(deltasAccess, null);
+          snapshot = 
WaveletDataUtil.buildWaveletFromDeltas(deltasAccess.getWaveletName(),
+              Iterators.transform(deltas.iterator(), TRANSFORMED));
+
+          t2 = System.currentTimeMillis();
+          
+          String waveletName = 
ModernIdSerialiser.INSTANCE.serialiseWaveId(deltasAccess.getWaveletName().waveId)+"/"+
+              deltasAccess.getWaveletName().waveletId.getId();
+          
+          LOG.info("Snapshot built for "+waveletName+" in "+ (t2-t1) +"ms");
+          
+          // Persist the snapshot if it is not user wavelet
+          if (!IdUtil.isUserDataWavelet(snapshot.getWaveletId()))
+            deltasAccess.storeSnapshot(snapshot);
+
+        } catch (IOException e) {
+          throw new PersistenceException("Failed to read stored deltas", e);
+        } catch (OperationException e) {
+          throw new PersistenceException("Failed to compose stored deltas", e);
+        }
+          
+        }
+                
+        return new DeltaStoreBasedWaveletState(deltasAccess, snapshot, 
persistExecutor, persistSnapshotOnDeltasCount);
+        
+
     }
   }
 
@@ -195,11 +316,25 @@ class DeltaStoreBasedWaveletState implements WaveletState 
{
 
   /** The persist task that will be executed next. */
   private ListenableFutureTask<Void> nextPersistTask = null;
+  
+  /** 
+   * the number of deltas to be processed before to
+   * persist the snapshot.
+   */
+  private final int persistSnapshotOnDeltasCount;
 
+  /** 
+   * Counter of processed deltas in order to persist the snapshot 
+   * if its value excess {@link persistSnapshotOnDeltaCount} 
+   */
+  private int deltasCountBeforeSnapshotStore = 0;
+  
   /**
    * Processes the persist task and checks if there is another task to do when
    * one task is done. In such a case, it writes all waiting to be persisted
    * deltas to persistent storage in one operation.
+   * 
+   * Also persist the snapshot if it is required.
    */
   private final Callable<Void> persisterTask = new Callable<Void>() {
     @Override
@@ -225,6 +360,14 @@ class DeltaStoreBasedWaveletState implements WaveletState {
         } while (v.getVersion() < version.getVersion());
         Preconditions.checkState(v.equals(version));
         deltasAccess.append(deltas.build());
+        
+        if (deltasCountBeforeSnapshotStore >= persistSnapshotOnDeltasCount) {
+          synchronized (persistLock) {
+            deltasAccess.storeSnapshot(snapshot);
+            deltasCountBeforeSnapshotStore = 0;
+          }          
+        }
+        
       }
       synchronized (persistLock) {
         Preconditions.checkState(last == lastPersistedVersion.get(),
@@ -242,9 +385,11 @@ class DeltaStoreBasedWaveletState implements WaveletState {
   };
 
   /** Keyed by appliedAtVersion. */
+  /*
   private final ConcurrentNavigableMap<HashedVersion, 
ByteStringMessage<ProtocolAppliedWaveletDelta>> appliedDeltas =
       new ConcurrentSkipListMap<HashedVersion, 
ByteStringMessage<ProtocolAppliedWaveletDelta>>();
-
+  */
+  
   /** Keyed by appliedAtVersion. */
   private final ConcurrentNavigableMap<HashedVersion, WaveletDeltaRecord> 
cachedDeltas =
       new ConcurrentSkipListMap<HashedVersion, WaveletDeltaRecord>();
@@ -261,23 +406,24 @@ class DeltaStoreBasedWaveletState implements WaveletState 
{
   private final AtomicReference<HashedVersion> lastPersistedVersion;
 
   /**
-   * Constructs a wavelet state with the given deltas and snapshot.
-   * The deltas must be the contents of deltasAccess, and they
-   * must be contiguous from version zero.
+   * Constructs a wavelet state with the given snapshot.
+
    * The snapshot must be the composition of the deltas, or null if there
    * are no deltas. The constructed object takes ownership of the
    * snapshot and will mutate it if appendDelta() is called.
+   * 
+   * The delta store is responsible to update the snapshot 
+   * but no necessarily for each appendDelta(). Hence it is expected that
+   * snapshots are stored less frequently.
    */
   @VisibleForTesting
-  DeltaStoreBasedWaveletState(DeltaStore.DeltasAccess deltasAccess,
-      List<WaveletDeltaRecord> deltas, WaveletData snapshot, Executor 
persistExecutor) {
-    Preconditions.checkArgument(deltasAccess.isEmpty() == deltas.isEmpty());
-    Preconditions.checkArgument(deltas.isEmpty() == (snapshot == null));
+  DeltaStoreBasedWaveletState(DeltaStore.DeltasAccess deltasAccess, 
WaveletData snapshot, Executor persistExecutor, int 
persistSnapshotOnDeltasCount) {
     this.persistExecutor = persistExecutor;
     this.versionZero = 
HASH_FACTORY.createVersionZero(deltasAccess.getWaveletName());
     this.deltasAccess = deltasAccess;
     this.snapshot = snapshot;
     this.lastPersistedVersion = new 
AtomicReference<HashedVersion>(deltasAccess.getEndVersion());
+    this.persistSnapshotOnDeltasCount = persistSnapshotOnDeltasCount;
   }
 
   @Override
@@ -446,15 +592,24 @@ class DeltaStoreBasedWaveletState implements WaveletState 
{
         "Applied version %s doesn't match current version %s", 
deltaRecord.getAppliedAtVersion(),
         currentVersion);
 
-    if (deltaRecord.getAppliedAtVersion().getVersion() == 0) {
-      Preconditions.checkState(lastPersistedVersion.get() == null);
-      snapshot = WaveletDataUtil.buildWaveletFromFirstDelta(getWaveletName(), 
deltaRecord.getTransformedDelta());
-    } else {
-      WaveletDataUtil.applyWaveletDelta(deltaRecord.getTransformedDelta(), 
snapshot);
-    }
+    
+
+      if (deltaRecord.getAppliedAtVersion().getVersion() == 0) {
+        Preconditions.checkState(lastPersistedVersion.get() == null);
+        snapshot = 
WaveletDataUtil.buildWaveletFromFirstDelta(getWaveletName(), 
deltaRecord.getTransformedDelta());
+      } else {
+        // Avoid to update snapshot when it has being persisted        
+        synchronized (persistLock) {
+          WaveletDataUtil.applyWaveletDelta(deltaRecord.getTransformedDelta(), 
snapshot);
+        }
+      }
 
+   
     // Now that we built the snapshot without any exceptions, we record the 
delta.
     cachedDeltas.put(deltaRecord.getAppliedAtVersion(), deltaRecord);
+    
+    // Increment counter controlling snapshot persistence
+    deltasCountBeforeSnapshotStore++;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/94fa1878/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveServerModule.java
----------------------------------------------------------------------
diff --git 
a/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveServerModule.java
 
b/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveServerModule.java
index 41d60bf..578f9b6 100644
--- 
a/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveServerModule.java
+++ 
b/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveServerModule.java
@@ -27,6 +27,8 @@ import com.google.inject.Inject;
 import com.google.inject.Provides;
 import com.google.inject.Singleton;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
 import 
org.waveprotocol.box.server.executor.ExecutorAnnotations.StorageContinuationExecutor;
 import 
org.waveprotocol.box.server.executor.ExecutorAnnotations.WaveletLoadExecutor;
 import org.waveprotocol.box.server.persistence.PersistenceException;
@@ -52,6 +54,7 @@ public class WaveServerModule extends AbstractModule {
   private final Executor waveletLoadExecutor;
   private final Executor storageContinuationExecutor;
   private final boolean enableFederation;
+  private final int persistSnapshotOnDeltasCount;
 
 
   @Inject
@@ -59,6 +62,13 @@ public class WaveServerModule extends AbstractModule {
       @WaveletLoadExecutor Executor waveletLoadExecutor,
       @StorageContinuationExecutor Executor storageContinuationExecutor) {
     this.enableFederation = config.getBoolean("federation.enable_federation");
+    int deltaCountForPersistSnapshots = 250;
+    try { 
+      deltaCountForPersistSnapshots = 
config.getInt("core.persist_snapshots_on_deltas_count");
+    } catch (ConfigException.Missing e) {
+      e.printStackTrace();
+    }
+    this.persistSnapshotOnDeltasCount = deltaCountForPersistSnapshots;
     this.waveletLoadExecutor = waveletLoadExecutor;
     this.storageContinuationExecutor = storageContinuationExecutor;
   }
@@ -106,7 +116,7 @@ public class WaveServerModule extends AbstractModule {
       public LocalWaveletContainer create(WaveletNotificationSubscriber 
notifiee,
           WaveletName waveletName, String waveDomain) {
         return new LocalWaveletContainerImpl(waveletName, notifiee, 
loadWaveletState(
-            waveletLoadExecutor, deltaStore, waveletName, 
waveletLoadExecutor), waveDomain,
+            waveletLoadExecutor, deltaStore, waveletName, waveletLoadExecutor, 
persistSnapshotOnDeltasCount), waveDomain,
             storageContinuationExecutor);
       }
     };
@@ -121,7 +131,7 @@ public class WaveServerModule extends AbstractModule {
       public RemoteWaveletContainer create(WaveletNotificationSubscriber 
notifiee,
           WaveletName waveletName, String waveDomain) {
         return new RemoteWaveletContainerImpl(waveletName, notifiee, 
loadWaveletState(
-            waveletLoadExecutor, deltaStore, waveletName, waveletLoadExecutor),
+            waveletLoadExecutor, deltaStore, waveletName, waveletLoadExecutor, 
persistSnapshotOnDeltasCount),
             storageContinuationExecutor);
       }
     };
@@ -147,16 +157,16 @@ public class WaveServerModule extends AbstractModule {
    */
   @VisibleForTesting
   static ListenableFuture<DeltaStoreBasedWaveletState> 
loadWaveletState(Executor executor,
-      final DeltaStore deltaStore, final WaveletName waveletName, final 
Executor persistExecutor) {
-    ListenableFutureTask<DeltaStoreBasedWaveletState> task =
-        ListenableFutureTask.create(
-           new Callable<DeltaStoreBasedWaveletState>() {
-             @Override
-             public DeltaStoreBasedWaveletState call() throws 
PersistenceException {
-               return 
DeltaStoreBasedWaveletState.create(deltaStore.open(waveletName),
-                                                                
persistExecutor);
-             }
-           });
+      final DeltaStore deltaStore, final WaveletName waveletName, final 
Executor persistExecutor,
+      final int persistSnapshotOnDeltasCount) {
+    ListenableFutureTask<DeltaStoreBasedWaveletState> task = 
ListenableFutureTask
+        .create(new Callable<DeltaStoreBasedWaveletState>() {
+          @Override
+          public DeltaStoreBasedWaveletState call() throws 
PersistenceException {
+            return 
DeltaStoreBasedWaveletState.create(deltaStore.open(waveletName), 
persistExecutor,
+                persistSnapshotOnDeltasCount);
+          }
+        });
     executor.execute(task);
     return task;
   }

http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/94fa1878/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveletDeltaRecordReader.java
----------------------------------------------------------------------
diff --git 
a/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveletDeltaRecordReader.java
 
b/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveletDeltaRecordReader.java
index 7ac0dd9..27617b5 100644
--- 
a/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveletDeltaRecordReader.java
+++ 
b/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveletDeltaRecordReader.java
@@ -95,13 +95,22 @@ public interface WaveletDeltaRecordReader {
 
 
   /**
-   * Returns a range of deltas.
+   * Returns a range of deltas having their result version greater than 
startVersion
+   * and less or equals than endVersion
    * 
-   * @param start
-   * @param end
+   * @param startVersion 
+   * @param endVersion
    * @param receiver
    */
   long getDeltasInRange(long startVersion, long endVersion, 
Receiver<WaveletDeltaRecord> receiver)
       throws IOException;
 
+  /**
+   * Returns the last delta of the wavelet.
+   * 
+   * @return the last delta
+   * @throws IOException 
+   */
+  WaveletDeltaRecord getLastDelta() throws IOException;
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/94fa1878/wave/src/test/java/org/waveprotocol/box/server/waveserver/WaveServerTest.java
----------------------------------------------------------------------
diff --git 
a/wave/src/test/java/org/waveprotocol/box/server/waveserver/WaveServerTest.java 
b/wave/src/test/java/org/waveprotocol/box/server/waveserver/WaveServerTest.java
index 9c058b6..5523535 100644
--- 
a/wave/src/test/java/org/waveprotocol/box/server/waveserver/WaveServerTest.java
+++ 
b/wave/src/test/java/org/waveprotocol/box/server/waveserver/WaveServerTest.java
@@ -113,7 +113,7 @@ public class WaveServerTest extends TestCase {
       public LocalWaveletContainer create(WaveletNotificationSubscriber 
notifiee,
           WaveletName waveletName, String waveDomain) {
         return new LocalWaveletContainerImpl(waveletName, notifiee,
-            WaveServerModule.loadWaveletState(waveletLoadExecutor, deltaStore, 
waveletName, persistExecutor),
+            WaveServerModule.loadWaveletState(waveletLoadExecutor, deltaStore, 
waveletName, persistExecutor, 100),
             waveDomain, storageContinuationExecutor);
       }
     };

Reply via email to