Author: yurize
Date: Tue May  1 09:00:31 2012
New Revision: 1332579

URL: http://svn.apache.org/viewvc?rev=1332579&view=rev
Log:
Makes the threads count for executors that handle wavelet loading and
lookup configurable
https://reviews.apache.org/r/4924/

Added:
    
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/LookupExecutor.java
Modified:
    incubator/wave/trunk/server-config.xml
    incubator/wave/trunk/server.config.example
    incubator/wave/trunk/src/org/waveprotocol/box/server/CoreSettings.java
    incubator/wave/trunk/src/org/waveprotocol/box/server/ServerMain.java
    incubator/wave/trunk/src/org/waveprotocol/box/server/ServerModule.java
    
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/LocalWaveletContainerImpl.java
    
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/RemoteWaveletContainerImpl.java
    incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveMap.java
    
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveServerModule.java
    
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveletContainerImpl.java
    
incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/LocalWaveletContainerImplTest.java
    
incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/MemorySearchProviderTest.java
    
incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/WaveMapTest.java
    
incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/WaveServerTest.java
    
incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/WaveletContainerTest.java

Modified: incubator/wave/trunk/server-config.xml
URL: 
http://svn.apache.org/viewvc/incubator/wave/trunk/server-config.xml?rev=1332579&r1=1332578&r2=1332579&view=diff
==============================================================================
--- incubator/wave/trunk/server-config.xml (original)
+++ incubator/wave/trunk/server-config.xml Tue May  1 09:00:31 2012
@@ -4,8 +4,7 @@
   <!-- Properties for server.config -->
   <property name="wave_server_domain" value="local.net" />
   <property name="http_frontend_public_address" value="localhost:9898" />
-  <property name="http_frontend_addresses"
-            value="${http_frontend_public_address}" />
+  <property name="http_frontend_addresses" 
value="${http_frontend_public_address}" />
   <property name="resource_bases" value="./war" />
   <property name="signer_info_store_type" value="file" />
   <property name="attachment_store_type" value="disk" />
@@ -20,8 +19,10 @@
   <property name="admin_user" value="@${wave_server_domain}" />
   <property name="welcome_wave_id" value="" />
   <property name="listener_executor_thread_count" value="1" />
-  <property name="wavelet_load_executor_thread_count" value="2" />
-  <property name="delta_persist_executor_thread_count" value="2" />
+  <property name="wavelet_load_executor_thread_count" value="1" />
+  <property name="delta_persist_executor_thread_count" value="1" />
+  <property name="storage_continuation_executor_thread_count" value="1" />
+  <property name="lookup_executor_thread_count" value="1" />
   <property name="disable_registration" value="false" />
   <property name="enable_import" value="false" />
   <property name="enable_ssl" value="false" />
@@ -87,6 +88,8 @@
           <token key="LISTENER_EXECUTOR_THREAD_COUNT" 
value="${listener_executor_thread_count}" />
           <token key="WAVELET_LOAD_EXECUTOR_THREAD_COUNT" 
value="${wavelet_load_executor_thread_count}" />
           <token key="DELTA_PERSIST_EXECUTOR_THREAD_COUNT" 
value="${delta_persist_executor_thread_count}" />
+          <token key="STORAGE_CONTINUATION_EXECUTOR_THREAD_COUNT" 
value="${storage_continuation_executor_thread_count}" />
+          <token key="LOOKUP_EXECUTOR_THREAD_COUNT" 
value="${lookup_executor_thread_count}" />
           <token key="DISABLE_REGISTRATION" value="${disable_registration}" />
           <token key="ENABLE_IMPORT" value="${enable_import}" />
           <token key="ENABLE_SSL" value="${enable_ssl}" />

Modified: incubator/wave/trunk/server.config.example
URL: 
http://svn.apache.org/viewvc/incubator/wave/trunk/server.config.example?rev=1332579&r1=1332578&r2=1332579&view=diff
==============================================================================
--- incubator/wave/trunk/server.config.example (original)
+++ incubator/wave/trunk/server.config.example Tue May  1 09:00:31 2012
@@ -87,14 +87,22 @@ admin_user = @ADMIN_USER@
 # Default value: "" (empty)
 welcome_wave_id = @WELCOME_WAVE_ID@
 
