http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/waveserver/WaveServerImpl.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/waveserver/WaveServerImpl.java b/src/org/waveprotocol/box/server/waveserver/WaveServerImpl.java deleted file mode 100644 index 0c6a7a6..0000000 --- a/src/org/waveprotocol/box/server/waveserver/WaveServerImpl.java +++ /dev/null @@ -1,657 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.inject.Inject; -import com.google.inject.Singleton; -import com.google.inject.name.Named; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; - -import org.waveprotocol.box.common.ExceptionalIterator; -import org.waveprotocol.box.server.common.CoreWaveletOperationSerializer; -import org.waveprotocol.box.server.frontend.CommittedWaveletSnapshot; -import org.waveprotocol.box.common.Receiver; -import org.waveprotocol.box.server.persistence.PersistenceException; -import org.waveprotocol.wave.crypto.SignatureException; -import org.waveprotocol.wave.crypto.SignerInfo; -import org.waveprotocol.wave.crypto.UnknownSignerException; -import org.waveprotocol.wave.federation.FederationErrorProto.FederationError; -import org.waveprotocol.wave.federation.FederationErrors; -import org.waveprotocol.wave.federation.FederationException; -import org.waveprotocol.wave.federation.FederationRemoteBridge; -import org.waveprotocol.wave.federation.Proto.ProtocolAppliedWaveletDelta; -import org.waveprotocol.wave.federation.Proto.ProtocolHashedVersion; -import org.waveprotocol.wave.federation.Proto.ProtocolSignature; -import org.waveprotocol.wave.federation.Proto.ProtocolSignedDelta; -import org.waveprotocol.wave.federation.Proto.ProtocolSignerInfo; -import org.waveprotocol.wave.federation.Proto.ProtocolWaveletDelta; -import org.waveprotocol.wave.federation.WaveletFederationListener; -import org.waveprotocol.wave.federation.WaveletFederationProvider; -import org.waveprotocol.wave.model.id.WaveId; -import org.waveprotocol.wave.model.id.WaveletId; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.operation.OperationException; -import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta; -import org.waveprotocol.wave.model.version.HashedVersion; -import org.waveprotocol.wave.model.wave.InvalidParticipantAddress; -import org.waveprotocol.wave.model.wave.ParticipantId; -import org.waveprotocol.wave.model.wave.data.ReadableWaveletData; -import org.waveprotocol.wave.model.operation.TransformException; -import org.waveprotocol.wave.util.logging.Log; - -import java.util.List; -import java.util.Set; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicInteger; -import org.waveprotocol.box.server.executor.ExecutorAnnotations; -import org.waveprotocol.box.server.executor.ExecutorAnnotations.ListenerExecutor; - -/** - * The main class that services the FederationHost, FederationRemote and ClientFrontend. - */ -@Singleton -public class WaveServerImpl implements WaveletProvider, ReadableWaveletDataProvider, - WaveletFederationProvider, WaveletFederationListener.Factory { - - private static final Log LOG = Log.get(WaveServerImpl.class); - - private final Executor listenerExecutor; - private final CertificateManager certificateManager; - private final WaveletFederationProvider federationRemote; - private final WaveMap waveMap; - private boolean initialized = false; - - // - // WaveletFederationListener.Factory implementation. - // - - /** - * Listener for notifications coming from the Federation Remote. For now we accept updates - * for wavelets on any domain. - */ - @Override - public WaveletFederationListener listenerForDomain(final String domain) { - return new WaveletFederationListener() { - @Override - public void waveletDeltaUpdate(final WaveletName waveletName, - List<ByteString> deltas, final WaveletUpdateCallback callback) { - Preconditions.checkArgument(!deltas.isEmpty()); - - if (isLocalWavelet(waveletName)) { - LOG.warning("Remote tried to update local wavelet " + waveletName); - callback.onFailure(FederationErrors.badRequest("Received update to local wavelet")); - return; - } - - // Update wavelet container with the applied deltas - final RemoteWaveletContainer remoteWavelet = getOrCreateRemoteWavelet(waveletName); - - // Update this remote wavelet with the immediately incoming delta, - // providing a callback so that incoming historic deltas (as well as - // this delta) can be provided to the wave bus. - final ListenableFuture<Void> result = - remoteWavelet.update(deltas, domain, federationRemote, certificateManager); - result.addListener( - new Runnable() { - @Override - public void run() { - try { - FutureUtil.getResultOrPropagateException(result, FederationException.class); - callback.onSuccess(); - } catch (FederationException e) { - LOG.warning("Failed updating " + waveletName, e); - callback.onFailure(e.getError()); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.severe("Interrupted updating " + waveletName, e); - callback.onFailure(FederationErrors.internalServerError("Interrupted")); - } - } - }, - listenerExecutor); - } - - @Override - public void waveletCommitUpdate(WaveletName waveletName, - ProtocolHashedVersion committedVersion, WaveletUpdateCallback callback) { - Preconditions.checkNotNull(committedVersion); - - if (isLocalWavelet(waveletName)) { - LOG.warning("Got commit update for local wavelet " + waveletName); - callback.onFailure(FederationErrors.badRequest("Received commit update to local wavelet")); - return; - } - - RemoteWaveletContainer wavelet; - try { - wavelet = getRemoteWavelet(waveletName); - } catch (WaveletStateException e) { - LOG.warning("Failed to access wavelet " + waveletName + " for commit update", e); - callback.onFailure(FederationErrors.internalServerError("Storage access failure")); - return; - } - if (wavelet != null) { - wavelet.commit(CoreWaveletOperationSerializer.deserialize(committedVersion)); - } else { - if(LOG.isInfoLoggable()) { - LOG.info("Got commit update for missing wavelet " + waveletName); - } - createAndCommitRemoteWavelet(waveletName, committedVersion); - } - callback.onSuccess(); - } - }; - } - - /** - * Creates the non-existent remote wavelet container at this server and commits it. - * Calling commit at this known version, forces the history to be fetched up to this point. - * TODO (alown): Possible race condition here with update? (Though I don't think it would result in - * anything more serious than repeated history fetches.) - */ - private void createAndCommitRemoteWavelet(WaveletName waveletName, ProtocolHashedVersion committedVersion) { - RemoteWaveletContainer wavelet = getOrCreateRemoteWavelet(waveletName); - HashedVersion v = CoreWaveletOperationSerializer.deserialize(committedVersion); - wavelet.commit(v); - LOG.info("Passed commit message for version " + v.getVersion() + " to RemoteWavelet"); - } - - // - // WaveletFederationProvider implementation. - // - - @Override - public void submitRequest(WaveletName waveletName, ProtocolSignedDelta signedDelta, - SubmitResultListener listener) { - if (!isLocalWavelet(waveletName)) { - LOG.warning("Remote tried to submit to non-local wavelet " + waveletName); - listener.onFailure(FederationErrors.badRequest("Non-local wavelet update")); - return; - } - - ProtocolWaveletDelta delta; - try { - delta = ByteStringMessage.parseProtocolWaveletDelta(signedDelta.getDelta()).getMessage(); - } catch (InvalidProtocolBufferException e) { - LOG.warning("Submit request: Invalid delta protobuf. WaveletName: " + waveletName, e); - listener.onFailure(FederationErrors.badRequest("Signed delta contains invalid delta")); - return; - } - - // Disallow creation of wavelets by remote users. - if (delta.getHashedVersion().getVersion() == 0) { - LOG.warning("Remote user tried to submit delta at version 0 - disallowed. " + signedDelta); - listener.onFailure(FederationErrors.badRequest("Remote users may not create wavelets.")); - return; - } - - try { - certificateManager.verifyDelta(signedDelta); - submitDelta(waveletName, delta, signedDelta, listener); - } catch (SignatureException e) { - LOG.warning("Submit request: Delta failed verification. WaveletName: " + waveletName + - " delta: " + signedDelta, e); - listener.onFailure(FederationErrors.badRequest("Remote verification failed")); - } catch (UnknownSignerException e) { - LOG.warning("Submit request: unknown signer. WaveletName: " + waveletName + - "delta: " + signedDelta, e); - listener.onFailure(FederationErrors.internalServerError("Unknown signer")); - } - } - - @Override - public void requestHistory(WaveletName waveletName, String domain, - ProtocolHashedVersion startVersion, ProtocolHashedVersion endVersion, - final long lengthLimit, HistoryResponseListener listener) { - LocalWaveletContainer wavelet = loadLocalWavelet(waveletName, listener); - if (wavelet != null) { - final ImmutableList.Builder<ByteString> deltaHistoryBytes = ImmutableList.builder(); - final AtomicInteger length = new AtomicInteger(0); - try { - wavelet.requestHistory( - CoreWaveletOperationSerializer.deserialize(startVersion), - CoreWaveletOperationSerializer.deserialize(endVersion), - new Receiver<ByteStringMessage<ProtocolAppliedWaveletDelta>>() { - - @Override - public boolean put(ByteStringMessage<ProtocolAppliedWaveletDelta> delta) { - ByteString bytes = delta.getByteString(); - deltaHistoryBytes.add(bytes); - if (length.addAndGet(bytes.size()) >= lengthLimit) { - return false; - } - return true; - } - }); - } catch (WaveServerException e) { - LOG.severe("Error retrieving wavelet history: " + waveletName + " " + startVersion + - " - " + endVersion); - // TODO(soren): choose a better error code (depending on e) - listener.onFailure(FederationErrors.badRequest( - "Server error while retrieving wavelet history.")); - return; - } - - // Now determine whether we received the entire requested wavelet history. - LOG.info("Found deltaHistory between " + startVersion + " - " + endVersion - + ", returning to requester domain " + domain); - listener.onSuccess(deltaHistoryBytes.build(), endVersion, endVersion.getVersion()); - } - } - - @Override - public void getDeltaSignerInfo(ByteString signerId, - WaveletName waveletName, ProtocolHashedVersion deltaEndVersion, - DeltaSignerInfoResponseListener listener) { - LocalWaveletContainer wavelet = loadLocalWavelet(waveletName, listener); - if (wavelet != null) { - HashedVersion endVersion = CoreWaveletOperationSerializer.deserialize(deltaEndVersion); - if (wavelet.isDeltaSigner(endVersion, signerId)) { - ProtocolSignerInfo signerInfo = certificateManager.retrieveSignerInfo(signerId); - if (signerInfo == null) { - // Oh no! We are supposed to store it, and we already know they did sign this delta. - LOG.severe("No stored signer info for valid getDeltaSignerInfo on " + waveletName); - listener.onFailure(FederationErrors.badRequest("Unknown signer info")); - } else { - listener.onSuccess(signerInfo); - } - } else { - LOG.info("getDeltaSignerInfo was not authrorised for wavelet " + waveletName - + ", end version " + deltaEndVersion); - listener.onFailure(FederationErrors.badRequest("Not authorised to get signer info")); - } - } - } - - @Override - public void postSignerInfo(String destinationDomain, ProtocolSignerInfo signerInfo, - PostSignerInfoResponseListener listener) { - try { - certificateManager.storeSignerInfo(signerInfo); - } catch (SignatureException e) { - String error = "verification failure from domain " + signerInfo.getDomain(); - LOG.warning("incoming postSignerInfo: " + error, e); - listener.onFailure(FederationErrors.badRequest(error)); - return; - } - listener.onSuccess(); - } - - // - // WaveletProvider implementation. - // - - @Override - public void initialize() throws WaveServerException { - Preconditions.checkState(!initialized, "Wave server already initialized"); - initialized = true; - } - - @Override - public void getHistory(WaveletName waveletName, HashedVersion startVersion, HashedVersion endVersion, - Receiver<TransformedWaveletDelta> receiver) throws WaveServerException { - Preconditions.checkState(initialized, "Wave server not yet initialized"); - WaveletContainer wavelet = getWavelet(waveletName); - if (wavelet == null) { - throw new AccessControlException( - "Client request for history made for non-existent wavelet: " + waveletName); - } - wavelet.requestTransformedHistory(startVersion, endVersion, receiver); - } - - @Override - public - ExceptionalIterator<WaveId, WaveServerException> getWaveIds() { - Preconditions.checkState(initialized, "Wave server not yet initialized"); - return waveMap.getWaveIds(); - } - - @Override - public - ImmutableSet<WaveletId> getWaveletIds(WaveId waveId) throws WaveServerException { - Preconditions.checkState(initialized, "Wave server not yet initialized"); - return waveMap.lookupWavelets(waveId); - } - - @Override - public CommittedWaveletSnapshot getSnapshot(WaveletName waveletName) throws WaveServerException { - Preconditions.checkState(initialized, "Wave server not yet initialized"); - WaveletContainer wavelet = getWavelet(waveletName); - if (wavelet == null) { - LOG.info("client requested snapshot for non-existent wavelet: " + waveletName); - return null; - } else { - return wavelet.getSnapshot(); - } - } - - @Override - public ReadableWaveletData getReadableWaveletData(WaveletName waveletName) - throws WaveServerException { - return getSnapshot(waveletName).snapshot; - } - - @Override - public void submitRequest(WaveletName waveletName, ProtocolWaveletDelta delta, - final SubmitRequestListener listener) { - Preconditions.checkState(initialized, "Wave server not yet initialized"); - if (delta.getOperationCount() == 0) { - listener.onFailure("Empty delta at version " + delta.getHashedVersion().getVersion()); - return; - } - - // The serialised version of this delta happens now. This should be the only place, ever! - ProtocolSignedDelta signedDelta = - certificateManager.signDelta(ByteStringMessage.serializeMessage(delta)); - - submitDelta(waveletName, delta, signedDelta, new SubmitResultListener() { - @Override - public void onFailure(FederationError errorMessage) { - listener.onFailure(errorMessage.getErrorMessage()); - } - - @Override - public void onSuccess(int operationsApplied, - ProtocolHashedVersion hashedVersionAfterApplication, long applicationTimestamp) { - listener.onSuccess(operationsApplied, - CoreWaveletOperationSerializer.deserialize(hashedVersionAfterApplication), - applicationTimestamp); - } - }); - } - - @Override - public boolean checkAccessPermission(WaveletName waveletName, ParticipantId participantId) - throws WaveServerException { - Preconditions.checkState(initialized, "Wave server not yet initialized"); - WaveletContainer wavelet = getWavelet(waveletName); - return wavelet != null && wavelet.checkAccessPermission(participantId); - } - - /** - * Constructor. - * - * @param listenerExecutor executes callback listeners - * @param certificateManager provider of certificates; it also determines which - * domains this wave server regards as local wavelets. - * @param federationRemote federation remote interface - * @param waveMap records the waves and wavelets in memory - */ - @Inject - WaveServerImpl(@ListenerExecutor Executor listenerExecutor, - CertificateManager certificateManager, - @FederationRemoteBridge WaveletFederationProvider federationRemote, WaveMap waveMap) { - this.listenerExecutor = listenerExecutor; - this.certificateManager = certificateManager; - this.federationRemote = federationRemote; - this.waveMap = waveMap; - - LOG.info("Wave Server configured to host local domains: " - + certificateManager.getLocalDomains()); - - // Preemptively add our own signer info to the certificate manager - SignerInfo signerInfo = certificateManager.getLocalSigner().getSignerInfo(); - if (signerInfo != null) { - try { - certificateManager.storeSignerInfo(signerInfo.toProtoBuf()); - } catch (SignatureException e) { - LOG.severe("Failed to add our own signer info to the certificate store", e); - } - } - } - - /** - * Loads a local wavelet. If the request is invalid then the listener is - * notified and null returned. - * - * @param waveletName wavelet to load - * @param listener listener to notify on failure - * @return the wavelet container, or null on failure - */ - private LocalWaveletContainer loadLocalWavelet(WaveletName waveletName, - FederationListener listener) { - // TODO(soren): once we support federated groups, expand support to request remote wavelets too. - if (!isLocalWavelet(waveletName)) { - LOG.warning("Attempt to get delta signer info for remote wavelet " + waveletName); - listener.onFailure(FederationErrors.badRequest("Wavelet not hosted here.")); - return null; - } - - LocalWaveletContainer wavelet = null; - try { - wavelet = getLocalWavelet(waveletName); - } catch (WaveletStateException e) { - LOG.warning("Failed to access wavelet " + waveletName, e); - listener.onFailure(FederationErrors.internalServerError("Storage access failure")); - return null; - } - if (wavelet == null) { - LOG.info("Non-existent wavelet " + waveletName); - // TODO(soren): determine if it's ok to leak the fact that the wavelet doesn't exist, - // or if we should hide this information and return the same error code as for wrong - // version number or history hash - listener.onFailure(FederationErrors.badRequest("Wavelet does not exist")); - } - return wavelet; - } - - private boolean isLocalWavelet(WaveletName waveletName) { - boolean isLocal = getLocalDomains().contains(waveletName.waveletId.getDomain()); - LOG.fine("" + waveletName + " is " + (isLocal? "" : "not") + " local"); - return isLocal; - } - - private Set<String> getLocalDomains() { - return certificateManager.getLocalDomains(); - } - - /** - * Returns a container for a remote wavelet. If it doesn't exist, it will be created. - * This method is only called in response to a Federation Remote doing an update - * or commit on this wavelet. - * - * @param waveletName name of wavelet - * @return an existing or new instance. - * @throws IllegalArgumentException if the name refers to a local wavelet. - */ - private RemoteWaveletContainer getOrCreateRemoteWavelet(WaveletName waveletName) { - Preconditions.checkArgument(!isLocalWavelet(waveletName), "%s is not remote", waveletName); - return waveMap.getOrCreateRemoteWavelet(waveletName); - } - - /** - * Returns a container for a local wavelet. If it doesn't exist, it will be created. - * - * @param waveletName name of wavelet - * @return an existing or new instance. - * @throws IllegalArgumentException if the name refers to a remote wavelet. - */ - private LocalWaveletContainer getOrCreateLocalWavelet(WaveletName waveletName) { - Preconditions.checkArgument(isLocalWavelet(waveletName), "%s is not local", waveletName); - return waveMap.getOrCreateLocalWavelet(waveletName); - } - - private RemoteWaveletContainer getRemoteWavelet(WaveletName waveletName) - throws WaveletStateException { - Preconditions.checkArgument(!isLocalWavelet(waveletName), "%s is not remote", waveletName); - return waveMap.getRemoteWavelet(waveletName); - } - - private LocalWaveletContainer getLocalWavelet(WaveletName waveletName) - throws WaveletStateException { - Preconditions.checkArgument(isLocalWavelet(waveletName), "%s is not local", waveletName); - return waveMap.getLocalWavelet(waveletName); - } - - /** - * Returns a generic wavelet container, when the caller doesn't need to validate whether - * its a local or remote wavelet. - * - * @param waveletName name of wavelet. - * @return an wavelet container or null if it doesn't exist. - * @throw WaveServerException if storage lookup fails - */ - private WaveletContainer getWavelet(WaveletName waveletName) throws WaveServerException { - return isLocalWavelet(waveletName) ? - waveMap.getLocalWavelet(waveletName) : waveMap.getRemoteWavelet(waveletName); - } - - /** - * Callback interface for sending a list of certificates to a domain. - */ - private interface PostSignerInfoCallback { - public void done(int successCount); - } - - /** - * Submit the delta to local or remote wavelets, return results via listener. - * Also broadcast updates to federationHosts and clientFrontend. - * - * @param waveletName the wavelet to apply the delta to - * @param delta the {@link ProtocolWaveletDelta} inside {@code signedDelta} - * @param signedDelta the signed delta - * @param resultListener callback - * - * TODO: For now the WaveletFederationProvider will have to ensure this is a - * local wavelet. Once we support federated groups, that test should be - * removed. - */ - private void submitDelta(final WaveletName waveletName, ProtocolWaveletDelta delta, - final ProtocolSignedDelta signedDelta, final SubmitResultListener resultListener) { - Preconditions.checkArgument(delta.getOperationCount() > 0, "empty delta"); - - if (isLocalWavelet(waveletName)) { - LOG.info("Submit to " + waveletName + " by " + delta.getAuthor() + " @ " - + delta.getHashedVersion().getVersion() + " with " + delta.getOperationCount() + " ops"); - - // TODO(arb): add v0 policer here. - LocalWaveletContainer wavelet = getOrCreateLocalWavelet(waveletName); - try { - if (!wavelet.checkAccessPermission(ParticipantId.of(delta.getAuthor()))) { - resultListener.onFailure(FederationErrors.badRequest( - delta.getAuthor() + " is not a participant of " + waveletName)); - return; - } - } catch (InvalidParticipantAddress e) { - resultListener.onFailure(FederationErrors.badRequest( - "Invalid author address: " + e.getMessage())); - return; - } catch (WaveServerException e) { - resultListener.onFailure(FederationErrors.internalServerError(e.getMessage())); - return; - } - - try { - WaveletDeltaRecord submitResult = wavelet.submitRequest(waveletName, signedDelta); - TransformedWaveletDelta transformedDelta = submitResult.getTransformedDelta(); - LOG.info("Submit result for " + waveletName + " by " - + transformedDelta.getAuthor() + " applied " - + transformedDelta.size() + " ops at v: " - + transformedDelta.getAppliedAtVersion() + " t: " - + transformedDelta.getApplicationTimestamp()); - resultListener.onSuccess(transformedDelta.size(), - CoreWaveletOperationSerializer.serialize(transformedDelta.getResultingVersion()), - transformedDelta.getApplicationTimestamp()); - } catch (OperationException e) { - resultListener.onFailure(FederationErrors.badRequest(e.getMessage())); - } catch (InvalidProtocolBufferException e) { - resultListener.onFailure(FederationErrors.badRequest(e.getMessage())); - } catch (InvalidHashException e) { - resultListener.onFailure(FederationErrors.badRequest(e.getMessage())); - } catch (PersistenceException e) { - resultListener.onFailure(FederationErrors.internalServerError(e.getMessage())); - } catch (WaveletStateException e) { - resultListener.onFailure(FederationErrors.internalServerError(e.getMessage())); - } - } else { - // For remote wavelets post required signatures to the authorative server then send delta - postAllSignerInfo(signedDelta.getSignatureList(), waveletName.waveletId.getDomain(), - new PostSignerInfoCallback() { - @Override public void done(int successCount) { - LOG.info("Remote: successfully sent " + successCount + " of " - + signedDelta.getSignatureCount() + " certs to " - + waveletName.waveletId.getDomain()); - federationRemote.submitRequest(waveletName, signedDelta, resultListener); - } - }); - } - } - - /** - * Post a list of certificates to a domain and run a callback when all are finished. The - * callback will run whether or not all posts succeed. - * - * @param sigs list of signatures to post signer info for - * @param domain to post signature to - * @param callback to run when all signatures have been posted, successfully or unsuccessfully - */ - private void postAllSignerInfo(final List<ProtocolSignature> sigs, final String domain, - final PostSignerInfoCallback callback) { - - // In the current implementation there should only be a single signer - if (sigs.size() != 1) { - LOG.warning(sigs.size() + " signatures to broadcast, expecting exactly 1"); - } - - final AtomicInteger resultCount = new AtomicInteger(sigs.size()); - final AtomicInteger successCount = new AtomicInteger(0); - - for (final ProtocolSignature sig : sigs) { - final ProtocolSignerInfo psi = certificateManager.retrieveSignerInfo(sig.getSignerId()); - - if (psi == null) { - LOG.warning("Couldn't find signer info for " + sig); - if (resultCount.decrementAndGet() == 0) { - LOG.info("Finished signature broadcast with " + successCount.get() - + " successful, running callback"); - callback.done(successCount.get()); - } - } else { - federationRemote.postSignerInfo(domain, psi, new PostSignerInfoResponseListener() { - @Override - public void onFailure(FederationError error) { - LOG.warning("Failed to post " + sig + " to " + domain + ": " + error); - countDown(); - } - - @Override - public void onSuccess() { - LOG.info("Successfully broadcasted " + sig + " to " + domain); - successCount.incrementAndGet(); - countDown(); - } - - private void countDown() { - if (resultCount.decrementAndGet() == 0) { - LOG.info("Finished signature broadcast with " + successCount.get() - + " successful, running callback"); - callback.done(successCount.get()); - } - } - }); - } - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/waveserver/WaveServerModule.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/waveserver/WaveServerModule.java b/src/org/waveprotocol/box/server/waveserver/WaveServerModule.java deleted file mode 100644 index 41d60bf..0000000 --- a/src/org/waveprotocol/box/server/waveserver/WaveServerModule.java +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListenableFutureTask; -import com.google.inject.AbstractModule; -import com.google.inject.Inject; -import com.google.inject.Provides; -import com.google.inject.Singleton; -import com.typesafe.config.Config; -import org.waveprotocol.box.server.executor.ExecutorAnnotations.StorageContinuationExecutor; -import org.waveprotocol.box.server.executor.ExecutorAnnotations.WaveletLoadExecutor; -import org.waveprotocol.box.server.persistence.PersistenceException; -import org.waveprotocol.wave.crypto.*; -import org.waveprotocol.wave.model.id.IdURIEncoderDecoder; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.version.HashedVersionFactory; -import org.waveprotocol.wave.model.version.HashedVersionFactoryImpl; -import org.waveprotocol.wave.util.escapers.jvm.JavaUrlCodec; - -import java.util.concurrent.Callable; -import java.util.concurrent.Executor; - -/** - * Guice Module for the prototype Server. - * - */ -public class WaveServerModule extends AbstractModule { - private static final IdURIEncoderDecoder URI_CODEC = - new IdURIEncoderDecoder(new JavaUrlCodec()); - private static final HashedVersionFactory HASH_FACTORY = new HashedVersionFactoryImpl(URI_CODEC); - - private final Executor waveletLoadExecutor; - private final Executor storageContinuationExecutor; - private final boolean enableFederation; - - - @Inject - WaveServerModule(Config config, - @WaveletLoadExecutor Executor waveletLoadExecutor, - @StorageContinuationExecutor Executor storageContinuationExecutor) { - this.enableFederation = config.getBoolean("federation.enable_federation"); - this.waveletLoadExecutor = waveletLoadExecutor; - this.storageContinuationExecutor = storageContinuationExecutor; - } - - @Override - protected void configure() { - bind(TimeSource.class).to(DefaultTimeSource.class).in(Singleton.class); - - if (enableFederation) { - bind(SignatureHandler.class) - .toProvider(SigningSignatureHandler.SigningSignatureHandlerProvider.class); - } else { - bind(SignatureHandler.class) - .toProvider(NonSigningSignatureHandler.NonSigningSignatureHandlerProvider.class); - } - - try { - bind(WaveSignatureVerifier.class).toConstructor(WaveSignatureVerifier.class.getConstructor( - WaveCertPathValidator.class, CertPathStore.class)); - bind(VerifiedCertChainCache.class).to(DefaultCacheImpl.class).in(Singleton.class); - bind(DefaultCacheImpl.class).toConstructor( - DefaultCacheImpl.class.getConstructor(TimeSource.class)); - } catch (NoSuchMethodException e) { - throw new IllegalStateException(e); - } - - bind(WaveletNotificationDispatcher.class).in(Singleton.class); - bind(WaveBus.class).to(WaveletNotificationDispatcher.class); - bind(WaveletNotificationSubscriber.class).to(WaveletNotificationDispatcher.class); - bind(TrustRootsProvider.class).to(DefaultTrustRootsProvider.class).in(Singleton.class); - bind(CertificateManager.class).to(CertificateManagerImpl.class).in(Singleton.class); - bind(DeltaAndSnapshotStore.class).to(DeltaStoreBasedSnapshotStore.class).in(Singleton.class); - bind(WaveMap.class).in(Singleton.class); - bind(WaveletProvider.class).to(WaveServerImpl.class).asEagerSingleton(); - bind(ReadableWaveletDataProvider.class).to(WaveServerImpl.class).in(Singleton.class); - bind(HashedVersionFactory.class).toInstance(HASH_FACTORY); - } - - @Provides - @SuppressWarnings("unused") - private LocalWaveletContainer.Factory provideLocalWaveletContainerFactory( - final DeltaStore deltaStore) { - return new LocalWaveletContainer.Factory() { - @Override - public LocalWaveletContainer create(WaveletNotificationSubscriber notifiee, - WaveletName waveletName, String waveDomain) { - return new LocalWaveletContainerImpl(waveletName, notifiee, loadWaveletState( - waveletLoadExecutor, deltaStore, waveletName, waveletLoadExecutor), waveDomain, - storageContinuationExecutor); - } - }; - } - - @Provides - @SuppressWarnings("unused") - private RemoteWaveletContainer.Factory provideRemoteWaveletContainerFactory( - final DeltaStore deltaStore) { - return new RemoteWaveletContainer.Factory() { - @Override - public RemoteWaveletContainer create(WaveletNotificationSubscriber notifiee, - WaveletName waveletName, String waveDomain) { - return new RemoteWaveletContainerImpl(waveletName, notifiee, loadWaveletState( - waveletLoadExecutor, deltaStore, waveletName, waveletLoadExecutor), - storageContinuationExecutor); - } - }; - } - - @Provides - @SuppressWarnings("unused") - private WaveCertPathValidator provideWaveCertPathValidator(Config config, - TimeSource timeSource, - VerifiedCertChainCache certCache, - TrustRootsProvider trustRootsProvider) { - if (config.getBoolean("federation.waveserver_disable_signer_verification")) { - return new DisabledCertPathValidator(); - } else { - return new CachedCertPathValidator(certCache, timeSource, trustRootsProvider); - } - } - - /** - * Returns a future whose result is the state of the wavelet after it has been - * loaded from storage. Any failure is reported as a - * {@link PersistenceException}. - */ - @VisibleForTesting - static ListenableFuture<DeltaStoreBasedWaveletState> loadWaveletState(Executor executor, - final DeltaStore deltaStore, final WaveletName waveletName, final Executor persistExecutor) { - ListenableFutureTask<DeltaStoreBasedWaveletState> task = - ListenableFutureTask.create( - new Callable<DeltaStoreBasedWaveletState>() { - @Override - public DeltaStoreBasedWaveletState call() throws PersistenceException { - return DeltaStoreBasedWaveletState.create(deltaStore.open(waveletName), - persistExecutor); - } - }); - executor.execute(task); - return task; - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/waveserver/WaveletContainer.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/waveserver/WaveletContainer.java b/src/org/waveprotocol/box/server/waveserver/WaveletContainer.java deleted file mode 100644 index 7cf0763..0000000 --- a/src/org/waveprotocol/box/server/waveserver/WaveletContainer.java +++ /dev/null @@ -1,150 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import org.waveprotocol.box.common.Receiver; -import org.waveprotocol.box.server.frontend.CommittedWaveletSnapshot; -import org.waveprotocol.wave.federation.Proto.ProtocolAppliedWaveletDelta; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta; -import org.waveprotocol.wave.model.version.HashedVersion; -import org.waveprotocol.wave.model.wave.ParticipantId; -import org.waveprotocol.wave.model.wave.data.ObservableWaveletData; -import org.waveprotocol.wave.model.wave.data.ReadableWaveletData; - -import com.google.common.base.Function; - -/** - * Interface for a container class for a Wavelet's current state as well as its - * delta history. Local and remote wavelet interfaces inherit from this one. - */ -interface WaveletContainer { - - /** - * Manufactures wavelet containers. - * - * @param <T> type manufactured by this factory. - */ - interface Factory<T extends WaveletContainer> { - /** - * @return a new wavelet container with the given wavelet name - */ - T create(WaveletNotificationSubscriber notifiee, WaveletName waveletName, String waveDomain); - } - - /** Returns the name of the wavelet. */ - WaveletName getWaveletName(); - - /** Returns a snapshot copy of the wavelet state. */ - ObservableWaveletData copyWaveletData() throws WaveletStateException; - - /** Returns a snapshot of the wavelet state, last committed version. */ - CommittedWaveletSnapshot getSnapshot() throws WaveletStateException; - - /** - * Provides read access to the inner state of the {@link WaveletContainer}. - * - * @param <T> the return type of the method. - * @param function the function to apply on the {@link ReadableWaveletData}. - * @return the output of the function. - * @throws WaveletStateException if the wavelet is in an unsuitable state. - */ - <T> T applyFunction(Function<ReadableWaveletData, T> function) throws WaveletStateException; - - /** - * Retrieve the wavelet history of deltas applied to the wavelet. - * - * @param versionStart start version (inclusive), minimum 0. - * @param versionEnd end version (exclusive). - * @param receiver the deltas receiver. - * @throws AccessControlException if {@code versionStart} or - * {@code versionEnd} are not in the wavelet history. - * @throws WaveletStateException if the wavelet is in a state unsuitable for - * retrieving history. - */ - void requestHistory(HashedVersion versionStart, HashedVersion versionEnd, - Receiver<ByteStringMessage<ProtocolAppliedWaveletDelta>> receiver) - throws AccessControlException, WaveletStateException; - - /** - * Retrieve the wavelet history of deltas applied to the wavelet, with - * additional safety check that - * - * @param versionStart start version (inclusive), minimum 0. - * @param versionEnd end version (exclusive). - * @param receiver the deltas receiver. - * @throws AccessControlException if {@code versionStart} or - * {@code versionEnd} are not in the wavelet history. - * @throws WaveletStateException if the wavelet is in a state unsuitable for - * retrieving history. - */ - void requestTransformedHistory(HashedVersion versionStart, HashedVersion versionEnd, - Receiver<TransformedWaveletDelta> receiver) - throws AccessControlException, WaveletStateException; - - /** - * @param participantId id of participant attempting to gain access to - * wavelet, or null if the user isn't logged in. - * @throws WaveletStateException if the wavelet is in a state unsuitable for - * checking permissions. - * @return true if the participant is a participant on the wavelet or if the - * wavelet is empty or if a shared domain participant is participant - * on the wavelet. - */ - boolean checkAccessPermission(ParticipantId participantId) throws WaveletStateException; - - /** - * The Last Committed Version returns when the local or remote wave server - * committed the wavelet. - * - * @throws WaveletStateException if the wavelet is in a state unsuitable for - * getting LCV. - */ - HashedVersion getLastCommittedVersion() throws WaveletStateException; - - /** - * @return true if the participant id is a current participant of the wavelet. - * Each invocation acquires and releases the lock. - */ - boolean hasParticipant(ParticipantId participant) throws WaveletStateException; - - /** - * @return the wavelet creator. This method doesn't acquire - * {@link WaveletContainer} lock since wavelet creator cannot change - * after wavelet creation and therefore it is save to concurrently - * read this property without lock. - */ - ParticipantId getCreator(); - - /** - * This method doesn't acquire {@link WaveletContainer} lock since shared - * domain participant cannot change and therefore it is safe to concurrently - * read this property without lock. - * - * @return the shared domain participant. - */ - public ParticipantId getSharedDomainParticipant(); - - /** - * @return true if the wavelet is at version zero, i.e., has no delta history - */ - boolean isEmpty() throws WaveletStateException; - -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/waveserver/WaveletContainerImpl.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/waveserver/WaveletContainerImpl.java b/src/org/waveprotocol/box/server/waveserver/WaveletContainerImpl.java deleted file mode 100644 index fe08fe0..0000000 --- a/src/org/waveprotocol/box/server/waveserver/WaveletContainerImpl.java +++ /dev/null @@ -1,564 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import org.waveprotocol.box.common.Receiver; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.protobuf.InvalidProtocolBufferException; - -import org.waveprotocol.box.common.DeltaSequence; -import org.waveprotocol.box.server.frontend.CommittedWaveletSnapshot; -import org.waveprotocol.box.server.persistence.PersistenceException; -import org.waveprotocol.box.server.util.WaveletDataUtil; -import org.waveprotocol.box.common.ListReceiver; -import org.waveprotocol.wave.federation.Proto.ProtocolAppliedWaveletDelta; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.operation.OperationException; -import org.waveprotocol.wave.model.operation.OperationPair; -import org.waveprotocol.wave.model.operation.TransformException; -import org.waveprotocol.wave.model.operation.wave.Transform; -import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta; -import org.waveprotocol.wave.model.operation.wave.WaveletDelta; -import org.waveprotocol.wave.model.operation.wave.WaveletOperation; -import org.waveprotocol.wave.model.version.HashedVersion; -import org.waveprotocol.wave.model.wave.ParticipantId; -import org.waveprotocol.wave.model.wave.ParticipantIdUtil; -import org.waveprotocol.wave.model.wave.data.ObservableWaveletData; -import org.waveprotocol.wave.model.wave.data.ReadableWaveletData; -import org.waveprotocol.wave.util.logging.Log; - -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import javax.annotation.Nullable; - -/** - * Contains the history of a wavelet - applied and transformed deltas plus the - * content of the wavelet. - * - * TODO(soren): Unload the wavelet (remove it from WaveMap) if it becomes - * corrupt or fails to load from storage. - */ -abstract class WaveletContainerImpl implements WaveletContainer { - - private static final Log LOG = Log.get(WaveletContainerImpl.class); - - private static final int AWAIT_LOAD_TIMEOUT_SECONDS = 1000; - - protected enum State { - /** Everything is working fine. */ - OK, - - /** Wavelet state is being loaded from storage. */ - LOADING, - - /** Wavelet has been deleted, the instance will not contain any data. */ - DELETED, - - /** - * For some reason this instance is broken, e.g. a remote wavelet update - * signature failed. - */ - CORRUPTED - } - - private final Executor storageContinuationExecutor; - - private final Lock readLock; - private final ReentrantReadWriteLock.WriteLock writeLock; - private final WaveletName waveletName; - private final WaveletNotificationSubscriber notifiee; - private final ParticipantId sharedDomainParticipantId; - /** Is counted down when initial loading from storage completes. */ - private final CountDownLatch loadLatch = new CountDownLatch(1); - /** Is set at most once, before loadLatch is counted down. */ - private WaveletState waveletState; - private State state = State.LOADING; - - /** - * Constructs an empty WaveletContainer for a wavelet. - * WaveletData is not set until a delta has been applied. - * - * @param notifiee the subscriber to notify of wavelet updates and commits. - * @param waveletState the wavelet's delta history and current state. - * @param waveDomain the wave server domain. - * @param storageContinuationExecutor the executor used to perform post wavelet loading logic. - */ - public WaveletContainerImpl(WaveletName waveletName, WaveletNotificationSubscriber notifiee, - final ListenableFuture<? extends WaveletState> waveletStateFuture, String waveDomain, - Executor storageContinuationExecutor) { - this.waveletName = waveletName; - this.notifiee = notifiee; - this.sharedDomainParticipantId = - waveDomain != null ? ParticipantIdUtil.makeUnsafeSharedDomainParticipantId(waveDomain) - : null; - this.storageContinuationExecutor = storageContinuationExecutor; - ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); - this.readLock = readWriteLock.readLock(); - this.writeLock = readWriteLock.writeLock(); - - waveletStateFuture.addListener( - new Runnable() { - @Override - public void run() { - acquireWriteLock(); - try { - Preconditions.checkState(waveletState == null, - "Repeat attempts to set wavelet state"); - Preconditions.checkState(state == State.LOADING, "Unexpected state %s", state); - waveletState = FutureUtil.getResultOrPropagateException( - waveletStateFuture, PersistenceException.class); - Preconditions.checkState(waveletState.getWaveletName().equals(getWaveletName()), - "Wrong wavelet state, named %s, expected %s", - waveletState.getWaveletName(), getWaveletName()); - state = State.OK; - } catch (PersistenceException e) { - LOG.warning("Failed to load wavelet " + getWaveletName(), e); - state = State.CORRUPTED; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.warning("Interrupted loading wavelet " + getWaveletName(), e); - state = State.CORRUPTED; - } catch (RuntimeException e) { - // TODO(soren): would be better to terminate the process in this case - LOG.severe("Unexpected exception loading wavelet " + getWaveletName(), e); - state = State.CORRUPTED; - } finally { - releaseWriteLock(); - } - loadLatch.countDown(); - } - }, - storageContinuationExecutor); - } - - protected void acquireReadLock() { - readLock.lock(); - } - - protected void releaseReadLock() { - readLock.unlock(); - } - - protected void acquireWriteLock() { - writeLock.lock(); - } - - protected void releaseWriteLock() { - writeLock.unlock(); - } - - protected void notifyOfDeltas(ImmutableList<WaveletDeltaRecord> deltas, - ImmutableSet<String> domainsToNotify) { - Preconditions.checkState(writeLock.isHeldByCurrentThread(), "must hold write lock"); - Preconditions.checkArgument(!deltas.isEmpty(), "empty deltas"); - HashedVersion endVersion = deltas.get(deltas.size() - 1).getResultingVersion(); - HashedVersion currentVersion = getCurrentVersion(); - Preconditions.checkArgument(endVersion.equals(currentVersion), - "cannot notify of deltas ending in %s != current version %s", endVersion, currentVersion); - notifiee.waveletUpdate(waveletState.getSnapshot(), deltas, domainsToNotify); - } - - protected void notifyOfCommit(HashedVersion version, ImmutableSet<String> domainsToNotify) { - Preconditions.checkState(writeLock.isHeldByCurrentThread(), "must hold write lock"); - notifiee.waveletCommitted(getWaveletName(), version, domainsToNotify); - } - - /** - * Blocks until the initial load of the wavelet state from storage completes. - * Should be called without the read or write lock held. - * - * @throws WaveletStateException if the wavelet fails to load, - * either because of a storage access failure or timeout, - * or because the current thread is interrupted. - */ - protected void awaitLoad() throws WaveletStateException { - Preconditions.checkState(!writeLock.isHeldByCurrentThread(), "should not hold write lock"); - try { - if (!loadLatch.await(AWAIT_LOAD_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { - throw new WaveletStateException("Timed out waiting for wavelet to load"); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new WaveletStateException("Interrupted waiting for wavelet to load"); - } - } - - /** - * Verifies that the wavelet is in an operational state (not loading, - * not corrupt). - * - * Should be preceded by a call to awaitLoad() so that the initial load from - * storage has completed. Should be called with the read or write lock held. - * - * @throws WaveletStateException if the wavelet is loading or marked corrupt. - */ - protected void checkStateOk() throws WaveletStateException { - if (state != State.OK) { - throw new WaveletStateException("The wavelet is in an unusable state: " + state); - } - } - - /** - * Flags the wavelet corrupted so future calls to checkStateOk() will fail. - */ - protected void markStateCorrupted() { - Preconditions.checkState(writeLock.isHeldByCurrentThread(), "must hold write lock"); - state = State.CORRUPTED; - } - - protected void persist(final HashedVersion version, final ImmutableSet<String> domainsToNotify) { - Preconditions.checkState(writeLock.isHeldByCurrentThread(), "must hold write lock"); - final ListenableFuture<Void> result = waveletState.persist(version); - result.addListener( - new Runnable() { - @Override - public void run() { - try { - result.get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (ExecutionException e) { - LOG.severe("Version " + version, e); - } - acquireWriteLock(); - try { - waveletState.flush(version); - notifyOfCommit(version, domainsToNotify); - } finally { - releaseWriteLock(); - } - } - }, - storageContinuationExecutor); - } - - @Override - public WaveletName getWaveletName() { - return waveletName; - } - - @Override - public boolean checkAccessPermission(ParticipantId participantId) throws WaveletStateException { - awaitLoad(); - acquireReadLock(); - try { - checkStateOk(); - // ParticipantId will be null if the user isn't logged in. A user who isn't logged in should - // have access to public waves once they've been implemented. - // If the wavelet is empty, everyone has access (to write the first delta). - // TODO(soren): determine if off-domain participants should be denied access if empty - ReadableWaveletData snapshot = waveletState.getSnapshot(); - return WaveletDataUtil.checkAccessPermission(snapshot, participantId, sharedDomainParticipantId); - } finally { - releaseReadLock(); - } - } - - @Override - public HashedVersion getLastCommittedVersion() throws WaveletStateException { - awaitLoad(); - acquireReadLock(); - try { - checkStateOk(); - return waveletState.getLastPersistedVersion(); - } finally { - releaseReadLock(); - } - } - - @Override - public ObservableWaveletData copyWaveletData() throws WaveletStateException { - awaitLoad(); - acquireReadLock(); - try { - checkStateOk(); - return WaveletDataUtil.copyWavelet(waveletState.getSnapshot()); - } finally { - releaseReadLock(); - } - } - - @Override - public CommittedWaveletSnapshot getSnapshot() throws WaveletStateException { - awaitLoad(); - acquireReadLock(); - try { - checkStateOk(); - return new CommittedWaveletSnapshot(waveletState.getSnapshot(), - waveletState.getLastPersistedVersion()); - } finally { - releaseReadLock(); - } - } - - @Override - public <T> T applyFunction(Function<ReadableWaveletData, T> function) - throws WaveletStateException { - awaitLoad(); - acquireReadLock(); - try { - checkStateOk(); - return function.apply(waveletState.getSnapshot()); - } finally { - releaseReadLock(); - } - } - - /** - * Transform a wavelet delta if it has been submitted against a different head (currentVersion). - * Must be called with write lock held. - * - * @param delta to possibly transform - * @return the transformed delta and the version it was applied at - * (the version is the current version of the wavelet, unless the delta is - * a duplicate in which case it is the version at which it was originally - * applied) - * @throws InvalidHashException if submitting against same version but different hash - * @throws OperationException if transformation fails - */ - protected WaveletDelta maybeTransformSubmittedDelta(WaveletDelta delta) - throws InvalidHashException, OperationException { - HashedVersion targetVersion = delta.getTargetVersion(); - HashedVersion currentVersion = getCurrentVersion(); - if (targetVersion.equals(currentVersion)) { - // Applied version is the same, we're submitting against head, don't need to do OT - return delta; - } else { - // Not submitting against head, we need to do OT, but check the versions really are different - if (targetVersion.getVersion() == currentVersion.getVersion()) { - LOG.warning("Mismatched hash, expected " + currentVersion + ") but delta targets (" + - targetVersion + ")"); - throw new InvalidHashException(currentVersion, targetVersion); - } else { - return transformSubmittedDelta(delta); - } - } - } - - /** - * Finds range of server deltas needed to transform against, then transforms all client - * ops against the server ops. - */ - private WaveletDelta transformSubmittedDelta(WaveletDelta submittedDelta) - throws OperationException, InvalidHashException { - HashedVersion targetVersion = submittedDelta.getTargetVersion(); - HashedVersion currentVersion = getCurrentVersion(); - Preconditions.checkArgument(!targetVersion.equals(currentVersion)); - ListReceiver<TransformedWaveletDelta> receiver = new ListReceiver<TransformedWaveletDelta>(); - waveletState.getTransformedDeltaHistory(targetVersion, currentVersion, receiver); - DeltaSequence serverDeltas = DeltaSequence.of(receiver); - Preconditions.checkState(!serverDeltas.isEmpty(), - "No deltas between valid versions %s and %s", targetVersion, currentVersion); - - ParticipantId clientAuthor = submittedDelta.getAuthor(); - // TODO(anorth): remove this copy somehow; currently, it's necessary to - // ensure that clientOps.equals() works correctly below (because - // WaveletDelta breaks the List.equals() contract) - List<WaveletOperation> clientOps = Lists.newArrayList(submittedDelta); - for (TransformedWaveletDelta serverDelta : serverDeltas) { - // If the client delta transforms to nothing before we've traversed all - // the server deltas, return the version at which the delta was - // obliterated (rather than the current version) to ensure that delta - // submission is idempotent. - if (clientOps.isEmpty()) { - return new WaveletDelta(clientAuthor, targetVersion, clientOps); - } - ParticipantId serverAuthor = serverDelta.getAuthor(); - if (clientAuthor.equals(serverAuthor) && clientOps.equals(serverDelta)) { - // This is a duplicate of the server delta. - return new WaveletDelta(clientAuthor, targetVersion, clientOps); - } - clientOps = transformOps(clientOps, serverDelta); - targetVersion = serverDelta.getResultingVersion(); - } - Preconditions.checkState(targetVersion.equals(currentVersion)); - return new WaveletDelta(clientAuthor, targetVersion, clientOps); - } - - /** - * Transforms the specified client operations against the specified server operations, - * returning the transformed client operations in a new list. - * - * @param clientOps may be unmodifiable - * @param serverOps may be unmodifiable - * @return transformed client ops - */ - private List<WaveletOperation> transformOps(List<WaveletOperation> clientOps, - List<WaveletOperation> serverOps) throws OperationException { - List<WaveletOperation> transformedClientOps = Lists.newArrayList(); - - for (WaveletOperation c : clientOps) { - for (WaveletOperation s : serverOps) { - OperationPair<WaveletOperation> pair; - try { - pair = Transform.transform(c, s); - } catch (TransformException e) { - throw new OperationException(e); - } - c = pair.clientOp(); - } - transformedClientOps.add(c); - } - return transformedClientOps; - } - - /** - * Builds a {@link WaveletDeltaRecord} and applies it to the wavelet container. - * The delta must be non-empty. - */ - protected WaveletDeltaRecord applyDelta( - ByteStringMessage<ProtocolAppliedWaveletDelta> appliedDelta, WaveletDelta transformed) - throws InvalidProtocolBufferException, OperationException { - TransformedWaveletDelta transformedDelta = - AppliedDeltaUtil.buildTransformedDelta(appliedDelta, transformed); - - WaveletDeltaRecord deltaRecord = new WaveletDeltaRecord(transformed.getTargetVersion(), - appliedDelta, transformedDelta); - waveletState.appendDelta(deltaRecord); - - return deltaRecord; - } - - /** - * @param versionActuallyAppliedAt the version to look up - * @return the applied delta applied at the specified hashed version - */ - protected ByteStringMessage<ProtocolAppliedWaveletDelta> lookupAppliedDelta( - HashedVersion versionActuallyAppliedAt) { - return waveletState.getAppliedDelta(versionActuallyAppliedAt); - } - - /** - * @param endVersion the version to look up - * @return the applied delta with the given resulting version - */ - protected ByteStringMessage<ProtocolAppliedWaveletDelta> lookupAppliedDeltaByEndVersion( - HashedVersion endVersion) { - return waveletState.getAppliedDeltaByEndVersion(endVersion); - } - - protected TransformedWaveletDelta lookupTransformedDelta(HashedVersion appliedAtVersion) { - return waveletState.getTransformedDelta(appliedAtVersion); - } - - /** - * @throws AccessControlException with the given message if version does not - * match a delta boundary in the wavelet history. - */ - private void checkVersionIsDeltaBoundary(HashedVersion version, String message) - throws AccessControlException { - HashedVersion actual = waveletState.getHashedVersion(version.getVersion()); - if (!version.equals(actual)) { - LOG.info("Unrecognized " + message + " at version " + version + ", actual " + actual); - // We omit the hash from the message to avoid leaking it. - throw new AccessControlException( - "Unrecognized " + message + " at version " + version.getVersion()); - } - } - - @Override - public void requestHistory(HashedVersion startVersion, HashedVersion endVersion, - Receiver<ByteStringMessage<ProtocolAppliedWaveletDelta>> receiver) - throws AccessControlException, WaveletStateException { - acquireReadLock(); - try { - checkStateOk(); - checkVersionIsDeltaBoundary(startVersion, "start version"); - checkVersionIsDeltaBoundary(endVersion, "end version"); - waveletState.getAppliedDeltaHistory(startVersion, endVersion, receiver); - } finally { - releaseReadLock(); - } - } - - @Override - public void requestTransformedHistory(HashedVersion startVersion, HashedVersion endVersion, - Receiver<TransformedWaveletDelta> receiver) throws AccessControlException, WaveletStateException { - awaitLoad(); - acquireReadLock(); - try { - checkStateOk(); - checkVersionIsDeltaBoundary(startVersion, "start version"); - checkVersionIsDeltaBoundary(endVersion, "end version"); - waveletState.getTransformedDeltaHistory(startVersion, endVersion, receiver); - } finally { - releaseReadLock(); - } - } - - @Override - public boolean hasParticipant(ParticipantId participant) throws WaveletStateException { - awaitLoad(); - acquireReadLock(); - try { - checkStateOk(); - ReadableWaveletData snapshot = waveletState.getSnapshot(); - return snapshot != null && snapshot.getParticipants().contains(participant); - } finally { - releaseReadLock(); - } - } - - @Override - public ParticipantId getSharedDomainParticipant() { - return sharedDomainParticipantId; - } - - @Override - public ParticipantId getCreator() { - ReadableWaveletData snapshot = waveletState.getSnapshot(); - return snapshot != null ? snapshot.getCreator() : null; - } - - @Override - public boolean isEmpty() throws WaveletStateException { - awaitLoad(); - acquireReadLock(); - try { - checkStateOk(); - return waveletState.getSnapshot() == null; - } finally { - releaseReadLock(); - } - } - - @Nullable - protected HashedVersion getCurrentVersion() { - if(waveletState == null) - return null; - - return waveletState.getCurrentVersion(); - } - - protected ReadableWaveletData accessSnapshot() { - return waveletState.getSnapshot(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/waveserver/WaveletDeltaRecord.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/waveserver/WaveletDeltaRecord.java b/src/org/waveprotocol/box/server/waveserver/WaveletDeltaRecord.java deleted file mode 100644 index 5244091..0000000 --- a/src/org/waveprotocol/box/server/waveserver/WaveletDeltaRecord.java +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import com.google.common.base.Preconditions; - -import org.waveprotocol.wave.federation.Proto.ProtocolAppliedWaveletDelta; -import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta; -import org.waveprotocol.wave.model.version.HashedVersion; -import org.waveprotocol.wave.model.wave.ParticipantId; - -import javax.annotation.Nullable; - -/** - * Bundles an applied delta (an original signed delta with information about how - * it was applied) and its transformed operations. - * - * @author [email protected] (Soren Lassen) - */ -public class WaveletDeltaRecord { - private final HashedVersion appliedAtVersion; - @Nullable private final ByteStringMessage<ProtocolAppliedWaveletDelta> applied; - private final TransformedWaveletDelta transformed; - - /** - * @param appliedAtVersion the version which the transformed delta applies at - * @param applied is the applied delta which transforms to {@code transformed} - * @param transformed is the transformed result of {@code applied} - */ - public WaveletDeltaRecord( - HashedVersion appliedAtVersion, - @Nullable ByteStringMessage<ProtocolAppliedWaveletDelta> applied, - TransformedWaveletDelta transformed) { - Preconditions.checkNotNull(transformed, "null appliedAtVersion"); - Preconditions.checkNotNull(transformed, "null transformed delta"); - this.appliedAtVersion = appliedAtVersion; - this.applied = applied; - this.transformed = transformed; - } - - public ByteStringMessage<ProtocolAppliedWaveletDelta> getAppliedDelta() { - return applied; - } - - public TransformedWaveletDelta getTransformedDelta() { - return transformed; - } - - // Convenience methods: - - /** @return true if the transformed delta has no operations */ - public boolean isEmpty() { - return transformed.isEmpty(); - } - - /** - * @return the hashed version which this delta was applied at - */ - public HashedVersion getAppliedAtVersion() { - return appliedAtVersion; - } - - /** @return the author of the delta */ - public ParticipantId getAuthor() { - return transformed.getAuthor(); - } - - /** @return the hashed version after the delta is applied */ - public HashedVersion getResultingVersion() { - return transformed.getResultingVersion(); - } - - /** @return the timestamp when this delta was applied */ - public long getApplicationTimestamp() { - return transformed.getApplicationTimestamp(); - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((applied == null) ? 0 : applied.hashCode()); - result = prime * result + ((transformed == null) ? 0 : transformed.hashCode()); - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - WaveletDeltaRecord other = (WaveletDeltaRecord) obj; - if (applied == null) { - if (other.applied != null) { - return false; - } - } else if (!applied.equals(other.applied)) { - return false; - } - if (transformed == null) { - if (other.transformed != null) { - return false; - } - } else if (!transformed.equals(other.transformed)) { - return false; - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/waveserver/WaveletDeltaRecordReader.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/waveserver/WaveletDeltaRecordReader.java b/src/org/waveprotocol/box/server/waveserver/WaveletDeltaRecordReader.java deleted file mode 100644 index ca03528..0000000 --- a/src/org/waveprotocol/box/server/waveserver/WaveletDeltaRecordReader.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import org.waveprotocol.wave.federation.Proto.ProtocolAppliedWaveletDelta; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta; -import org.waveprotocol.wave.model.version.HashedVersion; - -import java.io.IOException; - -/** - * Reads wavelet deltas for a wavelet. - * - * @author [email protected] (Soren Lassen) - */ -public interface WaveletDeltaRecordReader { - - /** @return the name of the wavelet whose deltas this reader reads */ - WaveletName getWaveletName(); - - /** @return true if the collection contains no deltas */ - boolean isEmpty(); - - /** @return the resulting version of the last delta */ - HashedVersion getEndVersion(); - - /** @return the delta applied at this version, if any, otherwise null */ - WaveletDeltaRecord getDelta(long version) throws IOException; - - /** @return the delta leading to this version, if any, otherwise null */ - WaveletDeltaRecord getDeltaByEndVersion(long version) throws IOException; - - /** - * Convenience method, approximately equivalent to - * {@code AppliedDeltaUtil.getHashedVersionAppliedAt(getAppliedDelta(version))} - * - * @return the hashed version for the specified version number, if any delta - * was applied at this version, otherwise null - */ - HashedVersion getAppliedAtVersion(long version) throws IOException; - - /** - * Convenience method, approximately equivalent to - * {@code getTransformedDelta(version).getResultingVersion()} - * - * @return the resulting hashed version of the delta applied at the given - * version, if any, otherwise null - */ - HashedVersion getResultingVersion(long version) throws IOException; - - /** - * Convenience method, approximately equivalent to - * {@code getDelta(version).applied)} - * - * @return the applied delta applied at this version, if any, otherwise null - */ - ByteStringMessage<ProtocolAppliedWaveletDelta> getAppliedDelta(long version) throws IOException; - - /** - * Convenience method, approximately equivalent to - * {@code getDelta(version).transformed)} - * - * @return the transformed delta applied at this version, if any, otherwise - * null - */ - TransformedWaveletDelta getTransformedDelta(long version) throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/waveserver/WaveletNotificationDispatcher.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/waveserver/WaveletNotificationDispatcher.java b/src/org/waveprotocol/box/server/waveserver/WaveletNotificationDispatcher.java deleted file mode 100644 index c0eee13..0000000 --- a/src/org/waveprotocol/box/server/waveserver/WaveletNotificationDispatcher.java +++ /dev/null @@ -1,186 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; -import com.google.inject.Inject; -import com.google.protobuf.ByteString; - -import org.waveprotocol.box.common.DeltaSequence; -import org.waveprotocol.box.server.common.CoreWaveletOperationSerializer; -import org.waveprotocol.box.server.util.WaveletDataUtil; -import org.waveprotocol.wave.federation.FederationErrorProto.FederationError; -import org.waveprotocol.wave.federation.FederationHostBridge; -import org.waveprotocol.wave.federation.Proto.ProtocolHashedVersion; -import org.waveprotocol.wave.federation.WaveletFederationListener; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta; -import org.waveprotocol.wave.model.version.HashedVersion; -import org.waveprotocol.wave.model.wave.data.ReadableWaveletData; -import org.waveprotocol.wave.util.logging.Log; - -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.ExecutionException; - -/** - * Forwards wave notifications to wave bus subscribers and remote wave servers. - * - * Swallows any runtime exception from a wave bus subscriber but not removes that - * subscriber. The wave server used to do this swallowing but really things are - * in bad shape if a subscriber throws a runtime exception. - * TODO(anorth): Remove this catch and let the server crash. - * - * @author [email protected] (Soren Lassen) - */ -class WaveletNotificationDispatcher implements WaveBus, WaveletNotificationSubscriber { - - private static final Log LOG = Log.get(WaveletNotificationDispatcher.class); - - /** Picks out the transformed deltas from a list of delta records. */ - private static ImmutableList<TransformedWaveletDelta> transformedDeltasOf( - Iterable<WaveletDeltaRecord> deltaRecords) { - ImmutableList.Builder<TransformedWaveletDelta> transformedDeltas = ImmutableList.builder(); - for (WaveletDeltaRecord deltaRecord : deltaRecords) { - transformedDeltas.add(deltaRecord.getTransformedDelta()); - } - return transformedDeltas.build(); - } - - /** Picks out the byte strings of the applied deltas from a list of delta records. */ - private static ImmutableList<ByteString> serializedAppliedDeltasOf( - Iterable<WaveletDeltaRecord> deltaRecords) { - ImmutableList.Builder<ByteString> serializedAppliedDeltas = ImmutableList.builder(); - for (WaveletDeltaRecord deltaRecord : deltaRecords) { - serializedAppliedDeltas.add(deltaRecord.getAppliedDelta().getByteString()); - } - return serializedAppliedDeltas.build(); - } - - private final ImmutableSet<String> localDomains; - private final WaveletFederationListener.Factory federationHostFactory; - private final CopyOnWriteArraySet<WaveBus.Subscriber> subscribers = - new CopyOnWriteArraySet<WaveBus.Subscriber>(); - - /** Maps remote domains to wave server stubs for those domains. */ - private final LoadingCache<String, WaveletFederationListener> federationHosts = - CacheBuilder.newBuilder().build(new CacheLoader<String, WaveletFederationListener>() { - @Override - public WaveletFederationListener load(String domain) { - return federationHostFactory.listenerForDomain(domain); - } - }); - - /** - * Constructor. - * - * @param certificateManager knows what the local domains are - * @param federationHostFactory manufactures federation host instances for - * remote domains - */ - @Inject - public WaveletNotificationDispatcher( - CertificateManager certificateManager, - @FederationHostBridge WaveletFederationListener.Factory federationHostFactory) { - this.localDomains = certificateManager.getLocalDomains(); - this.federationHostFactory = federationHostFactory; - } - - @Override - public void subscribe(Subscriber s) { - subscribers.add(s); - } - - @Override - public void unsubscribe(Subscriber s) { - subscribers.remove(s); - } - - @Override - public void waveletUpdate(ReadableWaveletData wavelet, ImmutableList<WaveletDeltaRecord> deltas, - ImmutableSet<String> domainsToNotify) { - DeltaSequence sequence = DeltaSequence.of(transformedDeltasOf(deltas)); - for (WaveBus.Subscriber s : subscribers) { - try { - s.waveletUpdate(wavelet, sequence); - } catch (RuntimeException e) { - LOG.severe("Runtime exception in update to wave bus subscriber " + s, e); - } - } - - Set<String> remoteDomainsToNotify = Sets.difference(domainsToNotify, localDomains); - if (!remoteDomainsToNotify.isEmpty()) { - ImmutableList<ByteString> serializedAppliedDeltas = serializedAppliedDeltasOf(deltas); - for (String domain : remoteDomainsToNotify) { - try { - federationHosts.get(domain).waveletDeltaUpdate(WaveletDataUtil.waveletNameOf(wavelet), - serializedAppliedDeltas, federationCallback("delta update")); - } catch (ExecutionException ex) { - throw new RuntimeException(ex); - } - } - } - } - - @Override - public void waveletCommitted(WaveletName waveletName, HashedVersion version, - ImmutableSet<String> domainsToNotify) { - for (WaveBus.Subscriber s : subscribers) { - try { - s.waveletCommitted(waveletName, version); - } catch (RuntimeException e) { - LOG.severe("Runtime exception in commit to wave bus subscriber " + s, e); - } - } - - Set<String> remoteDomainsToNotify = Sets.difference(domainsToNotify, localDomains); - if (!remoteDomainsToNotify.isEmpty()) { - ProtocolHashedVersion serializedVersion = CoreWaveletOperationSerializer.serialize(version); - for (String domain : remoteDomainsToNotify) { - try { - federationHosts.get(domain).waveletCommitUpdate( - waveletName, serializedVersion, federationCallback("commit notice")); - } catch (ExecutionException ex) { - throw new RuntimeException(ex); - } - } - } - } - - private WaveletFederationListener.WaveletUpdateCallback federationCallback( - final String description) { - return new WaveletFederationListener.WaveletUpdateCallback() { - @Override - public void onSuccess() { - LOG.info(description + " success"); - } - - @Override - public void onFailure(FederationError error) { - LOG.warning(description + " failure: " + error); - } - }; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/waveserver/WaveletNotificationSubscriber.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/waveserver/WaveletNotificationSubscriber.java b/src/org/waveprotocol/box/server/waveserver/WaveletNotificationSubscriber.java deleted file mode 100644 index c36f456..0000000 --- a/src/org/waveprotocol/box/server/waveserver/WaveletNotificationSubscriber.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; - -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.version.HashedVersion; -import org.waveprotocol.wave.model.wave.data.ReadableWaveletData; - -/** - * Receives wave notifications. - * - * @author [email protected] (Soren Lassen) - */ -public interface WaveletNotificationSubscriber { - /** - * Notifies of a wavelet update. - * - * @param wavelet the state of the wavelet after the deltas have - * been applied - * @param deltas deltas applied to the wavelet - * @param domainsToNotify domains who should know. Empty set if the wavelet is remote. - */ - void waveletUpdate(ReadableWaveletData wavelet, ImmutableList<WaveletDeltaRecord> deltas, - ImmutableSet<String> domainsToNotify); - - /** - * Notifies that a wavelet has been committed to persistent storage. - * - * @param waveletName name of wavelet - * @param version the version and hash of the wavelet as it was committed - * @param domainsToNotify domains who should know. Empty set if the wavelet is remote. - */ - void waveletCommitted(WaveletName waveletName, HashedVersion version, - ImmutableSet<String> domainsToNotify); -}
