Wavelet snapshot storage with MongoDB Project: http://git-wip-us.apache.org/repos/asf/incubator-wave/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-wave/commit/94fa1878 Tree: http://git-wip-us.apache.org/repos/asf/incubator-wave/tree/94fa1878 Diff: http://git-wip-us.apache.org/repos/asf/incubator-wave/diff/94fa1878
Branch: refs/heads/swellrt Commit: 94fa187895df34e4c9319db74634be8d537ceede Parents: 95f178b Author: Pablo Ojanguren <pablo...@gmail.com> Authored: Mon Oct 17 19:06:46 2016 +0200 Committer: Pablo Ojanguren <pablo...@gmail.com> Committed: Mon Oct 17 19:06:46 2016 +0200 ---------------------------------------------------------------------- wave/config/reference.conf | 3 + .../persistence/file/FileDeltaCollection.java | 30 ++- .../memory/MemoryDeltaCollection.java | 19 ++ .../mongodb/MongoDBSnapshotStore.java | 195 +++++++++++++++++ .../mongodb/MongoDbDeltaCollection.java | 97 +++++--- .../persistence/mongodb/MongoDbDeltaStore.java | 74 +++++-- .../persistence/mongodb/MongoDbProvider.java | 2 +- .../box/server/waveserver/DeltaStore.java | 35 +++ .../DeltaStoreBasedSnapshotStore.java | 5 + .../waveserver/DeltaStoreBasedWaveletState.java | 219 ++++++++++++++++--- .../box/server/waveserver/WaveServerModule.java | 34 ++- .../waveserver/WaveletDeltaRecordReader.java | 15 +- .../box/server/waveserver/WaveServerTest.java | 2 +- 13 files changed, 629 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/94fa1878/wave/config/reference.conf ---------------------------------------------------------------------- diff --git a/wave/config/reference.conf b/wave/config/reference.conf index f751ca8..005872d 100644 --- a/wave/config/reference.conf +++ b/wave/config/reference.conf @@ -137,6 +137,9 @@ core { # Duration to keep the waves in cache. wave_cache_expire = 60m + + # Persist wavelet snapshot after a number of deltas received + persist_snapshots_on_deltas_count = 100 } network { http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/94fa1878/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileDeltaCollection.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileDeltaCollection.java b/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileDeltaCollection.java index 11dbb31..6677d33 100644 --- a/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileDeltaCollection.java +++ b/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileDeltaCollection.java @@ -35,11 +35,13 @@ import org.waveprotocol.box.server.waveserver.AppliedDeltaUtil; import org.waveprotocol.box.server.waveserver.ByteStringMessage; import org.waveprotocol.box.server.waveserver.WaveletDeltaRecord; import org.waveprotocol.box.server.waveserver.DeltaStore.DeltasAccess; +import org.waveprotocol.box.server.waveserver.DeltaStore.Snapshot; import org.waveprotocol.wave.federation.Proto.ProtocolAppliedWaveletDelta; import org.waveprotocol.wave.model.id.WaveletName; import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta; import org.waveprotocol.wave.model.util.Pair; import org.waveprotocol.wave.model.version.HashedVersion; +import org.waveprotocol.wave.model.wave.data.WaveletData; import org.waveprotocol.wave.util.logging.Log; import java.io.File; @@ -641,9 +643,27 @@ public class FileDeltaCollection implements DeltasAccess { file.setLength(file.getFilePointer()); } -@Override -public long getAllDeltas(Receiver<WaveletDeltaRecord> receiver) throws IOException { - // TODO Auto-generated method stub - return 0; -} + @Override + public long getAllDeltas(Receiver<WaveletDeltaRecord> receiver) throws IOException { + return getDeltasInRange(0, endVersion.getVersion(), receiver); + } + + @Override + public Snapshot loadSnapshot() throws PersistenceException { + // Not supported! + return null; + } + + @Override + public void storeSnapshot(WaveletData waveletData) + throws PersistenceException { + // No-op + } + + @Override + public WaveletDeltaRecord getLastDelta() throws IOException { + return getDeltaByEndVersion(endVersion.getVersion()); + } + + } http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/94fa1878/wave/src/main/java/org/waveprotocol/box/server/persistence/memory/MemoryDeltaCollection.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/persistence/memory/MemoryDeltaCollection.java b/wave/src/main/java/org/waveprotocol/box/server/persistence/memory/MemoryDeltaCollection.java index 135b99a..471ff6b 100644 --- a/wave/src/main/java/org/waveprotocol/box/server/persistence/memory/MemoryDeltaCollection.java +++ b/wave/src/main/java/org/waveprotocol/box/server/persistence/memory/MemoryDeltaCollection.java @@ -24,13 +24,16 @@ import com.google.common.collect.Maps; import com.google.protobuf.InvalidProtocolBufferException; import org.waveprotocol.box.common.Receiver; +import org.waveprotocol.box.server.persistence.PersistenceException; import org.waveprotocol.box.server.waveserver.ByteStringMessage; import org.waveprotocol.box.server.waveserver.WaveletDeltaRecord; import org.waveprotocol.box.server.waveserver.DeltaStore.DeltasAccess; +import org.waveprotocol.box.server.waveserver.DeltaStore.Snapshot; import org.waveprotocol.wave.federation.Proto.ProtocolAppliedWaveletDelta; import org.waveprotocol.wave.model.id.WaveletName; import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta; import org.waveprotocol.wave.model.version.HashedVersion; +import org.waveprotocol.wave.model.wave.data.WaveletData; import java.io.IOException; import java.util.Collection; @@ -150,4 +153,20 @@ public class MemoryDeltaCollection implements DeltasAccess { return count; } + + @Override + public Snapshot loadSnapshot() throws PersistenceException { + return null; + } + + @Override + public void storeSnapshot(WaveletData waveletData) + throws PersistenceException { + // No op + } + + @Override + public WaveletDeltaRecord getLastDelta() throws IOException { + return endDeltas.get(endVersion.getVersion()); + } } http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/94fa1878/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDBSnapshotStore.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDBSnapshotStore.java b/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDBSnapshotStore.java new file mode 100644 index 0000000..5b6aa8d --- /dev/null +++ b/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDBSnapshotStore.java @@ -0,0 +1,195 @@ +package org.waveprotocol.box.server.persistence.mongodb; + +import org.waveprotocol.box.common.comms.WaveClientRpc.WaveletSnapshot; +import org.waveprotocol.box.server.common.SnapshotSerializer; +import org.waveprotocol.box.server.persistence.PersistenceException; +import org.waveprotocol.box.server.waveserver.DeltaStore; +import org.waveprotocol.wave.model.id.InvalidIdException; +import org.waveprotocol.wave.model.id.ModernIdSerialiser; +import org.waveprotocol.wave.model.id.WaveletName; +import org.waveprotocol.wave.model.operation.OperationException; +import org.waveprotocol.wave.model.util.Preconditions; +import org.waveprotocol.wave.model.wave.InvalidParticipantAddress; +import org.waveprotocol.wave.model.wave.data.ReadableWaveletData; +import org.waveprotocol.wave.model.wave.data.WaveletData; +import org.waveprotocol.wave.util.logging.Log; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.mongodb.BasicDBObject; +import com.mongodb.DB; +import com.mongodb.DBCollection; +import com.mongodb.DBObject; +import com.mongodb.MongoException; +import com.mongodb.WriteConcern; + +/** + * A MongoDB-backed store of Wavelet snapshots. + * The aim is to avoid the whole processing of deltas when a + * wavelet is loaded into server's memory for the first time. + * + * This class is not thread-safe. + * + * @author pablo...@gmail.com (Pablo Ojanguren) + * + */ +public class MongoDBSnapshotStore { + + private static final Log LOG = Log.get(MongoDBSnapshotStore.class); + + protected static final String WAVE_ID_FIELD = "waveid"; + protected static final String WAVELET_ID_FIELD = "waveletid"; + protected static final String VERSION_FIELD = "version"; + protected static final String VERSION_HASH_FIELD = "versionhash"; + protected static final String LASTMOD_FIELD = "lastmod"; + + protected static final String SNAPSHOT_DATA = "data"; + + + /** Name of the MongoDB collection to store Deltas */ + protected static final String SNAPSHOT_COLLECTION = "snapshots"; + + + private final DBCollection collection; + + /** + * Get a reference to the snapshots store. + * + * @param database + * @return + */ + public static MongoDBSnapshotStore create(DB database) { + Preconditions.checkArgument(database != null, "Unable to get reference to mongoDB snapshots collection"); + DBCollection collection = database.getCollection(SNAPSHOT_COLLECTION); + return new MongoDBSnapshotStore(collection); + } + + /** + * Construct a new snapshots store. + * + * @param database the database connection object + */ + protected MongoDBSnapshotStore(DBCollection collection) { + this.collection = collection; + } + + protected void deleteSnapshot(WaveletName waveletName) throws PersistenceException { + + DBObject criteria = new BasicDBObject(); + criteria.put(WAVE_ID_FIELD, ModernIdSerialiser.INSTANCE.serialiseWaveId(waveletName.waveId)); + criteria.put(WAVELET_ID_FIELD, ModernIdSerialiser.INSTANCE.serialiseWaveletId(waveletName.waveletId)); + + try { + // Using Journaled Write Concern + // (http://docs.mongodb.org/manual/core/write-concern/#journaled) + collection.remove(criteria, WriteConcern.JOURNALED); + } catch (MongoException e) { + throw new PersistenceException(e); + } + } + + /** + * Store a snapshot + * + * @param waveletData + * @param hashedVersion + * @throws PersistenceException + */ + public void store(ReadableWaveletData waveletData) throws PersistenceException { + + String waveId = ModernIdSerialiser.INSTANCE.serialiseWaveId(waveletData.getWaveId()); + String waveletId = ModernIdSerialiser.INSTANCE.serialiseWaveletId(waveletData.getWaveletId()); + + + // store new snapshot + BasicDBObject dbo = new BasicDBObject(); + dbo.put(WAVE_ID_FIELD, ModernIdSerialiser.INSTANCE.serialiseWaveId(waveletData.getWaveId())); + dbo.put(WAVELET_ID_FIELD, ModernIdSerialiser.INSTANCE.serialiseWaveletId(waveletData.getWaveletId())); + + dbo.put(VERSION_FIELD, waveletData.getHashedVersion().getVersion()); + dbo.put(VERSION_HASH_FIELD, waveletData.getHashedVersion().getHistoryHash()); + dbo.put(LASTMOD_FIELD,waveletData.getLastModifiedTime()); + + + WaveletSnapshot snapshot = SnapshotSerializer.serializeWavelet(waveletData, waveletData.getHashedVersion()); + dbo.put(SNAPSHOT_DATA,snapshot.toByteArray()); + + try { + collection.insert(dbo); + } catch (MongoException e) { + LOG.warning("Error storing wavelet snapshot for "+waveId+"/"+waveletId, e); + throw new PersistenceException(e); + } + + + // Delete previous snapshots of this wavelet + DBObject query = new BasicDBObject(); + query.put(WAVE_ID_FIELD, waveId); + query.put(WAVELET_ID_FIELD, waveletId); + query.put(VERSION_FIELD, new BasicDBObject("$lt", waveletData.getHashedVersion().getVersion())); + + try { + collection.remove(query); + } catch (MongoException e) { + LOG.warning("Error deleting outdated wavelet snapshots for "+waveId+"/"+waveletId, e); + throw new PersistenceException(e); + } + + + + LOG.info("Stored snaphost for "+waveId+"/"+waveletId+" version "+ waveletData.getHashedVersion().getVersion()); + } + + + public DeltaStore.Snapshot load(WaveletName waveletName) throws PersistenceException { + + String waveId = ModernIdSerialiser.INSTANCE.serialiseWaveId(waveletName.waveId); + String waveletId = ModernIdSerialiser.INSTANCE.serialiseWaveletId(waveletName.waveletId); + + // find last snapshot stored + DBObject query = new BasicDBObject(); + query.put(WAVE_ID_FIELD, waveId); + query.put(WAVELET_ID_FIELD, waveletId); + + DBObject snapshotDBObject = null; + + try { + snapshotDBObject = collection.findOne(query); + } catch (MongoException e) { + LOG.warning("Error querying wavelet snapshots for "+waveId+"/"+waveletId, e); + throw new PersistenceException(e); + } + + if (snapshotDBObject == null) + return null; + + + WaveletSnapshot snapshot = null; + try { + snapshot = WaveletSnapshot.parseFrom((byte[]) snapshotDBObject.get(SNAPSHOT_DATA)); + } catch (InvalidProtocolBufferException e) { + throw new PersistenceException(e); + } + + try { + + final WaveletData waveletData = SnapshotSerializer.deserializeWavelet(snapshot, waveletName.waveId); + + return new DeltaStore.Snapshot() { + + @Override + public WaveletData getWaveletData() { + + return waveletData; + } + + }; + + } catch (OperationException e) { + throw new PersistenceException(e); + } catch (InvalidParticipantAddress e) { + throw new PersistenceException(e); + } catch (InvalidIdException e) { + throw new PersistenceException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/94fa1878/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaCollection.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaCollection.java b/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaCollection.java index 0ed4a48..aacb793 100644 --- a/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaCollection.java +++ b/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaCollection.java @@ -19,11 +19,8 @@ package org.waveprotocol.box.server.persistence.mongodb; -import com.mongodb.BasicDBObject; -import com.mongodb.DBCollection; -import com.mongodb.DBCursor; -import com.mongodb.DBObject; -import com.mongodb.WriteConcern; +import java.io.IOException; +import java.util.Collection; import org.waveprotocol.box.common.Receiver; import org.waveprotocol.box.server.persistence.PersistenceException; @@ -34,10 +31,15 @@ import org.waveprotocol.wave.federation.Proto.ProtocolAppliedWaveletDelta; import org.waveprotocol.wave.model.id.WaveletName; import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta; import org.waveprotocol.wave.model.version.HashedVersion; +import org.waveprotocol.wave.model.wave.data.WaveletData; import org.waveprotocol.wave.util.logging.Log; -import java.io.IOException; -import java.util.Collection; +import com.google.gwt.thirdparty.guava.common.base.Preconditions; +import com.mongodb.BasicDBObject; +import com.mongodb.DBCollection; +import com.mongodb.DBCursor; +import com.mongodb.DBObject; +import com.mongodb.WriteConcern; /** * A MongoDB based Delta Access implementation using a simple <b>deltas</b> @@ -51,21 +53,31 @@ public class MongoDbDeltaCollection implements DeltaStore.DeltasAccess { private static final Log LOG = Log.get(MongoDbDeltaCollection.class); + /** Wavelet name to work with. */ private final WaveletName waveletName; /** MongoDB Collection object for delta storage */ - private final DBCollection deltaDbCollection; - + private final DBCollection deltasCollection; + + /** MongoDB based wavelet snapshot store */ + private final MongoDBSnapshotStore snapshotStore; + + + public static MongoDbDeltaCollection create(WaveletName waveletName, DBCollection deltasCollection, MongoDBSnapshotStore snapshotStore) { + return new MongoDbDeltaCollection(waveletName, deltasCollection, snapshotStore); + } + /** * Construct a new Delta Access object for the wavelet * * @param waveletName The wavelet name. * @param deltaDbCollection The MongoDB deltas collection */ - public MongoDbDeltaCollection(WaveletName waveletName, DBCollection deltaDbCollection) { + public MongoDbDeltaCollection(WaveletName waveletName, DBCollection deltasCollection, MongoDBSnapshotStore snapshotStore) { this.waveletName = waveletName; - this.deltaDbCollection = deltaDbCollection; + this.deltasCollection = deltasCollection; + this.snapshotStore = snapshotStore; } @Override @@ -90,7 +102,7 @@ public class MongoDbDeltaCollection implements DeltaStore.DeltasAccess { @Override public boolean isEmpty() { - return deltaDbCollection.count(createWaveletDBQuery()) == 0; + return deltasCollection.count(createWaveletDBQuery()) == 0; } @Override @@ -106,7 +118,7 @@ public class MongoDbDeltaCollection implements DeltaStore.DeltasAccess { DBObject field = new BasicDBObject(); field.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_RESULTINGVERSION, 1); - DBObject result = deltaDbCollection.findOne(query, field, sort); + DBObject result = deltasCollection.findOne(query, field, sort); return result != null ? MongoDbDeltaStoreUtil .deserializeHashedVersion((DBObject) ((DBObject) result @@ -120,7 +132,7 @@ public class MongoDbDeltaCollection implements DeltaStore.DeltasAccess { DBObject query = createWaveletDBQuery(); query.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_APPLIEDATVERSION, version); - DBObject result = deltaDbCollection.findOne(query); + DBObject result = deltasCollection.findOne(query); WaveletDeltaRecord waveletDelta = null; @@ -137,7 +149,7 @@ public class MongoDbDeltaCollection implements DeltaStore.DeltasAccess { DBObject query = createWaveletDBQuery(); query.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_RESULTINGVERSION_VERSION, version); - DBObject result = deltaDbCollection.findOne(query); + DBObject result = deltasCollection.findOne(query); WaveletDeltaRecord waveletDelta = null; @@ -156,7 +168,7 @@ public class MongoDbDeltaCollection implements DeltaStore.DeltasAccess { DBObject query = createWaveletDBQuery(); query.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_APPLIEDATVERSION, version); - DBObject result = deltaDbCollection.findOne(query); + DBObject result = deltasCollection.findOne(query); if (result != null) return MongoDbDeltaStoreUtil.deserializeHashedVersion((DBObject) result @@ -169,7 +181,7 @@ public class MongoDbDeltaCollection implements DeltaStore.DeltasAccess { DBObject query = createWaveletDBQuery(); query.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_APPLIEDATVERSION, version); - DBObject result = deltaDbCollection.findOne(query); + DBObject result = deltasCollection.findOne(query); if (result != null) return MongoDbDeltaStoreUtil.deserializeHashedVersion((DBObject) result @@ -203,7 +215,7 @@ public class MongoDbDeltaCollection implements DeltaStore.DeltasAccess { for (WaveletDeltaRecord delta : newDeltas) { // Using Journaled Write Concern // (http://docs.mongodb.org/manual/core/write-concern/#journaled) - deltaDbCollection.insert(MongoDbDeltaStoreUtil.serialize(delta, + deltasCollection.insert(MongoDbDeltaStoreUtil.serialize(delta, waveletName.waveId.serialise(), waveletName.waveletId.serialise()), WriteConcern.JOURNALED); } @@ -219,7 +231,7 @@ public class MongoDbDeltaCollection implements DeltaStore.DeltasAccess { sort.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED + "." + MongoDbDeltaStoreUtil.FIELD_APPLICATIONTIMESTAMP, 1); - DBCursor result = deltaDbCollection.find(query).sort(sort); + DBCursor result = deltasCollection.find(query).sort(sort); long count = 0; HashedVersion lastResultingVersion = null; @@ -258,15 +270,13 @@ public class MongoDbDeltaCollection implements DeltaStore.DeltasAccess { Receiver<WaveletDeltaRecord> receiver) throws IOException { BasicDBObject sort = new BasicDBObject(); - sort.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_APPLIEDATVERSION, 1); - sort.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED + "." - + MongoDbDeltaStoreUtil.FIELD_APPLICATIONTIMESTAMP, 1); + sort.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_RESULTINGVERSION_VERSION, 1); DBObject query = createWaveletDBQuery(); - query.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_APPLIEDATVERSION, - new BasicDBObject("$gte", startVersion).append("$lte", endVersion)); + query.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_RESULTINGVERSION_VERSION, + new BasicDBObject("$gt", startVersion).append("$lte", endVersion)); - DBCursor result = deltaDbCollection.find(query).sort(sort); + DBCursor result = deltasCollection.find(query).sort(sort); long count = 0; HashedVersion lastResultingVersion = null; @@ -298,4 +308,41 @@ public class MongoDbDeltaCollection implements DeltaStore.DeltasAccess { return count; } + + @Override + public WaveletDeltaRecord getLastDelta() throws IOException { + + // Search the max of delta.getTransformedDelta().getResultingVersion() + + DBObject query = createWaveletDBQuery(); + + DBObject sort = new BasicDBObject(); + sort.put(MongoDbDeltaStoreUtil.FIELD_TRANSFORMED_RESULTINGVERSION_VERSION, -1); // Descending + + + DBObject result = deltasCollection.findOne(query, null, sort); + + try { + return result != null ? MongoDbDeltaStoreUtil + .deserializeWaveletDeltaRecord(result) : null; + } catch (PersistenceException e) { + throw new IOException(e); + } + + } + + @Override + public DeltaStore.Snapshot loadSnapshot() throws PersistenceException { + return snapshotStore.load(waveletName); + } + + @Override + public void storeSnapshot(WaveletData waveletData) + throws PersistenceException { + + Preconditions.checkArgument(waveletName.equals(WaveletName.of(waveletData.getWaveId(), waveletData.getWaveletId())), + "Can't store snapshots for different wavelet"); + + snapshotStore.store(waveletData); + } } http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/94fa1878/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaStore.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaStore.java b/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaStore.java index 05aacbd..313b928 100644 --- a/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaStore.java +++ b/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbDeltaStore.java @@ -36,37 +36,71 @@ import org.waveprotocol.box.server.waveserver.DeltaStore; import org.waveprotocol.wave.model.id.WaveId; import org.waveprotocol.wave.model.id.WaveletId; import org.waveprotocol.wave.model.id.WaveletName; +import org.waveprotocol.wave.util.logging.Log; import java.util.List; /** - * A MongoDB based Delta Store implementation using a simple <b>deltas</b> - * collection, storing a delta record per each MongoDb document. + * A MongoDB based Delta Store implementation using a <b>deltas</b> + * collection and a snapshots collection. + * + * Each deltas is stored as a MongoDb document. + * + * For snapshots store details check out {@link MongoDBSnapshotStore} * * @author pablo...@gmail.com (Pablo Ojanguren) * */ public class MongoDbDeltaStore implements DeltaStore { - + + private static final Log LOG = Log.get(MongoDbDeltaStore.class); + /** Name of the MongoDB collection to store Deltas */ private static final String DELTAS_COLLECTION = "deltas"; - /** Database connection object */ - private final DB database; + /** MongoDB Collection for deltas */ + private final DBCollection deltasCollection; + + /** A specific class handling snapshots */ + private final MongoDBSnapshotStore snapshotStore; /** - * Construct a new store + * Creates a mongoDB based delta/snapshot store. + * + * @param database + * @return + */ + public static MongoDbDeltaStore create(DB database) { + + DBCollection deltasCollection = database.getCollection(DELTAS_COLLECTION); + checkDeltasCollectionIndexes(deltasCollection); + MongoDBSnapshotStore snapshotStore = MongoDBSnapshotStore.create(database); + + return new MongoDbDeltaStore(deltasCollection, snapshotStore); + } + + private static void checkDeltasCollectionIndexes(DBCollection deltasCollection) { + + LOG.info("For production environments, set MongoDB index in 'deltas' collection with fields 'waveid', 'waveletid' and 'transformed.resultingversion'"); + + } + + + /** + * Construct a new store instance * * @param database the database connection object */ - public MongoDbDeltaStore(DB database) { - this.database = database; + private MongoDbDeltaStore(DBCollection deltasCollection, MongoDBSnapshotStore snapshotStore) { + this.deltasCollection = deltasCollection; + this.snapshotStore = snapshotStore; } + + @Override public DeltasAccess open(WaveletName waveletName) throws PersistenceException { - - return new MongoDbDeltaCollection(waveletName, getDeltaDbCollection()); + return new MongoDbDeltaCollection(waveletName, deltasCollection, snapshotStore); } @Override @@ -80,10 +114,14 @@ public class MongoDbDeltaStore implements DeltaStore { try { // Using Journaled Write Concern // (http://docs.mongodb.org/manual/core/write-concern/#journaled) - getDeltaDbCollection().remove(criteria, WriteConcern.JOURNALED); + deltasCollection.remove(criteria, WriteConcern.JOURNALED); } catch (MongoException e) { throw new PersistenceException(e); } + + // Also delete wavelet snapshots + snapshotStore.deleteSnapshot(waveletName); + } @Override @@ -99,7 +137,7 @@ public class MongoDbDeltaStore implements DeltaStore { DBCursor cursor = null; try { - cursor = getDeltaDbCollection().find(query, projection); + cursor = deltasCollection.find(query, projection); } catch (MongoException e) { throw new PersistenceException(e); } @@ -126,7 +164,7 @@ public class MongoDbDeltaStore implements DeltaStore { try { @SuppressWarnings("rawtypes") - List results = getDeltaDbCollection().distinct(MongoDbDeltaStoreUtil.FIELD_WAVE_ID); + List results = deltasCollection.distinct(MongoDbDeltaStoreUtil.FIELD_WAVE_ID); for (Object o : results) builder.add(WaveId.deserialise((String) o)); @@ -138,13 +176,5 @@ public class MongoDbDeltaStore implements DeltaStore { return ExceptionalIterator.FromIterator.create(builder.build().iterator()); } - - /** - * Access to deltas collection - * - * @return DBCollection of deltas - */ - private DBCollection getDeltaDbCollection() { - return database.getCollection(DELTAS_COLLECTION); - } + } http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/94fa1878/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbProvider.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbProvider.java b/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbProvider.java index c48a8a6..bc78406 100644 --- a/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbProvider.java +++ b/wave/src/main/java/org/waveprotocol/box/server/persistence/mongodb/MongoDbProvider.java @@ -159,7 +159,7 @@ public class MongoDbProvider { */ public MongoDbDeltaStore provideMongoDbDeltaStore() { if (mongoDbDeltaStore == null) { - mongoDbDeltaStore = new MongoDbDeltaStore(getDatabase()); + mongoDbDeltaStore = MongoDbDeltaStore.create(getDatabase()); } return mongoDbDeltaStore; http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/94fa1878/wave/src/main/java/org/waveprotocol/box/server/waveserver/DeltaStore.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/waveserver/DeltaStore.java b/wave/src/main/java/org/waveprotocol/box/server/waveserver/DeltaStore.java index 8ca5a5d..b34e01a 100644 --- a/wave/src/main/java/org/waveprotocol/box/server/waveserver/DeltaStore.java +++ b/wave/src/main/java/org/waveprotocol/box/server/waveserver/DeltaStore.java @@ -20,6 +20,8 @@ package org.waveprotocol.box.server.waveserver; import org.waveprotocol.box.server.persistence.PersistenceException; +import org.waveprotocol.wave.model.version.HashedVersion; +import org.waveprotocol.wave.model.wave.data.WaveletData; import java.io.Closeable; import java.util.Collection; @@ -28,12 +30,26 @@ import java.util.Collection; * Stores wavelet deltas. * * @author so...@google.com (Soren Lassen) + * @author pablo...@gmail.com (Pablo Ojanguren) */ public interface DeltaStore extends WaveletStore<DeltaStore.DeltasAccess> { /** + * Wavelet snapshot and resulting version together. + * + * @author pablo...@gmail.com (Pablo Ojanguren) + */ + interface Snapshot { + + /** Gets the wavelet snapshot */ + public WaveletData getWaveletData(); + } + + /** * Accesses the delta history for a wavelet. * Permits reading historical deltas and appending deltas to the history. + * + * Optionally, it can provide service to load and store snapshots. */ interface DeltasAccess extends WaveletDeltaRecordReader, Closeable { /** @@ -50,5 +66,24 @@ public interface DeltaStore extends WaveletStore<DeltaStore.DeltasAccess> { * storage. */ void append(Collection<WaveletDeltaRecord> deltas) throws PersistenceException; + + /** + * Loads the last snapshot of the wavelet. + * + * @return the wavelet data object + * @throws PersistenceException if anything goes wrong with the underlying + * storage. + */ + Snapshot loadSnapshot() throws PersistenceException; + + /** + * Stores a wavelet snapshot as the last one. + * + * @param waveletData the wavelet data + * @throws PersistenceException if anything goes wrong with the underlying + * storage. + */ + void storeSnapshot(WaveletData waveletData) throws PersistenceException; + } } http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/94fa1878/wave/src/main/java/org/waveprotocol/box/server/waveserver/DeltaStoreBasedSnapshotStore.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/waveserver/DeltaStoreBasedSnapshotStore.java b/wave/src/main/java/org/waveprotocol/box/server/waveserver/DeltaStoreBasedSnapshotStore.java index d564470..0d2c449 100644 --- a/wave/src/main/java/org/waveprotocol/box/server/waveserver/DeltaStoreBasedSnapshotStore.java +++ b/wave/src/main/java/org/waveprotocol/box/server/waveserver/DeltaStoreBasedSnapshotStore.java @@ -188,6 +188,11 @@ class DeltaStoreBasedSnapshotStore implements DeltaAndSnapshotStore { isClosed = true; deltasAccess.close(); } + + @Override + public WaveletDeltaRecord getLastDelta() throws IOException { + return deltasAccess.getLastDelta(); + } } private final DeltaStore deltaStore; http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/94fa1878/wave/src/main/java/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java b/wave/src/main/java/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java index 8581fee..983aece 100644 --- a/wave/src/main/java/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java +++ b/wave/src/main/java/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java @@ -35,6 +35,8 @@ import org.waveprotocol.box.server.persistence.PersistenceException; import org.waveprotocol.box.server.util.WaveletDataUtil; import org.waveprotocol.wave.federation.Proto.ProtocolAppliedWaveletDelta; import org.waveprotocol.wave.model.id.IdURIEncoderDecoder; +import org.waveprotocol.wave.model.id.IdUtil; +import org.waveprotocol.wave.model.id.ModernIdSerialiser; import org.waveprotocol.wave.model.id.WaveletName; import org.waveprotocol.wave.model.operation.OperationException; import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta; @@ -47,7 +49,6 @@ import org.waveprotocol.wave.util.escapers.jvm.JavaUrlCodec; import org.waveprotocol.wave.util.logging.Log; import java.io.IOException; -import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.NavigableMap; @@ -60,18 +61,26 @@ import java.util.concurrent.atomic.AtomicReference; /** * Simplistic {@link DeltaStore}-backed wavelet state implementation * which goes to persistent storage for every history request. - * - * TODO(soren): rewire this class to be backed by {@link WaveletStore} and - * read the snapshot from there instead of computing it in the - * DeltaStoreBasedWaveletState constructor. + * <br/><br/> + * If implementation of {@link DeltaStore.DeltaAccess} supports snapshot storage, + * this class will take advantage of it: + * <br/><br/> + * On creation, it'll try to load a snapshot from the storage instead of compose all deltas. + * If the snapshot is built from deltas composition, persist the snapshot. + * <br/><br/> + * Snapshot updates will be persisted every time a fixed number of deltas are added to the wavelet. + * This will be done by the persistence task, after deltas are persisted. + * * * @author so...@google.com (Soren Lassen) * @author akapla...@gmail.com (Andew Kaplanov) + * @author pablo...@gmail.com (Pablo Ojanguren) */ class DeltaStoreBasedWaveletState implements WaveletState { private static final Log LOG = Log.get(DeltaStoreBasedWaveletState.class); - + + private static final IdURIEncoderDecoder URI_CODEC = new IdURIEncoderDecoder(new JavaUrlCodec()); @@ -111,21 +120,133 @@ class DeltaStoreBasedWaveletState implements WaveletState { * processing stored deltas */ public static DeltaStoreBasedWaveletState create(DeltaStore.DeltasAccess deltasAccess, - Executor persistExecutor) throws PersistenceException { + Executor persistExecutor) throws PersistenceException { + return create(deltasAccess, persistExecutor, 250); + } + + /** + * Creates a new delta store based state. + * + * The executor must ensure that only one thread executes at any time for each + * state instance. + * + * @param deltasAccess delta store accessor + * @param persistExecutor executor for making persistence calls + * @param persistSnapshotOnDeltasCount number of deltas to receive before storing the snapshot + * @return a state initialized from the deltas + * @throws PersistenceException if a failure occurs while reading or + * processing stored deltas + */ + public static DeltaStoreBasedWaveletState create(DeltaStore.DeltasAccess deltasAccess, + Executor persistExecutor, int persistSnapshotOnDeltasCount) throws PersistenceException { if (deltasAccess.isEmpty()) { - return new DeltaStoreBasedWaveletState(deltasAccess, ImmutableList.<WaveletDeltaRecord>of(), - null, persistExecutor); + return new DeltaStoreBasedWaveletState(deltasAccess, null, persistExecutor, persistSnapshotOnDeltasCount); } else { - try { - ImmutableList<WaveletDeltaRecord> deltas = readAll(deltasAccess, null); - WaveletData snapshot = WaveletDataUtil.buildWaveletFromDeltas(deltasAccess.getWaveletName(), - Iterators.transform(deltas.iterator(), TRANSFORMED)); - return new DeltaStoreBasedWaveletState(deltasAccess, deltas, snapshot, persistExecutor); - } catch (IOException e) { - throw new PersistenceException("Failed to read stored deltas", e); - } catch (OperationException e) { - throw new PersistenceException("Failed to compose stored deltas", e); + + long t1 = 0; + long t2 = 0; + + ImmutableList<WaveletDeltaRecord> deltas = null; + WaveletData snapshot = null; + + // Get snapshot from persistence. + // Snapshot storage is optional for underlying {@link DeltaStore.DeltasAccess} + // In case of no available snapshot, build it from deltas. + + t1 = System.currentTimeMillis(); + DeltaStore.Snapshot persistenceSnapshot = deltasAccess.loadSnapshot(); + t2 = System.currentTimeMillis(); + + if (persistenceSnapshot != null) { + + try { + + WaveletDeltaRecord lastStoredDelta = deltasAccess.getLastDelta(); + + // Is the persisted snapshot up to date? + + if (lastStoredDelta.getResultingVersion().getVersion() > persistenceSnapshot + .getWaveletData().getVersion()) { + + long startVersion = persistenceSnapshot.getWaveletData().getHashedVersion().getVersion(); + long endVersion = lastStoredDelta.getResultingVersion().getVersion(); + + deltasAccess.getDeltasInRange(startVersion, endVersion, + new Receiver<WaveletDeltaRecord>() { + + @Override + public boolean put(WaveletDeltaRecord delta) { + try { + WaveletDataUtil.applyWaveletDelta(delta.getTransformedDelta(), + persistenceSnapshot.getWaveletData()); + } catch (OperationException e) { + return false; + } + return true; + } + }); + + t2 = System.currentTimeMillis(); + + snapshot = persistenceSnapshot.getWaveletData(); + + } else if (lastStoredDelta.getResultingVersion().getVersion() == persistenceSnapshot + .getWaveletData().getVersion()) { + + snapshot = persistenceSnapshot.getWaveletData(); + + } else { + // Wow, the snapshot has a version higher than last delta! + // That's impossible, let's suppose delta history is right and + // ignore snapshot + } + + } catch (IllegalStateException e) { + throw new PersistenceException("Failed to compose wavelet snapshot", e); + } catch (IOException e) { + throw new PersistenceException("Failed to compose wavelet snapshot", e); + } + + String waveletName = ModernIdSerialiser.INSTANCE.serialiseWaveId(deltasAccess.getWaveletName().waveId)+"/"+ + deltasAccess.getWaveletName().waveletId.getId(); + + LOG.info("Snapshot loaded for "+waveletName+" in "+ (t2-t1) +"ms"); + } + + // Build snapshot from deltas if it is not already built + if (snapshot == null) { + + try { + + t1 = System.currentTimeMillis(); + + deltas = readAll(deltasAccess, null); + snapshot = WaveletDataUtil.buildWaveletFromDeltas(deltasAccess.getWaveletName(), + Iterators.transform(deltas.iterator(), TRANSFORMED)); + + t2 = System.currentTimeMillis(); + + String waveletName = ModernIdSerialiser.INSTANCE.serialiseWaveId(deltasAccess.getWaveletName().waveId)+"/"+ + deltasAccess.getWaveletName().waveletId.getId(); + + LOG.info("Snapshot built for "+waveletName+" in "+ (t2-t1) +"ms"); + + // Persist the snapshot if it is not user wavelet + if (!IdUtil.isUserDataWavelet(snapshot.getWaveletId())) + deltasAccess.storeSnapshot(snapshot); + + } catch (IOException e) { + throw new PersistenceException("Failed to read stored deltas", e); + } catch (OperationException e) { + throw new PersistenceException("Failed to compose stored deltas", e); + } + + } + + return new DeltaStoreBasedWaveletState(deltasAccess, snapshot, persistExecutor, persistSnapshotOnDeltasCount); + + } } @@ -195,11 +316,25 @@ class DeltaStoreBasedWaveletState implements WaveletState { /** The persist task that will be executed next. */ private ListenableFutureTask<Void> nextPersistTask = null; + + /** + * the number of deltas to be processed before to + * persist the snapshot. + */ + private final int persistSnapshotOnDeltasCount; + /** + * Counter of processed deltas in order to persist the snapshot + * if its value excess {@link persistSnapshotOnDeltaCount} + */ + private int deltasCountBeforeSnapshotStore = 0; + /** * Processes the persist task and checks if there is another task to do when * one task is done. In such a case, it writes all waiting to be persisted * deltas to persistent storage in one operation. + * + * Also persist the snapshot if it is required. */ private final Callable<Void> persisterTask = new Callable<Void>() { @Override @@ -225,6 +360,14 @@ class DeltaStoreBasedWaveletState implements WaveletState { } while (v.getVersion() < version.getVersion()); Preconditions.checkState(v.equals(version)); deltasAccess.append(deltas.build()); + + if (deltasCountBeforeSnapshotStore >= persistSnapshotOnDeltasCount) { + synchronized (persistLock) { + deltasAccess.storeSnapshot(snapshot); + deltasCountBeforeSnapshotStore = 0; + } + } + } synchronized (persistLock) { Preconditions.checkState(last == lastPersistedVersion.get(), @@ -242,9 +385,11 @@ class DeltaStoreBasedWaveletState implements WaveletState { }; /** Keyed by appliedAtVersion. */ + /* private final ConcurrentNavigableMap<HashedVersion, ByteStringMessage<ProtocolAppliedWaveletDelta>> appliedDeltas = new ConcurrentSkipListMap<HashedVersion, ByteStringMessage<ProtocolAppliedWaveletDelta>>(); - + */ + /** Keyed by appliedAtVersion. */ private final ConcurrentNavigableMap<HashedVersion, WaveletDeltaRecord> cachedDeltas = new ConcurrentSkipListMap<HashedVersion, WaveletDeltaRecord>(); @@ -261,23 +406,24 @@ class DeltaStoreBasedWaveletState implements WaveletState { private final AtomicReference<HashedVersion> lastPersistedVersion; /** - * Constructs a wavelet state with the given deltas and snapshot. - * The deltas must be the contents of deltasAccess, and they - * must be contiguous from version zero. + * Constructs a wavelet state with the given snapshot. + * The snapshot must be the composition of the deltas, or null if there * are no deltas. The constructed object takes ownership of the * snapshot and will mutate it if appendDelta() is called. + * + * The delta store is responsible to update the snapshot + * but no necessarily for each appendDelta(). Hence it is expected that + * snapshots are stored less frequently. */ @VisibleForTesting - DeltaStoreBasedWaveletState(DeltaStore.DeltasAccess deltasAccess, - List<WaveletDeltaRecord> deltas, WaveletData snapshot, Executor persistExecutor) { - Preconditions.checkArgument(deltasAccess.isEmpty() == deltas.isEmpty()); - Preconditions.checkArgument(deltas.isEmpty() == (snapshot == null)); + DeltaStoreBasedWaveletState(DeltaStore.DeltasAccess deltasAccess, WaveletData snapshot, Executor persistExecutor, int persistSnapshotOnDeltasCount) { this.persistExecutor = persistExecutor; this.versionZero = HASH_FACTORY.createVersionZero(deltasAccess.getWaveletName()); this.deltasAccess = deltasAccess; this.snapshot = snapshot; this.lastPersistedVersion = new AtomicReference<HashedVersion>(deltasAccess.getEndVersion()); + this.persistSnapshotOnDeltasCount = persistSnapshotOnDeltasCount; } @Override @@ -446,15 +592,24 @@ class DeltaStoreBasedWaveletState implements WaveletState { "Applied version %s doesn't match current version %s", deltaRecord.getAppliedAtVersion(), currentVersion); - if (deltaRecord.getAppliedAtVersion().getVersion() == 0) { - Preconditions.checkState(lastPersistedVersion.get() == null); - snapshot = WaveletDataUtil.buildWaveletFromFirstDelta(getWaveletName(), deltaRecord.getTransformedDelta()); - } else { - WaveletDataUtil.applyWaveletDelta(deltaRecord.getTransformedDelta(), snapshot); - } + + + if (deltaRecord.getAppliedAtVersion().getVersion() == 0) { + Preconditions.checkState(lastPersistedVersion.get() == null); + snapshot = WaveletDataUtil.buildWaveletFromFirstDelta(getWaveletName(), deltaRecord.getTransformedDelta()); + } else { + // Avoid to update snapshot when it has being persisted + synchronized (persistLock) { + WaveletDataUtil.applyWaveletDelta(deltaRecord.getTransformedDelta(), snapshot); + } + } + // Now that we built the snapshot without any exceptions, we record the delta. cachedDeltas.put(deltaRecord.getAppliedAtVersion(), deltaRecord); + + // Increment counter controlling snapshot persistence + deltasCountBeforeSnapshotStore++; } @Override http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/94fa1878/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveServerModule.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveServerModule.java b/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveServerModule.java index 41d60bf..578f9b6 100644 --- a/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveServerModule.java +++ b/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveServerModule.java @@ -27,6 +27,8 @@ import com.google.inject.Inject; import com.google.inject.Provides; import com.google.inject.Singleton; import com.typesafe.config.Config; +import com.typesafe.config.ConfigException; + import org.waveprotocol.box.server.executor.ExecutorAnnotations.StorageContinuationExecutor; import org.waveprotocol.box.server.executor.ExecutorAnnotations.WaveletLoadExecutor; import org.waveprotocol.box.server.persistence.PersistenceException; @@ -52,6 +54,7 @@ public class WaveServerModule extends AbstractModule { private final Executor waveletLoadExecutor; private final Executor storageContinuationExecutor; private final boolean enableFederation; + private final int persistSnapshotOnDeltasCount; @Inject @@ -59,6 +62,13 @@ public class WaveServerModule extends AbstractModule { @WaveletLoadExecutor Executor waveletLoadExecutor, @StorageContinuationExecutor Executor storageContinuationExecutor) { this.enableFederation = config.getBoolean("federation.enable_federation"); + int deltaCountForPersistSnapshots = 250; + try { + deltaCountForPersistSnapshots = config.getInt("core.persist_snapshots_on_deltas_count"); + } catch (ConfigException.Missing e) { + e.printStackTrace(); + } + this.persistSnapshotOnDeltasCount = deltaCountForPersistSnapshots; this.waveletLoadExecutor = waveletLoadExecutor; this.storageContinuationExecutor = storageContinuationExecutor; } @@ -106,7 +116,7 @@ public class WaveServerModule extends AbstractModule { public LocalWaveletContainer create(WaveletNotificationSubscriber notifiee, WaveletName waveletName, String waveDomain) { return new LocalWaveletContainerImpl(waveletName, notifiee, loadWaveletState( - waveletLoadExecutor, deltaStore, waveletName, waveletLoadExecutor), waveDomain, + waveletLoadExecutor, deltaStore, waveletName, waveletLoadExecutor, persistSnapshotOnDeltasCount), waveDomain, storageContinuationExecutor); } }; @@ -121,7 +131,7 @@ public class WaveServerModule extends AbstractModule { public RemoteWaveletContainer create(WaveletNotificationSubscriber notifiee, WaveletName waveletName, String waveDomain) { return new RemoteWaveletContainerImpl(waveletName, notifiee, loadWaveletState( - waveletLoadExecutor, deltaStore, waveletName, waveletLoadExecutor), + waveletLoadExecutor, deltaStore, waveletName, waveletLoadExecutor, persistSnapshotOnDeltasCount), storageContinuationExecutor); } }; @@ -147,16 +157,16 @@ public class WaveServerModule extends AbstractModule { */ @VisibleForTesting static ListenableFuture<DeltaStoreBasedWaveletState> loadWaveletState(Executor executor, - final DeltaStore deltaStore, final WaveletName waveletName, final Executor persistExecutor) { - ListenableFutureTask<DeltaStoreBasedWaveletState> task = - ListenableFutureTask.create( - new Callable<DeltaStoreBasedWaveletState>() { - @Override - public DeltaStoreBasedWaveletState call() throws PersistenceException { - return DeltaStoreBasedWaveletState.create(deltaStore.open(waveletName), - persistExecutor); - } - }); + final DeltaStore deltaStore, final WaveletName waveletName, final Executor persistExecutor, + final int persistSnapshotOnDeltasCount) { + ListenableFutureTask<DeltaStoreBasedWaveletState> task = ListenableFutureTask + .create(new Callable<DeltaStoreBasedWaveletState>() { + @Override + public DeltaStoreBasedWaveletState call() throws PersistenceException { + return DeltaStoreBasedWaveletState.create(deltaStore.open(waveletName), persistExecutor, + persistSnapshotOnDeltasCount); + } + }); executor.execute(task); return task; } http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/94fa1878/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveletDeltaRecordReader.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveletDeltaRecordReader.java b/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveletDeltaRecordReader.java index 7ac0dd9..27617b5 100644 --- a/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveletDeltaRecordReader.java +++ b/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveletDeltaRecordReader.java @@ -95,13 +95,22 @@ public interface WaveletDeltaRecordReader { /** - * Returns a range of deltas. + * Returns a range of deltas having their result version greater than startVersion + * and less or equals than endVersion * - * @param start - * @param end + * @param startVersion + * @param endVersion * @param receiver */ long getDeltasInRange(long startVersion, long endVersion, Receiver<WaveletDeltaRecord> receiver) throws IOException; + /** + * Returns the last delta of the wavelet. + * + * @return the last delta + * @throws IOException + */ + WaveletDeltaRecord getLastDelta() throws IOException; + } http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/94fa1878/wave/src/test/java/org/waveprotocol/box/server/waveserver/WaveServerTest.java ---------------------------------------------------------------------- diff --git a/wave/src/test/java/org/waveprotocol/box/server/waveserver/WaveServerTest.java b/wave/src/test/java/org/waveprotocol/box/server/waveserver/WaveServerTest.java index 9c058b6..5523535 100644 --- a/wave/src/test/java/org/waveprotocol/box/server/waveserver/WaveServerTest.java +++ b/wave/src/test/java/org/waveprotocol/box/server/waveserver/WaveServerTest.java @@ -113,7 +113,7 @@ public class WaveServerTest extends TestCase { public LocalWaveletContainer create(WaveletNotificationSubscriber notifiee, WaveletName waveletName, String waveDomain) { return new LocalWaveletContainerImpl(waveletName, notifiee, - WaveServerModule.loadWaveletState(waveletLoadExecutor, deltaStore, waveletName, persistExecutor), + WaveServerModule.loadWaveletState(waveletLoadExecutor, deltaStore, waveletName, persistExecutor, 100), waveDomain, storageContinuationExecutor); } };