-# Thread counts
-#The number of threads to listen on wavelet updates. Default value: 1
+# The number of threads to listen on wavelet updates. Default value: 1
 listener_executor_thread_count = @LISTENER_EXECUTOR_THREAD_COUNT@
-#The number of threads for loading wavelets. Default value: 2
+
+# The number of threads for loading wavelets. Default value: 1
 wavelet_load_executor_thread_count = @WAVELET_LOAD_EXECUTOR_THREAD_COUNT@
-#The number of threads to persist deltas. Default value: 2
+
+# The number of threads to persist deltas. Default value: 1
 delta_persist_executor_thread_count = @DELTA_PERSIST_EXECUTOR_THREAD_COUNT@
 
+# The number of threads to perform post wavelet loading logic. Default value: 1
+storage_continuation_executor_thread_count = 
@STORAGE_CONTINUATION_EXECUTOR_THREAD_COUNT@
+
+# The number of threads for looking up the wavelet ids
+# while creating a list of all wavelets in the persistent storage. Default 
value: 1
+lookup_executor_thread_count = @LOOKUP_EXECUTOR_THREAD_COUNT@
+
 # To enable federation, edit the server.federation.config file and include it 
here.
 # Or run ant -f server-config.xml server-federation-config
 # If not using the server-config.xml ant script - it is possible just to 
comment the line.

Modified: incubator/wave/trunk/src/org/waveprotocol/box/server/CoreSettings.java
URL: 
http://svn.apache.org/viewvc/incubator/wave/trunk/src/org/waveprotocol/box/server/CoreSettings.java?rev=1332579&r1=1332578&r2=1332579&view=diff
==============================================================================
--- incubator/wave/trunk/src/org/waveprotocol/box/server/CoreSettings.java 
(original)
+++ incubator/wave/trunk/src/org/waveprotocol/box/server/CoreSettings.java Tue 
May  1 09:00:31 2012
@@ -53,6 +53,8 @@ public class CoreSettings {
   public static final String LISTENER_EXECUTOR_THREAD_COUNT = 
"listener_executor_thread_count";
   public static final String WAVELET_LOAD_EXECUTOR_THREAD_COUNT = 
"wavelet_load_executor_thread_count";
   public static final String DELTA_PERSIST_EXECUTOR_THREAD_COUNT = 
"delta_persist_executor_thread_count";
+  public static final String STORAGE_CONTINUATION_EXECUTOR_THREAD_COUNT = 
"storage_continuation_executor_thread_count";
+  public static final String LOOKUP_EXECUTOR_THREAD_COUNT = 
"lookup_executor_thread_count";
   public static final String DISABLE_REGISTRATION = "disable_registration";
   public static final String ENABLE_SSL = "enable_ssl";
   public static final String SSL_KEYSTORE_PATH = "ssl_keystore_path";
@@ -167,14 +169,24 @@ public class CoreSettings {
 
   @Setting(name = WAVELET_LOAD_EXECUTOR_THREAD_COUNT,
       description = "The number of threads for loading wavelets.",
-      defaultValue = "2")
+      defaultValue = "1")
   private static int waveletLoadExecutorThreadCount;
 
   @Setting(name = DELTA_PERSIST_EXECUTOR_THREAD_COUNT,
       description = "The number of threads to persist deltas.",
-      defaultValue = "2")
+      defaultValue = "1")
   private static int deltaPersistExecutorThreadCount;
 
+  @Setting(name = STORAGE_CONTINUATION_EXECUTOR_THREAD_COUNT,
+      description = "The number of threads to perform post wavelet loading 
logic.",
+      defaultValue = "1")
+  private static int storageContinuationExecutorThreadCount;
+
+  @Setting(name = LOOKUP_EXECUTOR_THREAD_COUNT,
+      description = "The number of threads to perform post wavelet loading 
logic.",
+      defaultValue = "1")
+  private static int lookupExecutorThreadCount;
+
   @Setting(name = DISABLE_REGISTRATION,
       description = "Prevents the register page from being available to 
anyone", defaultValue = "false")
   private static boolean disableRegistration;

Modified: incubator/wave/trunk/src/org/waveprotocol/box/server/ServerMain.java
URL: 
http://svn.apache.org/viewvc/incubator/wave/trunk/src/org/waveprotocol/box/server/ServerMain.java?rev=1332579&r1=1332578&r2=1332579&view=diff
==============================================================================
--- incubator/wave/trunk/src/org/waveprotocol/box/server/ServerMain.java 
(original)
+++ incubator/wave/trunk/src/org/waveprotocol/box/server/ServerMain.java Tue 
May  1 09:00:31 2012
@@ -150,6 +150,10 @@ public class ServerMain {
         Names.named(CoreSettings.WAVELET_LOAD_EXECUTOR_THREAD_COUNT)));
     int deltaPersistCount = settingsInjector.getInstance(Key.get(Integer.class,
         Names.named(CoreSettings.DELTA_PERSIST_EXECUTOR_THREAD_COUNT)));
