Repository: incubator-wave Updated Branches: refs/heads/master de3cd2460 -> 407074736
Adds shutdown manager. https://reviews.apache.org/r/45934 Project: http://git-wip-us.apache.org/repos/asf/incubator-wave/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-wave/commit/40707473 Tree: http://git-wip-us.apache.org/repos/asf/incubator-wave/tree/40707473 Diff: http://git-wip-us.apache.org/repos/asf/incubator-wave/diff/40707473 Branch: refs/heads/master Commit: 407074736d0db6f741504601379ac750bfbaea23 Parents: de3cd24 Author: Yuri Zelikov <yur...@apache.org> Authored: Mon Apr 25 21:00:29 2016 +0300 Committer: Yuri Zelikov <yur...@apache.org> Committed: Mon Apr 25 21:03:14 2016 +0300 ---------------------------------------------------------------------- wave/config/reference.conf | 8 +- .../org/waveprotocol/box/server/ServerMain.java | 14 +++ .../server/executor/RequestScopeExecutor.java | 13 +- .../executor/ScheduledRequestScopeExecutor.java | 25 ++-- .../persistence/file/FileDeltaCollection.java | 75 +++++++++--- .../box/server/shutdown/LifeCycle.java | 118 +++++++++++++++++++ .../box/server/shutdown/ShutdownManager.java | 109 +++++++++++++++++ .../box/server/shutdown/ShutdownPriority.java | 34 ++++++ .../box/server/shutdown/Shutdownable.java | 28 +++++ .../box/server/waveserver/WaveMap.java | 20 ++-- .../SimpleSearchProviderImplTest.java | 10 +- .../box/server/waveserver/WaveMapTest.java | 9 +- .../box/server/waveserver/WaveServerTest.java | 8 +- 13 files changed, 429 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/40707473/wave/config/reference.conf ---------------------------------------------------------------------- diff --git a/wave/config/reference.conf b/wave/config/reference.conf index 8142f9b..425a576 100644 --- a/wave/config/reference.conf +++ b/wave/config/reference.conf @@ -122,6 +122,12 @@ core { # Database's name. Default name: wiab mongodb_database : wiab + + # Number of waves in memory cache. + wave_cache_size = 1000 + + # Duration to keep the waves in cache. + wave_cache_expire = 60m } network { @@ -254,4 +260,4 @@ federation { # Set true to disable the verification of signers (certificates) waveserver_disable_signer_verification : true -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/40707473/wave/src/main/java/org/waveprotocol/box/server/ServerMain.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/ServerMain.java b/wave/src/main/java/org/waveprotocol/box/server/ServerMain.java index bfa3b92..14e2491 100644 --- a/wave/src/main/java/org/waveprotocol/box/server/ServerMain.java +++ b/wave/src/main/java/org/waveprotocol/box/server/ServerMain.java @@ -51,6 +51,9 @@ import org.waveprotocol.box.server.robots.dataapi.DataApiOAuthServlet; import org.waveprotocol.box.server.robots.dataapi.DataApiServlet; import org.waveprotocol.box.server.robots.passive.RobotsGateway; import org.waveprotocol.box.server.rpc.*; +import org.waveprotocol.box.server.shutdown.ShutdownManager; +import org.waveprotocol.box.server.shutdown.ShutdownPriority; +import org.waveprotocol.box.server.shutdown.Shutdownable; import org.waveprotocol.box.server.stat.RequestScopeFilter; import org.waveprotocol.box.server.stat.StatuszServlet; import org.waveprotocol.box.server.stat.TimingFilter; @@ -167,6 +170,7 @@ public class ServerMain { initializeFrontend(injector, server, waveBus); initializeFederation(injector); initializeSearch(injector, waveBus); + initializeShutdownHandler(server); LOG.info("Starting server"); server.startWebSocketServer(injector); @@ -284,4 +288,14 @@ public class ServerMain { WaveIndexer waveIndexer = injector.getInstance(WaveIndexer.class); waveIndexer.remakeIndex(); } + + private static void initializeShutdownHandler(final ServerRpcProvider server) { + ShutdownManager.getInstance().register(new Shutdownable() { + + @Override + public void shutdown() throws Exception { + server.stopServer(); + } + }, ServerMain.class.getSimpleName(), ShutdownPriority.Server); + } } http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/40707473/wave/src/main/java/org/waveprotocol/box/server/executor/RequestScopeExecutor.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/executor/RequestScopeExecutor.java b/wave/src/main/java/org/waveprotocol/box/server/executor/RequestScopeExecutor.java index 40c2039..845b0df 100644 --- a/wave/src/main/java/org/waveprotocol/box/server/executor/RequestScopeExecutor.java +++ b/wave/src/main/java/org/waveprotocol/box/server/executor/RequestScopeExecutor.java @@ -25,6 +25,9 @@ import java.util.concurrent.ExecutorService; import java.util.logging.Level; import java.util.logging.Logger; +import org.waveprotocol.box.server.shutdown.ShutdownManager; +import org.waveprotocol.box.server.shutdown.ShutdownPriority; +import org.waveprotocol.box.server.shutdown.Shutdownable; import org.waveprotocol.box.stat.RequestScope; import org.waveprotocol.box.stat.Timing; import org.waveprotocol.wave.model.util.Preconditions; @@ -38,7 +41,7 @@ import org.waveprotocol.wave.model.util.Preconditions; * @author akapla...@gmail.com (A. Kaplanov) */ @SuppressWarnings("rawtypes") -public class RequestScopeExecutor implements Executor { +public class RequestScopeExecutor implements Executor, Shutdownable { private final static Logger LOG = Logger.getLogger(RequestScopeExecutor.class.getName()); private ExecutorService executor; @@ -50,6 +53,7 @@ public class RequestScopeExecutor implements Executor { public void setExecutor(ExecutorService executor, String name) { Preconditions.checkArgument(this.executor == null, "Executor is already defined."); this.executor = executor; + ShutdownManager.getInstance().register(this, name, ShutdownPriority.Task); } @Override @@ -75,4 +79,11 @@ public class RequestScopeExecutor implements Executor { } }); } + + @Override + public void shutdown() throws Exception { + if (executor != null) { + executor.shutdown(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/40707473/wave/src/main/java/org/waveprotocol/box/server/executor/ScheduledRequestScopeExecutor.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/executor/ScheduledRequestScopeExecutor.java b/wave/src/main/java/org/waveprotocol/box/server/executor/ScheduledRequestScopeExecutor.java index f841667..ac1ec02 100644 --- a/wave/src/main/java/org/waveprotocol/box/server/executor/ScheduledRequestScopeExecutor.java +++ b/wave/src/main/java/org/waveprotocol/box/server/executor/ScheduledRequestScopeExecutor.java @@ -19,23 +19,18 @@ package org.waveprotocol.box.server.executor; import com.google.inject.Inject; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.logging.Logger; - +import org.waveprotocol.box.server.shutdown.ShutdownManager; +import org.waveprotocol.box.server.shutdown.ShutdownPriority; +import org.waveprotocol.box.server.shutdown.Shutdownable; import org.waveprotocol.box.stat.RequestScope; import org.waveprotocol.box.stat.Timing; import org.waveprotocol.wave.model.util.Preconditions; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; + /** * Session-based request scope executor. * Runs on specified executor. @@ -44,8 +39,7 @@ import org.waveprotocol.wave.model.util.Preconditions; * @author akapla...@gmail.com (A. Kaplanov) */ @SuppressWarnings("rawtypes") -public class ScheduledRequestScopeExecutor implements ScheduledExecutorService { - private final static Logger LOG = Logger.getLogger(ScheduledRequestScopeExecutor.class.getName()); +public class ScheduledRequestScopeExecutor implements ScheduledExecutorService, Shutdownable { private ScheduledExecutorService executor; @@ -59,6 +53,7 @@ public class ScheduledRequestScopeExecutor implements ScheduledExecutorService { public void setExecutor(ScheduledExecutorService executor, String name) { Preconditions.checkArgument(this.executor == null, "Executor is already defined."); this.executor = executor; + ShutdownManager.getInstance().register(this, name, ShutdownPriority.Task); } @Override http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/40707473/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileDeltaCollection.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileDeltaCollection.java b/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileDeltaCollection.java index 65683c4..2bb1f93 100644 --- a/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileDeltaCollection.java +++ b/wave/src/main/java/org/waveprotocol/box/server/persistence/file/FileDeltaCollection.java @@ -27,6 +27,9 @@ import com.google.protobuf.InvalidProtocolBufferException; import org.waveprotocol.box.server.persistence.PersistenceException; import org.waveprotocol.box.server.persistence.protos.ProtoDeltaStoreDataSerializer; import org.waveprotocol.box.server.persistence.protos.ProtoDeltaStoreData.ProtoTransformedWaveletDelta; +import org.waveprotocol.box.server.shutdown.LifeCycle; +import org.waveprotocol.box.server.shutdown.ShutdownPriority; +import org.waveprotocol.box.server.shutdown.Shutdownable; import org.waveprotocol.box.server.waveserver.AppliedDeltaUtil; import org.waveprotocol.box.server.waveserver.ByteStringMessage; import org.waveprotocol.box.server.waveserver.WaveletDeltaRecord; @@ -82,6 +85,15 @@ public class FileDeltaCollection implements DeltasAccess { private HashedVersion endVersion; private boolean isOpen; + + final private LifeCycle lifeCycle = new LifeCycle(FileDeltaCollection.class.getSimpleName(), + ShutdownPriority.Storage, new Shutdownable() { + @Override + public void shutdown() throws Exception { + close(); + } + }); + /** * A single record in the delta file. */ @@ -129,6 +141,7 @@ public class FileDeltaCollection implements DeltasAccess { index.openForCollection(collection); collection.initializeEndVersionAndTruncateTrailingJunk(); + return collection; } @@ -171,6 +184,7 @@ public class FileDeltaCollection implements DeltasAccess { this.file = deltaFile; this.index = index; this.isOpen = true; + lifeCycle.start(); } @Override @@ -185,43 +199,73 @@ public class FileDeltaCollection implements DeltasAccess { @Override public WaveletDeltaRecord getDelta(long version) throws IOException { - checkIsOpen(); - return seekToRecord(version) ? readRecord() : null; + lifeCycle.enter(); + try { + checkIsOpen(); + return seekToRecord(version) ? readRecord() : null; + } finally { + lifeCycle.leave(); + } } @Override public WaveletDeltaRecord getDeltaByEndVersion(long version) throws IOException { - checkIsOpen(); - return seekToEndRecord(version) ? readRecord() : null; + lifeCycle.enter(); + try { + checkIsOpen(); + return seekToEndRecord(version) ? readRecord() : null; + } finally { + lifeCycle.leave(); + } } @Override public ByteStringMessage<ProtocolAppliedWaveletDelta> getAppliedDelta(long version) throws IOException { - checkIsOpen(); - return seekToRecord(version) ? readAppliedDeltaFromRecord() : null; + lifeCycle.enter(); + try { + checkIsOpen(); + return seekToRecord(version) ? readAppliedDeltaFromRecord() : null; + } finally { + lifeCycle.leave(); + } } @Override public TransformedWaveletDelta getTransformedDelta(long version) throws IOException { - checkIsOpen(); - return seekToRecord(version) ? readTransformedDeltaFromRecord() : null; + lifeCycle.enter(); + try { + checkIsOpen(); + return seekToRecord(version) ? readTransformedDeltaFromRecord() : null; + } finally { + lifeCycle.leave(); + } } @Override public HashedVersion getAppliedAtVersion(long version) throws IOException { - checkIsOpen(); - ByteStringMessage<ProtocolAppliedWaveletDelta> applied = getAppliedDelta(version); + lifeCycle.enter(); + try { + checkIsOpen(); + ByteStringMessage<ProtocolAppliedWaveletDelta> applied = getAppliedDelta(version); - return (applied != null) ? AppliedDeltaUtil.getHashedVersionAppliedAt(applied) : null; + return (applied != null) ? AppliedDeltaUtil.getHashedVersionAppliedAt(applied) : null; + } finally { + lifeCycle.leave(); + } } @Override public HashedVersion getResultingVersion(long version) throws IOException { - checkIsOpen(); - TransformedWaveletDelta transformed = getTransformedDelta(version); + lifeCycle.enter(); + try { + checkIsOpen(); + TransformedWaveletDelta transformed = getTransformedDelta(version); - return (transformed != null) ? transformed.getResultingVersion() : null; + return (transformed != null) ? transformed.getResultingVersion() : null; + } finally { + lifeCycle.leave(); + } } @Override @@ -234,6 +278,7 @@ public class FileDeltaCollection implements DeltasAccess { @Override public void append(Collection<WaveletDeltaRecord> deltas) throws PersistenceException { + lifeCycle.enter(); checkIsOpen(); try { file.seek(file.length()); @@ -252,6 +297,8 @@ public class FileDeltaCollection implements DeltasAccess { endVersion = lastDelta.getTransformedDelta().getResultingVersion(); } catch (IOException e) { throw new PersistenceException(e); + } finally { + lifeCycle.leave(); } } http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/40707473/wave/src/main/java/org/waveprotocol/box/server/shutdown/LifeCycle.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/shutdown/LifeCycle.java b/wave/src/main/java/org/waveprotocol/box/server/shutdown/LifeCycle.java new file mode 100644 index 0000000..eb0c893 --- /dev/null +++ b/wave/src/main/java/org/waveprotocol/box/server/shutdown/LifeCycle.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.waveprotocol.box.server.shutdown; + +import org.waveprotocol.wave.model.util.Preconditions; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Life cycle control. + * + * @author akapla...@gmail.com (A. Kaplanov) + */ +public class LifeCycle { + + private static int SHUTDOWN_TIMEOUT_SEC = 2; + + private final String name; + private final ShutdownPriority shutdownPriority; + private final Shutdownable shutdownHandler; + private final Semaphore semaphore = new Semaphore(Integer.MAX_VALUE); + private final ShutdownManager shutdownManager; + private boolean started; + + /** + * Creates lifecycle. + * + * @param name the name of task. + * @param shutdownPriority determines shutdown order. + * @param shutdownHandler the handler executed on shutdown. + */ + public LifeCycle(String name, ShutdownPriority shutdownPriority, Shutdownable shutdownHandler) { + this(name, shutdownPriority, shutdownHandler, ShutdownManager.getInstance()); + } + + /** + * Creates lifecycle. + * + * @param name the name of task. + * @param shutdownPriority determines shutdown order. + * @param shutdownHandler the handler executed on shutdown. + * @param shutdownManager the shutdown manager. + */ + public LifeCycle(String name, ShutdownPriority shutdownPriority, Shutdownable shutdownHandler, + ShutdownManager shutdownManager) { + this.name = name; + this.shutdownPriority = shutdownPriority; + this.shutdownHandler = shutdownHandler; + this.shutdownManager = shutdownManager; + } + + /** + * Starts lifecycle. + */ + public synchronized void start() { + Preconditions.checkArgument(!started, name + " is already started."); + started = true; + shutdownManager.register(new Shutdownable() { + + @Override + public void shutdown() throws Exception { + synchronized (LifeCycle.this) { + if (shutdownHandler != null) { + shutdownHandler.shutdown(); + } + if (!semaphore.tryAcquire(Integer.MAX_VALUE, SHUTDOWN_TIMEOUT_SEC, TimeUnit.SECONDS)) { + throw new TimeoutException(); + } + started = false; + } + } + }, name, shutdownPriority); + } + + /** + * Enters to execution block of task. + */ + public synchronized void enter() { + checkIsStarted(); + try { + semaphore.acquire(); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + + /** + * Leaves execution block of task. + */ + public synchronized void leave() { + semaphore.release(); + } + + private void checkIsStarted() { + if (!started) { + throw new IllegalStateException(name + " is not started"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/40707473/wave/src/main/java/org/waveprotocol/box/server/shutdown/ShutdownManager.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/shutdown/ShutdownManager.java b/wave/src/main/java/org/waveprotocol/box/server/shutdown/ShutdownManager.java new file mode 100644 index 0000000..29ffdc0 --- /dev/null +++ b/wave/src/main/java/org/waveprotocol/box/server/shutdown/ShutdownManager.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.waveprotocol.box.server.shutdown; + +import org.waveprotocol.wave.util.logging.Log; + +import java.util.HashSet; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; + +/** + * Registers and executed by specified priority shutdown tasks. + * + * @author akapla...@gmail.com (A. Kaplanov) + */ +public class ShutdownManager extends Thread { + + interface NamedShutdownable extends Shutdownable { + String getName(); + } + + private static final Log LOG = Log.get(ShutdownManager.class); + private static ShutdownManager instance; + + private final SortedMap<ShutdownPriority, Set<NamedShutdownable>> tasks = new TreeMap<>(); + + private ShutdownManager() { + super(ShutdownManager.class.getSimpleName()); + } + + public static synchronized ShutdownManager getInstance() { + if (instance == null) { + instance = new ShutdownManager(); + } + return instance; + } + + /** + * Requsters shutdown task. + * + * @param shutdownHandler the handler to execute on shutdown. + * @param taskName the name of task. + * @param priority the priority determines shutdown order. + */ + public synchronized void register(final Shutdownable shutdownHandler, final String taskName, + ShutdownPriority priority) { + if (tasks.isEmpty()) { + Runtime.getRuntime().addShutdownHook(this); + } + Set<NamedShutdownable> priorityTasks = tasks.get(priority); + if (priorityTasks == null) { + tasks.put(priority, priorityTasks = new HashSet<>()); + } + priorityTasks.add(new NamedShutdownable() { + + @Override + public String getName() { + return taskName; + } + + @Override + public void shutdown() throws Exception { + shutdownHandler.shutdown(); + } + }); + } + + /** + * Executes on Java shutdown hook. + */ + @Override + public void run() { + LOG.info("Shutdown hook is fired."); + shutdown(); + } + + private synchronized void shutdown() { + LOG.info("Start of shutdown procedure."); + for (ShutdownPriority priority : tasks.keySet()) { + LOG.info("Shutdown priority class " + priority.name()); + for (NamedShutdownable task : tasks.get(priority)) { + LOG.info("Shutdown of " + task.getName() + " ..."); + try { + task.shutdown(); + } catch (Exception ex) { + LOG.severe("Shutdown of " + task.getName() + " error", ex); + } + } + } + LOG.info("End of shutdown procedure."); + } +} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/40707473/wave/src/main/java/org/waveprotocol/box/server/shutdown/ShutdownPriority.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/shutdown/ShutdownPriority.java b/wave/src/main/java/org/waveprotocol/box/server/shutdown/ShutdownPriority.java new file mode 100644 index 0000000..2846f53 --- /dev/null +++ b/wave/src/main/java/org/waveprotocol/box/server/shutdown/ShutdownPriority.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.waveprotocol.box.server.shutdown; + +/** + * Priority determines shutdown order. + * + * @author akapla...@gmail.com (A. Kaplanov) + */ + +public enum ShutdownPriority { + Server(1), Waves(3), Task(2), Storage(3); + final int value; + + ShutdownPriority(int priority) { + this.value = priority; + } +} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/40707473/wave/src/main/java/org/waveprotocol/box/server/shutdown/Shutdownable.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/shutdown/Shutdownable.java b/wave/src/main/java/org/waveprotocol/box/server/shutdown/Shutdownable.java new file mode 100644 index 0000000..62cc388 --- /dev/null +++ b/wave/src/main/java/org/waveprotocol/box/server/shutdown/Shutdownable.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.waveprotocol.box.server.shutdown; + +/** + * Shutdown handler. + * + * @author akapla...@gmail.com (A. Kaplanov) + */ +public interface Shutdownable { + void shutdown() throws Exception; +} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/40707473/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveMap.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveMap.java b/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveMap.java index b2fa769..6217bda 100644 --- a/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveMap.java +++ b/wave/src/main/java/org/waveprotocol/box/server/waveserver/WaveMap.java @@ -28,6 +28,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFutureTask; import com.google.inject.Inject; import com.google.inject.name.Named; +import com.typesafe.config.Config; import org.waveprotocol.box.common.ExceptionalIterator; import org.waveprotocol.box.server.CoreSettingsNames; import org.waveprotocol.box.server.executor.ExecutorAnnotations.LookupExecutor; @@ -41,6 +42,7 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; /** * A collection of wavelets, local and remote, held in memory. @@ -75,18 +77,20 @@ public class WaveMap { final WaveletNotificationSubscriber notifiee, final LocalWaveletContainer.Factory localFactory, final RemoteWaveletContainer.Factory remoteFactory, - @Named(CoreSettingsNames.WAVE_SERVER_DOMAIN) final String waveDomain, + @Named(CoreSettingsNames.WAVE_SERVER_DOMAIN) final String waveDomain, + Config config, @LookupExecutor final Executor lookupExecutor) { - // NOTE(anorth): DeltaAndSnapshotStore is more specific than necessary, but - // helps Guice out. + this.store = waveletStore; - waves = CacheBuilder.newBuilder().build(new CacheLoader<WaveId, Wave>() { + waves = CacheBuilder.newBuilder() + .maximumSize(config.getInt("core.wave_cache_size")) + .expireAfterAccess(config.getDuration("core.wave_cache_expire", TimeUnit.MINUTES), TimeUnit.MINUTES) + .build(new CacheLoader<WaveId, Wave>() { @Override - public Wave load(WaveId waveId) { + public Wave load(WaveId waveId) throws Exception { ListenableFuture<ImmutableSet<WaveletId>> lookedupWavelets = - lookupWavelets(waveId, waveletStore, lookupExecutor); - return new Wave(waveId, lookedupWavelets, notifiee, localFactory, remoteFactory, - waveDomain); + lookupWavelets(waveId, waveletStore, lookupExecutor); + return new Wave(waveId, lookedupWavelets, notifiee, localFactory, remoteFactory, waveDomain); } }); } http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/40707473/wave/src/test/java/org/waveprotocol/box/server/waveserver/SimpleSearchProviderImplTest.java ---------------------------------------------------------------------- diff --git a/wave/src/test/java/org/waveprotocol/box/server/waveserver/SimpleSearchProviderImplTest.java b/wave/src/test/java/org/waveprotocol/box/server/waveserver/SimpleSearchProviderImplTest.java index 1b5e5a1..5dc3043 100644 --- a/wave/src/test/java/org/waveprotocol/box/server/waveserver/SimpleSearchProviderImplTest.java +++ b/wave/src/test/java/org/waveprotocol/box/server/waveserver/SimpleSearchProviderImplTest.java @@ -22,6 +22,7 @@ package org.waveprotocol.box.server.waveserver; import static org.mockito.Mockito.when; import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; import com.google.common.util.concurrent.Futures; @@ -29,6 +30,8 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.gxp.com.google.common.collect.Maps; import com.google.wave.api.SearchResult; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import junit.framework.TestCase; import org.mockito.Mock; @@ -203,9 +206,14 @@ public class SimpleSearchProviderImplTest extends TestCase { } }; + Config config = ConfigFactory.parseMap(ImmutableMap.<String, Object>of( + "core.wave_cache_size", 1000, + "core.wave_cache_expire", "60m") + ); + waveMap = new WaveMap(waveletStore, notifiee, localWaveletContainerFactory, - remoteWaveletContainerFactory, DOMAIN, lookupExecutor); + remoteWaveletContainerFactory, DOMAIN, config, lookupExecutor); searchProvider = new SimpleSearchProviderImpl(DOMAIN, digester, waveMap, waveViewProvider); } http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/40707473/wave/src/test/java/org/waveprotocol/box/server/waveserver/WaveMapTest.java ---------------------------------------------------------------------- diff --git a/wave/src/test/java/org/waveprotocol/box/server/waveserver/WaveMapTest.java b/wave/src/test/java/org/waveprotocol/box/server/waveserver/WaveMapTest.java index 579dc7c..999b56b 100644 --- a/wave/src/test/java/org/waveprotocol/box/server/waveserver/WaveMapTest.java +++ b/wave/src/test/java/org/waveprotocol/box/server/waveserver/WaveMapTest.java @@ -22,10 +22,13 @@ package org.waveprotocol.box.server.waveserver; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.MoreExecutors; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import junit.framework.TestCase; import org.mockito.Mock; @@ -83,9 +86,13 @@ public class WaveMapTest extends TestCase { }; waveletStore = mock(DeltaAndSnapshotStore.class); + Config config = ConfigFactory.parseMap(ImmutableMap.<String, Object>of( + "core.wave_cache_size", 1000, + "core.wave_cache_expire", "60m") + ); waveMap = new WaveMap(waveletStore, notifiee, localWaveletContainerFactory, - remoteWaveletContainerFactory, "example.com", storageContinuationExecutor); + remoteWaveletContainerFactory, "example.com", config, storageContinuationExecutor); } public void testWaveMapStartsEmpty() throws WaveServerException { http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/40707473/wave/src/test/java/org/waveprotocol/box/server/waveserver/WaveServerTest.java ---------------------------------------------------------------------- diff --git a/wave/src/test/java/org/waveprotocol/box/server/waveserver/WaveServerTest.java b/wave/src/test/java/org/waveprotocol/box/server/waveserver/WaveServerTest.java index 8291af3..9c058b6 100644 --- a/wave/src/test/java/org/waveprotocol/box/server/waveserver/WaveServerTest.java +++ b/wave/src/test/java/org/waveprotocol/box/server/waveserver/WaveServerTest.java @@ -23,10 +23,12 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.MoreExecutors; import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; import junit.framework.TestCase; import org.mockito.Matchers; @@ -118,9 +120,13 @@ public class WaveServerTest extends TestCase { waveletStore = new DeltaStoreBasedSnapshotStore(deltaStore); Executor lookupExecutor = MoreExecutors.sameThreadExecutor(); + Config config = ConfigFactory.parseMap(ImmutableMap.<String, Object>of( + "core.wave_cache_size", 1000, + "core.wave_cache_expire", "60m") + ); waveMap = new WaveMap(waveletStore, notifiee, localWaveletContainerFactory, - remoteWaveletContainerFactory, "example.com", lookupExecutor); + remoteWaveletContainerFactory, "example.com", config, lookupExecutor); waveServer = new WaveServerImpl(MoreExecutors.sameThreadExecutor(), certificateManager, federationRemote, waveMap);