http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/waveserver/CertificateManagerImpl.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/waveserver/CertificateManagerImpl.java b/src/org/waveprotocol/box/server/waveserver/CertificateManagerImpl.java deleted file mode 100644 index 911576c..0000000 --- a/src/org/waveprotocol/box/server/waveserver/CertificateManagerImpl.java +++ /dev/null @@ -1,311 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import com.google.common.base.Preconditions; -import com.google.common.collect.*; -import com.google.inject.Inject; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; -import com.typesafe.config.Config; -import org.apache.commons.codec.binary.Hex; -import org.waveprotocol.box.server.common.CoreWaveletOperationSerializer; -import org.waveprotocol.wave.crypto.*; -import org.waveprotocol.wave.federation.FederationErrorProto.FederationError; -import org.waveprotocol.wave.federation.FederationErrors; -import org.waveprotocol.wave.federation.Proto.ProtocolSignature; -import org.waveprotocol.wave.federation.Proto.ProtocolSignedDelta; -import org.waveprotocol.wave.federation.Proto.ProtocolSignerInfo; -import org.waveprotocol.wave.federation.Proto.ProtocolWaveletDelta; -import org.waveprotocol.wave.federation.WaveletFederationProvider; -import org.waveprotocol.wave.federation.WaveletFederationProvider.DeltaSignerInfoResponseListener; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.version.HashedVersion; -import org.waveprotocol.wave.model.wave.ParticipantId; -import org.waveprotocol.wave.util.logging.Log; - -import java.util.List; -import java.util.Map; - -/** - * Default implementation of {@link CertificateManager}. - */ -public class CertificateManagerImpl implements CertificateManager { - - private static final Log LOG = Log.get(CertificateManagerImpl.class); - - private final SignatureHandler waveSigner; - private final ImmutableSet<String> localDomains; - private final WaveSignatureVerifier verifier; - private final CertPathStore certPathStore; - private final boolean disableVerfication; - - /** - * Map of signer ids to requests for the signer info for those ids. Each signer id is mapped to - * a multimap: a domain mapped to a list of callbacks for that domain, called when the signer info - * is available for the signer id. It is arranged by domain to facilitate the optimisation where - * exactly 1 signer request is sent per domain. - */ - private final Map<ByteString, Multimap<String, SignerInfoPrefetchResultListener>> - signerInfoRequests; - - @Inject - public CertificateManagerImpl(Config config, SignatureHandler signer, - WaveSignatureVerifier verifier, CertPathStore certPathStore) { - this.disableVerfication = config.getBoolean("federation.waveserver_disable_verification"); - this.waveSigner = signer; - // for now, we just support a single signer - this.localDomains = ImmutableSet.of(signer.getDomain()); - this.verifier = verifier; - this.certPathStore = certPathStore; - this.signerInfoRequests = Maps.newHashMap(); - - if (disableVerfication) { - LOG.warning("** SIGNATURE VERIFICATION DISABLED ** " - + "see configuration federation.waveserver_disable_verification"); - } - } - - @Override - public ImmutableSet<String> getLocalDomains() { - return localDomains; - } - - @Override - public SignatureHandler getLocalSigner() { - return waveSigner; - } - - @Override - public ProtocolSignedDelta signDelta(ByteStringMessage<ProtocolWaveletDelta> delta) { - // TODO: support extended address paths. For now, there will be exactly - // one signature, and we don't support federated groups. - Preconditions.checkState(delta.getMessage().getAddressPathCount() == 0); - - ProtocolSignedDelta.Builder signedDelta = ProtocolSignedDelta.newBuilder(); - - signedDelta.setDelta(delta.getByteString()); - signedDelta.addAllSignature(waveSigner.sign(delta)); - return signedDelta.build(); - } - - @Override - public ByteStringMessage<ProtocolWaveletDelta> verifyDelta(ProtocolSignedDelta signedDelta) - throws SignatureException, UnknownSignerException { - ByteStringMessage<ProtocolWaveletDelta> delta; - try { - delta = ByteStringMessage.parseProtocolWaveletDelta(signedDelta.getDelta()); - } catch (InvalidProtocolBufferException e) { - throw new IllegalArgumentException("signed delta does not contain valid delta", e); - } - - if (disableVerfication) { - return delta; - } - - List<String> domains = getParticipantDomains(delta.getMessage()); - - if (domains.size() != signedDelta.getSignatureCount()) { - throw new SignatureException("found " + domains.size() + " domains in " + - "extended address path, but " + signedDelta.getSignatureCount() + - " signatures."); - } - - for (int i = 0; i < domains.size(); i++) { - String domain = domains.get(i); - ProtocolSignature signature = signedDelta.getSignature(i); - verifySingleSignature(delta, signature, domain); - } - - return delta; - } - - /** - * Verifies a single signature. - * @param delta the payload that we're verifying the signature on. - * @param signature the signature on the payload - * @param domain the authority (domain name) that should have signed the - * payload. - * @throws SignatureException if the signature doesn't verify. - */ - private void verifySingleSignature(ByteStringMessage<ProtocolWaveletDelta> delta, - ProtocolSignature signature, String domain) - throws SignatureException, UnknownSignerException { - verifier.verify(delta.getByteString().toByteArray(), signature, domain); - } - - /** - * Returns the domains of all the addresses in the extended address path. - */ - private List<String> getParticipantDomains(ProtocolWaveletDelta delta) { - Iterable<String> addresses = getExtendedAddressPath(delta); - return getDeDupedDomains(addresses); - } - - /** - * Extracts the domains from user addresses, and removes duplicates. - */ - private List<String> getDeDupedDomains(Iterable<String> addresses) { - List<String> domains = Lists.newArrayList(); - for (String address : addresses) { - String participantDomain = new ParticipantId(address).getDomain(); - if (!domains.contains(participantDomain)) { - domains.add(participantDomain); - } - } - return domains; - } - - /** - * Returns the extended address path, i.e., the addresses in the delta's - * address path, plus the author of the delta. - */ - private Iterable<String> getExtendedAddressPath(ProtocolWaveletDelta delta) { - return Iterables.concat(delta.getAddressPathList(), - ImmutableList.of(delta.getAuthor())); - } - - @Override - public synchronized void storeSignerInfo(ProtocolSignerInfo signerInfo) - throws SignatureException { - verifier.verifySignerInfo(new SignerInfo(signerInfo)); - certPathStore.putSignerInfo(signerInfo); - } - - @Override - public synchronized ProtocolSignerInfo retrieveSignerInfo(ByteString signerId) { - SignerInfo signerInfo; - try { - signerInfo = certPathStore.getSignerInfo(signerId.toByteArray()); - // null is acceptable for retrieveSignerInfo. The user of the certificate manager should call - // prefetchDeltaSignerInfo for the mechanism to actually populate the certificate manager. - return signerInfo == null ? null : signerInfo.toProtoBuf(); - } catch (SignatureException e) { - /* - * TODO: This may result in the server endlessly requesting the signer info from the - * remote server, a more graceful failure needs to be implemented. - */ - LOG.severe("Failed to retreive signer info for " - + new String(Hex.encodeHex(signerId.toByteArray())), e); - return null; - } - } - - @Override - public synchronized void prefetchDeltaSignerInfo(WaveletFederationProvider provider, - ByteString signerId, WaveletName waveletName, HashedVersion deltaEndVersion, - SignerInfoPrefetchResultListener callback) { - ProtocolSignerInfo signerInfo = retrieveSignerInfo(signerId); - - if (signerInfo != null) { - callback.onSuccess(signerInfo); - } else { - enqueueSignerInfoRequest(provider, signerId, waveletName, deltaEndVersion, callback); - } - } - - /** - * Enqueue a signer info request for a signed delta on a given domain. - */ - private synchronized void enqueueSignerInfoRequest(final WaveletFederationProvider provider, - final ByteString signerId, final WaveletName waveletName, - HashedVersion deltaEndVersion, SignerInfoPrefetchResultListener callback) { - final String domain = waveletName.waveletId.getDomain(); - Multimap<String, SignerInfoPrefetchResultListener> domainCallbacks = - signerInfoRequests.get(signerId); - - if (domainCallbacks == null) { - domainCallbacks = ArrayListMultimap.create(); - signerInfoRequests.put(signerId, domainCallbacks); - } - - // The thing is, we need to add multiple callbacks for the same domain, but we only want to - // have one outstanding request per domain - domainCallbacks.put(domain, callback); - - if (domainCallbacks.get(domain).size() == 1) { - provider.getDeltaSignerInfo(signerId, waveletName, - (deltaEndVersion == null) - ? null : CoreWaveletOperationSerializer.serialize(deltaEndVersion), - new DeltaSignerInfoResponseListener() { - @Override public void onFailure(FederationError error) { - LOG.warning("getDeltaSignerInfo failed: " + error); - // Fail all requests on this domain - dequeueSignerInfoRequestForDomain(signerId, error, domain); - } - - @Override public void onSuccess(ProtocolSignerInfo signerInfo) { - try { - storeSignerInfo(signerInfo); - dequeueSignerInfoRequest(signerId, null); - } catch (SignatureException e) { - LOG.warning("Failed to verify signer info", e); - dequeueSignerInfoRequest(signerId, FederationErrors.badRequest(e.toString())); - } - }}); - } - } - - /** - * Dequeue all signer info requests for a given signer id. - * - * @param signerId to dequeue requests for - * @param error if there was an error, null for success - */ - private synchronized void dequeueSignerInfoRequest(ByteString signerId, FederationError error) { - List<String> domains = ImmutableList.copyOf(signerInfoRequests.get(signerId).keySet()); - for (String domain : domains) { - dequeueSignerInfoRequestForDomain(signerId, error, domain); - } - } - - /** - * Dequeue all signer info requests for a given signer id and a specific domain. - * - * @param signerId to dequeue requests for - * @param error if there was an error, null for success - * @param domain to dequeue the signer requests for - */ - private synchronized void dequeueSignerInfoRequestForDomain(ByteString signerId, - FederationError error, String domain) { - Multimap<String, SignerInfoPrefetchResultListener> domainListeners = - signerInfoRequests.get(signerId); - if (domainListeners == null) { - LOG.info("There are no domain listeners for signer " + signerId + " domain "+ domain); - return; - } else { - LOG.info("Dequeuing " + domainListeners.size() + " listeners for domain " + domain); - } - - for (SignerInfoPrefetchResultListener listener : domainListeners.get(domain)) { - if (error == null) { - listener.onSuccess(retrieveSignerInfo(signerId)); - } else { - listener.onFailure(error); - } - } - - domainListeners.removeAll(domain); - if (domainListeners.isEmpty()) { - // No listeners for any domains, delete the signer id for the overall map - signerInfoRequests.remove(signerId); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/waveserver/DeltaAndSnapshotStore.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/waveserver/DeltaAndSnapshotStore.java b/src/org/waveprotocol/box/server/waveserver/DeltaAndSnapshotStore.java deleted file mode 100644 index 04fc051..0000000 --- a/src/org/waveprotocol/box/server/waveserver/DeltaAndSnapshotStore.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import org.waveprotocol.box.server.persistence.PersistenceException; -import org.waveprotocol.wave.model.wave.data.ReadableWaveletData; - -import java.io.Closeable; -import java.util.Collection; - -/** - * Stores wavelet deltas and snapshots. - * - * @author [email protected] (Soren Lassen) - */ -public interface DeltaAndSnapshotStore extends WaveletStore<DeltaAndSnapshotStore.WaveletAccess> { - - /** - * Accesses the state for a wavelet. - * Permits reading a snapshot of the wavelet state and historical deltas, - * and appending deltas to the history. - * - * Callers must serialize all calls to - * {@link #appendDeltas(Collection,ReadableWaveletData)}. - */ - interface WaveletAccess extends WaveletDeltaRecordReader, Closeable { - - /** - * @return Immutable copy of the last known committed state of the wavelet. - */ - ReadableWaveletData getSnapshot(); - - /** - * Blocking call to append deltas to the end of the delta history. - * If the call returns normally (doesn't throw an exception), then - * the deltas have been successfully and "durably" stored, that is, - * the method forces the data to disk. - * - * @param deltas Contiguous deltas, beginning from the DeltaAccess object's - * end version. It is the caller's responsibility to ensure that - * everything matches up (applied and transformed deltas in the - * records match, that the hashes are correctly computed, etc). - * @param resultingSnapshot A snapshot of the state of the wavelet after - * {@code deltas} are applied. - * @throws PersistenceException if anything goes wrong with the underlying - * storage. - */ - void appendDeltas(Collection<WaveletDeltaRecord> deltas, - ReadableWaveletData resultingSnapshot) throws PersistenceException; - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/waveserver/DeltaStore.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/waveserver/DeltaStore.java b/src/org/waveprotocol/box/server/waveserver/DeltaStore.java deleted file mode 100644 index 8ca5a5d..0000000 --- a/src/org/waveprotocol/box/server/waveserver/DeltaStore.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import org.waveprotocol.box.server.persistence.PersistenceException; - -import java.io.Closeable; -import java.util.Collection; - -/** - * Stores wavelet deltas. - * - * @author [email protected] (Soren Lassen) - */ -public interface DeltaStore extends WaveletStore<DeltaStore.DeltasAccess> { - - /** - * Accesses the delta history for a wavelet. - * Permits reading historical deltas and appending deltas to the history. - */ - interface DeltasAccess extends WaveletDeltaRecordReader, Closeable { - /** - * Blocking call to append deltas to the end of the delta history. - * If the call returns normally (doesn't throw an exception), then - * the deltas have been successfully and "durably" stored, that is, - * the method forces the data to disk. - * - * @param deltas contiguous deltas, beginning from the DeltaAccess object's - * end version. It is the caller's responsibility to ensure that - * everything matches up (applied and transformed deltas in the - * records match, that the hashes are correctly computed, etc). - * @throws PersistenceException if anything goes wrong with the underlying - * storage. - */ - void append(Collection<WaveletDeltaRecord> deltas) throws PersistenceException; - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedSnapshotStore.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedSnapshotStore.java b/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedSnapshotStore.java deleted file mode 100644 index d564470..0000000 --- a/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedSnapshotStore.java +++ /dev/null @@ -1,226 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableSet; -import com.google.inject.Inject; - -import org.waveprotocol.box.common.ExceptionalIterator; -import org.waveprotocol.box.server.persistence.FileNotFoundPersistenceException; -import org.waveprotocol.box.server.persistence.PersistenceException; -import org.waveprotocol.box.server.util.WaveletDataUtil; -import org.waveprotocol.wave.model.id.WaveId; -import org.waveprotocol.wave.model.id.WaveletId; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.operation.OperationException; -import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta; -import org.waveprotocol.wave.model.version.HashedVersion; -import org.waveprotocol.wave.model.wave.data.ReadableWaveletData; - -import java.io.IOException; -import java.util.Collection; -import java.util.Iterator; - -/** - * Wave store backed by a {@link DeltaStore}. - * - * @author [email protected] (Soren Lassen) - */ -class DeltaStoreBasedSnapshotStore implements DeltaAndSnapshotStore { - - /** - * Reads the transformed deltas from a {@link WaveletDeltaRecordReader}. - */ - private static class TransformedWaveletDeltaIterator - implements Iterator<TransformedWaveletDelta> { - private final WaveletDeltaRecordReader reader; - private long nextVersion = 0; - - public TransformedWaveletDeltaIterator(WaveletDeltaRecordReader reader) { - this.reader = reader; - } - - @Override - public boolean hasNext() { - return !reader.isEmpty() && nextVersion < reader.getEndVersion().getVersion(); - } - - /** - * {@inheritDoc} - * - * @throws RuntimeIOException if the underlying reader throws - * {@link IOException} - * @throws IllegalStateException if there are gaps between deltas or - * the next delta is empty - */ - @Override - public TransformedWaveletDelta next() { - try { - TransformedWaveletDelta delta = reader.getTransformedDelta(nextVersion); - Preconditions.checkState(delta != null, "no delta at version %s", nextVersion); - Preconditions.checkState( - delta.getAppliedAtVersion() < delta.getResultingVersion().getVersion(), - "delta [%s, %s) is empty", delta.getAppliedAtVersion(), delta.getResultingVersion()); - nextVersion = delta.getResultingVersion().getVersion(); - return delta; - } catch (IOException e) { - throw new RuntimeIOException(e); - } - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - } - - /** - * Reads all deltas and applies them all to construct the end wavelet state. - */ - private static ReadableWaveletData buildWaveletFromDeltaReader(WaveletDeltaRecordReader reader) - throws PersistenceException { - try { - // TODO(soren): better error handling of IllegalStateExceptions and - // OperationExceptions thrown from here - ReadableWaveletData wavelet = - WaveletDataUtil.buildWaveletFromDeltas(reader.getWaveletName(), - new TransformedWaveletDeltaIterator(reader)); - Preconditions.checkState(wavelet.getHashedVersion().equals(reader.getEndVersion())); - return wavelet; - } catch (OperationException e) { - throw new PersistenceException(e); - } catch (RuntimeIOException e) { - throw new PersistenceException(e.getIOException()); - } - } - - /** - * Creates a {@link DeltaAndSnapshotStore.WaveletAccess} instance which wraps - * {@code deltasAccess}. - * - * @throws IllegalStateException if the delta history is bad - */ - private static WaveletAccess createWaveletAccess(DeltaStore.DeltasAccess deltasAccess) - throws PersistenceException { - ReadableWaveletData wavelet; - wavelet = deltasAccess.isEmpty() ? null : buildWaveletFromDeltaReader(deltasAccess); - return new DeltasAccessBasedWaveletAccess(deltasAccess, wavelet); - } - - /** - * Wraps {@link DeltaStore.DeltasAccess}. - */ - static class DeltasAccessBasedWaveletAccess extends ForwardingWaveletDeltaRecordReader - implements WaveletAccess { - - private final DeltaStore.DeltasAccess deltasAccess; - - // TODO(soren): figure out thread safe access - // (synchronize access to snapshot, isClosed? or make them atomic types?) - - private ReadableWaveletData snapshot; // is null when there are no deltas - private boolean isClosed = false; - - private DeltasAccessBasedWaveletAccess(DeltaStore.DeltasAccess deltasAccess, - ReadableWaveletData snapshot) { - this.deltasAccess = deltasAccess; - this.snapshot = snapshot; - } - - @Override - protected WaveletDeltaRecordReader delegate() { - Preconditions.checkState(!isClosed, "Illegal access after closure"); - return deltasAccess; - } - - @Override - public boolean isEmpty() { - // Don't use the underlying deltasAccess method, rather let this - // be controlled by our own state. - return getSnapshot() == null; - } - - @Override - public HashedVersion getEndVersion() { - // Don't use the underlying deltasAccess method, rather let this - // be controlled by our own state. - return getSnapshot().getHashedVersion(); - } - - @Override - public ReadableWaveletData getSnapshot() { - Preconditions.checkState(!isClosed, "Illegal access after closure"); - return snapshot; - } - - @Override - public void appendDeltas(Collection<WaveletDeltaRecord> deltas, - ReadableWaveletData resultingSnapshot) throws PersistenceException { - Preconditions.checkState(!isClosed, "Illegal access after closure"); - // First append the deltas. - deltasAccess.append(deltas); - // Once the deltas have been stored, we update the wavelet data, which - // affects the result of any calls to getEndVersion() or getSnapshot(). - snapshot = resultingSnapshot; - } - - @Override - public void close() throws IOException { - // We don't check if we're already closed, it's ok to call close() twice. - isClosed = true; - deltasAccess.close(); - } - } - - private final DeltaStore deltaStore; - - /** - * Constructs a {@link DeltaAndSnapshotStore} instance which wraps {@code deltaStore}. - * - * @param deltaStore The underlying {@link DeltaStore}. - */ - @Inject - public DeltaStoreBasedSnapshotStore(DeltaStore deltaStore) { - this.deltaStore = deltaStore; - } - - @Override - public WaveletAccess open(WaveletName waveletName) throws PersistenceException { - return createWaveletAccess(deltaStore.open(waveletName)); - } - - @Override - public void delete(WaveletName waveletName) throws PersistenceException, - FileNotFoundPersistenceException { - deltaStore.delete(waveletName); - } - - @Override - public ImmutableSet<WaveletId> lookup(WaveId waveId) throws PersistenceException { - return deltaStore.lookup(waveId); - } - - @Override - public ExceptionalIterator<WaveId, PersistenceException> getWaveIdIterator() - throws PersistenceException { - return deltaStore.getWaveIdIterator(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java b/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java deleted file mode 100644 index 33e7e3c..0000000 --- a/src/org/waveprotocol/box/server/waveserver/DeltaStoreBasedWaveletState.java +++ /dev/null @@ -1,516 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import static java.lang.String.format; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterators; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListenableFutureTask; - -import org.waveprotocol.box.common.ListReceiver; -import org.waveprotocol.box.common.Receiver; -import org.waveprotocol.box.server.persistence.PersistenceException; -import org.waveprotocol.box.server.util.WaveletDataUtil; -import org.waveprotocol.wave.federation.Proto.ProtocolAppliedWaveletDelta; -import org.waveprotocol.wave.model.id.IdURIEncoderDecoder; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.operation.OperationException; -import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta; -import org.waveprotocol.wave.model.version.HashedVersion; -import org.waveprotocol.wave.model.version.HashedVersionFactory; -import org.waveprotocol.wave.model.version.HashedVersionFactoryImpl; -import org.waveprotocol.wave.model.wave.data.ReadableWaveletData; -import org.waveprotocol.wave.model.wave.data.WaveletData; -import org.waveprotocol.wave.util.escapers.jvm.JavaUrlCodec; -import org.waveprotocol.wave.util.logging.Log; - -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.NavigableMap; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentNavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicReference; - -/** - * Simplistic {@link DeltaStore}-backed wavelet state implementation - * which goes to persistent storage for every history request. - * - * TODO(soren): rewire this class to be backed by {@link WaveletStore} and - * read the snapshot from there instead of computing it in the - * DeltaStoreBasedWaveletState constructor. - * - * @author [email protected] (Soren Lassen) - * @author [email protected] (Andew Kaplanov) - */ -class DeltaStoreBasedWaveletState implements WaveletState { - - private static final Log LOG = Log.get(DeltaStoreBasedWaveletState.class); - - private static final IdURIEncoderDecoder URI_CODEC = - new IdURIEncoderDecoder(new JavaUrlCodec()); - - private static final HashedVersionFactory HASH_FACTORY = - new HashedVersionFactoryImpl(URI_CODEC); - - private static final Function<WaveletDeltaRecord, TransformedWaveletDelta> TRANSFORMED = - new Function<WaveletDeltaRecord, TransformedWaveletDelta>() { - @Override - public TransformedWaveletDelta apply(WaveletDeltaRecord record) { - return record.getTransformedDelta(); - } - }; - - /** - * @return An entry keyed by a hashed version with the given version number, - * if any, otherwise null. - */ - private static <T> Map.Entry<HashedVersion, T> lookupCached(NavigableMap<HashedVersion, T> map, - long version) { - // Smallest key with version number >= version. - HashedVersion key = HashedVersion.unsigned(version); - Map.Entry<HashedVersion, T> entry = map.ceilingEntry(key); - return (entry != null && entry.getKey().getVersion() == version) ? entry : null; - } - - /** - * Creates a new delta store based state. - * - * The executor must ensure that only one thread executes at any time for each - * state instance. - * - * @param deltasAccess delta store accessor - * @param persistExecutor executor for making persistence calls - * @return a state initialized from the deltas - * @throws PersistenceException if a failure occurs while reading or - * processing stored deltas - */ - public static DeltaStoreBasedWaveletState create(DeltaStore.DeltasAccess deltasAccess, - Executor persistExecutor) throws PersistenceException { - if (deltasAccess.isEmpty()) { - return new DeltaStoreBasedWaveletState(deltasAccess, ImmutableList.<WaveletDeltaRecord>of(), - null, persistExecutor); - } else { - try { - ImmutableList<WaveletDeltaRecord> deltas = readAll(deltasAccess, null); - WaveletData snapshot = WaveletDataUtil.buildWaveletFromDeltas(deltasAccess.getWaveletName(), - Iterators.transform(deltas.iterator(), TRANSFORMED)); - return new DeltaStoreBasedWaveletState(deltasAccess, deltas, snapshot, persistExecutor); - } catch (IOException e) { - throw new PersistenceException("Failed to read stored deltas", e); - } catch (OperationException e) { - throw new PersistenceException("Failed to compose stored deltas", e); - } - } - } - - /** - * Reads all deltas from persistent storage. - */ - private static ImmutableList<WaveletDeltaRecord> readAll(WaveletDeltaRecordReader reader, - ConcurrentNavigableMap<HashedVersion, WaveletDeltaRecord> cachedDeltas) - throws IOException { - HashedVersion startVersion = HASH_FACTORY.createVersionZero(reader.getWaveletName()); - HashedVersion endVersion = reader.getEndVersion(); - ListReceiver<WaveletDeltaRecord> receiver = new ListReceiver<WaveletDeltaRecord>(); - readDeltasInRange(reader, cachedDeltas, startVersion, endVersion, receiver); - return ImmutableList.copyOf(receiver); - } - - private static void readDeltasInRange(WaveletDeltaRecordReader reader, - ConcurrentNavigableMap<HashedVersion, WaveletDeltaRecord> cachedDeltas, - HashedVersion startVersion, HashedVersion endVersion, Receiver<WaveletDeltaRecord> receiver) - throws IOException { - WaveletDeltaRecord delta = getDelta(reader, cachedDeltas, startVersion); - Preconditions.checkArgument(delta != null && delta.getAppliedAtVersion().equals(startVersion), - "invalid start version"); - for (;;) { - if (!receiver.put(delta)) { - return; - } - if (delta.getResultingVersion().getVersion() >= endVersion.getVersion()) { - break; - } - delta = getDelta(reader, cachedDeltas, delta.getResultingVersion()); - if (delta == null) { - break; - } - } - Preconditions.checkArgument(delta != null && delta.getResultingVersion().equals(endVersion), - "invalid end version"); - } - - private static WaveletDeltaRecord getDelta(WaveletDeltaRecordReader reader, - ConcurrentNavigableMap<HashedVersion, WaveletDeltaRecord> cachedDeltas, - HashedVersion version) throws IOException { - WaveletDeltaRecord delta = reader.getDelta(version.getVersion()); - if (delta == null && cachedDeltas != null) { - delta = cachedDeltas.get(version); - } - return delta; - } - - private final Executor persistExecutor; - private final HashedVersion versionZero; - private final DeltaStore.DeltasAccess deltasAccess; - - /** The lock that guards access to persistence related state. */ - private final Object persistLock = new Object(); - - /** - * Indicates the version of the latest appended delta that was already requested to be - * persisted. - */ - private HashedVersion latestVersionToPersist = null; - - /** The persist task that will be executed next. */ - private ListenableFutureTask<Void> nextPersistTask = null; - - /** - * Processes the persist task and checks if there is another task to do when - * one task is done. In such a case, it writes all waiting to be persisted - * deltas to persistent storage in one operation. - */ - private final Callable<Void> persisterTask = new Callable<Void>() { - @Override - public Void call() throws PersistenceException { - HashedVersion last; - HashedVersion version; - synchronized (persistLock) { - last = lastPersistedVersion.get(); - version = latestVersionToPersist; - } - if (last != null && version.getVersion() <= last.getVersion()) { - LOG.info("Attempt to persist version " + version - + " smaller than last persisted version " + last); - // Done, version is already persisted. - version = last; - } else { - ImmutableList.Builder<WaveletDeltaRecord> deltas = ImmutableList.builder(); - HashedVersion v = (last == null) ? versionZero : last; - do { - WaveletDeltaRecord d = cachedDeltas.get(v); - deltas.add(d); - v = d.getResultingVersion(); - } while (v.getVersion() < version.getVersion()); - Preconditions.checkState(v.equals(version)); - deltasAccess.append(deltas.build()); - } - synchronized (persistLock) { - Preconditions.checkState(last == lastPersistedVersion.get(), - "lastPersistedVersion changed while we were writing to storage"); - lastPersistedVersion.set(version); - if (nextPersistTask != null) { - persistExecutor.execute(nextPersistTask); - nextPersistTask = null; - } else { - latestVersionToPersist = null; - } - } - return null; - } - }; - - /** Keyed by appliedAtVersion. */ - private final ConcurrentNavigableMap<HashedVersion, ByteStringMessage<ProtocolAppliedWaveletDelta>> appliedDeltas = - new ConcurrentSkipListMap<HashedVersion, ByteStringMessage<ProtocolAppliedWaveletDelta>>(); - - /** Keyed by appliedAtVersion. */ - private final ConcurrentNavigableMap<HashedVersion, WaveletDeltaRecord> cachedDeltas = - new ConcurrentSkipListMap<HashedVersion, WaveletDeltaRecord>(); - - /** Is null if the wavelet state is empty. */ - private WaveletData snapshot; - - /** - * Last version persisted with a call to persist(), or null if never called. - * It's an atomic reference so we can set in one thread (which - * asynchronously writes deltas to storage) and read it in another, - * simultaneously. - */ - private final AtomicReference<HashedVersion> lastPersistedVersion; - - /** - * Constructs a wavelet state with the given deltas and snapshot. - * The deltas must be the contents of deltasAccess, and they - * must be contiguous from version zero. - * The snapshot must be the composition of the deltas, or null if there - * are no deltas. The constructed object takes ownership of the - * snapshot and will mutate it if appendDelta() is called. - */ - @VisibleForTesting - DeltaStoreBasedWaveletState(DeltaStore.DeltasAccess deltasAccess, - List<WaveletDeltaRecord> deltas, WaveletData snapshot, Executor persistExecutor) { - Preconditions.checkArgument(deltasAccess.isEmpty() == deltas.isEmpty()); - Preconditions.checkArgument(deltas.isEmpty() == (snapshot == null)); - this.persistExecutor = persistExecutor; - this.versionZero = HASH_FACTORY.createVersionZero(deltasAccess.getWaveletName()); - this.deltasAccess = deltasAccess; - this.snapshot = snapshot; - this.lastPersistedVersion = new AtomicReference<HashedVersion>(deltasAccess.getEndVersion()); - } - - @Override - public WaveletName getWaveletName() { - return deltasAccess.getWaveletName(); - } - - @Override - public ReadableWaveletData getSnapshot() { - return snapshot; - } - - @Override - public HashedVersion getCurrentVersion() { - return (snapshot == null) ? versionZero : snapshot.getHashedVersion(); - } - - @Override - public HashedVersion getLastPersistedVersion() { - HashedVersion version = lastPersistedVersion.get(); - return (version == null) ? versionZero : version; - } - - @Override - public HashedVersion getHashedVersion(long version) { - final Entry<HashedVersion, WaveletDeltaRecord> cachedEntry = - lookupCached(cachedDeltas, version); - if (version == 0) { - return versionZero; - } else if (snapshot == null) { - return null; - } else if (version == snapshot.getVersion()) { - return snapshot.getHashedVersion(); - } else { - WaveletDeltaRecord delta; - try { - delta = lookup(version); - } catch (IOException e) { - throw new RuntimeIOException(new IOException(format("Version : %d", version), e)); - } - if (delta == null && cachedEntry != null) { - return cachedEntry.getKey(); - } else { - return delta != null ? delta.getAppliedAtVersion() : null; - } - } - } - - @Override - public TransformedWaveletDelta getTransformedDelta( - final HashedVersion beginVersion) { - WaveletDeltaRecord delta = cachedDeltas.get(beginVersion); - if (delta != null) { - return delta.getTransformedDelta(); - } else { - WaveletDeltaRecord nowDelta; - try { - nowDelta = lookup(beginVersion.getVersion()); - } catch (IOException e) { - throw new RuntimeIOException(new IOException(format("Begin version : %s", - beginVersion.toString()), e)); - } - return nowDelta != null ? nowDelta.getTransformedDelta() : null; - } - } - - @Override - public TransformedWaveletDelta getTransformedDeltaByEndVersion(final HashedVersion endVersion) { - Preconditions.checkArgument(endVersion.getVersion() > 0, "end version %s is not positive", - endVersion); - Entry<HashedVersion, WaveletDeltaRecord> transformedEntry = - cachedDeltas.lowerEntry(endVersion); - final WaveletDeltaRecord cachedDelta = - transformedEntry != null ? transformedEntry.getValue() : null; - if (snapshot == null) { - return null; - } else { - WaveletDeltaRecord deltaRecord = getDeltaRecordByEndVersion(endVersion); - TransformedWaveletDelta delta; - if (deltaRecord == null && cachedDelta != null - && cachedDelta.getResultingVersion().equals(endVersion)) { - delta = cachedDelta.getTransformedDelta(); - } else { - delta = deltaRecord != null ? deltaRecord.getTransformedDelta() : null; - } - return delta; - } - } - - @Override - public void getTransformedDeltaHistory(final HashedVersion startVersion, - final HashedVersion endVersion, final Receiver<TransformedWaveletDelta> receiver) { - try { - readDeltasInRange(deltasAccess, cachedDeltas, startVersion, endVersion, - new Receiver<WaveletDeltaRecord>() { - @Override - public boolean put(WaveletDeltaRecord delta) { - return receiver.put(delta.getTransformedDelta()); - } - }); - } catch (IOException e) { - throw new RuntimeIOException(new IOException(format("Start version : %s, end version: %s", - startVersion.toString(), endVersion.toString()), e)); - } - } - - @Override - public ByteStringMessage<ProtocolAppliedWaveletDelta> getAppliedDelta( - HashedVersion beginVersion) { - WaveletDeltaRecord delta = cachedDeltas.get(beginVersion); - if (delta != null) { - return delta.getAppliedDelta(); - } else { - WaveletDeltaRecord record = null; - try { - record = lookup(beginVersion.getVersion()); - } catch (IOException e) { - throw new RuntimeIOException(new IOException(format("Begin version : %s", - beginVersion.toString()), e)); - } - return record != null ? record.getAppliedDelta() : null; - } - } - - @Override - public ByteStringMessage<ProtocolAppliedWaveletDelta> getAppliedDeltaByEndVersion( - final HashedVersion endVersion) { - Preconditions.checkArgument(endVersion.getVersion() > 0, - "end version %s is not positive", endVersion); - Entry<HashedVersion, WaveletDeltaRecord> appliedEntry = - cachedDeltas.lowerEntry(endVersion); - final ByteStringMessage<ProtocolAppliedWaveletDelta> cachedDelta = - appliedEntry != null ? appliedEntry.getValue().getAppliedDelta() : null; - WaveletDeltaRecord deltaRecord = getDeltaRecordByEndVersion(endVersion); - ByteStringMessage<ProtocolAppliedWaveletDelta> appliedDelta; - if (deltaRecord == null && isDeltaBoundary(endVersion)) { - appliedDelta = cachedDelta; - } else { - appliedDelta = deltaRecord != null ? deltaRecord.getAppliedDelta() : null; - } - return appliedDelta; - } - - @Override - public void getAppliedDeltaHistory(HashedVersion startVersion, HashedVersion endVersion, - final Receiver<ByteStringMessage<ProtocolAppliedWaveletDelta>> receiver) { - Preconditions.checkArgument(startVersion.getVersion() < endVersion.getVersion()); - try { - readDeltasInRange(deltasAccess, cachedDeltas, startVersion, endVersion, new Receiver<WaveletDeltaRecord>() { - @Override - public boolean put(WaveletDeltaRecord delta) { - return receiver.put(delta.getAppliedDelta()); - } - }); - } catch (IOException e) { - throw new RuntimeIOException(new IOException(format("Start version : %s, end version: %s", - startVersion.toString(), endVersion.toString()), e)); - } - } - - @Override - public void appendDelta(WaveletDeltaRecord deltaRecord) - throws OperationException { - HashedVersion currentVersion = getCurrentVersion(); - Preconditions.checkArgument(currentVersion.equals(deltaRecord.getAppliedAtVersion()), - "Applied version %s doesn't match current version %s", deltaRecord.getAppliedAtVersion(), - currentVersion); - - if (deltaRecord.getAppliedAtVersion().getVersion() == 0) { - Preconditions.checkState(lastPersistedVersion.get() == null); - snapshot = WaveletDataUtil.buildWaveletFromFirstDelta(getWaveletName(), deltaRecord.getTransformedDelta()); - } else { - WaveletDataUtil.applyWaveletDelta(deltaRecord.getTransformedDelta(), snapshot); - } - - // Now that we built the snapshot without any exceptions, we record the delta. - cachedDeltas.put(deltaRecord.getAppliedAtVersion(), deltaRecord); - } - - @Override - public ListenableFuture<Void> persist(final HashedVersion version) { - Preconditions.checkArgument(version.getVersion() > 0, - "Cannot persist non-positive version %s", version); - Preconditions.checkArgument(isDeltaBoundary(version), - "Version to persist %s matches no delta", version); - synchronized (persistLock) { - if (latestVersionToPersist != null) { - // There's a persist task in flight. - if (version.getVersion() <= latestVersionToPersist.getVersion()) { - LOG.info("Attempt to persist version " + version - + " smaller than last version requested " + latestVersionToPersist); - } else { - latestVersionToPersist = version; - } - if (nextPersistTask == null) { - nextPersistTask = ListenableFutureTask.<Void>create(persisterTask); - } - return nextPersistTask; - } else { - latestVersionToPersist = version; - ListenableFutureTask<Void> resultTask = ListenableFutureTask.<Void>create(persisterTask); - persistExecutor.execute(resultTask); - return resultTask; - } - } - } - - @Override - public void flush(HashedVersion version) { - cachedDeltas.remove(cachedDeltas.lowerKey(version)); - if (LOG.isFineLoggable()) { - LOG.fine("Flushed deltas up to version " + version); - } - } - - @Override - public void close() { - } - - /** - * @return An entry keyed by a hashed version with the given version number, - * if any, otherwise null. - */ - private WaveletDeltaRecord lookup(long version) throws IOException { - return deltasAccess.getDelta(version); - } - - private WaveletDeltaRecord getDeltaRecordByEndVersion(HashedVersion endVersion) { - long version = endVersion.getVersion(); - try { - return deltasAccess.getDeltaByEndVersion(version); - } catch (IOException e) { - throw new RuntimeIOException(new IOException(format("Version : %d", version), e)); - } - } - - private boolean isDeltaBoundary(HashedVersion version) { - Preconditions.checkNotNull(version, "version is null"); - return version.equals(getCurrentVersion()) || cachedDeltas.containsKey(version); - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/waveserver/ForwardingWaveletDeltaRecordReader.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/waveserver/ForwardingWaveletDeltaRecordReader.java b/src/org/waveprotocol/box/server/waveserver/ForwardingWaveletDeltaRecordReader.java deleted file mode 100644 index 7db441e..0000000 --- a/src/org/waveprotocol/box/server/waveserver/ForwardingWaveletDeltaRecordReader.java +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import org.waveprotocol.wave.federation.Proto.ProtocolAppliedWaveletDelta; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta; -import org.waveprotocol.wave.model.version.HashedVersion; - -import java.io.IOException; - -/** - * Forwards calls to a delegate {@link WaveletDeltaRecordReader}. - * - * @author [email protected] (Soren Lassen) - */ -public abstract class ForwardingWaveletDeltaRecordReader implements WaveletDeltaRecordReader { - - protected abstract WaveletDeltaRecordReader delegate(); - - @Override - public WaveletName getWaveletName() { - return delegate().getWaveletName(); - } - - @Override - public boolean isEmpty() { - return delegate().isEmpty(); - } - - @Override - public HashedVersion getEndVersion() { - return delegate().getEndVersion(); - } - - @Override - public WaveletDeltaRecord getDelta(long version) throws IOException { - return delegate().getDelta(version); - } - - @Override - public WaveletDeltaRecord getDeltaByEndVersion(long version) throws IOException { - return delegate().getDeltaByEndVersion(version); - } - - @Override - public HashedVersion getAppliedAtVersion(long version) throws IOException { - return delegate().getAppliedAtVersion(version); - } - - @Override - public HashedVersion getResultingVersion(long version) throws IOException { - return delegate().getResultingVersion(version); - } - - @Override - public ByteStringMessage<ProtocolAppliedWaveletDelta> getAppliedDelta(long version) - throws IOException { - return delegate().getAppliedDelta(version); - } - - @Override - public TransformedWaveletDelta getTransformedDelta(long version) throws IOException { - return delegate().getTransformedDelta(version); - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/waveserver/FutureUtil.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/waveserver/FutureUtil.java b/src/org/waveprotocol/box/server/waveserver/FutureUtil.java deleted file mode 100644 index a803730..0000000 --- a/src/org/waveprotocol/box/server/waveserver/FutureUtil.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - -/** - * Utility methods for futures. - * - * @author [email protected] (Soren Lassen) - */ -public class FutureUtil { - - /** - * Wraps an unexpected exception as a {@link RuntimeException}. - */ - public static class UnexpectedExceptionFromFuture extends RuntimeException { - public UnexpectedExceptionFromFuture(Throwable cause) { - super(cause); - } - } - - /** - * Gets the result of the given future or propagates any exception of - * the expected type. That is, in the case of an {@link ExecutionException}), - * if the cause has the expected type, it is unwrapped and thrown directly. - * - * @param <V> the result type of the future - * @param <X> exception type expected to be wrapped - * @param future future to check - * @param expectedCauseClass class of exception expected to be wrapped - * @return the result of the future - * @throws X if the future throws an {@link ExecutionException} wrapping an - * exception of type X - * @throws UnexpectedExceptionFromFuture wrapping any cause of - * an {@link ExecutionException} not of type X - * @throws InterruptedException if the current thread was interrupted - * @throws CancellationException if the future was cancelled - */ - public static <V, X extends Exception> V getResultOrPropagateException(Future<V> future, - Class<X> expectedCauseClass) throws X, InterruptedException { - try { - return future.get(); - } catch (ExecutionException e) { - Throwable cause = e.getCause(); - if (expectedCauseClass.isInstance(cause)) { - throw expectedCauseClass.cast(cause); - } else { - throw new UnexpectedExceptionFromFuture(cause); - } - } - } - - private FutureUtil() { } // prevent instantiation -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/waveserver/ImportServlet.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/waveserver/ImportServlet.java b/src/org/waveprotocol/box/server/waveserver/ImportServlet.java deleted file mode 100644 index 67c4ee4..0000000 --- a/src/org/waveprotocol/box/server/waveserver/ImportServlet.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ - http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/waveserver/IndexException.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/waveserver/IndexException.java b/src/org/waveprotocol/box/server/waveserver/IndexException.java deleted file mode 100644 index 2afece1..0000000 --- a/src/org/waveprotocol/box/server/waveserver/IndexException.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -/** - * @author [email protected] (Yuri Zelikov) - */ -@SuppressWarnings("serial") -public class IndexException extends RuntimeException { - - public IndexException() { - } - - public IndexException(String message) { - super(message); - } - - public IndexException(Throwable cause) { - super(cause); - } - - public IndexException(String message, Throwable cause) { - super(message, cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/waveserver/IndexFieldType.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/waveserver/IndexFieldType.java b/src/org/waveprotocol/box/server/waveserver/IndexFieldType.java deleted file mode 100644 index a2e03fa..0000000 --- a/src/org/waveprotocol/box/server/waveserver/IndexFieldType.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - - -/** - * The Wave index fields. - * - * @author [email protected] (Yuri Zelikov) - */ -public enum IndexFieldType { - IN, - WITH, - CREATOR, - WAVEID, - WAVELETID, - ABOUT, - TITLE, - SNIPPET, - IS, - TAG, - LMT, - ; -} - http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/waveserver/InvalidHashException.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/waveserver/InvalidHashException.java b/src/org/waveprotocol/box/server/waveserver/InvalidHashException.java deleted file mode 100644 index 3b581a2..0000000 --- a/src/org/waveprotocol/box/server/waveserver/InvalidHashException.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import org.waveprotocol.wave.model.version.HashedVersion; - -/** - * Indicates a caller submitted a delta with a mismatched hash. - * - */ -public class InvalidHashException extends WaveServerException { - private final HashedVersion expected; - private final HashedVersion target; - - public InvalidHashException(HashedVersion expectedVersion, HashedVersion targetVersion) { - // Note: The expected hash is not included in the exception message - // to avoid revealing it to clients. It's useful to keep a reference - // here for debugging though. - super("Mismatched hash at version " + expectedVersion.getVersion() + ": " + targetVersion); - this.expected = expectedVersion; - this.target = targetVersion; - } - - public HashedVersion expectedVersion() { - return expected; - } - - public HashedVersion targetVersion() { - return target; - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/waveserver/LocalWaveletContainer.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/waveserver/LocalWaveletContainer.java b/src/org/waveprotocol/box/server/waveserver/LocalWaveletContainer.java deleted file mode 100644 index 8290147..0000000 --- a/src/org/waveprotocol/box/server/waveserver/LocalWaveletContainer.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; - -import org.waveprotocol.box.server.persistence.PersistenceException; -import org.waveprotocol.wave.federation.Proto.ProtocolSignedDelta; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.operation.OperationException; -import org.waveprotocol.wave.model.version.HashedVersion; - -/** - * A local wavelet may be updated by submits. The local wavelet will perform - * operational transformation on the submitted delta and assign it the latest - * version of the wavelet. - * - * - */ -interface LocalWaveletContainer extends WaveletContainer { - - /** - * Manufactures local wavelet containers. - */ - interface Factory extends WaveletContainer.Factory<LocalWaveletContainer> { } - - /** - * Request that a given delta is submitted to the wavelet. - * - * @param waveletName name of wavelet. - * @param delta to be submitted to the server. - * @return result of application to the wavelet, both the applied result and the transformed - * result. - * @throws OperationException - * @throws InvalidProtocolBufferException - * @throws InvalidHashException - * @throws PersistenceException - * @throws WaveletStateException - */ - public WaveletDeltaRecord submitRequest(WaveletName waveletName, ProtocolSignedDelta delta) - throws OperationException, InvalidProtocolBufferException, InvalidHashException, - PersistenceException, WaveletStateException; - - /** - * Check whether a submitted delta (identified by its hashed version after application) was - * signed by a given signer. This (by design) should additionally validate that the history hash - * is valid for the delta. - * - * @param hashedVersion to check whether in the history of the delta - * @param signerId of the signer - */ - boolean isDeltaSigner(HashedVersion hashedVersion, ByteString signerId); -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/waveserver/LocalWaveletContainerImpl.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/waveserver/LocalWaveletContainerImpl.java b/src/org/waveprotocol/box/server/waveserver/LocalWaveletContainerImpl.java deleted file mode 100644 index 77c8756..0000000 --- a/src/org/waveprotocol/box/server/waveserver/LocalWaveletContainerImpl.java +++ /dev/null @@ -1,205 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.waveserver; - -import com.google.common.base.Function; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; - -import org.waveprotocol.box.server.common.CoreWaveletOperationSerializer; -import org.waveprotocol.box.server.persistence.PersistenceException; -import org.waveprotocol.wave.federation.Proto.ProtocolAppliedWaveletDelta; -import org.waveprotocol.wave.federation.Proto.ProtocolSignature; -import org.waveprotocol.wave.federation.Proto.ProtocolSignedDelta; -import org.waveprotocol.wave.federation.Proto.ProtocolWaveletDelta; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.operation.OperationException; -import org.waveprotocol.wave.model.operation.wave.RemoveParticipant; -import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta; -import org.waveprotocol.wave.model.operation.wave.WaveletDelta; -import org.waveprotocol.wave.model.operation.wave.WaveletOperation; -import org.waveprotocol.wave.model.version.HashedVersion; -import org.waveprotocol.wave.model.wave.ParticipantId; -import org.waveprotocol.wave.util.logging.Log; - -import java.util.concurrent.Executor; - -/** - * A local wavelet may be updated by submits. The local wavelet will perform - * operational transformation on the submitted delta and assign it the latest - * version of the wavelet. - */ -class LocalWaveletContainerImpl extends WaveletContainerImpl implements LocalWaveletContainer { - - private static final Log LOG = Log.get(LocalWaveletContainerImpl.class); - - private static final Function<RemoveParticipant, ParticipantId> PARTICIPANT_REMOVED_BY = - new Function<RemoveParticipant, ParticipantId>() { - @Override - public ParticipantId apply(RemoveParticipant op) { - return op.getParticipantId(); - } - }; - - private static final Function<ParticipantId, String> DOMAIN_OF = - new Function<ParticipantId, String>() { - @Override - public String apply(ParticipantId participant) { - return participant.getDomain(); - } - }; - - private static Iterable<ParticipantId> participantsRemovedBy(Iterable<WaveletOperation> ops) { - return Iterables.transform(Iterables.filter(ops, RemoveParticipant.class), - PARTICIPANT_REMOVED_BY); - } - - private static ImmutableSet<String> domainsOf(Iterable<ParticipantId> participants) { - return ImmutableSet.copyOf(Iterables.transform(participants, DOMAIN_OF)); - } - - public LocalWaveletContainerImpl(WaveletName waveletName, WaveletNotificationSubscriber notifiee, - ListenableFuture<? extends WaveletState> waveletStateFuture, String waveDomain, - Executor storageContinuationExecutor) { - super(waveletName, notifiee, waveletStateFuture, waveDomain, storageContinuationExecutor); - } - - @Override - public WaveletDeltaRecord submitRequest(WaveletName waveletName, ProtocolSignedDelta signedDelta) - throws OperationException, InvalidProtocolBufferException, InvalidHashException, - PersistenceException, WaveletStateException { - awaitLoad(); - acquireWriteLock(); - try { - checkStateOk(); - HashedVersion before = getCurrentVersion(); - WaveletDeltaRecord result = transformAndApplyLocalDelta(signedDelta); - HashedVersion after = getCurrentVersion(); - // Only publish and persist the delta if it wasn't transformed away - // (right now it never is since the current OT algorithm doesn't transform ops away) - // and wasn't a duplicate of a previously applied delta. - if (!after.equals(before)) { - Preconditions.checkState(!result.isEmpty()); - Preconditions.checkState(result.getAppliedAtVersion().equals(before)); - ImmutableSet<String> domainsToNotify = domainsOf(Iterables.concat( - accessSnapshot().getParticipants(), - participantsRemovedBy(result.getTransformedDelta()))); - notifyOfDeltas(ImmutableList.of(result), domainsToNotify); - // We always persist a local delta immediately after it's applied - // and after it's broadcast on the wave bus and to remote servers. - persist(result.getResultingVersion(), domainsToNotify); - } - return result; - } finally { - releaseWriteLock(); - } - } - - /** - * Apply a signed delta to a local wavelet. This assumes the caller has - * validated that the delta is at the correct version and can be applied to - * the wavelet. Must be called with writelock held. - * - * @param signedDelta the delta that is to be applied to wavelet. - * @return the transformed and applied delta. - * @throws OperationException if an error occurs during transformation or - * application - * @throws InvalidProtocolBufferException if the signed delta did not contain a valid delta - * @throws InvalidHashException if delta hash sanity checks fail - */ - private WaveletDeltaRecord transformAndApplyLocalDelta(ProtocolSignedDelta signedDelta) - throws OperationException, InvalidProtocolBufferException, InvalidHashException, - PersistenceException { - ProtocolWaveletDelta protocolDelta = - ByteStringMessage.parseProtocolWaveletDelta(signedDelta.getDelta()).getMessage(); - - Preconditions.checkArgument(protocolDelta.getOperationCount() > 0, "empty delta"); - - WaveletDelta transformed = maybeTransformSubmittedDelta( - CoreWaveletOperationSerializer.deserialize(protocolDelta)); - - // TODO(ljvderijk): a Clock needs to be injected here (Issue 104) - long applicationTimestamp = System.currentTimeMillis(); - - HashedVersion currentVersion = getCurrentVersion(); - - // This is always false right now because the current algorithm doesn't transform ops away. - if (transformed.size() == 0) { - Preconditions.checkState(currentVersion.getVersion() != 0, - "currentVersion can not be 0 if delta was transformed"); - Preconditions.checkState( - transformed.getTargetVersion().getVersion() <= currentVersion.getVersion()); - // The delta was transformed away. That's OK but we don't call either - // applyWaveletOperations(), because that will throw IllegalArgumentException, or - // commitAppliedDelta(), because empty deltas cannot be part of the delta history. - TransformedWaveletDelta emptyDelta = new TransformedWaveletDelta(transformed.getAuthor(), - transformed.getTargetVersion(), applicationTimestamp, transformed); - return new WaveletDeltaRecord(transformed.getTargetVersion(), null, emptyDelta); - } - - if (!transformed.getTargetVersion().equals(currentVersion)) { - Preconditions.checkState( - transformed.getTargetVersion().getVersion() < currentVersion.getVersion()); - // The delta was a duplicate of an existing server delta. - // We duplicate-eliminate it (don't apply it to the wavelet state and don't store it in - // the delta history) and return the server delta which it was a duplicate of - // (so delta submission becomes idempotent). - ByteStringMessage<ProtocolAppliedWaveletDelta> existingDeltaBytes = - lookupAppliedDelta(transformed.getTargetVersion()); - TransformedWaveletDelta dupDelta = lookupTransformedDelta(transformed.getTargetVersion()); - LOG.info("Duplicate delta " + dupDelta + " for wavelet " + getWaveletName()); - // TODO(anorth): Replace these comparisons with methods on delta classes. - Preconditions.checkState(dupDelta.getAuthor().equals(transformed.getAuthor()), - "Duplicate delta detected but mismatched author, expected %s found %s", - transformed.getAuthor(), dupDelta.getAuthor()); - Preconditions.checkState(Iterables.elementsEqual(dupDelta, transformed), - "Duplicate delta detected but mismatched ops, expected %s found %s", - transformed, dupDelta); - - return new WaveletDeltaRecord(transformed.getTargetVersion(), existingDeltaBytes, dupDelta); - } - - // Build the applied delta to commit - ByteStringMessage<ProtocolAppliedWaveletDelta> appliedDelta = - AppliedDeltaUtil.buildAppliedDelta(signedDelta, transformed.getTargetVersion(), - transformed.size(), applicationTimestamp); - - return applyDelta(appliedDelta, transformed); - } - - @Override - public boolean isDeltaSigner(HashedVersion version, ByteString signerId) { - ByteStringMessage<ProtocolAppliedWaveletDelta> appliedDelta = - lookupAppliedDeltaByEndVersion(version); - if (appliedDelta == null) { - return false; - } - ProtocolSignedDelta signedDelta = appliedDelta.getMessage().getSignedOriginalDelta(); - for (ProtocolSignature signature : signedDelta.getSignatureList()) { - if (signature.getSignerId().equals(signerId)) return true; - } - return false; - } -}