+    int storageContinuationCount = 
settingsInjector.getInstance(Key.get(Integer.class,
+        Names.named(CoreSettings.STORAGE_CONTINUATION_EXECUTOR_THREAD_COUNT)));
+    int lookupCount = settingsInjector.getInstance(Key.get(Integer.class,
+        Names.named(CoreSettings.LOOKUP_EXECUTOR_THREAD_COUNT)));
 
     if (enableFederation) {
       Module federationSettings =
@@ -162,8 +166,8 @@ public class ServerMain {
     PersistenceModule persistenceModule = 
settingsInjector.getInstance(PersistenceModule.class);
     Injector injector =
         settingsInjector.createChildInjector(new 
ServerModule(enableFederation, listenerCount,
-            waveletLoadCount, deltaPersistCount), new RobotApiModule(), 
federationModule,
-            persistenceModule);
+            waveletLoadCount, deltaPersistCount, storageContinuationCount, 
lookupCount),
+            new RobotApiModule(), federationModule, persistenceModule);
 
     ServerRpcProvider server = injector.getInstance(ServerRpcProvider.class);
     WaveBus waveBus = injector.getInstance(WaveBus.class);

Modified: incubator/wave/trunk/src/org/waveprotocol/box/server/ServerModule.java
URL: 
http://svn.apache.org/viewvc/incubator/wave/trunk/src/org/waveprotocol/box/server/ServerModule.java?rev=1332579&r1=1332578&r2=1332579&view=diff
==============================================================================
--- incubator/wave/trunk/src/org/waveprotocol/box/server/ServerModule.java 
(original)
+++ incubator/wave/trunk/src/org/waveprotocol/box/server/ServerModule.java Tue 
May  1 09:00:31 2012
@@ -32,6 +32,7 @@ import org.waveprotocol.box.server.robot
 import org.waveprotocol.box.server.robots.register.RobotRegistrarImpl;
 import org.waveprotocol.box.server.rpc.ProtoSerializer;
 import org.waveprotocol.box.server.rpc.ServerRpcProvider;
+import org.waveprotocol.box.server.waveserver.LookupExecutor;
 import org.waveprotocol.box.server.waveserver.WaveServerImpl;
 import org.waveprotocol.box.server.waveserver.WaveServerModule;
 import org.waveprotocol.wave.federation.FederationHostBridge;
@@ -47,6 +48,8 @@ import org.waveprotocol.wave.model.id.To
 import java.security.SecureRandom;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 
 import javax.security.auth.login.Configuration;
 
@@ -60,13 +63,17 @@ public class ServerModule extends Abstra
   private final int listenerCount;
   private final int waveletLoadCount;
   private final int deltaPersistCount;
+  private final int storageContinuationCount;
+  private final int lookupCount;
 
   public ServerModule(boolean enableFederation, int listenerCount, int 
waveletLoadCount,
-      int deltaPersistCount) {
+      int deltaPersistCount, int storageContinuationCount, int lookupCount) {
     this.enableFederation = enableFederation;
     this.listenerCount = listenerCount;
     this.waveletLoadCount = waveletLoadCount;
     this.deltaPersistCount = deltaPersistCount;
+    this.storageContinuationCount = storageContinuationCount;
+    this.lookupCount = lookupCount;
   }
 
   @Override
@@ -80,8 +87,11 @@ public class ServerModule extends Abstra
     
bind(WaveletFederationProvider.class).annotatedWith(FederationHostBridge.class).to(
         WaveServerImpl.class);
 
+    bind(Executor.class).annotatedWith(LookupExecutor.class).toInstance(
+        Executors.newFixedThreadPool(lookupCount));
+
     install(new WaveServerModule(enableFederation, listenerCount, 
waveletLoadCount,
-        deltaPersistCount));
+        deltaPersistCount, storageContinuationCount));
     TypeLiteral<List<String>> certs = new TypeLiteral<List<String>>() {};
     bind(certs).annotatedWith(Names.named("certs")).toInstance(Arrays.<String> 
asList());
 

