http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/common/CoreWaveletOperationSerializer.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/common/CoreWaveletOperationSerializer.java b/src/org/waveprotocol/box/server/common/CoreWaveletOperationSerializer.java deleted file mode 100644 index 3522fd4..0000000 --- a/src/org/waveprotocol/box/server/common/CoreWaveletOperationSerializer.java +++ /dev/null @@ -1,534 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.common; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.protobuf.ByteString; - -import org.waveprotocol.box.common.comms.WaveClientRpc.DocumentSnapshot; -import org.waveprotocol.box.common.comms.WaveClientRpc.ProtocolWaveletUpdate; -import org.waveprotocol.box.common.comms.WaveClientRpc.WaveletSnapshot; -import org.waveprotocol.box.server.util.WaveletDataUtil; -import org.waveprotocol.wave.federation.Proto.ProtocolDocumentOperation; -import org.waveprotocol.wave.federation.Proto.ProtocolHashedVersion; -import org.waveprotocol.wave.federation.Proto.ProtocolWaveletDelta; -import org.waveprotocol.wave.federation.Proto.ProtocolWaveletOperation; -import org.waveprotocol.wave.model.document.operation.AnnotationBoundaryMap; -import org.waveprotocol.wave.model.document.operation.Attributes; -import org.waveprotocol.wave.model.document.operation.AttributesUpdate; -import org.waveprotocol.wave.model.document.operation.DocOp; -import org.waveprotocol.wave.model.document.operation.DocOpCursor; -import org.waveprotocol.wave.model.document.operation.impl.AnnotationBoundaryMapImpl; -import org.waveprotocol.wave.model.document.operation.impl.AttributesImpl; -import org.waveprotocol.wave.model.document.operation.impl.AttributesUpdateImpl; -import org.waveprotocol.wave.model.document.operation.impl.DocOpBuilder; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.operation.OperationException; -import org.waveprotocol.wave.model.operation.core.CoreAddParticipant; -import org.waveprotocol.wave.model.operation.core.CoreWaveletDocumentOperation; -import org.waveprotocol.wave.model.operation.core.CoreWaveletOperation; -import org.waveprotocol.wave.model.operation.wave.AddParticipant; -import org.waveprotocol.wave.model.operation.wave.BlipContentOperation; -import org.waveprotocol.wave.model.operation.wave.BlipOperationVisitor; -import org.waveprotocol.wave.model.operation.wave.NoOp; -import org.waveprotocol.wave.model.operation.wave.RemoveParticipant; -import org.waveprotocol.wave.model.operation.wave.SubmitBlip; -import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta; -import org.waveprotocol.wave.model.operation.wave.WaveletBlipOperation; -import org.waveprotocol.wave.model.operation.wave.WaveletDelta; -import org.waveprotocol.wave.model.operation.wave.WaveletOperation; -import org.waveprotocol.wave.model.operation.wave.WaveletOperationContext; -import org.waveprotocol.wave.model.schema.SchemaCollection; -import org.waveprotocol.wave.model.version.HashedVersion; -import org.waveprotocol.wave.model.wave.Constants; -import org.waveprotocol.wave.model.wave.ParticipantId; -import org.waveprotocol.wave.model.wave.data.ObservableWaveletData; -import org.waveprotocol.wave.model.wave.data.core.CoreWaveletData; -import org.waveprotocol.wave.model.wave.data.core.impl.CoreWaveletDataImpl; -import org.waveprotocol.wave.model.wave.data.impl.DataUtil; - -import java.util.List; -import java.util.Map; - -/** - * Utility class for serialising/deserialising wavelet operations (and their components) to/from - * their protocol buffer representations (and their components). - * - * - */ -public class CoreWaveletOperationSerializer { - - private CoreWaveletOperationSerializer() { - } - - /** - * Serializes an untransformed delta as a {@link ProtocolWaveletDelta}. - * - * @param delta to serialize - * @return serialized protocol buffer wavelet delta - */ - public static ProtocolWaveletDelta serialize(WaveletDelta delta) { - ProtocolWaveletDelta.Builder protobufDelta = ProtocolWaveletDelta.newBuilder(); - - for (WaveletOperation waveletOp : delta) { - protobufDelta.addOperation(serialize(waveletOp)); - } - protobufDelta.setAuthor(delta.getAuthor().getAddress()); - protobufDelta.setHashedVersion(serialize(delta.getTargetVersion())); - - return protobufDelta.build(); - } - - /** - * Serializes a transformed delta as a {@link ProtocolWaveletDelta}. - * - * The target version of the result does not have an accompanying hash; - * server delta hashes are redundant in the client/server protocol. - * - * @param delta to serialize - * @return serialized protocol buffer wavelet delta - */ - public static ProtocolWaveletDelta serialize(TransformedWaveletDelta delta) { - ProtocolWaveletDelta.Builder protobufDelta = ProtocolWaveletDelta.newBuilder(); - - for (WaveletOperation waveletOp : delta) { - protobufDelta.addOperation(serialize(waveletOp)); - } - protobufDelta.setAuthor(delta.getAuthor().getAddress()); - protobufDelta.setHashedVersion(serialize(HashedVersion.unsigned(delta.getAppliedAtVersion()))); - - return protobufDelta.build(); - } - - /** - * Serializes a wavelet operation as a {@link ProtocolWaveletOperation}. - * - * @param waveletOp wavelet operation to serialize - * @return serialized protocol buffer wavelet operation - */ - public static ProtocolWaveletOperation serialize(WaveletOperation waveletOp) { - ProtocolWaveletOperation.Builder protobufOp = ProtocolWaveletOperation.newBuilder(); - - if (waveletOp instanceof NoOp) { - protobufOp.setNoOp(true); - } else if (waveletOp instanceof AddParticipant) { - protobufOp.setAddParticipant( - ((AddParticipant) waveletOp).getParticipantId().getAddress()); - } else if (waveletOp instanceof RemoveParticipant) { - protobufOp.setRemoveParticipant( - ((RemoveParticipant) waveletOp).getParticipantId().getAddress()); - } else if (waveletOp instanceof WaveletBlipOperation) { - final WaveletBlipOperation wbOp = (WaveletBlipOperation) waveletOp; - final ProtocolWaveletOperation.MutateDocument.Builder mutation = - ProtocolWaveletOperation.MutateDocument.newBuilder(); - mutation.setDocumentId(wbOp.getBlipId()); - wbOp.getBlipOp().acceptVisitor(new BlipOperationVisitor() { - @Override - public void visitBlipContentOperation(BlipContentOperation blipOp) { - mutation.setDocumentOperation(serialize(blipOp.getContentOp())); - } - - @Override - public void visitSubmitBlip(SubmitBlip op) { - throw new IllegalArgumentException("Unsupported blip operation: " + wbOp.getBlipOp()); - } - }); - protobufOp.setMutateDocument(mutation.build()); - } else { - throw new IllegalArgumentException("Unsupported wavelet operation: " + waveletOp); - } - - return protobufOp.build(); - } - - /** - * Serialize a {@link DocOp} as a {@link ProtocolDocumentOperation}. - * - * @param inputOp document operation to serialize - * @return serialized protocol buffer document operation - */ - public static ProtocolDocumentOperation serialize(DocOp inputOp) { - final ProtocolDocumentOperation.Builder output = ProtocolDocumentOperation.newBuilder(); - - inputOp.apply(new DocOpCursor() { - private ProtocolDocumentOperation.Component.Builder newComponentBuilder() { - return ProtocolDocumentOperation.Component.newBuilder(); - } - - @Override public void retain(int itemCount) { - output.addComponent(newComponentBuilder().setRetainItemCount(itemCount).build()); - } - - @Override public void characters(String characters) { - output.addComponent(newComponentBuilder().setCharacters(characters).build()); - } - - @Override public void deleteCharacters(String characters) { - output.addComponent(newComponentBuilder().setDeleteCharacters(characters).build()); - } - - @Override public void elementStart(String type, Attributes attributes) { - ProtocolDocumentOperation.Component.ElementStart e = makeElementStart(type, attributes); - output.addComponent(newComponentBuilder().setElementStart(e).build()); - } - - @Override public void deleteElementStart(String type, Attributes attributes) { - ProtocolDocumentOperation.Component.ElementStart e = makeElementStart(type, attributes); - output.addComponent(newComponentBuilder().setDeleteElementStart(e).build()); - } - - private ProtocolDocumentOperation.Component.ElementStart makeElementStart( - String type, Attributes attributes) { - ProtocolDocumentOperation.Component.ElementStart.Builder e = - ProtocolDocumentOperation.Component.ElementStart.newBuilder(); - - e.setType(type); - - for (String name : attributes.keySet()) { - e.addAttribute(ProtocolDocumentOperation.Component.KeyValuePair.newBuilder() - .setKey(name).setValue(attributes.get(name)).build()); - } - - return e.build(); - } - - @Override public void elementEnd() { - output.addComponent(newComponentBuilder().setElementEnd(true).build()); - } - - @Override public void deleteElementEnd() { - output.addComponent(newComponentBuilder().setDeleteElementEnd(true).build()); - } - - @Override public void replaceAttributes(Attributes oldAttributes, Attributes newAttributes) { - ProtocolDocumentOperation.Component.ReplaceAttributes.Builder r = - ProtocolDocumentOperation.Component.ReplaceAttributes.newBuilder(); - - if (oldAttributes.isEmpty() && newAttributes.isEmpty()) { - r.setEmpty(true); - } else { - for (String name : oldAttributes.keySet()) { - r.addOldAttribute(ProtocolDocumentOperation.Component.KeyValuePair.newBuilder() - .setKey(name).setValue(oldAttributes.get(name)).build()); - } - - for (String name : newAttributes.keySet()) { - r.addNewAttribute(ProtocolDocumentOperation.Component.KeyValuePair.newBuilder() - .setKey(name).setValue(newAttributes.get(name)).build()); - } - } - - output.addComponent(newComponentBuilder().setReplaceAttributes(r.build()).build()); - } - - @Override public void updateAttributes(AttributesUpdate attributes) { - ProtocolDocumentOperation.Component.UpdateAttributes.Builder u = - ProtocolDocumentOperation.Component.UpdateAttributes.newBuilder(); - - if (attributes.changeSize() == 0) { - u.setEmpty(true); - } else { - for (int i = 0; i < attributes.changeSize(); i++) { - u.addAttributeUpdate(makeKeyValueUpdate( - attributes.getChangeKey(i), attributes.getOldValue(i), attributes.getNewValue(i))); - } - } - - output.addComponent(newComponentBuilder().setUpdateAttributes(u.build()).build()); - } - - @Override public void annotationBoundary(AnnotationBoundaryMap map) { - ProtocolDocumentOperation.Component.AnnotationBoundary.Builder a = - ProtocolDocumentOperation.Component.AnnotationBoundary.newBuilder(); - - if (map.endSize() == 0 && map.changeSize() == 0) { - a.setEmpty(true); - } else { - for (int i = 0; i < map.endSize(); i++) { - a.addEnd(map.getEndKey(i)); - } - for (int i = 0; i < map.changeSize(); i++) { - a.addChange(makeKeyValueUpdate( - map.getChangeKey(i), map.getOldValue(i), map.getNewValue(i))); - } - } - - output.addComponent(newComponentBuilder().setAnnotationBoundary(a.build()).build()); - } - - private ProtocolDocumentOperation.Component.KeyValueUpdate makeKeyValueUpdate( - String key, String oldValue, String newValue) { - ProtocolDocumentOperation.Component.KeyValueUpdate.Builder kvu = - ProtocolDocumentOperation.Component.KeyValueUpdate.newBuilder(); - kvu.setKey(key); - if (oldValue != null) { - kvu.setOldValue(oldValue); - } - if (newValue != null) { - kvu.setNewValue(newValue); - } - - return kvu.build(); - } - }); - - return output.build(); - } - - /** - * Deserializes a {@link ProtocolWaveletDelta} as an untransformed wavelet - * delta. - */ - public static WaveletDelta deserialize(ProtocolWaveletDelta delta) { - List<WaveletOperation> ops = Lists.newArrayList(); - for (ProtocolWaveletOperation op : delta.getOperationList()) { - WaveletOperationContext context = new WaveletOperationContext( - ParticipantId.ofUnsafe(delta.getAuthor()), Constants.NO_TIMESTAMP, 1); - ops.add(deserialize(op, context)); - } - HashedVersion hashedVersion = deserialize(delta.getHashedVersion()); - return new WaveletDelta(new ParticipantId(delta.getAuthor()), hashedVersion, ops); - } - - /** - * Deserializes a {@link ProtocolWaveletDelta} as a transformed wavelet delta. - */ - public static TransformedWaveletDelta deserialize(ProtocolWaveletDelta delta, - HashedVersion resultingVersion, long applicationTimestamp) { - ParticipantId author = ParticipantId.ofUnsafe(delta.getAuthor()); - int count = delta.getOperationCount(); - Preconditions.checkArgument(count > 0, "Cannot deserialize an empty delta"); - List<WaveletOperation> ops = Lists.newArrayListWithCapacity(count); - if (count > 1) { - WaveletOperationContext context = - new WaveletOperationContext(author, applicationTimestamp, 1); - for (int i = 0; i < count - 1; i++) { - ProtocolWaveletOperation op = delta.getOperation(i); - ops.add(deserialize(op, context)); - } - } - WaveletOperationContext context = - new WaveletOperationContext(author, applicationTimestamp, 1, resultingVersion); - ops.add(deserialize(delta.getOperation(count - 1), context)); - return new TransformedWaveletDelta(author, resultingVersion, applicationTimestamp, ops); - } - - /** Deserializes a protobuf to a HashedVersion POJO. */ - public static HashedVersion deserialize(ProtocolHashedVersion hashedVersion) { - final ByteString historyHash = hashedVersion.getHistoryHash(); - return HashedVersion.of(hashedVersion.getVersion(), historyHash.toByteArray()); - } - - /** Serializes a HashedVersion POJO to a protobuf. */ - public static ProtocolHashedVersion serialize(HashedVersion hashedVersion) { - return ProtocolHashedVersion.newBuilder().setVersion(hashedVersion.getVersion()). - setHistoryHash(ByteString.copyFrom(hashedVersion.getHistoryHash())).build(); - } - - /** - * Deserialize a {@link ProtocolWaveletOperation} as a {@link WaveletOperation}. - * - * @param protobufOp protocol buffer wavelet operation to deserialize - * @return deserialized wavelet operation - */ - public static WaveletOperation deserialize(ProtocolWaveletOperation protobufOp, - WaveletOperationContext context) { - if (protobufOp.hasNoOp()) { - return new NoOp(context); - } else if (protobufOp.hasAddParticipant()) { - return new AddParticipant(context, new ParticipantId(protobufOp.getAddParticipant())); - } else if (protobufOp.hasRemoveParticipant()) { - return new RemoveParticipant(context, new ParticipantId(protobufOp.getRemoveParticipant())); - } else if (protobufOp.hasMutateDocument()) { - return new WaveletBlipOperation(protobufOp.getMutateDocument().getDocumentId(), - new BlipContentOperation(context, - deserialize(protobufOp.getMutateDocument().getDocumentOperation()))); - } else { - throw new IllegalArgumentException("Unsupported operation: " + protobufOp); - } - } - - /** - * Deserialize a {@link WaveletSnapshot} into a list of - * {@link WaveletOperation}s. - * - * @param snapshot snapshot protocol buffer to deserialize - * @return a list of operations - */ - public static List<CoreWaveletOperation> deserialize(WaveletSnapshot snapshot) { - List<CoreWaveletOperation> ops = Lists.newArrayList(); - for (String participant : snapshot.getParticipantIdList()) { - CoreAddParticipant addOp = new CoreAddParticipant(new ParticipantId(participant)); - ops.add(addOp); - } - for (DocumentSnapshot document : snapshot.getDocumentList()) { - CoreWaveletDocumentOperation docOp = new CoreWaveletDocumentOperation( - document.getDocumentId(), deserialize(document.getDocumentOperation())); - ops.add(docOp); - } - return ops; - } - - /** - * Deserialize a {@link ProtocolDocumentOperation} into a {@link DocOp}. - * - * @param op protocol buffer document operation to deserialize - * @return deserialized DocOp - */ - public static DocOp deserialize(ProtocolDocumentOperation op) { - DocOpBuilder output = new DocOpBuilder(); - - for (ProtocolDocumentOperation.Component c : op.getComponentList()) { - if (c.hasAnnotationBoundary()) { - if (c.getAnnotationBoundary().getEmpty()) { - output.annotationBoundary(AnnotationBoundaryMapImpl.EMPTY_MAP); - } else { - String[] ends = new String[c.getAnnotationBoundary().getEndCount()]; - String[] changeKeys = new String[c.getAnnotationBoundary().getChangeCount()]; - String[] oldValues = new String[c.getAnnotationBoundary().getChangeCount()]; - String[] newValues = new String[c.getAnnotationBoundary().getChangeCount()]; - if (c.getAnnotationBoundary().getEndCount() > 0) { - c.getAnnotationBoundary().getEndList().toArray(ends); - } - for (int i = 0; i < changeKeys.length; i++) { - ProtocolDocumentOperation.Component.KeyValueUpdate kvu = - c.getAnnotationBoundary().getChange(i); - changeKeys[i] = kvu.getKey(); - oldValues[i] = kvu.hasOldValue() ? kvu.getOldValue() : null; - newValues[i] = kvu.hasNewValue() ? kvu.getNewValue() : null; - } - output.annotationBoundary( - new AnnotationBoundaryMapImpl(ends, changeKeys, oldValues, newValues)); - } - } else if (c.hasCharacters()) { - output.characters(c.getCharacters()); - } else if (c.hasElementStart()) { - Map<String, String> attributesMap = Maps.newHashMap(); - for (ProtocolDocumentOperation.Component.KeyValuePair pair : - c.getElementStart().getAttributeList()) { - attributesMap.put(pair.getKey(), pair.getValue()); - } - output.elementStart(c.getElementStart().getType(), new AttributesImpl(attributesMap)); - } else if (c.hasElementEnd()) { - output.elementEnd(); - } else if (c.hasRetainItemCount()) { - output.retain(c.getRetainItemCount()); - } else if (c.hasDeleteCharacters()) { - output.deleteCharacters(c.getDeleteCharacters()); - } else if (c.hasDeleteElementStart()) { - Map<String, String> attributesMap = Maps.newHashMap(); - for (ProtocolDocumentOperation.Component.KeyValuePair pair : - c.getDeleteElementStart().getAttributeList()) { - attributesMap.put(pair.getKey(), pair.getValue()); - } - output.deleteElementStart(c.getDeleteElementStart().getType(), - new AttributesImpl(attributesMap)); - } else if (c.hasDeleteElementEnd()) { - output.deleteElementEnd(); - } else if (c.hasReplaceAttributes()) { - if (c.getReplaceAttributes().getEmpty()) { - output.replaceAttributes(AttributesImpl.EMPTY_MAP, AttributesImpl.EMPTY_MAP); - } else { - Map<String, String> oldAttributesMap = Maps.newHashMap(); - Map<String, String> newAttributesMap = Maps.newHashMap(); - for (ProtocolDocumentOperation.Component.KeyValuePair pair : - c.getReplaceAttributes().getOldAttributeList()) { - oldAttributesMap.put(pair.getKey(), pair.getValue()); - } - for (ProtocolDocumentOperation.Component.KeyValuePair pair : - c.getReplaceAttributes().getNewAttributeList()) { - newAttributesMap.put(pair.getKey(), pair.getValue()); - } - output.replaceAttributes(new AttributesImpl(oldAttributesMap), - new AttributesImpl(newAttributesMap)); - } - } else if (c.hasUpdateAttributes()) { - if (c.getUpdateAttributes().getEmpty()) { - output.updateAttributes(AttributesUpdateImpl.EMPTY_MAP); - } else { - String[] triplets = new String[c.getUpdateAttributes().getAttributeUpdateCount()*3]; - for (int i = 0, j = 0; i < c.getUpdateAttributes().getAttributeUpdateCount(); i++) { - ProtocolDocumentOperation.Component.KeyValueUpdate kvu = - c.getUpdateAttributes().getAttributeUpdate(i); - triplets[j++] = kvu.getKey(); - triplets[j++] = kvu.hasOldValue() ? kvu.getOldValue() : null; - triplets[j++] = kvu.hasNewValue() ? kvu.getNewValue() : null; - } - output.updateAttributes(new AttributesUpdateImpl(triplets)); - } - } else { - //throw new IllegalArgumentException("Unsupported operation component: " + c); - } - } - - return output.build(); - } - - /** - * Deserializes the snapshot contained in the {@link ProtocolWaveletUpdate} - * into a {@link ObservableWaveletData}. - * - * @param snapshot the {@link WaveletSnapshot} to deserialize. - * @param version the version of the wavelet after all deltas have been - * applied. - * @param waveletName the name of the wavelet contained in the update. - * @throws OperationException if the ops in the snapshot can not be applied. - */ - public static ObservableWaveletData deserializeSnapshot( - WaveletSnapshot snapshot, ProtocolHashedVersion version, WaveletName waveletName) - throws OperationException { - // TODO(ljvderijk): This method does too many steps to get to the - // ObservableWaveletData. - // We need something simpler when operations and the protocol have been - // edited. - - // Creating a CoreWaveletData because the current protocol lacks the - // meta-data required to construct an ObservableWaveletData directly. - // But this results in unnecessary object creation and copies. - CoreWaveletData coreWavelet = - new CoreWaveletDataImpl(waveletName.waveId, waveletName.waveletId); - - Preconditions.checkArgument(snapshot.getParticipantIdCount() > 0); - // Have to add a single participant for the copying to complete without a - // NPE. - coreWavelet.addParticipant(ParticipantId.ofUnsafe(snapshot.getParticipantId(0))); - - for (DocumentSnapshot document : snapshot.getDocumentList()) { - DocOp op = - CoreWaveletOperationSerializer.deserialize(document.getDocumentOperation()); - coreWavelet.modifyDocument(document.getDocumentId(), op); - } - - HashedVersion hashedVersion = CoreWaveletOperationSerializer.deserialize(version); - ObservableWaveletData immutableWaveletData = - DataUtil.fromCoreWaveletData(coreWavelet, hashedVersion, SchemaCollection.empty()); - - ObservableWaveletData wavelet = WaveletDataUtil.copyWavelet(immutableWaveletData); - - for (String participant : snapshot.getParticipantIdList()) { - wavelet.addParticipant(new ParticipantId(participant)); - } - - return wavelet; - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/common/SnapshotSerializer.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/common/SnapshotSerializer.java b/src/org/waveprotocol/box/server/common/SnapshotSerializer.java deleted file mode 100644 index 9cee394..0000000 --- a/src/org/waveprotocol/box/server/common/SnapshotSerializer.java +++ /dev/null @@ -1,174 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.common; - -import org.waveprotocol.box.common.comms.WaveClientRpc.DocumentSnapshot; -import org.waveprotocol.box.common.comms.WaveClientRpc.WaveletSnapshot; -import org.waveprotocol.wave.model.document.operation.DocInitialization; -import org.waveprotocol.wave.model.document.operation.DocOp; -import org.waveprotocol.wave.model.document.operation.impl.DocOpUtil; -import org.waveprotocol.wave.model.id.InvalidIdException; -import org.waveprotocol.wave.model.id.ModernIdSerialiser; -import org.waveprotocol.wave.model.id.WaveId; -import org.waveprotocol.wave.model.id.WaveletId; -import org.waveprotocol.wave.model.operation.OperationException; -import org.waveprotocol.wave.model.schema.SchemaCollection; -import org.waveprotocol.wave.model.util.CollectionUtils; -import org.waveprotocol.wave.model.version.HashedVersion; -import org.waveprotocol.wave.model.wave.InvalidParticipantAddress; -import org.waveprotocol.wave.model.wave.ParticipantId; -import org.waveprotocol.wave.model.wave.data.ObservableWaveletData; -import org.waveprotocol.wave.model.wave.data.ReadableBlipData; -import org.waveprotocol.wave.model.wave.data.ReadableWaveletData; -import org.waveprotocol.wave.model.wave.data.WaveletData; -import org.waveprotocol.wave.model.wave.data.impl.EmptyWaveletSnapshot; -import org.waveprotocol.wave.model.wave.data.impl.ObservablePluggableMutableDocument; -import org.waveprotocol.wave.model.wave.data.impl.WaveletDataImpl; - -import java.util.Collection; - - -/** - * Utility class for serialising/deserialising model objects (and their - * components) to/from their protocol buffer representations. - * - * NOTE: This class is mirrored in the client. Any changes here should also be - * made in - * {@link org.waveprotocol.box.webclient.common.SnapshotSerializer} - * - * @author Joseph Gentle ([email protected]) - */ -public class SnapshotSerializer { - private SnapshotSerializer() { - } - - /** - * Serializes a snapshot for a wavelet. - * - * @param wavelet wavelet to snapshot - * @param hashedVersion hashed version of the wavelet - * @return a wavelet snapshot that contains all the information in the - * original wavelet. - */ - public static WaveletSnapshot serializeWavelet(ReadableWaveletData wavelet, - HashedVersion hashedVersion) { - WaveletSnapshot.Builder builder = WaveletSnapshot.newBuilder(); - - builder.setWaveletId(ModernIdSerialiser.INSTANCE.serialiseWaveletId(wavelet.getWaveletId())); - for (ParticipantId participant : wavelet.getParticipants()) { - builder.addParticipantId(participant.toString()); - } - for (String id : wavelet.getDocumentIds()) { - ReadableBlipData data = wavelet.getDocument(id); - builder.addDocument(serializeDocument(data)); - } - - builder.setVersion(CoreWaveletOperationSerializer.serialize(hashedVersion)); - builder.setLastModifiedTime(wavelet.getLastModifiedTime()); - builder.setCreator(wavelet.getCreator().getAddress()); - builder.setCreationTime(wavelet.getCreationTime()); - - return builder.build(); - } - - /** - * Deserializes the snapshot contained in the {@link WaveletSnapshot} - * into an {@link ObservableWaveletData}. - * - * @param snapshot the {@link WaveletSnapshot} to deserialize. - * @throws OperationException if the ops in the snapshot can not be applied. - * @throws InvalidParticipantAddress - * @throws InvalidIdException - */ - public static ObservableWaveletData deserializeWavelet(WaveletSnapshot snapshot, WaveId waveId) - throws OperationException, InvalidParticipantAddress, InvalidIdException { - ObservableWaveletData.Factory<? extends ObservableWaveletData> factory = - WaveletDataImpl.Factory.create( - ObservablePluggableMutableDocument.createFactory(SchemaCollection.empty())); - - ParticipantId author = ParticipantId.of(snapshot.getCreator()); - WaveletId waveletId = ModernIdSerialiser.INSTANCE.deserialiseWaveletId(snapshot.getWaveletId()); - long creationTime = snapshot.getCreationTime(); - - ObservableWaveletData wavelet = factory.create(new EmptyWaveletSnapshot(waveId, waveletId, - author, CoreWaveletOperationSerializer.deserialize(snapshot.getVersion()), creationTime)); - - for (String participant : snapshot.getParticipantIdList()) { - wavelet.addParticipant(getParticipantId(participant)); - } - - for (DocumentSnapshot document : snapshot.getDocumentList()) { - addDocumentSnapshotToWavelet(document, wavelet); - } - - wavelet.setVersion(snapshot.getVersion().getVersion()); - wavelet.setLastModifiedTime(snapshot.getLastModifiedTime()); - // The creator and creation time are set when the empty wavelet template is - // created above. - - return wavelet; - } - - /** - * Serializes a document to a document snapshot. - * - * @param document The document to serialize - * @return A snapshot of the given document - */ - public static DocumentSnapshot serializeDocument(ReadableBlipData document) { - DocumentSnapshot.Builder builder = DocumentSnapshot.newBuilder(); - - builder.setDocumentId(document.getId()); - builder.setDocumentOperation(CoreWaveletOperationSerializer.serialize( - document.getContent().asOperation())); - - builder.setAuthor(document.getAuthor().getAddress()); - for (ParticipantId participant : document.getContributors()) { - builder.addContributor(participant.getAddress()); - } - builder.setLastModifiedVersion(document.getLastModifiedVersion()); - builder.setLastModifiedTime(document.getLastModifiedTime()); - - return builder.build(); - } - - // TODO(ljvderijk): Should be removed once the AbstractWaveletData changes - private static ParticipantId getParticipantId(String address) throws InvalidParticipantAddress { - return ParticipantId.of(address); - } - - private static void addDocumentSnapshotToWavelet( - DocumentSnapshot snapshot, WaveletData container) throws InvalidParticipantAddress { - DocOp op = CoreWaveletOperationSerializer.deserialize(snapshot.getDocumentOperation()); - DocInitialization docInit = DocOpUtil.asInitialization(op); - - Collection<ParticipantId> contributors = CollectionUtils.newArrayList(); - for (String p : snapshot.getContributorList()) { - contributors.add(getParticipantId(p)); - } - container.createDocument( - snapshot.getDocumentId(), - getParticipantId(snapshot.getAuthor()), - contributors, - docInit, - snapshot.getLastModifiedTime(), - snapshot.getLastModifiedVersion()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/executor/ExecutorAnnotations.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/executor/ExecutorAnnotations.java b/src/org/waveprotocol/box/server/executor/ExecutorAnnotations.java deleted file mode 100644 index a002c76..0000000 --- a/src/org/waveprotocol/box/server/executor/ExecutorAnnotations.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.executor; - -import com.google.inject.BindingAnnotation; -import java.lang.annotation.Retention; -import static java.lang.annotation.RetentionPolicy.RUNTIME; - -/** - * Executor annotations. - * - * @author [email protected] (A. Kaplanov) - */ -public interface ExecutorAnnotations { - - @Retention(RUNTIME) - @BindingAnnotation - public @interface ClientServerExecutor { - } - - @Retention(RUNTIME) - @BindingAnnotation - public @interface DeltaPersistExecutor { - } - - @Retention(RUNTIME) - @BindingAnnotation - public @interface IndexExecutor { - } - - @Retention(RUNTIME) - @BindingAnnotation - public @interface ListenerExecutor { - } - - @Retention(RUNTIME) - @BindingAnnotation - public @interface LookupExecutor { - } - - @Retention(RUNTIME) - @BindingAnnotation - public @interface StorageContinuationExecutor { - } - - @Retention(RUNTIME) - @BindingAnnotation - public @interface WaveletLoadExecutor { - } - - @Retention(RUNTIME) - @BindingAnnotation - public @interface ContactExecutor { - } - - @Retention(RUNTIME) - @BindingAnnotation - public @interface RobotConnectionExecutor { - } - - @Retention(RUNTIME) - @BindingAnnotation - public @interface RobotGatewayExecutor { - } - - @Retention(RUNTIME) - @BindingAnnotation - public @interface XmppExecutor { - } - - @BindingAnnotation - public @interface SolrExecutor { - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/executor/ExecutorsModule.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/executor/ExecutorsModule.java b/src/org/waveprotocol/box/server/executor/ExecutorsModule.java deleted file mode 100644 index bfcd345..0000000 --- a/src/org/waveprotocol/box/server/executor/ExecutorsModule.java +++ /dev/null @@ -1,188 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.executor; - -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.inject.*; -import com.typesafe.config.Config; -import org.waveprotocol.box.server.executor.ExecutorAnnotations.*; - -import java.util.concurrent.*; - -/** - * Module with executors. - * - * @author [email protected] (A. Kaplanov) - */ -public class ExecutorsModule extends AbstractModule { - - @Inject - public ExecutorsModule() { - } - - @Override - public void configure() { - bind(RequestScopeExecutor.class); - bind(ScheduledRequestScopeExecutor.class); - } - - @Provides - @Singleton - @ClientServerExecutor - protected Executor provideClientServerExecutor(Provider<RequestScopeExecutor> executorProvider) { - return provideThreadPoolExecutor(executorProvider, -1, ClientServerExecutor.class - .getSimpleName()); - } - - @Provides - @Singleton - @DeltaPersistExecutor - protected Executor provideDeltaPersistExecutor(Provider<RequestScopeExecutor> executorProvider, - Config config) { - return provideThreadPoolExecutor(executorProvider, config - .getInt("threads.delta_persist_executor_thread_count"), DeltaPersistExecutor.class - .getSimpleName()); - } - - @Provides - @Singleton - @IndexExecutor - protected Executor provideIndexExecutor(Provider<RequestScopeExecutor> executorProvider) { - return provideThreadPoolExecutor(executorProvider, 1, IndexExecutor.class.getSimpleName()); - } - - @Provides - @Singleton - @ListenerExecutor - protected Executor provideListenerExecutor(Provider<RequestScopeExecutor> executorProvider, - Config config) { - return provideThreadPoolExecutor(executorProvider, config - .getInt("threads.listener_executor_thread_count"), ListenerExecutor.class.getSimpleName()); - } - - @Provides - @Singleton - @LookupExecutor - protected Executor provideLookupExecutor(Provider<RequestScopeExecutor> executorProvider, - Config config) { - return provideThreadPoolExecutor(executorProvider, config - .getInt("threads.lookup_executor_thread_count"), LookupExecutor.class.getSimpleName()); - } - - @Provides - @Singleton - @StorageContinuationExecutor - protected Executor provideStorageContinuationExecutor( - Provider<RequestScopeExecutor> executorProvider, Config config) { - return provideThreadPoolExecutor(executorProvider, config - .getInt("threads.storage_continuation_executor_thread_count"), - StorageContinuationExecutor.class.getSimpleName()); - } - - @Provides - @Singleton - @WaveletLoadExecutor - protected Executor provideWaveletLoadExecutor(Provider<RequestScopeExecutor> executorProvider, - Config config) { - return provideThreadPoolExecutor(executorProvider, config - .getInt("threads.wavelet_load_executor_thread_count"), WaveletLoadExecutor.class - .getSimpleName()); - } - - @Provides - @Singleton - @ContactExecutor - protected ScheduledExecutorService provideContactExecutor( - Provider<ScheduledRequestScopeExecutor> executorProvider, Config config) { - return provideScheduledThreadPoolExecutor(executorProvider, config - .getInt("threads.contact_executor_thread_count"), ContactExecutor.class.getSimpleName()); - } - - @Provides - @Singleton - @RobotConnectionExecutor - protected ScheduledExecutorService provideRobotConnectionExecutor( - Provider<ScheduledRequestScopeExecutor> executorProvider, Config config) { - return provideScheduledThreadPoolExecutor(executorProvider, config - .getInt("threads.robot_connection_thread_count"), RobotConnectionExecutor.class - .getSimpleName()); - } - - @Provides - @Singleton - @RobotGatewayExecutor - protected Executor provideRobotGatewayExecutor(Provider<RequestScopeExecutor> executorProvider, - Config config) { - return provideThreadPoolExecutor(executorProvider, config - .getInt("threads.robot_gateway_thread_count"), RobotGatewayExecutor.class.getSimpleName()); - } - - @Provides - @Singleton - @XmppExecutor - protected ScheduledExecutorService provideXmppExecutor( - Provider<ScheduledRequestScopeExecutor> executorProvider) { - return provideScheduledThreadPoolExecutor(executorProvider, 1, XmppExecutor.class - .getSimpleName()); - } - - @Provides - @Singleton - @SolrExecutor - protected Executor provideSolrExecutor(Provider<RequestScopeExecutor> executorProvider, - Config config) { - return provideThreadPoolExecutor(executorProvider, config.getInt("threads.solr_thread_count"), - SolrExecutor.class.getSimpleName()); - } - - private Executor provideThreadPoolExecutor(Provider<RequestScopeExecutor> executorProvider, - int threadCount, String name) { - if (threadCount == 0) { - return MoreExecutors.sameThreadExecutor(); - } - ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(name).build(); - ExecutorService executor; - if (threadCount < 0) { - executor = Executors.newCachedThreadPool(threadFactory); - } else if (threadCount == 1) { - executor = Executors.newSingleThreadExecutor(threadFactory); - } else { - executor = Executors.newFixedThreadPool(threadCount, threadFactory); - } - RequestScopeExecutor scopeExecutor = executorProvider.get(); - scopeExecutor.setExecutor(executor, name); - return scopeExecutor; - } - - private ScheduledExecutorService provideScheduledThreadPoolExecutor( - Provider<ScheduledRequestScopeExecutor> executorProvider, int threadCount, String name) { - ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(name).build(); - ScheduledExecutorService executor; - if (threadCount == 1) { - executor = Executors.newSingleThreadScheduledExecutor(threadFactory); - } else { - executor = Executors.newScheduledThreadPool(threadCount, threadFactory); - } - ScheduledRequestScopeExecutor scopeExecutor = executorProvider.get(); - scopeExecutor.setExecutor(executor, name); - return scopeExecutor; - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/executor/RequestScopeExecutor.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/executor/RequestScopeExecutor.java b/src/org/waveprotocol/box/server/executor/RequestScopeExecutor.java deleted file mode 100644 index 40c2039..0000000 --- a/src/org/waveprotocol/box/server/executor/RequestScopeExecutor.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.waveprotocol.box.server.executor; - -import com.google.inject.Inject; -import java.util.Map; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.waveprotocol.box.stat.RequestScope; -import org.waveprotocol.box.stat.Timing; -import org.waveprotocol.wave.model.util.Preconditions; - -/** - * Session-based request scope executor. - * Runs on specified executor. - * Clones request scope on scheduling and restores scope on executing of task. - * - * @author (David Byttow) - * @author [email protected] (A. Kaplanov) - */ -@SuppressWarnings("rawtypes") -public class RequestScopeExecutor implements Executor { - private final static Logger LOG = Logger.getLogger(RequestScopeExecutor.class.getName()); - - private ExecutorService executor; - - @Inject - public RequestScopeExecutor() { - } - - public void setExecutor(ExecutorService executor, String name) { - Preconditions.checkArgument(this.executor == null, "Executor is already defined."); - this.executor = executor; - } - - @Override - public void execute(final Runnable runnable) { - Preconditions.checkNotNull(executor, "Executor is not defined."); - - final Map<Class, RequestScope.Value> values = - Timing.isEnabled() ? Timing.cloneScopeValues() : null; - - executor.submit(new Runnable() { - @Override - public void run() { - if (values != null) { - Timing.enterScope(values); - } - try { - runnable.run(); - } catch (Exception e) { - LOG.log(Level.WARNING, "Error executing with portable scope.", e); - } finally { - Timing.exitScope(); - } - } - }); - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/executor/ScheduledRequestScopeExecutor.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/executor/ScheduledRequestScopeExecutor.java b/src/org/waveprotocol/box/server/executor/ScheduledRequestScopeExecutor.java deleted file mode 100644 index f841667..0000000 --- a/src/org/waveprotocol/box/server/executor/ScheduledRequestScopeExecutor.java +++ /dev/null @@ -1,190 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.waveprotocol.box.server.executor; - -import com.google.inject.Inject; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.logging.Logger; - -import org.waveprotocol.box.stat.RequestScope; -import org.waveprotocol.box.stat.Timing; -import org.waveprotocol.wave.model.util.Preconditions; - -/** - * Session-based request scope executor. - * Runs on specified executor. - * Clones request scope on scheduling and restores scope on executing of task. - * - * @author [email protected] (A. Kaplanov) - */ -@SuppressWarnings("rawtypes") -public class ScheduledRequestScopeExecutor implements ScheduledExecutorService { - private final static Logger LOG = Logger.getLogger(ScheduledRequestScopeExecutor.class.getName()); - - private ScheduledExecutorService executor; - - @Inject - public ScheduledRequestScopeExecutor() { - } - - /** - * Sets the original executor. - */ - public void setExecutor(ScheduledExecutorService executor, String name) { - Preconditions.checkArgument(this.executor == null, "Executor is already defined."); - this.executor = executor; - } - - @Override - public ScheduledFuture<?> schedule(final Runnable runnable, final long delay, final TimeUnit unit) { - Preconditions.checkNotNull(executor, "Executor is not defined."); - return executor.schedule(makeScopedRunnable(runnable), delay, unit); - } - - @Override - public <V> ScheduledFuture<V> schedule(Callable<V> callable, long l, TimeUnit tu) { - Preconditions.checkNotNull(executor, "Executor is not defined."); - return executor.schedule(makeScopedCallable(callable), l, tu); - } - - @Override - public ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable, long l, long l1, TimeUnit tu) { - Preconditions.checkNotNull(executor, "Executor is not defined."); - return executor.scheduleAtFixedRate(makeScopedRunnable(runnable), l, l1, tu); - } - - @Override - public ScheduledFuture<?> scheduleWithFixedDelay(Runnable runnable, long l, long l1, TimeUnit tu) { - Preconditions.checkNotNull(executor, "Executor is not defined."); - return executor.scheduleAtFixedRate(makeScopedRunnable(runnable), l, l1, tu); - } - - @Override - public void shutdown() { - executor.shutdown(); - } - - @Override - public List<Runnable> shutdownNow() { - return executor.shutdownNow(); - } - - @Override - public boolean isShutdown() { - return executor.isShutdown(); - } - - @Override - public boolean isTerminated() { - return executor.isTerminated(); - } - - @Override - public boolean awaitTermination(long l, TimeUnit tu) throws InterruptedException { - return executor.awaitTermination(l, tu); - } - - @Override - public <T> Future<T> submit(Callable<T> clbl) { - throw new UnsupportedOperationException(); - } - - @Override - public <T> Future<T> submit(Runnable r, T t) { - throw new UnsupportedOperationException(); - } - - @Override - public Future<?> submit(Runnable r) { - throw new UnsupportedOperationException(); - } - - @Override - public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> clctn) throws InterruptedException { - throw new UnsupportedOperationException(); - } - - @Override - public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> clctn, long l, TimeUnit tu) throws InterruptedException { - throw new UnsupportedOperationException(); - } - - @Override - public <T> T invokeAny(Collection<? extends Callable<T>> clctn) throws InterruptedException, ExecutionException { - throw new UnsupportedOperationException(); - } - - @Override - public <T> T invokeAny(Collection<? extends Callable<T>> clctn, long l, TimeUnit tu) throws InterruptedException, ExecutionException, TimeoutException { - throw new UnsupportedOperationException(); - } - - @Override - public void execute(Runnable r) { - throw new UnsupportedOperationException(); - } - - private Runnable makeScopedRunnable(final Runnable runnable) { - final Map<Class, RequestScope.Value> scopeValues - = (Timing.isEnabled()) ? Timing.cloneScopeValues() : null; - - return new Runnable() { - @Override - public void run() { - if (scopeValues != null) { - Timing.enterScope(scopeValues); - } - try { - runnable.run(); - } finally { - Timing.exitScope(); - } - } - }; - } - - private <T> Callable<T> makeScopedCallable(final Callable<T> callable) { - final Map<Class, RequestScope.Value> scopeValues - = (Timing.isEnabled()) ? Timing.cloneScopeValues() : null; - - return new Callable<T> () { - @Override - public T call() throws Exception { - if (scopeValues != null) { - Timing.enterScope(scopeValues); - } - try { - return callable.call(); - } finally { - Timing.exitScope(); - } - } - }; - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/frontend/ClientFrontend.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/frontend/ClientFrontend.java b/src/org/waveprotocol/box/server/frontend/ClientFrontend.java deleted file mode 100644 index 2f752df..0000000 --- a/src/org/waveprotocol/box/server/frontend/ClientFrontend.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.frontend; - -import org.waveprotocol.box.common.comms.WaveClientRpc; -import org.waveprotocol.box.server.waveserver.WaveletProvider.SubmitRequestListener; -import org.waveprotocol.wave.federation.Proto.ProtocolWaveletDelta; -import org.waveprotocol.wave.model.id.IdFilter; -import org.waveprotocol.wave.model.id.WaveId; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta; -import org.waveprotocol.wave.model.version.HashedVersion; -import org.waveprotocol.wave.model.wave.ParticipantId; - -import java.util.Collection; -import java.util.List; - -import javax.annotation.Nullable; - -/** - * The client front-end handles requests from clients and directs them to - * appropriate back-ends. - * - * Provides updates for wavelets that a client has opened and access to. - */ -public interface ClientFrontend { - - /** - * Listener provided to open requests. - */ - interface OpenListener { - /** - * Called when an update is received. - * - * @param waveletName wavelet receiving the update - * @param snapshot optional snapshot - * @param deltas optional deltas, not necessarily contiguous - * @param committedVersion optional commit notice - * @param marker optional (true/false/absent) marker - * @param channelId channel id (first message only) - */ - void onUpdate(WaveletName waveletName, @Nullable CommittedWaveletSnapshot snapshot, - List<TransformedWaveletDelta> deltas, @Nullable HashedVersion committedVersion, - @Nullable Boolean marker, String channelId); - - /** - * Called when the stream fails. No further updates will be received. - */ - void onFailure(String errorMessage); - } - - /** - * Request submission of a delta. - * - * @param loggedInUser which is doing the requesting. - * @param waveletName name of wavelet. - * @param delta the wavelet delta to submit. - * @param channelId the client's channel ID - * @param listener callback for the result. - */ - void submitRequest(ParticipantId loggedInUser, WaveletName waveletName, - ProtocolWaveletDelta delta, String channelId, SubmitRequestListener listener); - - /** - * Request to open a Wave. Optional waveletIdPrefixes allows the requester to - * constrain which wavelets to include in the updates. - * - * @param loggedInUser which is doing the requesting. - * @param waveId the wave id. - * @param waveletIdFilter filter over wavelets to open - * @param knownWavelets a collection of wavelet versions the client already - * knows - * @param openListener callback for updates. - */ - void openRequest(ParticipantId loggedInUser, WaveId waveId, IdFilter waveletIdFilter, - Collection<WaveClientRpc.WaveletVersion> knownWavelets, OpenListener openListener); -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/frontend/ClientFrontendImpl.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/frontend/ClientFrontendImpl.java b/src/org/waveprotocol/box/server/frontend/ClientFrontendImpl.java deleted file mode 100644 index 3875b20..0000000 --- a/src/org/waveprotocol/box/server/frontend/ClientFrontendImpl.java +++ /dev/null @@ -1,298 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.frontend; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Sets; - -import org.waveprotocol.box.common.DeltaSequence; -import org.waveprotocol.box.common.comms.WaveClientRpc; -import org.waveprotocol.box.server.waveserver.WaveBus; -import org.waveprotocol.box.server.waveserver.WaveServerException; -import org.waveprotocol.box.server.waveserver.WaveletProvider; -import org.waveprotocol.box.server.waveserver.WaveletProvider.SubmitRequestListener; -import org.waveprotocol.wave.federation.Proto.ProtocolWaveletDelta; -import org.waveprotocol.wave.model.id.IdFilter; -import org.waveprotocol.wave.model.id.WaveId; -import org.waveprotocol.wave.model.id.WaveletId; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.operation.wave.AddParticipant; -import org.waveprotocol.wave.model.operation.wave.RemoveParticipant; -import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta; -import org.waveprotocol.wave.model.operation.wave.WaveletOperation; -import org.waveprotocol.wave.model.version.HashedVersion; -import org.waveprotocol.wave.model.wave.ParticipantId; -import org.waveprotocol.wave.model.wave.data.ReadableWaveletData; -import org.waveprotocol.wave.util.logging.Log; - -import java.util.Collection; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; - - -/** - * Implements {@link ClientFrontend}. - * - * When a wavelet is added and it's not at version 0, buffer updates until a - * request for the wavelet's history has completed. - */ -public class ClientFrontendImpl implements ClientFrontend, WaveBus.Subscriber { - private static final Log LOG = Log.get(ClientFrontendImpl.class); - - private final static AtomicInteger channel_counter = new AtomicInteger(0); - - private final WaveletProvider waveletProvider; - private final WaveletInfo waveletInfo; - - /** - * Creates a client frontend and subscribes it to the wave bus. - * - * @throws WaveServerException if the server fails during initialization. - */ - public static ClientFrontendImpl create(WaveletProvider waveletProvider, WaveBus wavebus, - WaveletInfo waveletInfo) throws WaveServerException { - - ClientFrontendImpl impl = - new ClientFrontendImpl(waveletProvider, waveletInfo); - - wavebus.subscribe(impl); - return impl; - } - - /** - * Constructor. - * - * @param waveletProvider - * @param waveDomain the server wave domain. It is assumed that the wave domain is valid. - */ - @VisibleForTesting - ClientFrontendImpl( - WaveletProvider waveletProvider, WaveletInfo waveletInfo) { - this.waveletProvider = waveletProvider; - this.waveletInfo = waveletInfo; - } - - @Override - public void openRequest(ParticipantId loggedInUser, WaveId waveId, IdFilter waveletIdFilter, - Collection<WaveClientRpc.WaveletVersion> knownWavelets, OpenListener openListener) { - LOG.info("received openRequest from " + loggedInUser + " for " + waveId + ", filter " - + waveletIdFilter + ", known wavelets: " + knownWavelets); - - // TODO(josephg): Make it possible for this to succeed & return public - // waves. - if (loggedInUser == null) { - openListener.onFailure("Not logged in"); - return; - } - - if (!knownWavelets.isEmpty()) { - openListener.onFailure("Known wavelets not supported"); - return; - } - - try { - waveletInfo.initialiseWave(waveId); - } catch (WaveServerException e) { - LOG.severe("Wave server failed lookup for " + waveId, e); - openListener.onFailure("Wave server failed to look up wave"); - return; - } - - String channelId = generateChannelID(); - UserManager userManager = waveletInfo.getUserManager(loggedInUser); - WaveViewSubscription subscription = - userManager.subscribe(waveId, waveletIdFilter, channelId, openListener); - LOG.info("Subscribed " + loggedInUser + " to " + waveId + " channel " + channelId); - - Set<WaveletId> waveletIds; - try { - waveletIds = waveletInfo.visibleWaveletsFor(subscription, loggedInUser); - } catch (WaveServerException e1) { - waveletIds = Sets.newHashSet(); - LOG.warning("Failed to retrieve visible wavelets for " + loggedInUser, e1); - } - for (WaveletId waveletId : waveletIds) { - WaveletName waveletName = WaveletName.of(waveId, waveletId); - // Ensure that implicit participants will also receive updates. - // TODO (Yuri Z.) If authorizing participant was removed from the wave - // (the shared domain participant), then all implicit participant that - // were authorized should be unsubsrcibed. - waveletInfo.notifyAddedImplcitParticipant(waveletName, loggedInUser); - // The WaveletName by which the waveletProvider knows the relevant deltas - - // TODO(anorth): if the client provides known wavelets, calculate - // where to start sending deltas from. - - CommittedWaveletSnapshot snapshotToSend; - - // Send a snapshot of the current state. - // TODO(anorth): calculate resync point if the client already knows - // a snapshot. - try { - snapshotToSend = waveletProvider.getSnapshot(waveletName); - } catch (WaveServerException e) { - LOG.warning("Failed to retrieve snapshot for wavelet " + waveletName, e); - openListener.onFailure("Wave server failure retrieving wavelet"); - return; - } - - LOG.info("snapshot in response is: " + (snapshotToSend != null)); - if (snapshotToSend == null) { - // Send deltas. - openListener.onUpdate(waveletName, snapshotToSend, DeltaSequence.empty(), null, null, - channelId); - } else { - // Send the snapshot. - openListener.onUpdate(waveletName, snapshotToSend, DeltaSequence.empty(), - snapshotToSend.committedVersion, null, channelId); - } - } - - WaveletName dummyWaveletName = createDummyWaveletName(waveId); - if (waveletIds.size() == 0) { - // Send message with just the channel id. - LOG.info("sending just a channel id for " + dummyWaveletName); - openListener.onUpdate(dummyWaveletName, null, DeltaSequence.empty(), null, null, channelId); - } - LOG.info("sending marker for " + dummyWaveletName); - openListener.onUpdate(dummyWaveletName, null, DeltaSequence.empty(), null, true, null); - } - - private String generateChannelID() { - return "ch" + channel_counter.addAndGet(1); - } - - @Override - public void submitRequest(ParticipantId loggedInUser, final WaveletName waveletName, - final ProtocolWaveletDelta delta, final String channelId, - final SubmitRequestListener listener) { - final ParticipantId author = new ParticipantId(delta.getAuthor()); - - if (!author.equals(loggedInUser)) { - listener.onFailure("Author field on delta must match logged in user"); - return; - } - - waveletInfo.getUserManager(author).submitRequest(channelId, waveletName); - waveletProvider.submitRequest(waveletName, delta, new SubmitRequestListener() { - @Override - public void onSuccess(int operationsApplied, - HashedVersion hashedVersionAfterApplication, long applicationTimestamp) { - listener.onSuccess(operationsApplied, hashedVersionAfterApplication, - applicationTimestamp); - waveletInfo.getUserManager(author).submitResponse(channelId, waveletName, - hashedVersionAfterApplication); - } - - @Override - public void onFailure(String error) { - listener.onFailure(error); - waveletInfo.getUserManager(author).submitResponse(channelId, waveletName, null); - } - }); - } - - @Override - public void waveletCommitted(WaveletName waveletName, HashedVersion version) { - for (ParticipantId participant : waveletInfo.getWaveletParticipants(waveletName)) { - waveletInfo.getUserManager(participant).onCommit(waveletName, version); - } - } - - /** - * Sends new deltas to a particular user on a particular wavelet. - * Updates the participants of the specified wavelet if the participant was added or removed. - * - * @param waveletName the waveletName which the deltas belong to. - * @param participant on the wavelet. - * @param newDeltas newly arrived deltas of relevance for participant. Must - * not be empty. - * @param add whether the participant is added by the first delta. - * @param remove whether the participant is removed by the last delta. - */ - private void participantUpdate(WaveletName waveletName, ParticipantId participant, - DeltaSequence newDeltas, boolean add, boolean remove) { - if(LOG.isFineLoggable()) { - LOG.fine("Notifying " + participant + " for " + waveletName); - } - if (add) { - waveletInfo.notifyAddedExplicitWaveletParticipant(waveletName, participant); - } - waveletInfo.getUserManager(participant).onUpdate(waveletName, newDeltas); - if (remove) { - waveletInfo.notifyRemovedExplicitWaveletParticipant(waveletName, participant); - } - } - - /** - * Tracks wavelet versions and ensures that the deltas are contiguous. Updates - * wavelet subscribers with new new deltas. - */ - @Override - public void waveletUpdate(ReadableWaveletData wavelet, DeltaSequence newDeltas) { - if (newDeltas.isEmpty()) { - return; - } - - WaveletName waveletName = WaveletName.of(wavelet.getWaveId(), wavelet.getWaveletId()); - - if(waveletInfo.getCurrentWaveletVersion(waveletName).getVersion() == 0 && LOG.isWarningLoggable()) { - LOG.warning("Wavelet does not appear to have been initialized by client. Continuing anyway."); - } - - waveletInfo.syncWaveletVersion(waveletName, newDeltas); - - Set<ParticipantId> remainingparticipants = - Sets.newHashSet(waveletInfo.getWaveletParticipants(waveletName)); - // Participants added during the course of newDeltas. - Set<ParticipantId> newParticipants = Sets.newHashSet(); - for (int i = 0; i < newDeltas.size(); i++) { - TransformedWaveletDelta delta = newDeltas.get(i); - // Participants added or removed in this delta get the whole delta. - for (WaveletOperation op : delta) { - if (op instanceof AddParticipant) { - ParticipantId p = ((AddParticipant) op).getParticipantId(); - remainingparticipants.add(p); - newParticipants.add(p); - } - if (op instanceof RemoveParticipant) { - ParticipantId p = ((RemoveParticipant) op).getParticipantId(); - remainingparticipants.remove(p); - participantUpdate(waveletName, p, newDeltas.subList(0, i + 1), newParticipants.remove(p), - true); - } - } - } - - // Send out deltas to those who end up being participants at the end - // (either because they already were, or because they were added). - for (ParticipantId p : remainingparticipants) { - boolean isNew = newParticipants.contains(p); - participantUpdate(waveletName, p, newDeltas, isNew, false); - } - } - - @VisibleForTesting - static WaveletName createDummyWaveletName(WaveId waveId) { - final WaveletName dummyWaveletName = - WaveletName.of(waveId, WaveletId.of(waveId.getDomain(), "dummy+root")); - return dummyWaveletName; - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/frontend/CommittedWaveletSnapshot.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/frontend/CommittedWaveletSnapshot.java b/src/org/waveprotocol/box/server/frontend/CommittedWaveletSnapshot.java deleted file mode 100644 index 6409a38..0000000 --- a/src/org/waveprotocol/box/server/frontend/CommittedWaveletSnapshot.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.frontend; - - -import com.google.common.base.Preconditions; - -import org.waveprotocol.wave.model.version.HashedVersion; -import org.waveprotocol.wave.model.wave.data.ReadableWaveletData; - -/** - * A wavelet snapshot with committed version. - * - * @author [email protected] (Alex North) -*/ -public final class CommittedWaveletSnapshot { - public final ReadableWaveletData snapshot; - public final HashedVersion committedVersion; - - public CommittedWaveletSnapshot(ReadableWaveletData snapshot, - HashedVersion committedVersion) { - Preconditions.checkNotNull(snapshot); - Preconditions.checkNotNull(committedVersion); - this.snapshot = snapshot; - this.committedVersion = committedVersion; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) return true; - if (obj != null && getClass() == obj.getClass()) { - CommittedWaveletSnapshot other = (CommittedWaveletSnapshot) obj; - return committedVersion.equals(other.committedVersion) && snapshot.equals(other.snapshot); - } - return false; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((committedVersion == null) ? 0 : committedVersion.hashCode()); - result = prime * result + ((snapshot == null) ? 0 : snapshot.hashCode()); - return result; - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/frontend/UserManager.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/frontend/UserManager.java b/src/org/waveprotocol/box/server/frontend/UserManager.java deleted file mode 100644 index a0d23ce..0000000 --- a/src/org/waveprotocol/box/server/frontend/UserManager.java +++ /dev/null @@ -1,139 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.frontend; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.LinkedListMultimap; -import com.google.common.collect.ListMultimap; -import com.google.common.collect.Lists; - -import org.waveprotocol.box.common.DeltaSequence; -import org.waveprotocol.wave.model.id.IdFilter; -import org.waveprotocol.wave.model.id.WaveId; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.version.HashedVersion; - -import java.util.List; - -/** - * Collects active wave view subscriptions for a single participant. - */ -final class UserManager { - private final ListMultimap<WaveId, WaveViewSubscription> subscriptions = - LinkedListMultimap.create(); - - /** The listeners interested in the specified wavelet. */ - @VisibleForTesting - synchronized List<WaveViewSubscription> matchSubscriptions(WaveletName waveletName) { - List<WaveViewSubscription> result = Lists.newArrayList(); - for (WaveViewSubscription subscription : subscriptions.get(waveletName.waveId)) { - if (subscription.includes(waveletName.waveletId)) { - result.add(subscription); - } - } - return result; - } - - /** Returns the subscription (if it exists) for a given wavelet and channel */ - private synchronized WaveViewSubscription findSubscription(WaveletName waveletName, String channelId) { - for (WaveViewSubscription subscription : subscriptions.get(waveletName.waveId)) { - if (subscription.includes(waveletName.waveletId)) { - if (subscription.getChannelId().equals(channelId)) { - return subscription; - } - } - } - return null; - } - - /** - * Receives additional deltas for the specified wavelet, of which we must be a - * participant. Delta updates must be received in contiguous version order. - */ - public synchronized void onUpdate(WaveletName waveletName, DeltaSequence deltas) { - Preconditions.checkNotNull(waveletName); - if (deltas.isEmpty()) { - return; - } - List<WaveViewSubscription> subscriptions = matchSubscriptions(waveletName); - for (WaveViewSubscription subscription : subscriptions) { - subscription.onUpdate(waveletName, deltas); - } - } - - /** - * Receives notification that the specified wavelet has been committed at the - * specified version. - */ - public void onCommit(WaveletName waveletName, HashedVersion version) { - Preconditions.checkNotNull(waveletName); - Preconditions.checkNotNull(version); - List<WaveViewSubscription> listeners = matchSubscriptions(waveletName); - for (WaveViewSubscription listener : listeners) { - listener.onCommit(waveletName, version); - } - } - - /** - * Subscribes a listener to updates on a wave, filtered by waveletId. - * - * @return a subscription - */ - public synchronized WaveViewSubscription subscribe(WaveId waveId, IdFilter waveletIdFilter, - String channelId, ClientFrontend.OpenListener listener) { - WaveViewSubscription subscription = - new WaveViewSubscription(waveId, waveletIdFilter, channelId, listener); - subscriptions.put(waveId, subscription); - return subscription; - } - - /** - * Tell the user manager that we have a submit request outstanding. While a - * submit request is outstanding, all wavelet updates are queued. - * - * @param channelId the channel identifying the specific client - * @param waveletName the name of the wavelet - */ - public void submitRequest(String channelId, WaveletName waveletName) { - WaveViewSubscription subscription = findSubscription(waveletName, channelId); - if (subscription != null) { - subscription.submitRequest(waveletName); - } - } - - /** - * Signal the user manager that a submit response has been sent for the given - * wavelet and version. Any pending wavelet updates will be sent. A matching - * wavelet update for the given wavelet name and version will be discarded. - * - * @param channelId the channel identifying the specific client - * @param waveletName the name of the wavelet - * @param hashedVersionAfterApplication the version of the wavelet in the - * response (or null if the submit request failed) - */ - public void submitResponse(String channelId, WaveletName waveletName, - HashedVersion hashedVersionAfterApplication) { - WaveViewSubscription subscription = findSubscription(waveletName, channelId); - if (subscription != null) { - subscription.submitResponse(waveletName, hashedVersionAfterApplication); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/7d8609e7/src/org/waveprotocol/box/server/frontend/WaveClientRpcImpl.java ---------------------------------------------------------------------- diff --git a/src/org/waveprotocol/box/server/frontend/WaveClientRpcImpl.java b/src/org/waveprotocol/box/server/frontend/WaveClientRpcImpl.java deleted file mode 100644 index 649e275..0000000 --- a/src/org/waveprotocol/box/server/frontend/WaveClientRpcImpl.java +++ /dev/null @@ -1,200 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.box.server.frontend; - -import com.google.common.base.Preconditions; -import com.google.protobuf.RpcCallback; -import com.google.protobuf.RpcController; - -import org.waveprotocol.box.common.comms.WaveClientRpc.ProtocolAuthenticate; -import org.waveprotocol.box.common.comms.WaveClientRpc.ProtocolAuthenticationResult; -import org.waveprotocol.box.common.comms.WaveClientRpc.ProtocolOpenRequest; -import org.waveprotocol.box.common.comms.WaveClientRpc.ProtocolSubmitRequest; -import org.waveprotocol.box.common.comms.WaveClientRpc.ProtocolSubmitResponse; -import org.waveprotocol.box.common.comms.WaveClientRpc.ProtocolWaveClientRpc; -import org.waveprotocol.box.common.comms.WaveClientRpc.ProtocolWaveletUpdate; -import org.waveprotocol.box.server.common.CoreWaveletOperationSerializer; -import org.waveprotocol.box.server.common.SnapshotSerializer; -import org.waveprotocol.box.server.rpc.ServerRpcController; -import org.waveprotocol.box.server.waveserver.WaveletProvider.SubmitRequestListener; -import org.waveprotocol.wave.model.id.IdFilter; -import org.waveprotocol.wave.model.id.InvalidIdException; -import org.waveprotocol.wave.model.id.ModernIdSerialiser; -import org.waveprotocol.wave.model.id.WaveId; -import org.waveprotocol.wave.model.id.WaveletId; -import org.waveprotocol.wave.model.id.WaveletName; -import org.waveprotocol.wave.model.operation.wave.TransformedWaveletDelta; -import org.waveprotocol.wave.model.version.HashedVersion; -import org.waveprotocol.wave.model.wave.ParticipantId; -import org.waveprotocol.wave.util.logging.Log; - -import java.util.Collections; -import java.util.List; - -import javax.annotation.Nullable; - -/** - * RPC interface implementation for the wave server. Adapts incoming and - * outgoing RPCs to the client frontend interface. - * - * - */ -public class WaveClientRpcImpl implements ProtocolWaveClientRpc.Interface { - - private static final Log LOG = Log.get(WaveClientRpcImpl.class); - - private final ClientFrontend frontend; - private final boolean handleAuthentication; - - /** - * Creates a new RPC interface to the front-end. - * - * @param frontend front-end to which to forward requests - * @param handleAuthentication whether to handle authentication; it's - * otherwise expected to be handled before this class - */ - public static WaveClientRpcImpl create(ClientFrontend frontend, - boolean handleAuthentication) { - return new WaveClientRpcImpl(frontend, handleAuthentication); - } - - private WaveClientRpcImpl(ClientFrontend frontend, boolean handleAuthentication) { - this.frontend = frontend; - this.handleAuthentication = handleAuthentication; - } - - @Override - public void open(final RpcController controller, ProtocolOpenRequest request, - final RpcCallback<ProtocolWaveletUpdate> done) { - WaveId waveId; - try { - waveId = ModernIdSerialiser.INSTANCE.deserialiseWaveId(request.getWaveId()); - } catch (InvalidIdException e) { - LOG.warning("Invalid id in open", e); - controller.setFailed(e.getMessage()); - return; - } - IdFilter waveletIdFilter = - IdFilter.of(Collections.<WaveletId>emptySet(), request.getWaveletIdPrefixList()); - - ParticipantId loggedInUser = asBoxController(controller).getLoggedInUser(); - frontend.openRequest(loggedInUser, waveId, waveletIdFilter, request.getKnownWaveletList(), - new ClientFrontend.OpenListener() { - @Override - public void onFailure(String errorMessage) { - LOG.warning("openRequest failure: " + errorMessage); - controller.setFailed(errorMessage); - } - - @Override - public void onUpdate(WaveletName waveletName, - @Nullable CommittedWaveletSnapshot snapshot, List<TransformedWaveletDelta> deltas, - @Nullable HashedVersion committedVersion, Boolean hasMarker, String channel_id) { - ProtocolWaveletUpdate.Builder builder = ProtocolWaveletUpdate.newBuilder(); - if (hasMarker != null) { - builder.setMarker(hasMarker.booleanValue()); - } - if (channel_id != null) { - builder.setChannelId(channel_id); - } - builder.setWaveletName(ModernIdSerialiser.INSTANCE.serialiseWaveletName(waveletName)); - for (TransformedWaveletDelta d : deltas) { - // TODO(anorth): Add delta application metadata to the result - // when the c/s protocol supports it. - builder.addAppliedDelta(CoreWaveletOperationSerializer.serialize(d)); - } - if (!deltas.isEmpty()) { - builder.setResultingVersion(CoreWaveletOperationSerializer.serialize( - deltas.get((deltas.size() - 1)).getResultingVersion())); - } - if (snapshot != null) { - Preconditions.checkState(committedVersion.equals(snapshot.committedVersion), - "Mismatched commit versions, snapshot: " + snapshot.committedVersion - + " expected: " + committedVersion); - builder.setSnapshot(SnapshotSerializer.serializeWavelet(snapshot.snapshot, - snapshot.committedVersion)); - builder.setResultingVersion(CoreWaveletOperationSerializer.serialize( - snapshot.snapshot.getHashedVersion())); - builder.setCommitNotice(CoreWaveletOperationSerializer.serialize( - snapshot.committedVersion)); - } else { - if (committedVersion != null) { - builder.setCommitNotice( - CoreWaveletOperationSerializer.serialize(committedVersion)); - } - } - done.run(builder.build()); - } - }); - } - - @Override - public void submit(RpcController controller, ProtocolSubmitRequest request, - final RpcCallback<ProtocolSubmitResponse> done) { - WaveletName waveletName = null; - try { - waveletName = ModernIdSerialiser.INSTANCE.deserialiseWaveletName(request.getWaveletName()); - } catch (InvalidIdException e) { - LOG.warning("Invalid id in submit", e); - controller.setFailed(e.getMessage()); - return; - } - String channelId; - if (request.hasChannelId()) { - channelId = request.getChannelId(); - } else { - channelId = null; - } - ParticipantId loggedInUser = asBoxController(controller).getLoggedInUser(); - frontend.submitRequest(loggedInUser, waveletName, request.getDelta(), channelId, - new SubmitRequestListener() { - @Override - public void onFailure(String error) { - done.run(ProtocolSubmitResponse.newBuilder() - .setOperationsApplied(0).setErrorMessage(error).build()); - } - - @Override - public void onSuccess(int operationsApplied, - HashedVersion hashedVersionAfterApplication, long applicationTimestamp) { - done.run(ProtocolSubmitResponse.newBuilder() - .setOperationsApplied(operationsApplied) - .setHashedVersionAfterApplication( - CoreWaveletOperationSerializer.serialize(hashedVersionAfterApplication)) - .build()); - // TODO(arb): applicationTimestamp?? - } - }); - } - - @Override - public void authenticate(RpcController controller, ProtocolAuthenticate request, - RpcCallback<ProtocolAuthenticationResult> done) { - Preconditions.checkState(handleAuthentication, - "ProtocolAuthenticate should be handled in ServerRpcProvider"); - done.run(ProtocolAuthenticationResult.getDefaultInstance()); - } - - ServerRpcController asBoxController(RpcController controller) { - // This cast is safe (because of how the WaveClientRpcImpl is instantiated). We need to do this - // because ServerRpcController implements an autogenerated interface. - return (ServerRpcController) controller; - } -}
