Author: yurize
Date: Tue Nov 1 19:04:58 2011
New Revision: 1196213
URL: http://svn.apache.org/viewvc?rev=1196213&view=rev
Log:
Improves persistence logic and addresses some old TODOs. In collaboration with
[email protected]
Modified:
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveServerModule.java
incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/WaveServerTest.java
Modified:
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java
URL:
http://svn.apache.org/viewvc/incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java?rev=1196213&r1=1196212&r2=1196213&view=diff
==============================================================================
---
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java
(original)
+++
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java
Tue Nov 1 19:04:58 2011
@@ -53,17 +53,11 @@ import java.util.concurrent.atomic.Atomi
/**
* Simplistic {@link DeltaStore}-backed wavelet state implementation
- * which keeps the entire delta history in memory.
- *
- * TODO(soren): only keep in memory what's not persisted
+ * which goes to persistent storage for every history request.
*
* TODO(soren): rewire this class to be backed by {@link WaveletStore} and
* read the snapshot from there instead of computing it in the
- * DeltaStoreBasedWaveletState constructor
- *
- * TODO(soren): refine the persist() logic to make it batch successive
- * writes to storage, when write latency exceeds the intervals between
- * calls to persist()
+ * DeltaStoreBasedWaveletState constructor.
*
* @author [email protected] (Soren Lassen)
*/
@@ -99,11 +93,6 @@ class DeltaStoreBasedWaveletState implem
*/
public static DeltaStoreBasedWaveletState create(DeltaStore.DeltasAccess
deltasAccess,
Executor persistExecutor) throws PersistenceException {
- // Note that the logic in persist() depends on persistExecutor being
single-threaded.
- // TODO(soren): finesse the logic in persist() so it
- // doesn't require it to be single-threaded, because it would be useful to
be
- // able to use a shared executor with a thread-count set to the
appropriate level
- // of write parallelism for the storage subsystem.
if (deltasAccess.isEmpty()) {
return new DeltaStoreBasedWaveletState(deltasAccess,
ImmutableList.<WaveletDeltaRecord>of(),
null, persistExecutor);
@@ -122,7 +111,7 @@ class DeltaStoreBasedWaveletState implem
}
/**
- * Reads all deltas from
+ * Reads all deltas from persistent storage.
*/
private static ImmutableList<WaveletDeltaRecord>
readAll(WaveletDeltaRecordReader reader)
throws IOException{
@@ -153,6 +142,64 @@ class DeltaStoreBasedWaveletState implem
private final Executor persistExecutor;
private final HashedVersion versionZero;
private final DeltaStore.DeltasAccess deltasAccess;
+
+ /** The lock that guards access to persistence related state. */
+ private Object persistLock = new Object();
+
+ /**
+ * Indicates the version of the latest appended delta that was already
requested to be
+ * persisted.
+ */
+ private HashedVersion latestVersionToPersist = null;
+
+ /** The persist task that will be executed next. */
+ private ListenableFutureTask<Void> nextPersistTask = null;
+
+ /**
+ * Processes the persist task and checks if there is another task to do when
+ * one task is done. In such a case, it writes all waiting to be persisted
+ * deltas to persistent storage in one operation.
+ */
+ private final Callable<Void> persisterTask = new Callable<Void>() {
+ @Override
+ public Void call() throws PersistenceException {
+ HashedVersion last;
+ HashedVersion version;
+ synchronized (persistLock) {
+ last = lastPersistedVersion.get();
+ version = latestVersionToPersist;
+ }
+ if (last != null && version.getVersion() <= last.getVersion()) {
+ LOG.info("Attempt to persist version " + version + " smaller than last
persisted version "
+ + last);
+ // Done, version is already persisted.
+ version = last;
+ } else {
+ ImmutableList.Builder<WaveletDeltaRecord> deltas =
ImmutableList.builder();
+ HashedVersion v = (last == null) ? versionZero : last;
+ do {
+ WaveletDeltaRecord d =
+ new WaveletDeltaRecord(v, appliedDeltas.get(v),
transformedDeltas.get(v));
+ deltas.add(d);
+ v = d.getResultingVersion();
+ } while (v.getVersion() < version.getVersion());
+ Preconditions.checkState(v.equals(version));
+ deltasAccess.append(deltas.build());
+ }
+ synchronized (persistLock) {
+ Preconditions.checkState(last == lastPersistedVersion.get(),
+ "lastPersistedVersion changed while we were writing to storage");
+ lastPersistedVersion.set(version);
+ if (nextPersistTask != null) {
+ persistExecutor.execute(nextPersistTask);
+ nextPersistTask = null;
+ } else {
+ latestVersionToPersist = null;
+ }
+ }
+ return null;
+ }
+ };
/** Keyed by appliedAtVersion. */
private final NavigableMap<HashedVersion,
ByteStringMessage<ProtocolAppliedWaveletDelta>>
@@ -314,42 +361,30 @@ class DeltaStoreBasedWaveletState implem
@Override
public ListenableFuture<Void> persist(final HashedVersion version) {
- Preconditions.checkArgument(version.getVersion() > 0,
- "Cannot persist non-positive version %s", version);
- Preconditions.checkArgument(isDeltaBoundary(version),
- "Version to persist %s matches no delta", version);
-
- // The following logic relies on persistExecutor being single-threaded,
- // so no two tasks execute in parallel.
- ListenableFutureTask<Void> resultTask = new ListenableFutureTask<Void>(
- new Callable<Void>() {
- @Override
- public Void call() throws PersistenceException {
- HashedVersion last = lastPersistedVersion.get();
- if (last != null && version.getVersion() <= last.getVersion()) {
- LOG.info("Attempt to persist version " + version
- + " smaller than last persisted version " + last);
- // done, version is already persisted
- } else {
- ImmutableList.Builder<WaveletDeltaRecord> deltas =
ImmutableList.builder();
- HashedVersion v = (last == null) ? versionZero : last;
- do {
- WaveletDeltaRecord d =
- new WaveletDeltaRecord(v, appliedDeltas.get(v),
transformedDeltas.get(v));
- deltas.add(d);
- v = d.getResultingVersion();
- } while (v.getVersion() < version.getVersion());
- Preconditions.checkState(v.equals(version));
- deltasAccess.append(deltas.build());
- Preconditions.checkState(last == lastPersistedVersion.get(),
- "lastPersistedVersion changed while we were writing to
storage");
- lastPersistedVersion.set(version);
- }
- return null;
- }
- });
- persistExecutor.execute(resultTask);
- return resultTask;
+ Preconditions.checkArgument(version.getVersion() > 0, "Cannot persist
non-positive version %s",
+ version);
+ Preconditions.checkArgument(isDeltaBoundary(version), "Version to persist
%s matches no delta",
+ version);
+ synchronized (persistLock) {
+ if (latestVersionToPersist != null) {
+ // There's a persist task in flight.
+ if (version.getVersion() <= latestVersionToPersist.getVersion()) {
+ LOG.info("Attempt to persist version " + version
+ + " smaller than last version requested " +
latestVersionToPersist);
+ } else {
+ latestVersionToPersist = version;
+ }
+ if (nextPersistTask == null) {
+ nextPersistTask = new ListenableFutureTask<Void>(persisterTask);
+ }
+ return nextPersistTask;
+ } else {
+ latestVersionToPersist = version;
+ ListenableFutureTask<Void> resultTask = new
ListenableFutureTask<Void>(persisterTask);
+ persistExecutor.execute(resultTask);
+ return resultTask;
+ }
+ }
}
@Override
Modified:
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveServerModule.java
URL:
http://svn.apache.org/viewvc/incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveServerModule.java?rev=1196213&r1=1196212&r2=1196213&view=diff
==============================================================================
---
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveServerModule.java
(original)
+++
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveServerModule.java
Tue Nov 1 19:04:58 2011
@@ -57,12 +57,15 @@ public class WaveServerModule extends Ab
// TODO(soren): move to global config file
private static final int LISTENER_EXECUTOR_THREAD_COUNT = 2;
private static final int WAVELET_LOAD_EXECUTOR_THREAD_COUNT = 2;
+ private static final int DELTA_PERSIST_EXECUTOR_THREAD_COUNT = 2;
private static final IdURIEncoderDecoder URI_CODEC =
new IdURIEncoderDecoder(new JavaUrlCodec());
private static final HashedVersionFactory HASH_FACTORY = new
HashedVersionFactoryImpl(URI_CODEC);
private final Executor waveletLoadExecutor =
Executors.newFixedThreadPool(WAVELET_LOAD_EXECUTOR_THREAD_COUNT);
+ private final Executor persistExecutor =
+ Executors.newFixedThreadPool(DELTA_PERSIST_EXECUTOR_THREAD_COUNT);
private final boolean enableFederation;
public WaveServerModule(boolean enableFederation) {
@@ -114,7 +117,7 @@ public class WaveServerModule extends Ab
public LocalWaveletContainer create(WaveletNotificationSubscriber
notifiee,
WaveletName waveletName, String waveDomain) {
return new LocalWaveletContainerImpl(waveletName, notifiee,
loadWaveletState(
- waveletLoadExecutor, deltaStore, waveletName), waveDomain);
+ waveletLoadExecutor, deltaStore, waveletName, persistExecutor),
waveDomain);
}
};
}
@@ -128,7 +131,7 @@ public class WaveServerModule extends Ab
public RemoteWaveletContainer create(WaveletNotificationSubscriber
notifiee,
WaveletName waveletName, String waveDomain) {
return new RemoteWaveletContainerImpl(waveletName, notifiee,
- loadWaveletState(waveletLoadExecutor, deltaStore, waveletName));
+ loadWaveletState(waveletLoadExecutor, deltaStore, waveletName,
persistExecutor));
}
};
}
@@ -147,21 +150,18 @@ public class WaveServerModule extends Ab
}
/**
- * Returns a future whose result is the state of the wavelet after it has
- * been loaded from storage.
- * Any failure is reported as a {@link PersistenceException}.
+ * Returns a future whose result is the state of the wavelet after it has
been
+ * loaded from storage. Any failure is reported as a
+ * {@link PersistenceException}.
*/
@VisibleForTesting
- static ListenableFuture<DeltaStoreBasedWaveletState> loadWaveletState(
- Executor executor, final DeltaStore deltaStore, final WaveletName
waveletName) {
+ static ListenableFuture<DeltaStoreBasedWaveletState>
loadWaveletState(Executor executor,
+ final DeltaStore deltaStore, final WaveletName waveletName, final
Executor persistExecutor) {
ListenableFutureTask<DeltaStoreBasedWaveletState> task =
new ListenableFutureTask<DeltaStoreBasedWaveletState>(
new Callable<DeltaStoreBasedWaveletState>() {
@Override
public DeltaStoreBasedWaveletState call() throws
PersistenceException {
- // One executor per wave is inefficient; see comment in
- // DeltaStoreBasedWaveletState.
- Executor persistExecutor = Executors.newSingleThreadExecutor();
return
DeltaStoreBasedWaveletState.create(deltaStore.open(waveletName),
persistExecutor);
}
Modified:
incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/WaveServerTest.java
URL:
http://svn.apache.org/viewvc/incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/WaveServerTest.java?rev=1196213&r1=1196212&r2=1196213&view=diff
==============================================================================
---
incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/WaveServerTest.java
(original)
+++
incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/WaveServerTest.java
Tue Nov 1 19:04:58 2011
@@ -106,7 +106,8 @@ public class WaveServerTest extends Test
public LocalWaveletContainer create(WaveletNotificationSubscriber
notifiee,
WaveletName waveletName, String waveDomain) {
return new LocalWaveletContainerImpl(waveletName, notifiee,
- WaveServerModule.loadWaveletState(executor, deltaStore,
waveletName), waveDomain);
+ WaveServerModule.loadWaveletState(executor, deltaStore,
waveletName, executor),
+ waveDomain);
}
};