Modified: 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/LocalWaveletContainerImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/LocalWaveletContainerImpl.java?rev=1332579&r1=1332578&r2=1332579&view=diff
==============================================================================
--- 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/LocalWaveletContainerImpl.java
 (original)
+++ 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/LocalWaveletContainerImpl.java
 Tue May  1 09:00:31 2012
@@ -42,6 +42,8 @@ import org.waveprotocol.wave.model.versi
 import org.waveprotocol.wave.model.wave.ParticipantId;
 import org.waveprotocol.wave.util.logging.Log;
 
+import java.util.concurrent.Executor;
+
 /**
  * A local wavelet may be updated by submits. The local wavelet will perform
  * operational transformation on the submitted delta and assign it the latest
@@ -77,8 +79,9 @@ class LocalWaveletContainerImpl extends 
   }
 
   public LocalWaveletContainerImpl(WaveletName waveletName, 
WaveletNotificationSubscriber notifiee,
-      ListenableFuture<? extends WaveletState> waveletStateFuture, String 
waveDomain) {
-    super(waveletName, notifiee, waveletStateFuture, waveDomain);
+      ListenableFuture<? extends WaveletState> waveletStateFuture, String 
waveDomain,
+      Executor storageContinuationExecutor) {
+    super(waveletName, notifiee, waveletStateFuture, waveDomain, 
storageContinuationExecutor);
   }
 
   @Override

Added: 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/LookupExecutor.java
URL: 
http://svn.apache.org/viewvc/incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/LookupExecutor.java?rev=1332579&view=auto
==============================================================================
--- 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/LookupExecutor.java
 (added)
+++ 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/LookupExecutor.java
 Tue May  1 09:00:31 2012
@@ -0,0 +1,35 @@
+/**
+ * Copyright 2012 Apache Wave.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.waveprotocol.box.server.waveserver;
+
+import static java.lang.annotation.ElementType.FIELD;
+import static java.lang.annotation.ElementType.METHOD;
+import static java.lang.annotation.ElementType.PARAMETER;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+import com.google.inject.BindingAnnotation;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+
+/**
+ * Used to mark the lookup executor class.
+ *
+ * @author [email protected] (Yuri Zelikov)
+ */
+@BindingAnnotation @Target({ FIELD, PARAMETER, METHOD }) @Retention(RUNTIME)
+public @interface LookupExecutor {}
\ No newline at end of file

Modified: 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/RemoteWaveletContainerImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/RemoteWaveletContainerImpl.java?rev=1332579&r1=1332578&r2=1332579&view=diff
==============================================================================
--- 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/RemoteWaveletContainerImpl.java
 (original)
+++ 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/RemoteWaveletContainerImpl.java
 Tue May  1 09:00:31 2012
@@ -52,6 +52,7 @@ import org.waveprotocol.wave.util.loggin
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -74,11 +75,11 @@ class RemoteWaveletContainerImpl extends
    * constructor.
    */
   public RemoteWaveletContainerImpl(WaveletName waveletName, 
WaveletNotificationSubscriber notifiee,
-      ListenableFuture<? extends WaveletState> waveletStateFuture) {
+      ListenableFuture<? extends WaveletState> waveletStateFuture,
+      Executor storageContinuationExecutor) {
     // We pass here null for waveDomain because you have to be explicit
     // participant on remote wavelet to have access permission.
-    // TODO (Yuri Z.): check if the assumption above is correct.
-    super(waveletName, notifiee, waveletStateFuture, null);
+    super(waveletName, notifiee, waveletStateFuture, null, 
storageContinuationExecutor);
   }
 
   @Override

