Author: yurize
Date: Fri Nov 18 10:46:53 2011
New Revision: 1203579
URL: http://svn.apache.org/viewvc?rev=1203579&view=rev
Log:
Evicts old deltas. part 2. https://reviews.apache.org/r/2662/
Added:
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/RuntimeIOException.java
(with props)
Modified:
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedSnapshotStore.java
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveletContainerImpl.java
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveletState.java
Modified:
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedSnapshotStore.java
URL:
http://svn.apache.org/viewvc/incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedSnapshotStore.java?rev=1203579&r1=1203578&r2=1203579&view=diff
==============================================================================
---
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedSnapshotStore.java
(original)
+++
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedSnapshotStore.java
Fri Nov 18 10:46:53 2011
@@ -45,22 +45,6 @@ import java.util.Iterator;
class DeltaStoreBasedSnapshotStore implements DeltaAndSnapshotStore {
/**
- * Wraps an {@link IOException} in a {@link RuntimeException}.
- */
- private static class RuntimeIOException extends RuntimeException {
- private final IOException cause;
-
- public RuntimeIOException(IOException cause) {
- super(cause);
- this.cause = cause;
- }
-
- public IOException getIOException() {
- return cause;
- }
- }
-
- /**
* Reads the transformed deltas from a {@link WaveletDeltaRecordReader}.
*/
private static class TransformedWaveletDeltaIterator
Modified:
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java
URL:
http://svn.apache.org/viewvc/incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java?rev=1203579&r1=1203578&r2=1203579&view=diff
==============================================================================
---
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java
(original)
+++
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java
Fri Nov 18 10:46:53 2011
@@ -22,9 +22,12 @@ import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.gxp.compiler.io.RuntimeIOException;
import org.waveprotocol.box.common.DeltaSequence;
import org.waveprotocol.box.server.persistence.PersistenceException;
@@ -46,8 +49,12 @@ import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.NavigableMap;
+import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
@@ -80,6 +87,18 @@ class DeltaStoreBasedWaveletState implem
};
/**
+ * @return An entry keyed by a hashed version with the given version number,
+ * if any, otherwise null.
+ */
+ private static <T> Map.Entry<HashedVersion, T>
lookupCached(NavigableMap<HashedVersion, T> map,
+ long version) {
+ // Smallest key with version number >= version.
+ HashedVersion key = HashedVersion.unsigned(version);
+ Map.Entry<HashedVersion, T> entry = map.ceilingEntry(key);
+ return (entry != null && entry.getKey().getVersion() == version) ? entry :
null;
+ }
+
+ /**
* Creates a new delta store based state.
*
* The executor must ensure that only one thread executes at any time for
each
@@ -114,35 +133,30 @@ class DeltaStoreBasedWaveletState implem
* Reads all deltas from persistent storage.
*/
private static ImmutableList<WaveletDeltaRecord>
readAll(WaveletDeltaRecordReader reader)
- throws IOException{
+ throws IOException {
+ long startVersion = 0;
+ long endVersion = reader.getEndVersion().getVersion();
+ return readDeltasInRange(reader, startVersion, endVersion);
+ }
+
+ private static ImmutableList<WaveletDeltaRecord> readDeltasInRange(
+ final WaveletDeltaRecordReader reader, final long startVersion, final
long endVersion) throws IOException {
Preconditions.checkArgument(!reader.isEmpty());
ImmutableList.Builder<WaveletDeltaRecord> result = ImmutableList.builder();
- HashedVersion endVersion = reader.getEndVersion();
- long version = 0;
- while (version < endVersion.getVersion()) {
- WaveletDeltaRecord delta = reader.getDelta(version);
+ long i = startVersion;
+ while (i < endVersion) {
+ WaveletDeltaRecord delta;
+ delta = reader.getDelta(i);
result.add(delta);
- version = delta.getResultingVersion().getVersion();
+ i = delta.getResultingVersion().getVersion();
}
return result.build();
}
- /**
- * @return An entry keyed by a hashed version with the given version number,
- * if any, otherwise null.
- */
- private static <T> Map.Entry<HashedVersion, T> lookup(
- NavigableMap<HashedVersion, T> map, long version) {
- // Smallest key with version number >= version.
- HashedVersion key = HashedVersion.unsigned(version);
- Map.Entry<HashedVersion, T> entry = map.ceilingEntry(key);
- return (entry != null && entry.getKey().getVersion() == version) ? entry :
null;
- }
-
private final Executor persistExecutor;
private final HashedVersion versionZero;
private final DeltaStore.DeltasAccess deltasAccess;
-
+
/** The lock that guards access to persistence related state. */
private Object persistLock = new Object();
@@ -170,8 +184,8 @@ class DeltaStoreBasedWaveletState implem
version = latestVersionToPersist;
}
if (last != null && version.getVersion() <= last.getVersion()) {
- LOG.info("Attempt to persist version " + version + " smaller than last
persisted version "
- + last);
+ LOG.info("Attempt to persist version " + version
+ + " smaller than last persisted version " + last);
// Done, version is already persisted.
version = last;
} else {
@@ -202,12 +216,12 @@ class DeltaStoreBasedWaveletState implem
};
/** Keyed by appliedAtVersion. */
- private final NavigableMap<HashedVersion,
ByteStringMessage<ProtocolAppliedWaveletDelta>>
- appliedDeltas = Maps.newTreeMap();
+ private final ConcurrentNavigableMap<HashedVersion,
ByteStringMessage<ProtocolAppliedWaveletDelta>> appliedDeltas =
+ new ConcurrentSkipListMap<HashedVersion,
ByteStringMessage<ProtocolAppliedWaveletDelta>>();
/** Keyed by appliedAtVersion. */
- private final NavigableMap<HashedVersion, TransformedWaveletDelta>
transformedDeltas =
- Maps.newTreeMap();
+ private final ConcurrentNavigableMap<HashedVersion, TransformedWaveletDelta>
transformedDeltas =
+ new ConcurrentSkipListMap<HashedVersion, TransformedWaveletDelta>();
/** Is null if the wavelet state is empty. */
private WaveletData snapshot;
@@ -236,11 +250,6 @@ class DeltaStoreBasedWaveletState implem
this.persistExecutor = persistExecutor;
this.versionZero =
HASH_FACTORY.createVersionZero(deltasAccess.getWaveletName());
this.deltasAccess = deltasAccess;
- for (WaveletDeltaRecord delta : deltas) {
- HashedVersion hashedVersion = delta.getAppliedAtVersion();
- appliedDeltas.put(hashedVersion, delta.getAppliedDelta());
- transformedDeltas.put(hashedVersion, delta.getTransformedDelta());
- }
this.snapshot = snapshot;
this.lastPersistedVersion = new
AtomicReference<HashedVersion>(deltasAccess.getEndVersion());
}
@@ -268,6 +277,8 @@ class DeltaStoreBasedWaveletState implem
@Override
public HashedVersion getHashedVersion(long version) {
+ final Entry<HashedVersion, TransformedWaveletDelta> cachedEntry =
+ lookupCached(transformedDeltas, version);
if (version == 0) {
return versionZero;
} else if (snapshot == null) {
@@ -275,58 +286,123 @@ class DeltaStoreBasedWaveletState implem
} else if (version == snapshot.getVersion()) {
return snapshot.getHashedVersion();
} else {
- Map.Entry<HashedVersion, TransformedWaveletDelta> entry =
lookup(transformedDeltas, version);
- return (entry == null) ? null : entry.getKey();
+ WaveletDeltaRecord delta;
+ try {
+ delta = lookup(version);
+ } catch (IOException e) {
+ throw new RuntimeIOException(e);
+ }
+ if (delta == null && cachedEntry != null) {
+ return cachedEntry.getKey();
+ } else {
+ return delta != null ? delta.getAppliedAtVersion() : null;
+ }
}
}
@Override
- public TransformedWaveletDelta getTransformedDelta(HashedVersion
beginVersion) {
- return transformedDeltas.get(beginVersion);
+ public TransformedWaveletDelta getTransformedDelta(
+ final HashedVersion beginVersion) {
+ TransformedWaveletDelta delta = transformedDeltas.get(beginVersion);
+ if (delta != null) {
+ return delta;
+ } else {
+ WaveletDeltaRecord nowDelta;
+ try {
+ nowDelta = lookup(beginVersion.getVersion());
+ } catch (IOException e) {
+ throw new RuntimeIOException(e);
+ }
+ return nowDelta != null ? nowDelta.transformed : null;
+ }
}
@Override
- public TransformedWaveletDelta getTransformedDeltaByEndVersion(HashedVersion
endVersion) {
- Preconditions.checkArgument(endVersion.getVersion() > 0,
- "end version %s is not positive", endVersion);
+ public TransformedWaveletDelta getTransformedDeltaByEndVersion(final
HashedVersion endVersion) {
+ Preconditions.checkArgument(endVersion.getVersion() > 0, "end version %s
is not positive",
+ endVersion);
+ final TransformedWaveletDelta cachedDelta =
transformedDeltas.lowerEntry(endVersion).getValue();
if (snapshot == null) {
return null;
- } else if (endVersion.equals(snapshot.getHashedVersion())) {
- return transformedDeltas.lastEntry().getValue();
} else {
- TransformedWaveletDelta delta =
transformedDeltas.lowerEntry(endVersion).getValue();
- return delta.getResultingVersion().equals(endVersion) ? delta : null;
+ WaveletDeltaRecord deltaRecord = getDeltaRecordByEndVersion(endVersion);
+ TransformedWaveletDelta delta;
+ if (deltaRecord == null &&
cachedDelta.getResultingVersion().equals(endVersion)) {
+ delta = cachedDelta;
+ } else {
+ delta = deltaRecord != null ? deltaRecord.getTransformedDelta() : null;
+ }
+ return delta;
}
}
@Override
- public DeltaSequence getTransformedDeltaHistory(HashedVersion startVersion,
- HashedVersion endVersion) {
+ public DeltaSequence getTransformedDeltaHistory(final HashedVersion
startVersion,
+ final HashedVersion endVersion) {
Preconditions.checkArgument(startVersion.getVersion() <
endVersion.getVersion(),
"Start version %s should be smaller than end version %s",
startVersion, endVersion);
- NavigableMap<HashedVersion, TransformedWaveletDelta> deltas =
- transformedDeltas.subMap(startVersion, true, endVersion, false);
- return
- (!deltas.isEmpty() &&
- deltas.firstKey().equals(startVersion) &&
-
deltas.lastEntry().getValue().getResultingVersion().equals(endVersion))
- ? DeltaSequence.of(deltas.values())
- : null;
+ // The history deltas can be either in the memory - waiting to be
persisted,
+ // or already persisted. We take both and merge into one list.
+ final NavigableMap<HashedVersion, TransformedWaveletDelta> cachedDeltas =
Maps.newTreeMap();
+ cachedDeltas.putAll(transformedDeltas.subMap(startVersion, true,
endVersion, false));
+ ImmutableList<WaveletDeltaRecord> persistedDeltas;
+ try {
+ persistedDeltas =
+ readDeltasInRange(deltasAccess, startVersion.getVersion(),
endVersion.getVersion());
+ } catch (IOException e) {
+ throw new RuntimeIOException(e);
+ }
+ NavigableMap<HashedVersion, TransformedWaveletDelta>
allTransformedDeltasMap =
+ Maps.newTreeMap();
+ allTransformedDeltasMap.putAll(cachedDeltas);
+ for (WaveletDeltaRecord d : persistedDeltas) {
+ allTransformedDeltasMap.put(d.getAppliedAtVersion(),
d.getTransformedDelta());
+ }
+ DeltaSequence nowDeltaSequence;
+ if (!allTransformedDeltasMap.isEmpty()
+ && allTransformedDeltasMap.firstKey().equals(startVersion)
+ &&
allTransformedDeltasMap.lastEntry().getValue().getResultingVersion().equals(endVersion))
{
+ List<TransformedWaveletDelta> cachedAndPersitentDeltasList =
+ Lists.newArrayList(allTransformedDeltasMap.values());
+ nowDeltaSequence = DeltaSequence.of(cachedAndPersitentDeltasList);
+ } else {
+ nowDeltaSequence = null;
+ }
+ return nowDeltaSequence;
}
@Override
public ByteStringMessage<ProtocolAppliedWaveletDelta> getAppliedDelta(
HashedVersion beginVersion) {
- return appliedDeltas.get(beginVersion);
+ ByteStringMessage<ProtocolAppliedWaveletDelta> delta =
appliedDeltas.get(beginVersion);
+ if (delta != null) {
+ return delta;
+ } else {
+ WaveletDeltaRecord record = null;
+ try {
+ record = lookup(beginVersion.getVersion());
+ } catch (IOException e) {
+ new RuntimeIOException(e);
+ }
+ return record != null ? record.applied : null;
+ }
}
@Override
public ByteStringMessage<ProtocolAppliedWaveletDelta>
getAppliedDeltaByEndVersion(
- HashedVersion endVersion) {
+ final HashedVersion endVersion) {
Preconditions.checkArgument(endVersion.getVersion() > 0,
"end version %s is not positive", endVersion);
- return isDeltaBoundary(endVersion) ?
appliedDeltas.lowerEntry(endVersion).getValue() : null;
-
+ final ByteStringMessage<ProtocolAppliedWaveletDelta> cachedDelta =
+ appliedDeltas.lowerEntry(endVersion).getValue();
+ WaveletDeltaRecord deltaRecord = getDeltaRecordByEndVersion(endVersion);
+ ByteStringMessage<ProtocolAppliedWaveletDelta> appliedDelta;
+ if (deltaRecord == null && isDeltaBoundary(endVersion)) {
+ appliedDelta = cachedDelta;
+ } else {
+ appliedDelta = deltaRecord != null ? deltaRecord.getAppliedDelta() :
null;
+ }
+ return appliedDelta;
}
@Override
@@ -338,6 +414,31 @@ class DeltaStoreBasedWaveletState implem
: null;
}
+ public Collection<ByteStringMessage<ProtocolAppliedWaveletDelta>>
getAppliedDeltaHistory1(
+ final HashedVersion startVersion, final HashedVersion endVersion) {
+ Preconditions.checkArgument(startVersion.getVersion() <
endVersion.getVersion());
+ final Set<ByteStringMessage<ProtocolAppliedWaveletDelta>> allDeltas =
Sets.newHashSet();
+ allDeltas.addAll(appliedDeltas.subMap(startVersion, endVersion).values());
+ ImmutableList<WaveletDeltaRecord> persistedDeltas;
+ try {
+ persistedDeltas =
+ readDeltasInRange(deltasAccess, startVersion.getVersion(),
endVersion.getVersion());
+ } catch (IOException e) {
+ throw new RuntimeIOException(e);
+ }
+ for (WaveletDeltaRecord d : persistedDeltas) {
+ allDeltas.add(d.getAppliedDelta());
+ }
+ Collection<ByteStringMessage<ProtocolAppliedWaveletDelta>> deltaCollection
=
+ Lists.newArrayList();
+ if (isDeltaBoundary(startVersion) && isDeltaBoundary(endVersion)) {
+ for (ByteStringMessage<ProtocolAppliedWaveletDelta> appliedDelta :
allDeltas) {
+ deltaCollection.add(appliedDelta);
+ }
+ }
+ return deltaCollection;
+ }
+
@Override
public void appendDelta(HashedVersion appliedAtVersion,
TransformedWaveletDelta transformedDelta,
@@ -361,10 +462,10 @@ class DeltaStoreBasedWaveletState implem
@Override
public ListenableFuture<Void> persist(final HashedVersion version) {
- Preconditions.checkArgument(version.getVersion() > 0, "Cannot persist
non-positive version %s",
- version);
- Preconditions.checkArgument(isDeltaBoundary(version), "Version to persist
%s matches no delta",
- version);
+ Preconditions.checkArgument(version.getVersion() > 0,
+ "Cannot persist non-positive version %s", version);
+ Preconditions.checkArgument(isDeltaBoundary(version),
+ "Version to persist %s matches no delta", version);
synchronized (persistLock) {
if (latestVersionToPersist != null) {
// There's a persist task in flight.
@@ -388,9 +489,35 @@ class DeltaStoreBasedWaveletState implem
}
@Override
+ public void flush(HashedVersion version) {
+ transformedDeltas.remove(version);
+ appliedDeltas.remove(version);
+ if (LOG.isFineLoggable()) {
+ LOG.fine("Flushed delta " + version);
+ }
+ }
+
+ @Override
public void close() {
}
+ /**
+ * @return An entry keyed by a hashed version with the given version number,
+ * if any, otherwise null.
+ */
+ private WaveletDeltaRecord lookup(long version) throws IOException {
+ return deltasAccess.getDelta(version);
+ }
+
+ private WaveletDeltaRecord getDeltaRecordByEndVersion(HashedVersion
endVersion) {
+ long version = endVersion.getVersion();
+ try {
+ return deltasAccess.getDeltaByEndVersion(version);
+ } catch (IOException e) {
+ throw new RuntimeIOException(e);
+ }
+ }
+
private boolean isDeltaBoundary(HashedVersion version) {
Preconditions.checkNotNull(version, "version is null");
return version.equals(getCurrentVersion()) ||
transformedDeltas.containsKey(version);
Added:
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/RuntimeIOException.java
URL:
http://svn.apache.org/viewvc/incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/RuntimeIOException.java?rev=1203579&view=auto
==============================================================================
---
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/RuntimeIOException.java
(added)
+++
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/RuntimeIOException.java
Fri Nov 18 10:46:53 2011
@@ -0,0 +1,38 @@
+/**
+ * Copyright 2011 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.waveprotocol.box.server.waveserver;
+
+import java.io.IOException;
+
+/**
+ * Wraps an {@link IOException} in a {@link RuntimeException}.
+ *
+ * @author [email protected] (Yuri Zelikov)
+ */
+@SuppressWarnings("serial")
+public class RuntimeIOException extends RuntimeException {
+ private final IOException cause;
+
+ public RuntimeIOException(IOException cause) {
+ super(cause);
+ this.cause = cause;
+ }
+
+ public IOException getIOException() {
+ return cause;
+ }
+}
Propchange:
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/RuntimeIOException.java
------------------------------------------------------------------------------
svn:executable = *
Modified:
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveletContainerImpl.java
URL:
http://svn.apache.org/viewvc/incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveletContainerImpl.java?rev=1203579&r1=1203578&r2=1203579&view=diff
==============================================================================
---
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveletContainerImpl.java
(original)
+++
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveletContainerImpl.java
Fri Nov 18 10:46:53 2011
@@ -237,7 +237,7 @@ abstract class WaveletContainerImpl impl
public void run() {
acquireWriteLock();
try {
- // waveletState.flush(version); // TODO(soren): implement this
+ waveletState.flush(version);
notifyOfCommit(version, domainsToNotify);
} finally {
releaseWriteLock();
Modified:
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveletState.java
URL:
http://svn.apache.org/viewvc/incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveletState.java?rev=1203579&r1=1203578&r2=1203579&view=diff
==============================================================================
---
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveletState.java
(original)
+++
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveletState.java
Fri Nov 18 10:46:53 2011
@@ -135,6 +135,9 @@ interface WaveletState {
*/
ListenableFuture<Void> persist(HashedVersion version);
+ /** Flushes persisted delta from memory. */
+ void flush(HashedVersion version);
+
/**
* Closes the object. No other methods on the object should be invoked after
* this class.