Modified: 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveMap.java
URL: 
http://svn.apache.org/viewvc/incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveMap.java?rev=1332579&r1=1332578&r2=1332579&view=diff
==============================================================================
--- 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveMap.java 
(original)
+++ 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveMap.java 
Tue May  1 09:00:31 2012
@@ -38,7 +38,6 @@ import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
 
 /**
  * A collection of wavelets, local and remote, held in memory.
@@ -75,12 +74,10 @@ public class WaveMap {
       final LocalWaveletContainer.Factory localFactory,
       final RemoteWaveletContainer.Factory remoteFactory,
       @Named(CoreSettings.WAVE_SERVER_DOMAIN) final String waveDomain,
-      WaveDigester digester) {
+      @LookupExecutor final Executor lookupExecutor) {
     // NOTE(anorth): DeltaAndSnapshotStore is more specific than necessary, but
     // helps Guice out.
-    // TODO(soren): inject a proper executor (with a pool of configurable size)
     this.store = waveletStore;
-    final Executor lookupExecutor = Executors.newSingleThreadExecutor();
     waves = new MapMaker().makeComputingMap(new Function<WaveId, Wave>() {
       @Override
       public Wave apply(WaveId waveId) {

Modified: 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveServerModule.java
URL: 
http://svn.apache.org/viewvc/incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveServerModule.java?rev=1332579&r1=1332578&r2=1332579&view=diff
==============================================================================
--- 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveServerModule.java
 (original)
+++ 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveServerModule.java
 Tue May  1 09:00:31 2012
@@ -61,14 +61,16 @@ public class WaveServerModule extends Ab
   private final int listenerExecutorThreadCount;
   private final Executor waveletLoadExecutor;
   private final Executor persistExecutor;
+  private final Executor storageContinuationExecutor;
   private final boolean enableFederation;
 
   public WaveServerModule(boolean enableFederation, int listenerCount, int 
waveletLoadCount,
-      int deltaPersistCount) {
+      int deltaPersistCount, int storageContinuationCount) {
     this.enableFederation = enableFederation;
     this.listenerExecutorThreadCount = listenerCount;
     waveletLoadExecutor = Executors.newFixedThreadPool(waveletLoadCount);
     persistExecutor = Executors.newFixedThreadPool(deltaPersistCount);
+    storageContinuationExecutor = 
Executors.newFixedThreadPool(storageContinuationCount);
   }
 
   @Override
@@ -116,7 +118,8 @@ public class WaveServerModule extends Ab
       public LocalWaveletContainer create(WaveletNotificationSubscriber 
notifiee,
           WaveletName waveletName, String waveDomain) {
         return new LocalWaveletContainerImpl(waveletName, notifiee, 
loadWaveletState(
-            waveletLoadExecutor, deltaStore, waveletName, persistExecutor), 
waveDomain);
+            waveletLoadExecutor, deltaStore, waveletName, persistExecutor), 
waveDomain,
+            storageContinuationExecutor);
       }
     };
   }
@@ -129,8 +132,9 @@ public class WaveServerModule extends Ab
       @Override
       public RemoteWaveletContainer create(WaveletNotificationSubscriber 
notifiee,
           WaveletName waveletName, String waveDomain) {
-        return new RemoteWaveletContainerImpl(waveletName, notifiee,
-            loadWaveletState(waveletLoadExecutor, deltaStore, waveletName, 
persistExecutor));
+        return new RemoteWaveletContainerImpl(waveletName, notifiee, 
loadWaveletState(
+            waveletLoadExecutor, deltaStore, waveletName, persistExecutor),
+            storageContinuationExecutor);
       }
     };
   }

Modified: 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveletContainerImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveletContainerImpl.java?rev=1332579&r1=1332578&r2=1332579&view=diff
==============================================================================
--- 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveletContainerImpl.java
 (original)
+++ 
incubator/wave/trunk/src/org/waveprotocol/box/server/waveserver/WaveletContainerImpl.java
 Tue May  1 09:00:31 2012
@@ -50,7 +50,6 @@ import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -85,9 +84,7 @@ abstract class WaveletContainerImpl impl
     CORRUPTED
   }
 
-  // TODO(soren): inject an executor which can be shared with other wavelets
-  private final Executor storageContinuationExecutor =
-      Executors.newSingleThreadExecutor();
+  private final Executor storageContinuationExecutor;
 
   private final Lock readLock;
   private final ReentrantReadWriteLock.WriteLock writeLock;
@@ -107,14 +104,17 @@ abstract class WaveletContainerImpl impl
    * @param notifiee the subscriber to notify of wavelet updates and commits.
    * @param waveletState the wavelet's delta history and current state.
    * @param waveDomain the wave server domain.
+   * @param storageContinuationExecutor the executor used to perform post 
wavelet loading logic.
    */
   public WaveletContainerImpl(WaveletName waveletName, 
WaveletNotificationSubscriber notifiee,
-      final ListenableFuture<? extends WaveletState> waveletStateFuture, 
String waveDomain) {
+      final ListenableFuture<? extends WaveletState> waveletStateFuture, 
String waveDomain,
+      Executor storageContinuationExecutor) {
     this.waveletName = waveletName;
     this.notifiee = notifiee;
     this.sharedDomainParticipantId =
         waveDomain != null ? 
ParticipantIdUtil.makeUnsafeSharedDomainParticipantId(waveDomain)
             : null;
+    this.storageContinuationExecutor = storageContinuationExecutor;
     ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
     this.readLock = readWriteLock.readLock();
     this.writeLock = readWriteLock.writeLock();

Modified: 
incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/LocalWaveletContainerImplTest.java
URL: 
http://svn.apache.org/viewvc/incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/LocalWaveletContainerImplTest.java?rev=1332579&r1=1332578&r2=1332579&view=diff
==============================================================================
--- 
incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/LocalWaveletContainerImplTest.java
 (original)
+++ 
incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/LocalWaveletContainerImplTest.java
 Tue May  1 09:00:31 2012
@@ -54,6 +54,7 @@ public class LocalWaveletContainerImplTe
       new IdURIEncoderDecoder(new JavaUrlCodec());
   private static final HashedVersionFactory HASH_FACTORY = new 
HashedVersionFactoryImpl(URI_CODEC);
   private static final Executor PERSIST_EXECUTOR = 
MoreExecutors.sameThreadExecutor();
+  private static final Executor STORAGE_CONTINUATION_EXECUTOR = 
MoreExecutors.sameThreadExecutor();
 
   private static final WaveletName WAVELET_NAME = WaveletName.of("a", "a", 
"b", "b");
   private static final ProtocolSignature SIGNATURE = 
ProtocolSignature.newBuilder()
@@ -87,7 +88,7 @@ public class LocalWaveletContainerImplTe
     WaveletState waveletState = 
DeltaStoreBasedWaveletState.create(deltaStore.open(WAVELET_NAME),
         PERSIST_EXECUTOR);
     wavelet = new LocalWaveletContainerImpl(WAVELET_NAME, notifiee,
-        Futures.immediateFuture(waveletState), null);
+        Futures.immediateFuture(waveletState), null, 
STORAGE_CONTINUATION_EXECUTOR);
     wavelet.awaitLoad();
   }
 

Modified: 
incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/MemorySearchProviderTest.java
URL: 
http://svn.apache.org/viewvc/incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/MemorySearchProviderTest.java?rev=1332579&r1=1332578&r2=1332579&view=diff
==============================================================================
--- 
incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/MemorySearchProviderTest.java
 (original)
+++ 
incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/MemorySearchProviderTest.java
 Tue May  1 09:00:31 2012
@@ -184,6 +184,7 @@ public class MemorySearchProviderTest ex
 
     final DeltaStore deltaStore = new MemoryDeltaStore();
     final Executor persistExecutor = MoreExecutors.sameThreadExecutor();
+    final Executor storageContinuationExecutor = 
MoreExecutors.sameThreadExecutor();
     LocalWaveletContainer.Factory localWaveletContainerFactory =
         new LocalWaveletContainer.Factory() {
           @Override
@@ -197,13 +198,13 @@ public class MemorySearchProviderTest ex
               throw new RuntimeException(e);
             }
             return new LocalWaveletContainerImpl(waveletName, notifiee,
-                Futures.immediateFuture(waveletState), DOMAIN);
+                Futures.immediateFuture(waveletState), DOMAIN, 
storageContinuationExecutor);
           }
         };
 
     waveMap =
         new WaveMap(waveletStore, notifiee, notifiee, 
localWaveletContainerFactory,
-            remoteWaveletContainerFactory, "example.com", digester);
+            remoteWaveletContainerFactory, "example.com", 
storageContinuationExecutor);
     searchProvider = new MemorySearchProvider(notifiee, DOMAIN, digester, 
waveMap, subscriber);
   }
 

Modified: 
incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/WaveMapTest.java
URL: 
http://svn.apache.org/viewvc/incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/WaveMapTest.java?rev=1332579&r1=1332578&r2=1332579&view=diff
==============================================================================
--- 
incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/WaveMapTest.java
 (original)
+++ 
incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/WaveMapTest.java
 Tue May  1 09:00:31 2012
@@ -31,8 +31,6 @@ import org.mockito.MockitoAnnotations;
 import org.waveprotocol.box.common.ExceptionalIterator;
 import org.waveprotocol.box.server.persistence.PersistenceException;
 import org.waveprotocol.box.server.persistence.memory.MemoryDeltaStore;
-import org.waveprotocol.box.server.robots.util.ConversationUtil;
-import org.waveprotocol.wave.model.id.IdGenerator;
 import org.waveprotocol.wave.model.id.WaveId;
 import org.waveprotocol.wave.model.id.WaveletId;
 import org.waveprotocol.wave.model.id.WaveletName;
@@ -55,12 +53,8 @@ public class WaveMapTest extends TestCas
   @Mock private WaveletNotificationDispatcher notifiee;
   @Mock private RemoteWaveletContainer.Factory remoteWaveletContainerFactory;
 
-  @Mock private IdGenerator idGenerator;
-
   private DeltaAndSnapshotStore waveletStore;
   private WaveMap waveMap;
-  private ConversationUtil conversationUtil;
-  private WaveDigester digester;
 
   @Override
   protected void setUp() throws Exception {
@@ -68,6 +62,7 @@ public class WaveMapTest extends TestCas
 
     final DeltaStore deltaStore = new MemoryDeltaStore();
     final Executor persistExecutor = MoreExecutors.sameThreadExecutor();
+    final Executor storageContinuationExecutor = 
MoreExecutors.sameThreadExecutor();
     LocalWaveletContainer.Factory localWaveletContainerFactory =
         new LocalWaveletContainer.Factory() {
           @Override
@@ -81,16 +76,14 @@ public class WaveMapTest extends TestCas
               throw new RuntimeException(e);
             }
             return new LocalWaveletContainerImpl(waveletName, notifiee,
-                Futures.immediateFuture(waveletState), DOMAIN);
+                Futures.immediateFuture(waveletState), DOMAIN, 
storageContinuationExecutor);
           }
         };
 
-    conversationUtil = new ConversationUtil(idGenerator);
-    digester = new WaveDigester(conversationUtil);
     waveletStore = mock(DeltaAndSnapshotStore.class);
     waveMap =
         new WaveMap(waveletStore, notifiee, notifiee, 
localWaveletContainerFactory,
-            remoteWaveletContainerFactory, "example.com", digester);
+            remoteWaveletContainerFactory, "example.com", 
storageContinuationExecutor);
   }
 
   public void testWaveMapStartsEmpty() throws WaveServerException {

Modified: 
incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/WaveServerTest.java
URL: 
http://svn.apache.org/viewvc/incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/WaveServerTest.java?rev=1332579&r1=1332578&r2=1332579&view=diff
==============================================================================
--- 
incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/WaveServerTest.java
 (original)
+++ 
incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/WaveServerTest.java
 Tue May  1 09:00:31 2012
@@ -33,14 +33,12 @@ import org.mockito.MockitoAnnotations;
 import org.waveprotocol.box.common.ExceptionalIterator;
 import org.waveprotocol.box.server.common.CoreWaveletOperationSerializer;
 import org.waveprotocol.box.server.persistence.memory.MemoryDeltaStore;
-import org.waveprotocol.box.server.robots.util.ConversationUtil;
 import org.waveprotocol.box.server.waveserver.LocalWaveletContainer.Factory;
 import 
org.waveprotocol.box.server.waveserver.WaveletProvider.SubmitRequestListener;
 import org.waveprotocol.wave.federation.Proto.ProtocolSignature;
 import org.waveprotocol.wave.federation.Proto.ProtocolSignedDelta;
 import org.waveprotocol.wave.federation.Proto.ProtocolWaveletDelta;
 import org.waveprotocol.wave.federation.WaveletFederationProvider;
-import org.waveprotocol.wave.model.id.IdGenerator;
 import org.waveprotocol.wave.model.id.IdURIEncoderDecoder;
 import org.waveprotocol.wave.model.id.WaveId;
 import org.waveprotocol.wave.model.id.WaveletId;
@@ -57,7 +55,6 @@ import org.waveprotocol.wave.model.wave.
 import org.waveprotocol.wave.util.escapers.jvm.JavaUrlCodec;
 
 import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
 
 /**
  * @author [email protected] (Joseph Gentle)
@@ -85,11 +82,6 @@ public class WaveServerTest extends Test
   @Mock private WaveletFederationProvider federationRemote;
   @Mock private WaveletNotificationDispatcher notifiee;
   @Mock private RemoteWaveletContainer.Factory remoteWaveletContainerFactory;
-  
-  @Mock private IdGenerator idGenerator;
-
-  private ConversationUtil conversationUtil;
-  private WaveDigester digester;
 
   private CertificateManager certificateManager;
   private DeltaAndSnapshotStore waveletStore;
@@ -107,23 +99,24 @@ public class WaveServerTest extends Test
 
     certificateManager = new CertificateManagerImpl(true, localSigner, null, 
null);
     final DeltaStore deltaStore = new MemoryDeltaStore();
-    final Executor executor = Executors.newSingleThreadExecutor();
+    final Executor waveletLoadExecutor = MoreExecutors.sameThreadExecutor();
+    final Executor persistExecutor = MoreExecutors.sameThreadExecutor();
+    final Executor storageContinuationExecutor = 
MoreExecutors.sameThreadExecutor();
     Factory localWaveletContainerFactory = new LocalWaveletContainer.Factory() 
{
       @Override
       public LocalWaveletContainer create(WaveletNotificationSubscriber 
notifiee,
           WaveletName waveletName, String waveDomain) {
         return new LocalWaveletContainerImpl(waveletName, notifiee,
-            WaveServerModule.loadWaveletState(executor, deltaStore, 
waveletName, executor),
-            waveDomain);
+            WaveServerModule.loadWaveletState(waveletLoadExecutor, deltaStore, 
waveletName, persistExecutor),
+            waveDomain, storageContinuationExecutor);
       }
     };
 
     waveletStore = new DeltaStoreBasedSnapshotStore(deltaStore);
-    conversationUtil = new ConversationUtil(idGenerator);
-    digester = new WaveDigester(conversationUtil);
+    Executor lookupExecutor = MoreExecutors.sameThreadExecutor();
     waveMap =
         new WaveMap(waveletStore, notifiee, notifiee, 
localWaveletContainerFactory,
-            remoteWaveletContainerFactory, "example.com", digester);
+            remoteWaveletContainerFactory, "example.com", lookupExecutor);
     waveServer =
         new WaveServerImpl(MoreExecutors.sameThreadExecutor(), 
certificateManager,
             federationRemote, waveMap);

Modified: 
incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/WaveletContainerTest.java
URL: 
http://svn.apache.org/viewvc/incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/WaveletContainerTest.java?rev=1332579&r1=1332578&r2=1332579&view=diff
==============================================================================
--- 
incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/WaveletContainerTest.java
 (original)
+++ 
incubator/wave/trunk/test/org/waveprotocol/box/server/waveserver/WaveletContainerTest.java
 Tue May  1 09:00:31 2012
@@ -70,6 +70,7 @@ public class WaveletContainerTest extend
       new IdURIEncoderDecoder(new JavaUrlCodec());
   private static final HashedVersionFactory HASH_FACTORY = new 
HashedVersionFactoryImpl(URI_CODEC);
   private static final Executor PERSIST_EXECUTOR = 
MoreExecutors.sameThreadExecutor();
+  private static final Executor STORAGE_CONTINUATION_EXECUTOR = 
MoreExecutors.sameThreadExecutor();
 
   private static final String localDomain = "example.com";
   private static final WaveletName localWaveletName = WaveletName.of(
@@ -123,12 +124,12 @@ public class WaveletContainerTest extend
     WaveletState localWaveletState =
         DeltaStoreBasedWaveletState.create(deltaStore.open(localWaveletName), 
PERSIST_EXECUTOR);
     localWavelet = new LocalWaveletContainerImpl(localWaveletName, notifiee,
-        Futures.immediateFuture(localWaveletState), localDomain);
+        Futures.immediateFuture(localWaveletState), localDomain, 
STORAGE_CONTINUATION_EXECUTOR);
     localWavelet.awaitLoad();
     WaveletState remoteWaveletState =
         DeltaStoreBasedWaveletState.create(deltaStore.open(remoteWaveletName), 
PERSIST_EXECUTOR);
     remoteWavelet = new RemoteWaveletContainerImpl(remoteWaveletName, notifiee,
-        Futures.immediateFuture(remoteWaveletState));
+        Futures.immediateFuture(remoteWaveletState), 
STORAGE_CONTINUATION_EXECUTOR);
     remoteWavelet.awaitLoad();
   }
 


Reply via email to