http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationRemote.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationRemote.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationRemote.java deleted file mode 100644 index a28bd72..0000000 --- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationRemote.java +++ /dev/null @@ -1,633 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.wave.federation.xmpp; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.inject.Inject; -import com.google.protobuf.ByteString; -import com.typesafe.config.Config; -import org.apache.commons.codec.binary.Base64; -import org.dom4j.Attribute; -import org.dom4j.Element; -import org.waveprotocol.wave.federation.FederationErrorProto.FederationError; -import org.waveprotocol.wave.federation.FederationErrors; -import org.waveprotocol.wave.federation.FederationRemoteBridge; -import org.waveprotocol.wave.federation.Proto.ProtocolHashedVersion; -import org.waveprotocol.wave.federation.Proto.ProtocolSignedDelta; -import org.waveprotocol.wave.federation.Proto.ProtocolSignerInfo; -import org.waveprotocol.wave.federation.WaveletFederationListener; -import org.waveprotocol.wave.federation.WaveletFederationProvider; -import org.waveprotocol.wave.federation.xmpp.XmppUtil.UnknownSignerType; -import org.waveprotocol.wave.model.id.URIEncoderDecoder.EncodingException; -import org.waveprotocol.wave.model.id.WaveletName; -import org.xmpp.packet.IQ; -import org.xmpp.packet.Message; -import org.xmpp.packet.Packet; - -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.Logger; - -/** - * Remote implementation. Receives submit and history requests from the local - * wave server and sends them to a remote wave server Host, and also receives - * update messages from a remote wave server Host and sends them to the local - * wave server. - */ -public class XmppFederationRemote implements WaveletFederationProvider { - private static final Logger LOG = Logger.getLogger(XmppFederationRemote.class.getCanonicalName()); - - // Timeout for outstanding provider calls sent over XMPP. - private static final int XMPP_PROVIDER_TIMEOUT = 30; - - private final WaveletFederationListener.Factory updatesListenerFactory; - private final XmppDisco disco; - private final String jid; - - private XmppManager manager = null; - - /** - * Constructor. Note that {@link #setManager} must be called before this class - * is ready to use. - * - * @param updatesListenerFactory used to communicate back to the local wave - * server when an update arrives. - */ - @Inject - public XmppFederationRemote( - @FederationRemoteBridge WaveletFederationListener.Factory updatesListenerFactory, - XmppDisco disco, Config config) { - this.updatesListenerFactory = updatesListenerFactory; - this.disco = disco; - this.jid = config.getString("federation.xmpp_jid"); - } - - /** - * Set the manager instance for this class. Must be invoked before any other - * methods are used. - */ - public void setManager(XmppManager manager) { - this.manager = manager; - } - - /** - * Request submission of signed delta. This is part of the Federation Remote - * interface - sends a submit request on behalf of the wave server. Part of - * the WaveletFederationProvider interface. - * - * @param waveletName name of wavelet. - * @param signedDelta delta signed by the submitting wave server. - * @param listener callback for the result of the submit. - */ - @Override - public void submitRequest(final WaveletName waveletName, - final ProtocolSignedDelta signedDelta, - final SubmitResultListener listener) { - - final IQ submitIq = new IQ(IQ.Type.set); - submitIq.setID(XmppUtil.generateUniqueId()); - - LOG.info("Submitting delta to remote server, wavelet " + waveletName); - submitIq.setFrom(jid); - - Element pubsub = submitIq.setChildElement("pubsub", XmppNamespace.NAMESPACE_PUBSUB); - Element publish = pubsub.addElement("publish"); - publish.addAttribute("node", "wavelet"); - Element submitRequest = publish.addElement("item").addElement("submit-request", - XmppNamespace.NAMESPACE_WAVE_SERVER); - Element deltaElement = submitRequest.addElement("delta"); - - deltaElement.addCDATA(Base64Util.encode(signedDelta.toByteArray())); - try { - deltaElement.addAttribute("wavelet-name", - XmppUtil.waveletNameCodec.waveletNameToURI(waveletName)); - } catch (EncodingException e) { - listener.onFailure(FederationErrors.badRequest( - "Couldn't encode wavelet name " + waveletName)); - return; - } - - final PacketCallback callback = new PacketCallback() { - @Override - public void error(FederationError error) { - listener.onFailure(error); - } - - @Override - public void run(Packet packet) { - processSubmitResponse(packet, listener); - } - }; - - disco.discoverRemoteJid(waveletName.waveletId.getDomain(), - new SuccessFailCallback<String, String>() { - @Override - public void onSuccess(String remoteJid) { - Preconditions.checkNotNull(remoteJid); - submitIq.setTo(remoteJid); - manager.send(submitIq, callback, XMPP_PROVIDER_TIMEOUT); - } - - @Override - public void onFailure(String errorMessage) { - // TODO(thorogood): Broken, Disco should return the error (and it - // should be timeout/etc) - listener.onFailure(FederationErrors.badRequest( - "No such wave server " + waveletName.waveletId.getDomain() + ": " + errorMessage)); - } - }); - } - - /** - * Retrieve delta history for the given wavelet. <p/> Part of the - * WaveletFederationProvider interface. - * - * @param waveletName name of wavelet. - * @param domain the remote Federation Host - * @param startVersion beginning of range (inclusive), minimum 0. - * @param endVersion end of range (exclusive). - * @param lengthLimit estimated size, in bytes, as an upper limit on the - * amount of data returned. - * @param listener callback for the result. - */ - public void requestHistory(final WaveletName waveletName, - final String domain, - ProtocolHashedVersion startVersion, - ProtocolHashedVersion endVersion, - long lengthLimit, - final WaveletFederationProvider.HistoryResponseListener listener) { - final IQ submitIq = new IQ(IQ.Type.get); - submitIq.setID(XmppUtil.generateUniqueId()); - - LOG.info("Getting history from remote server, wavelet " + waveletName - + " version " + startVersion + " (inc) through " + endVersion - + " (ex)"); - submitIq.setFrom(jid); - - Element pubsub = - submitIq.setChildElement("pubsub", - XmppNamespace.NAMESPACE_PUBSUB); - Element items = pubsub.addElement("items"); - items.addAttribute("node", "wavelet"); - Element historyDelta = - items.addElement("delta-history", - XmppNamespace.NAMESPACE_WAVE_SERVER); - - historyDelta.addAttribute("start-version", Long.toString(startVersion - .getVersion())); - historyDelta.addAttribute("start-version-hash", Base64Util - .encode(startVersion.getHistoryHash())); - historyDelta.addAttribute("end-version", Long.toString(endVersion - .getVersion())); - historyDelta.addAttribute("end-version-hash", Base64Util.encode(endVersion - .getHistoryHash())); - if (lengthLimit > 0) { - historyDelta.addAttribute("response-length-limit", Long - .toString(lengthLimit)); - } - try { - historyDelta.addAttribute("wavelet-name", - XmppUtil.waveletNameCodec.waveletNameToURI(waveletName)); - } catch (EncodingException e) { - listener.onFailure( - FederationErrors.badRequest("Couldn't encode wavelet name " + waveletName)); - return; - } - - final PacketCallback callback = new PacketCallback() { - public void error(FederationError error) { - listener.onFailure(error); - } - - @Override - public void run(Packet packet) { - processHistoryResponse(packet, listener); - } - }; - - disco.discoverRemoteJid(domain, new SuccessFailCallback<String, String>() { - @Override - public void onSuccess(String remoteJid) { - Preconditions.checkNotNull(remoteJid); - submitIq.setTo(remoteJid); - manager.send(submitIq, callback, XMPP_PROVIDER_TIMEOUT); - } - - @Override - public void onFailure(String errorMessage) { - listener.onFailure(FederationErrors.badRequest( - "No such wave server " + domain + ": " + errorMessage)); - } - }); - } - - @Override - public void getDeltaSignerInfo(ByteString signerId, WaveletName waveletName, - ProtocolHashedVersion deltaEndVersion, - final DeltaSignerInfoResponseListener listener) { - final IQ getSignerIq = new IQ(IQ.Type.get); - getSignerIq.setID(XmppUtil.generateUniqueId()); - - getSignerIq.setFrom(jid); - // Extract domain from waveletId - final String remoteDomain = waveletName.waveletId.getDomain(); - Element pubsub = - getSignerIq.setChildElement("pubsub", - XmppNamespace.NAMESPACE_PUBSUB); - Element items = pubsub.addElement("items"); - items.addAttribute("node", "signer"); - // TODO: should allow multiple requests in the same packet - Element signerRequest = - items.addElement("signer-request", - XmppNamespace.NAMESPACE_WAVE_SERVER); - signerRequest.addAttribute("signer-id", Base64Util.encode(signerId)); - signerRequest.addAttribute("history-hash", Base64Util - .encode(deltaEndVersion.getHistoryHash())); - signerRequest.addAttribute("version", String.valueOf(deltaEndVersion - .getVersion())); - try { - signerRequest.addAttribute("wavelet-name", - XmppUtil.waveletNameCodec.waveletNameToURI(waveletName)); - } catch (EncodingException e) { - listener.onFailure(FederationErrors.badRequest( - "Couldn't encode wavelet name " + waveletName)); - return; - } - - final PacketCallback callback = new PacketCallback() { - @Override - public void error(FederationError error) { - listener.onFailure(error); - } - - @Override - public void run(Packet packet) { - processGetSignerResponse(packet, listener); - } - }; - - disco.discoverRemoteJid( - remoteDomain, new SuccessFailCallback<String, String>() { - @Override - public void onSuccess(String remoteJid) { - Preconditions.checkNotNull(remoteJid); - getSignerIq.setTo(remoteJid); - manager.send(getSignerIq, callback, XMPP_PROVIDER_TIMEOUT); - } - - @Override - public void onFailure(String errorMessage) { - listener.onFailure(FederationErrors.badRequest( - "No such wave server " + remoteDomain + ": " + errorMessage)); - } - }); - } - - @Override - public void postSignerInfo( - final String remoteDomain, - ProtocolSignerInfo signerInfo, - final WaveletFederationProvider.PostSignerInfoResponseListener listener) { - final IQ request = new IQ(IQ.Type.set); - request.setID(XmppUtil.generateUniqueId()); - - request.setFrom(jid); - Element pubsub = request.setChildElement("pubsub", XmppNamespace.NAMESPACE_PUBSUB); - Element publish = pubsub.addElement("publish"); - publish.addAttribute("node", "signer"); - XmppUtil.protocolSignerInfoToXml(signerInfo, publish.addElement("item")); - - final PacketCallback callback = new PacketCallback() { - @Override - public void error(FederationError error) { - listener.onFailure(error); - } - - @Override - public void run(Packet packet) { - processPostSignerResponse(packet, listener); - } - }; - - disco.discoverRemoteJid( - remoteDomain, new SuccessFailCallback<String, String>() { - @Override - public void onSuccess(String remoteJid) { - Preconditions.checkNotNull(remoteJid); - request.setTo(remoteJid); - manager.send(request, callback, XMPP_PROVIDER_TIMEOUT); - } - - @Override - public void onFailure(String errorMessage) { - listener.onFailure(FederationErrors.badRequest( - "No such wave server " + remoteDomain + ": " + errorMessage)); - } - }); - } - - /** - * Handles a wavelet update message from a foreign Federation Host. Passes the - * message to the local waveserver (synchronously) and replies. - * - * @param updateMessage the incoming XMPP message. - * @param responseCallback response callback for acks and errors - */ - public void update(final Message updateMessage, final PacketCallback responseCallback) { - final Element receiptRequested = - updateMessage.getChildElement("request", XmppNamespace.NAMESPACE_XMPP_RECEIPTS); - - // Check existence of <event> - Element event = updateMessage.getChildElement("event", XmppNamespace.NAMESPACE_PUBSUB_EVENT); - if (event == null) { - responseCallback.error(FederationErrors.badRequest("Event element missing from message")); - return; - } - - // Check existence of <items> within <event> - Element items = event.element("items"); - if (items == null) { - responseCallback.error(FederationErrors.badRequest( - "Items element missing from update message")); - return; - } - - // Complain if no items have been included. - List<Element> elements = XmppUtil.toSafeElementList(items.elements("item")); - if (elements.isEmpty()) { - responseCallback.error(FederationErrors.badRequest("No items included")); - return; - } - - // Create a callback latch counter and corresponding countDown runnable. - // When the latch reaches zero, send receipt (if it was requested). - final AtomicInteger callbackCount = new AtomicInteger(1); - final Runnable countDown = new Runnable() { - @Override - public void run() { - if (callbackCount.decrementAndGet() == 0 && receiptRequested != null) { - Message response = XmppUtil.createResponseMessage(updateMessage); - response.addChildElement("received", XmppNamespace.NAMESPACE_XMPP_RECEIPTS); - responseCallback.run(response); - } - } - }; - - WaveletFederationListener.WaveletUpdateCallback callback = - new WaveletFederationListener.WaveletUpdateCallback() { - @Override - public void onSuccess() { - countDown.run(); - } - - @Override - public void onFailure(FederationError error) { - // Note that we don't propogate the error, we just ack the stanza - // and continue. - // TODO(thorogood): We may want to rate-limit misbehaving servers - // that are sending us invalid/malicious data. - LOG.warning("Incoming XMPP waveletUpdate failure: " + error); - countDown.run(); - } - }; - - // We must call callback once on every iteration to ensure that we send - // response if receiptRequested != null. - for (Element item : elements) { - Element waveletUpdate = item.element("wavelet-update"); - - if (waveletUpdate == null) { - callback.onFailure(FederationErrors.badRequest( - "wavelet-update element missing from message: " + updateMessage)); - continue; - } - - final WaveletName waveletName; - try { - waveletName = XmppUtil.waveletNameCodec.uriToWaveletName( - waveletUpdate.attributeValue("wavelet-name")); - } catch (EncodingException e) { - callback.onFailure(FederationErrors.badRequest( - "Couldn't decode wavelet name: " + waveletUpdate.attributeValue("wavelet-name"))); - continue; - } - - WaveletFederationListener listener = - updatesListenerFactory.listenerForDomain(waveletName.waveletId.getDomain()); - - // Submit all applied deltas to the domain-focused listener. - ImmutableList.Builder<ByteString> builder = ImmutableList.builder(); - for (Element appliedDeltaElement : - XmppUtil.toSafeElementList(waveletUpdate.elements("applied-delta"))) { - builder.add(Base64Util.decode(appliedDeltaElement.getText())); - } - ImmutableList<ByteString> deltas = builder.build(); - if (!deltas.isEmpty()) { - callbackCount.incrementAndGet(); // Increment required callbacks. - listener.waveletDeltaUpdate(waveletName, deltas, callback); - } - - // Optionally submit any received last committed notice. - Element commitNoticeElement = waveletUpdate.element("commit-notice"); - if (commitNoticeElement != null) { - ProtocolHashedVersion version = ProtocolHashedVersion.newBuilder() - .setHistoryHash(Base64Util.decode(commitNoticeElement.attributeValue("history-hash"))) - .setVersion(Long.parseLong(commitNoticeElement.attributeValue("version"))).build(); - callbackCount.incrementAndGet(); // Increment required callbacks. - listener.waveletCommitUpdate(waveletName, version, callback); - } - } - - // Release sentinel so that 'expected' callbacks from the WS don't invoke - // sending a receipt. - countDown.run(); - } - - /** - * Parses the response to a submitRequest and passes the result to the correct - * wave server. - * - * @param result the XMPP Packet - * @param listener the listener to invoke with the response. - */ - private void processSubmitResponse(Packet result, SubmitResultListener listener) { - Element publish = null; - Element item = null; - Element submitResponse = null; - Element hashedVersionElement = null; - Element pubsub = ((IQ) result).getChildElement(); - if (pubsub != null) { - publish = pubsub.element("publish"); - if (publish != null) { - item = publish.element("item"); - if (item != null) { - submitResponse = item.element("submit-response"); - if (submitResponse != null) { - hashedVersionElement = submitResponse.element("hashed-version"); - } - } - } - } - - if (pubsub == null || publish == null || item == null - || submitResponse == null || hashedVersionElement == null - || hashedVersionElement.attribute("history-hash") == null - || hashedVersionElement.attribute("version") == null - || submitResponse.attribute("application-timestamp") == null - || submitResponse.attribute("operations-applied") == null) { - LOG.severe("Unexpected submitResponse to submit request: " + result); - listener.onFailure(FederationErrors.badRequest("Invalid submitResponse: " + result)); - return; - } - - ProtocolHashedVersion.Builder hashedVersion = ProtocolHashedVersion.newBuilder(); - hashedVersion.setHistoryHash( - Base64Util.decode(hashedVersionElement.attributeValue("history-hash"))); - hashedVersion.setVersion(Long.parseLong(hashedVersionElement.attributeValue("version"))); - long applicationTimestamp = - Long.parseLong(submitResponse.attributeValue("application-timestamp")); - int operationsApplied = Integer.parseInt(submitResponse.attributeValue("operations-applied")); - listener.onSuccess(operationsApplied, hashedVersion.build(), applicationTimestamp); - } - - /** - * Parses a response to a history request and passes the result to the wave - * server. - * - * @param historyResponse the XMPP packet - * @param listener interface to the wave server - */ - @SuppressWarnings("unchecked") - private void processHistoryResponse(Packet historyResponse, - WaveletFederationProvider.HistoryResponseListener listener) { - Element pubsubResponse = historyResponse.getElement().element("pubsub"); - Element items = pubsubResponse.element("items"); - long versionTruncatedAt = -1; - long lastCommittedVersion = -1; - List<ByteString> deltaList = Lists.newArrayList(); - - if (items != null) { - for (Element itemElement : (List<Element>) items.elements()) { - for (Element element : (List<Element>) itemElement.elements()) { - String elementName = element.getQName().getName(); - switch (elementName) { - case "applied-delta": - String deltaBody = element.getText(); - deltaList.add(ByteString.copyFrom(Base64.decodeBase64(deltaBody.getBytes()))); - break; - case "commit-notice": - Attribute commitVersion = element.attribute("version"); - if (commitVersion != null) { - try { - lastCommittedVersion = Long.parseLong(commitVersion.getValue()); - } catch (NumberFormatException e) { - lastCommittedVersion = -1; - } - } - break; - case "history-truncated": - Attribute truncVersion = element.attribute("version"); - if (truncVersion != null) { - try { - versionTruncatedAt = Long.parseLong(truncVersion.getValue()); - } catch (NumberFormatException e) { - versionTruncatedAt = -1; - } - } - break; - default: - listener.onFailure(FederationErrors.badRequest( - "Bad response packet: " + historyResponse)); - break; - } - } - } - } else { - listener.onFailure(FederationErrors.badRequest("Bad response packet: " + historyResponse)); - } - - final ProtocolHashedVersion lastCommitted; - if (lastCommittedVersion > -1) { - // TODO(thorogood): fedone doesn't send a history hash, and it's arguable - // that it's even sane to include it. - // Can't set it to null - NPE - lastCommitted = - ProtocolHashedVersion.newBuilder() - .setVersion(lastCommittedVersion).setHistoryHash(ByteString.EMPTY) - .build(); - } else { - lastCommitted = null; - } - listener.onSuccess(deltaList, lastCommitted, versionTruncatedAt); - } - - /** - * Parses a GetSigner response, passes result to the waveserver. - * - * @param packet the response packet - * @param listener the interface to the wave server - */ - private void processGetSignerResponse(Packet packet, DeltaSignerInfoResponseListener listener) { - IQ response = (IQ) packet; - Element items = response.getChildElement().element("items"); - Element signature = items.element("signature"); - if (signature == null) { - LOG.severe("Empty getDeltaSignerRequest response: " + response); - listener.onFailure(FederationErrors.badRequest("Bad getDeltaSignatureRequest response")); - return; - } - String domain = signature.attributeValue("domain"); - String hashName = signature.attributeValue("algorithm"); - if (domain == null || hashName == null || signature.element("certificate") == null) { - LOG.severe("Bad getDeltaSignerRequest response: " + response); - listener.onFailure(FederationErrors.badRequest("Bad getDeltaSignatureRequest response")); - return; - } - ProtocolSignerInfo signer; - try { - signer = XmppUtil.xmlToProtocolSignerInfo(signature); - } catch (UnknownSignerType e) { - listener.onFailure(FederationErrors.badRequest(e.toString())); - return; - } - listener.onSuccess(signer); - } - - /** - * Parses a response to a PostSigner request, passes result to wave server. - * - * @param packet the response XMPP packet - * @param listener the listener to invoke - */ - private void processPostSignerResponse( - Packet packet, - WaveletFederationProvider.PostSignerInfoResponseListener listener) { - IQ response = (IQ) packet; - Element pubsub = response.getChildElement(); - Element item = pubsub.element("publish").element("item"); - if (item.element("signature-response") != null) { - listener.onSuccess(); - } else { - listener.onFailure(FederationErrors.badRequest("No valid response")); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationTransport.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationTransport.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationTransport.java deleted file mode 100644 index bb4e654..0000000 --- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppFederationTransport.java +++ /dev/null @@ -1,50 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.wave.federation.xmpp; - -import com.google.inject.Inject; - -import org.waveprotocol.wave.federation.FederationTransport; -import org.waveprotocol.wave.util.logging.Log; -import org.xmpp.component.ComponentException; - -/** - * An implementation of {@link FederationManger} for XMPP federation. - * - * @author tad.gli...@gmail.com (Tad Glines) - */ -public class XmppFederationTransport implements FederationTransport { - private static final Log LOG = Log.get(XmppFederationTransport.class); - private final ComponentPacketTransport transport; - - @Inject - XmppFederationTransport(ComponentPacketTransport transport) { - this.transport = transport; - } - - @Override - public void startFederation() { - try { - transport.run(); - } catch (ComponentException e) { - LOG.warning("couldn't connect to XMPP server:", e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppManager.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppManager.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppManager.java deleted file mode 100644 index e0f2fb7..0000000 --- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppManager.java +++ /dev/null @@ -1,474 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.wave.federation.xmpp; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.MapMaker; -import com.google.inject.Inject; -import com.typesafe.config.Config; -import org.dom4j.Element; -import org.waveprotocol.wave.federation.FederationErrorProto.FederationError; -import org.waveprotocol.wave.federation.FederationErrors; -import org.xmpp.packet.IQ; -import org.xmpp.packet.Message; -import org.xmpp.packet.Packet; -import org.xmpp.packet.PacketError; - -import java.util.concurrent.*; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Provides abstraction between Federation-specific code and the backing XMPP - * transport, including support for reliable outgoing calls (i.e. calls that are - * guaranteed to time out) and sending error responses. - * - * TODO(thorogood): Find a better name for this class. Suggestions include - * PacketHandler, Switchbox, TransportConnector, ReliableRouter, ... - * - * @author thorog...@google.com (Sam Thorogood) - */ -public class XmppManager implements IncomingPacketHandler { - private static final Logger LOG = Logger.getLogger(XmppManager.class.getCanonicalName()); - - /** - * Inner static class representing a single outgoing call. - */ - private static class OutgoingCall { - final Class<? extends Packet> responseType; - PacketCallback callback; - ScheduledFuture<?> timeout; - - OutgoingCall(Class<? extends Packet> responseType, PacketCallback callback) { - this.responseType = responseType; - this.callback = callback; - } - - void start(ScheduledFuture<?> timeout) { - Preconditions.checkState(this.timeout == null); - this.timeout = timeout; - } - } - - /** - * Inner non-static class representing a single incoming call. These are not - * cancellable and do not time out; this is just a helper class so success and - * failure responses may be more cleanly invoked. - */ - private class IncomingCallback implements PacketCallback { - private final Packet request; - private boolean complete = false; - - IncomingCallback(Packet request) { - this.request = request; - } - - @Override - public void error(FederationError error) { - Preconditions.checkState(!complete, - "Must not callback multiple times for incoming packet: %s", request); - complete = true; - sendErrorResponse(request, error); - } - - @Override - public void run(Packet response) { - Preconditions.checkState(!complete, - "Must not callback multiple times for incoming packet: %s", request); - // TODO(thorogood): Check outgoing response versus stored incoming request - // to ensure that to/from are paired correctly? - complete = true; - transport.sendPacket(response); - } - } - - // Injected types that handle incoming XMPP packet types. - private final XmppFederationHost host; - private final XmppFederationRemote remote; - private final XmppDisco disco; - private final OutgoingPacketTransport transport; - private final String jid; - - // Pending callbacks to outgoing requests. - private final ConcurrentMap<String, OutgoingCall> callbacks = new MapMaker().makeMap(); - private final ScheduledExecutorService timeoutExecutor = - Executors.newSingleThreadScheduledExecutor(); - - @Inject - public XmppManager(XmppFederationHost host, XmppFederationRemote remote, XmppDisco disco, - OutgoingPacketTransport transport, Config config) { - this.host = host; - this.remote = remote; - this.disco = disco; - this.transport = transport; - this.jid = config.getString("federation.xmpp_jid"); - - // Configure all related objects with this manager. Eventually, this should - // be replaced by better Guice interface bindings. - host.setManager(this); - remote.setManager(this); - disco.setManager(this); - } - - @Override - public void receivePacket(final Packet packet) { - if (LOG.isLoggable(Level.FINE)) { - LOG.fine("Received incoming XMPP packet:\n" + packet); - } - - if (packet instanceof IQ) { - IQ iq = (IQ) packet; - if (iq.getType().equals(IQ.Type.result) || iq.getType().equals(IQ.Type.error)) { - // Result type, hand off to callback handler. - response(packet); - } else { - processIqGetSet(iq); - } - } else if (packet instanceof Message) { - Message message = (Message) packet; - if (message.getType().equals(Message.Type.error) - || message.getChildElement("received", XmppNamespace.NAMESPACE_XMPP_RECEIPTS) != null) { - // Response type, hand off to callback handler. - response(packet); - } else { - processMessage(message); - } - } else { - sendErrorResponse(packet, FederationError.Code.BAD_REQUEST, "Unhandled packet type: " - + packet.getElement().getQName().getName()); - } - } - - /** - * Populate the given request subclass of Packet and return it. - */ - private <V extends Packet> V createRequest(V packet, String toJid) { - packet.setTo(toJid); - packet.setID(XmppUtil.generateUniqueId()); - packet.setFrom(jid); - return packet; - } - - /** - * Create a request IQ stanza with the given toJid. - * - * @param toJid target JID - * @return new IQ stanza - */ - public IQ createRequestIQ(String toJid) { - return createRequest(new IQ(), toJid); - } - - /** - * Create a request Message stanza with the given toJid. - * - * @param toJid target JID - * @return new Message stanza - */ - public Message createRequestMessage(String toJid) { - return createRequest(new Message(), toJid); - } - - /** - * Sends the given XMPP packet over the backing transport. This accepts a - * callback which is guaranteed to be invoked at a later point, either through - * a normal response, error response, or timeout. - * - * @param packet packet to be sent - * @param callback callback to be invoked on response or timeout - * @param timeout timeout, in seconds, for this callback - */ - public void send(Packet packet, final PacketCallback callback, int timeout) { - final String key = packet.getID() + "#" + packet.getTo() + "#" + packet.getFrom(); - - final OutgoingCall call = new OutgoingCall(packet.getClass(), callback); - if (callbacks.putIfAbsent(key, call) == null) { - // Timeout runnable to be invoked on packet expiry. - Runnable timeoutTask = new Runnable() { - @Override - public void run() { - if (callbacks.remove(key, call)) { - callback.error( - FederationErrors.newFederationError(FederationError.Code.REMOTE_SERVER_TIMEOUT)); - } else { - // Likely race condition where success has actually occurred. Ignore. - } - } - }; - call.start(timeoutExecutor.schedule(timeoutTask, timeout, TimeUnit.SECONDS)); - transport.sendPacket(packet); - } else { - String msg = "Could not send packet, ID already in-flight: " + key; - LOG.warning(msg); - - // Invoke the callback with an internal error. - callback.error( - FederationErrors.newFederationError(FederationError.Code.UNDEFINED_CONDITION, msg)); - } - } - - /** - * Cause an immediate timeout for the given packet, which is presumed to have - * already been sent via {@link #send}. - */ - @VisibleForTesting - void causeImmediateTimeout(Packet packet) { - String key = packet.getID() + "#" + packet.getTo() + "#" + packet.getFrom(); - OutgoingCall call = callbacks.remove(key); - if (call != null) { - call.callback.error(FederationErrors.newFederationError( - FederationError.Code.REMOTE_SERVER_TIMEOUT, "Forced immediate timeout")); - } - } - - /** - * Invoke the callback for a packet already identified as a response. This may - * either invoke the error or normal callback as necessary. - */ - private void response(Packet packet) { - String key = packet.getID() + "#" + packet.getFrom() + "#" + packet.getTo(); - OutgoingCall call = callbacks.remove(key); - - if (call == null) { - LOG.warning("Received response packet without paired request: " + packet.getID()); - } else { - // Cancel the outstanding timeout. - call.timeout.cancel(false); - - // Look for error condition and invoke the relevant callback. - Element element = packet.getElement().element("error"); - if (element != null) { - LOG.fine("Invoking error callback for: " + packet.getID()); - call.callback.error(toFederationError(new PacketError(element))); - } else { - if (call.responseType.equals(packet.getClass())) { - LOG.fine("Invoking normal callback for: " + packet.getID()); - call.callback.run(packet); - } else { - String msg = - "Received mismatched response packet type: expected " + call.responseType - + ", given " + packet.getClass(); - LOG.warning(msg); - call.callback.error(FederationErrors.newFederationError( - FederationError.Code.UNDEFINED_CONDITION, msg)); - } - } - - // Clear call's reference to callback, otherwise callback only - // becomes eligible for GC once the timeout expires, because - // timeoutExecutor holds on to the call object till then, even - // though we cancelled the timeout. - call.callback = null; - } - } - - /** - * Process IQ request stanzas. This encompasses XMPP disco, submit and history - * requests/responses, and get/post signer info requests/responses. - */ - private void processIqGetSet(IQ iq) { - Element body = iq.getChildElement(); - if (body == null) { - sendErrorResponse(iq, FederationErrors.badRequest("Malformed request, no IQ child")); - return; - } - - final String namespace = body.getQName().getNamespace().getURI(); - final boolean isIQSet; - if (iq.getType().equals(IQ.Type.get)) { - isIQSet = false; - } else if (iq.getType().equals(IQ.Type.set)) { - isIQSet = true; - } else { - throw new IllegalArgumentException("Can only process an IQ get/set."); - } - PacketCallback responseCallback = new IncomingCallback(iq); - - if (namespace.equals(XmppNamespace.NAMESPACE_PUBSUB)) { - final Element pubsub = iq.getChildElement(); - final Element element = pubsub.element(isIQSet ? "publish" : "items"); - - if (element.attributeValue("node").equals("wavelet")) { - if (isIQSet) { - host.processSubmitRequest(iq, responseCallback); - } else { - host.processHistoryRequest(iq, responseCallback); - } - } else if (element.attributeValue("node").equals("signer")) { - if (isIQSet) { - host.processPostSignerRequest(iq, responseCallback); - } else { - host.processGetSignerRequest(iq, responseCallback); - } - } else { - sendErrorResponse(iq, FederationError.Code.BAD_REQUEST, "Unhandled pubsub request"); - } - } else if (!isIQSet) { - switch (namespace) { - case XmppNamespace.NAMESPACE_DISCO_INFO: - disco.processDiscoInfoGet(iq, responseCallback); - break; - case XmppNamespace.NAMESPACE_DISCO_ITEMS: - disco.processDiscoItemsGet(iq, responseCallback); - break; - default: - sendErrorResponse(iq, FederationError.Code.BAD_REQUEST, "Unhandled IQ get"); - break; - } - } else { - sendErrorResponse(iq, FederationError.Code.BAD_REQUEST, "Unhandled IQ set"); - } - } - - /** - * Processes Message stanzas. This encompasses wavelet updates, update acks, - * and ping messages. - */ - private void processMessage(Message message) { - if (message.getChildElement("event", XmppNamespace.NAMESPACE_PUBSUB_EVENT) != null) { - remote.update(message, new IncomingCallback(message)); - } else if (message.getChildElement("ping", XmppNamespace.NAMESPACE_WAVE_SERVER) != null) { - // Respond inline to the ping. - LOG.info("Responding to ping from: " + message.getFrom()); - Message response = XmppUtil.createResponseMessage(message); - response.addChildElement("received", XmppNamespace.NAMESPACE_XMPP_RECEIPTS); - transport.sendPacket(response); - } else { - sendErrorResponse(message, FederationError.Code.BAD_REQUEST, "Unhandled message type"); - } - } - - /** - * Helper method to send generic error responses, backed onto - * {@link #sendErrorResponse(Packet, FederationError)}. - */ - void sendErrorResponse(Packet request, FederationError.Code code) { - sendErrorResponse(request, FederationErrors.newFederationError(code)); - } - - /** - * Helper method to send error responses, backed onto - * {@link #sendErrorResponse(Packet, FederationError)}. - */ - void sendErrorResponse(Packet request, FederationError.Code code, String text) { - sendErrorResponse(request, FederationErrors.newFederationError(code, text)); - } - - /** - * Send an error request to the passed incoming request. - * - * @param request packet request, target is derived from its to/from - * @param error error to be contained in response - */ - void sendErrorResponse(Packet request, FederationError error) { - if (error.getErrorCode() == FederationError.Code.OK) { - throw new IllegalArgumentException("Can't send an error of OK!"); - } - sendErrorResponse(request, toPacketError(error)); - } - - /** - * Send an error response to the passed incoming request. Throws - * IllegalArgumentException if the original packet is also an error, or is of - * the IQ result type. - * - * According to RFC 3920 (9.3.1), the error packet may contain the original - * packet. However, this implementation does not include it. - * - * @param request packet request, to/from is inverted for response - * @param error packet error describing error condition - */ - void sendErrorResponse(Packet request, PacketError error) { - if (request instanceof IQ) { - IQ.Type type = ((IQ) request).getType(); - if (!(type.equals(IQ.Type.get) || type.equals(IQ.Type.set))) { - throw new IllegalArgumentException("May only return an error to IQ get/set, not: " + type); - } - } else if (request instanceof Message) { - Message message = (Message) request; - if (message.getType().equals(Message.Type.error)) { - throw new IllegalArgumentException("Can't return an error to another message error"); - } - } else { - throw new IllegalArgumentException("Unexpected Packet subclass, expected Message/IQ: " - + request.getClass()); - } - - LOG.fine("Sending error condition in response to " + request.getID() + ": " - + error.getCondition().name()); - - // Note that this does not include the original packet; just the ID. - final Packet response = XmppUtil.createResponsePacket(request); - response.setError(error); - - transport.sendPacket(response); - } - - /** - * Convert a FederationError instance to a PacketError. This may return - * <undefined-condition> if the incoming error can't be understood. - * - * @param error the incoming error - * @return a generated PacketError instance - * @throws IllegalArgumentException if the OK error code is given - */ - private static PacketError toPacketError(FederationError error) { - Preconditions.checkArgument(error.getErrorCode() != FederationError.Code.OK); - - String tag = error.getErrorCode().name().toLowerCase().replace('_', '-'); - PacketError.Condition condition; - try { - condition = PacketError.Condition.fromXMPP(tag); - } catch (IllegalArgumentException e) { - condition = PacketError.Condition.undefined_condition; - LOG.warning("Did not understand error condition, defaulting to: " + condition.name()); - } - PacketError result = new PacketError(condition); - if (error.hasErrorMessage()) { - // TODO(thorogood): Hide this behind a flag so we don't always broadcast error cases. - result.setText(error.getErrorMessage(), "en"); - } - return result; - } - - /** - * Convert a PacketError instance to an internal FederationError. This may - * return an error code of UNDEFINED_CONDITION if the incoming error can't be - * understood. - * - * @param error the incoming PacketError - * @return the generated FederationError instance - */ - private static FederationError toFederationError(PacketError error) { - String tag = error.getCondition().name().toUpperCase().replace('-', '_'); - FederationError.Code code; - try { - code = FederationError.Code.valueOf(tag); - } catch (IllegalArgumentException e) { - code = FederationError.Code.UNDEFINED_CONDITION; - } - FederationError.Builder builder = FederationError.newBuilder().setErrorCode(code); - if (error.getText() != null) { - builder.setErrorMessage(error.getText()); - } - return builder.build(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppNamespace.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppNamespace.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppNamespace.java deleted file mode 100644 index 7656a3b..0000000 --- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppNamespace.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.wave.federation.xmpp; - -/** - * Namespace definitions for the XMPP package. - * - * @author thorog...@google.com (Sam Thorogood) - */ -final class XmppNamespace { - - // Namespace definitions for packet types - static final String NAMESPACE_XMPP_RECEIPTS = "urn:xmpp:receipts"; - static final String NAMESPACE_DISCO_INFO = "http://jabber.org/protocol/disco#info"; - static final String NAMESPACE_DISCO_ITEMS = "http://jabber.org/protocol/disco#items"; - static final String NAMESPACE_PUBSUB = "http://jabber.org/protocol/pubsub"; - static final String NAMESPACE_PUBSUB_EVENT = "http://jabber.org/protocol/pubsub#event"; - static final String NAMESPACE_WAVE_SERVER = "http://waveprotocol.org/protocol/0.2/waveserver"; - - /** - * Uninstantiable class. - */ - private XmppNamespace() { - } - -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppUtil.java ---------------------------------------------------------------------- diff --git a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppUtil.java b/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppUtil.java deleted file mode 100644 index 7ca2971..0000000 --- a/wave/src/main/java/org/waveprotocol/wave/federation/xmpp/XmppUtil.java +++ /dev/null @@ -1,182 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.wave.federation.xmpp; - -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.ByteString; - -import org.dom4j.Element; -import org.waveprotocol.wave.federation.Proto.ProtocolSignerInfo; -import org.waveprotocol.wave.model.id.IdURIEncoderDecoder; -import org.waveprotocol.wave.util.escapers.jvm.JavaUrlCodec; -import org.xmpp.packet.IQ; -import org.xmpp.packet.Message; -import org.xmpp.packet.Packet; - -import java.nio.ByteBuffer; -import java.security.SecureRandom; -import java.util.List; -import java.util.Random; -import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicLong; - -/** - * Common utility code for XMPP packet generation and parsing. - */ -public class XmppUtil { - private static final AtomicLong idSequenceNo = new AtomicLong(0); - private static final Random random = new SecureRandom(); - - public static final IdURIEncoderDecoder waveletNameCodec = - new IdURIEncoderDecoder(new JavaUrlCodec()); - - // If non-null, this fake unique ID will be returned from generateUniqueId() - // rather than a random base64 string. - @VisibleForTesting - public static String fakeUniqueId = null; - - // Alternately, and better, this callable will be called each time an ID is needed, if non-null. - @VisibleForTesting - static Callable<String> fakeIdGenerator = null; - - private XmppUtil() { - } - - /** - * Helper method to translate from the XMPP package (1.4 without generics) to - * type-safe element lists. - */ - @SuppressWarnings({"cast", "unchecked", "rawtypes"}) - public static List<Element> toSafeElementList(List elements) { - return (List<Element>) elements; - } - - /** - * Checked exception thrown by signer conversion code. - */ - public static class UnknownSignerType extends Exception { - public UnknownSignerType(String algorithm) { - super(algorithm); - } - - public UnknownSignerType(String algorithm, Throwable stacked) { - super(algorithm, stacked); - } - } - - /** - * Convert the signer information to XML and place the result within the - * passed Element. This method should never fail. - */ - public static void protocolSignerInfoToXml(ProtocolSignerInfo signerInfo, Element parent) { - Element signature = parent.addElement("signature", XmppNamespace.NAMESPACE_WAVE_SERVER); - signature.addAttribute("domain", signerInfo.getDomain()); - ProtocolSignerInfo.HashAlgorithm hashValue = signerInfo.getHashAlgorithm(); - - signature.addAttribute("algorithm", hashValue.name()); - for (ByteString cert : signerInfo.getCertificateList()) { - signature.addElement("certificate").addCDATA(Base64Util.encode(cert)); - } - } - - /** - * Convert the given Element to a signer information XML element. - * - * @throws UnknownSignerType when the given hash algorithm is not understood - */ - public static ProtocolSignerInfo xmlToProtocolSignerInfo(Element signature) - throws UnknownSignerType { - ProtocolSignerInfo.HashAlgorithm hash; - String algorithm = signature.attributeValue("algorithm").toUpperCase(); - try { - hash = ProtocolSignerInfo.HashAlgorithm.valueOf(algorithm); - } catch (IllegalArgumentException e) { - throw new UnknownSignerType(algorithm, e); - } - - ProtocolSignerInfo.Builder builder = ProtocolSignerInfo.newBuilder(); - builder.setHashAlgorithm(hash); - builder.setDomain(signature.attributeValue("domain")); - for (Element certElement : toSafeElementList(signature.elements("certificate"))) { - builder.addCertificate(Base64Util.decode(certElement.getText())); - } - return builder.build(); - } - - /** - * Convenience method to create a response {@link Message} instance based on - * the passed request. Simply returns a new message instance with the same ID, - * but with inverse to/from addresses. - * - * @param request the request message - * @return the new response message - */ - public static Message createResponseMessage(Message request) { - Message response = new Message(); - response.setID(request.getID()); - response.setTo(request.getFrom()); - response.setFrom(request.getTo()); - return response; - } - - /** - * Convenience method to create a response {@link Packet} implementation from - * the given source packet. This will return either an {@link IQ} or - * {@link Message} depending on the passed type. - * - * @param request the request message - * @return the new response message - */ - public static Packet createResponsePacket(Packet request) { - if (request instanceof Message) { - return createResponseMessage((Message) request); - } else if (request instanceof IQ) { - return IQ.createResultIQ((IQ) request); - } else { - throw new IllegalArgumentException("Can't respond to unsupported packet type: " - + request.getClass()); - } - } - - /** - * Generate a unique string identifier for use in stanzas. - * - * @return unique string identifier - */ - public static String generateUniqueId() { - if (fakeIdGenerator != null) { - try { - return fakeIdGenerator.call(); - } catch (Exception e) { - // This is used in tests only. - throw new RuntimeException(e); - } - } - // TODO(arb): deprecate this. - if (fakeUniqueId != null) { - return fakeUniqueId; - } - - // Generate a base64 ID based on raw bytes. - byte[] bytes = ByteBuffer.allocate(16) - .putLong(random.nextLong()).putLong(idSequenceNo.incrementAndGet()).array(); - return Base64Util.encode(bytes); - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/test/java/org/waveprotocol/box/server/persistence/file/AccountStoreTest.java ---------------------------------------------------------------------- diff --git a/wave/src/test/java/org/waveprotocol/box/server/persistence/file/AccountStoreTest.java b/wave/src/test/java/org/waveprotocol/box/server/persistence/file/AccountStoreTest.java index c1bd5f0..6868ee3 100644 --- a/wave/src/test/java/org/waveprotocol/box/server/persistence/file/AccountStoreTest.java +++ b/wave/src/test/java/org/waveprotocol/box/server/persistence/file/AccountStoreTest.java @@ -19,6 +19,7 @@ package org.waveprotocol.box.server.persistence.file; +import com.google.common.collect.ImmutableMap; import com.typesafe.config.ConfigFactory; import org.waveprotocol.box.server.persistence.AccountStore; import org.waveprotocol.box.server.persistence.AccountStoreTestBase; @@ -42,7 +43,7 @@ public class AccountStoreTest extends AccountStoreTestBase { @Override protected AccountStore newAccountStore() { return new FileAccountStore( - ConfigFactory.parseString("core.account_store_directory : " + path.getAbsolutePath())); + ConfigFactory.parseMap (ImmutableMap.of("core.account_store_directory", path.getAbsolutePath()))); } @Override http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/test/java/org/waveprotocol/box/server/persistence/file/AttachmentStoreTest.java ---------------------------------------------------------------------- diff --git a/wave/src/test/java/org/waveprotocol/box/server/persistence/file/AttachmentStoreTest.java b/wave/src/test/java/org/waveprotocol/box/server/persistence/file/AttachmentStoreTest.java index 6ace3ee..53a68d3 100644 --- a/wave/src/test/java/org/waveprotocol/box/server/persistence/file/AttachmentStoreTest.java +++ b/wave/src/test/java/org/waveprotocol/box/server/persistence/file/AttachmentStoreTest.java @@ -19,6 +19,7 @@ package org.waveprotocol.box.server.persistence.file; +import com.google.common.collect.ImmutableMap; import com.typesafe.config.ConfigFactory; import org.waveprotocol.box.server.persistence.AttachmentStore; import org.waveprotocol.box.server.persistence.AttachmentStoreTestBase; @@ -41,7 +42,7 @@ public class AttachmentStoreTest extends AttachmentStoreTestBase { @Override protected AttachmentStore newAttachmentStore() { return new FileAttachmentStore( - ConfigFactory.parseString("core.attachment_store_directory : " + path.getAbsolutePath())); + ConfigFactory.parseMap (ImmutableMap.of("core.attachment_store_directory", path.getAbsolutePath()))); } @Override http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/test/java/org/waveprotocol/box/server/persistence/file/CertPathStoreTest.java ---------------------------------------------------------------------- diff --git a/wave/src/test/java/org/waveprotocol/box/server/persistence/file/CertPathStoreTest.java b/wave/src/test/java/org/waveprotocol/box/server/persistence/file/CertPathStoreTest.java index 0065d80..c10a774 100644 --- a/wave/src/test/java/org/waveprotocol/box/server/persistence/file/CertPathStoreTest.java +++ b/wave/src/test/java/org/waveprotocol/box/server/persistence/file/CertPathStoreTest.java @@ -19,6 +19,7 @@ package org.waveprotocol.box.server.persistence.file; +import com.google.common.collect.ImmutableMap; import com.typesafe.config.ConfigFactory; import org.waveprotocol.box.server.persistence.CertPathStoreTestBase; import org.waveprotocol.wave.crypto.CertPathStore; @@ -46,7 +47,8 @@ public class CertPathStoreTest extends CertPathStoreTestBase { @Override protected CertPathStore newCertPathStore() { - return new FileSignerInfoStore(ConfigFactory.parseString("core.signer_info_store_directory : " + path.getAbsolutePath())); + return new FileSignerInfoStore(ConfigFactory.parseMap ( + ImmutableMap.of("core.signer_info_store_directory", path.getAbsolutePath()))); } @Override http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/test/java/org/waveprotocol/box/server/persistence/file/DeltaStoreTest.java ---------------------------------------------------------------------- diff --git a/wave/src/test/java/org/waveprotocol/box/server/persistence/file/DeltaStoreTest.java b/wave/src/test/java/org/waveprotocol/box/server/persistence/file/DeltaStoreTest.java index 1bc18ee..3dbfaef 100644 --- a/wave/src/test/java/org/waveprotocol/box/server/persistence/file/DeltaStoreTest.java +++ b/wave/src/test/java/org/waveprotocol/box/server/persistence/file/DeltaStoreTest.java @@ -21,6 +21,7 @@ package org.waveprotocol.box.server.persistence.file; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.typesafe.config.ConfigFactory; import org.waveprotocol.box.server.persistence.DeltaStoreTestBase; import org.waveprotocol.box.server.waveserver.DeltaStore; @@ -55,8 +56,8 @@ public class DeltaStoreTest extends DeltaStoreTestBase { @Override protected DeltaStore newDeltaStore() { - return new FileDeltaStore(ConfigFactory.parseString("core.delta_store_directory : " + path - .getAbsolutePath())); + return new FileDeltaStore(ConfigFactory.parseMap ( + ImmutableMap.of("core.delta_store_directory", path.getAbsolutePath()))); } @Override http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/MockDisco.java ---------------------------------------------------------------------- diff --git a/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/MockDisco.java b/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/MockDisco.java deleted file mode 100644 index 5824cc8..0000000 --- a/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/MockDisco.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.wave.federation.xmpp; - -import com.google.common.base.Function; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import com.google.common.collect.MapMaker; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; - -import java.util.HashMap; -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * Tiny MockDisco class that wraps XmppDisco. - * - * Use {@link #testInjectInDomainToJidMap} to configure custom immediate responses, otherwise - * responses will be placed on a pending queue. - * - * @author thorog...@google.com (Sam Thorogood) - */ -public class MockDisco extends XmppDisco { - - private static final int FAIL_EXPIRY_SECS = 5 * 60; - private static final int SUCCESS_EXPIRY_SECS = 2 * 60 * 60; - private static final int DISCO_EXPIRY_HOURS = 6; - - public static final Config config; - - static { - Map<String, Object> props = new HashMap<>(); - props.put("federation.xmpp_server_description", "Wave in a Box"); - props.put("federation.disco_info_category", "collaboration"); - props.put("federation.disco_info_type", "apache-wave"); - props.put("federation.xmpp_disco_failed_expiry", FAIL_EXPIRY_SECS + "s"); - props.put("federation.xmpp_disco_successful_expiry", SUCCESS_EXPIRY_SECS + "s"); - props.put("federation.disco_expiration", DISCO_EXPIRY_HOURS + "h"); - - config = ConfigFactory.parseMap(props); - } - - MockDisco() { - super(config); - } - - public static class PendingMockDisco { - public final String remoteDomain; - public final Queue<SuccessFailCallback<String, String>> callbacks = Lists.newLinkedList(); - - private PendingMockDisco(String remoteDomain) { - this.remoteDomain = remoteDomain; - } - - private void addCallback(SuccessFailCallback<String, String> callback) { - callbacks.add(callback); - } - } - - public LoadingCache<String, PendingMockDisco> pending = CacheBuilder.newBuilder() - .build(new CacheLoader<String, PendingMockDisco>() { - @Override - public PendingMockDisco load(String domain) { - return new PendingMockDisco(domain); - } - }); - - @Override - public void discoverRemoteJid(String remoteDomain, SuccessFailCallback<String, String> callback) { - if (isDiscoRequestAvailable(remoteDomain)) { - // Note: tiny race condition in case this is purged between above and - // below, but since this is only used in tests, we can probably ignore it. - super.discoverRemoteJid(remoteDomain, callback); - } else { - try { - pending.get(remoteDomain).addCallback(callback); - } catch (ExecutionException ex) { - throw new RuntimeException(ex); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/MockOutgoingPacketTransport.java ---------------------------------------------------------------------- diff --git a/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/MockOutgoingPacketTransport.java b/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/MockOutgoingPacketTransport.java deleted file mode 100644 index 588fbe8..0000000 --- a/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/MockOutgoingPacketTransport.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.wave.federation.xmpp; - -import org.xmpp.packet.Packet; - -import java.util.LinkedList; -import java.util.Queue; - -/** - * Dummy implementation of {@link OutgoingPacketTransport} that stores packets - * being sent over-the-wire. May optionally accept a {@link Router} instance - * where packets are automatically forwarded. - * - * @author thorog...@google.com (Sam Thorogood) - */ -public class MockOutgoingPacketTransport implements OutgoingPacketTransport { - - public interface Router { - public void route(Packet packet); - } - - // wrapped router object, if null then packets are not routed - public Router router; - - // pending outgoing packets - public final Queue<Packet> packets = new LinkedList<Packet>(); - - // last packet sent - public Packet lastPacketSent = null; - - // total number of packets sent here - public long packetsSent = 0; - - public MockOutgoingPacketTransport() { - router = null; - } - - public MockOutgoingPacketTransport(Router router) { - this.router = router; - } - - @Override - public void sendPacket(Packet packet) { - if (!packets.offer(packet)) { - throw new IllegalStateException("Can't offer packet to queue: " + packets); - } - lastPacketSent = packet; - packetsSent++; - - if (router != null) { - router.route(packet); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/RemoteDiscoTest.java ---------------------------------------------------------------------- diff --git a/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/RemoteDiscoTest.java b/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/RemoteDiscoTest.java deleted file mode 100644 index 2668a74..0000000 --- a/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/RemoteDiscoTest.java +++ /dev/null @@ -1,138 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.wave.federation.xmpp; - -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import org.xmpp.packet.IQ; -import org.joda.time.DateTimeUtils; -import org.waveprotocol.wave.federation.FederationErrors; - - -import junit.framework.TestCase; - -/** - * Performs naive tests over RemoteDisco. Integration testing is performed in - * {@link XmppDiscoTest}. - * - * @author thorog...@google.com (Sam Thorogood) - */ - -public class RemoteDiscoTest extends TestCase { - - private final static String REMOTE_DOMAIN = "acmewave.com"; - private final static String REMOTE_JID = "wave.acmewave.com"; - private static final int SUCCESS_EXPIRY_SECS = 600; - private static final int FAIL_EXPIRY_SECS = 120; - private RemoteDisco remoteDisco; - private SuccessFailCallback<String, String> callback; - - protected void setUp() throws Exception { - super.setUp(); - XmppManager manager = mock(XmppManager.class); - when(manager.createRequestIQ(eq(REMOTE_DOMAIN))).thenReturn(new IQ()); - - DateTimeUtils.setCurrentMillisFixed(0); - remoteDisco = new RemoteDisco(manager, REMOTE_DOMAIN, FAIL_EXPIRY_SECS, - SUCCESS_EXPIRY_SECS); - callback = mockDiscoCallback(); - } - - @Override - protected void tearDown() throws Exception { - super.tearDown(); - DateTimeUtils.setCurrentMillisSystem(); - } - - @SuppressWarnings("unchecked") - private SuccessFailCallback<String, String> mockDiscoCallback() { - return mock(SuccessFailCallback.class); - } - - /** - * Test a RemoteDisco created with a forced success case. - */ - public void testForcedSuccess() { - RemoteDisco remoteDisco = new RemoteDisco(REMOTE_DOMAIN, REMOTE_JID, null); - - SuccessFailCallback<String, String> callback = mockDiscoCallback(); - remoteDisco.discoverRemoteJID(callback); - verify(callback).onSuccess(eq(REMOTE_JID)); - verify(callback, never()).onFailure(anyString()); - } - - /** - * Test a RemoteDisco created with a forced failure case. - */ - public void testForcedFailure() { - RemoteDisco remoteDisco = new RemoteDisco(REMOTE_DOMAIN, null, - FederationErrors.badRequest("irrelevant")); - - SuccessFailCallback<String, String> callback = mockDiscoCallback(); - remoteDisco.discoverRemoteJID(callback); - verify(callback, never()).onSuccess(anyString()); - verify(callback).onFailure(anyString()); - callback = mockDiscoCallback(); - remoteDisco.discoverRemoteJID(callback); - verify(callback, never()).onSuccess(anyString()); - verify(callback).onFailure(anyString()); - } - - /** - * Tests the disco expiry code for successful disco results. - */ - public void testTimeToLive() { - remoteDisco.discoverRemoteJID(callback); - assertFalse(remoteDisco.ttlExceeded()); - remoteDisco.finish(REMOTE_JID, null); // successful disco - assertFalse(remoteDisco.ttlExceeded()); - tick((SUCCESS_EXPIRY_SECS - 1) * 1000); // not quite expired - assertFalse(remoteDisco.ttlExceeded()); - tick(20 * 1000); // should now be expired - assertTrue(remoteDisco.ttlExceeded()); - } - - /** - * Tests the disco expiry code for failed disco results. - */ - public void testTimeToLiveDiscoFailed() { - remoteDisco.discoverRemoteJID(callback); - assertFalse(remoteDisco.ttlExceeded()); - remoteDisco.finish(null, FederationErrors.badRequest("test failure")); // failed disco - assertFalse(remoteDisco.ttlExceeded()); - tick((FAIL_EXPIRY_SECS - 1) * 1000); // not quite expired - assertFalse(remoteDisco.ttlExceeded()); - tick(20 * 1000); // should now be expired - assertTrue(remoteDisco.ttlExceeded()); - } - - /** - * Advance the clock. - * - * @param millis milliseconds to advance clock - */ - private void tick(int millis) { - DateTimeUtils.setCurrentMillisFixed(DateTimeUtils.currentTimeMillis() + millis); - } -} http://git-wip-us.apache.org/repos/asf/incubator-wave/blob/ed4feb70/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/RoundTripTest.java ---------------------------------------------------------------------- diff --git a/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/RoundTripTest.java b/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/RoundTripTest.java deleted file mode 100644 index fc928a6..0000000 --- a/wave/src/test/java/org/waveprotocol/wave/federation/xmpp/RoundTripTest.java +++ /dev/null @@ -1,378 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.waveprotocol.wave.federation.xmpp; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyZeroInteractions; - - -import com.typesafe.config.ConfigFactory; -import junit.framework.TestCase; - -import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; -import org.xmpp.packet.IQ; -import org.xmpp.packet.Message; -import org.xmpp.packet.Packet; -import org.xmpp.packet.PacketError; -import org.waveprotocol.wave.federation.FederationErrorProto.FederationError; -import org.waveprotocol.wave.federation.FederationErrors; -import org.waveprotocol.wave.federation.xmpp.MockOutgoingPacketTransport.Router; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -/** - * Test round-trips between two XmppManager instances pointed at each other. - * - * This class is not intended to test specific calls; it is primary to test - * reliable calls made by the manager along with error handling. Any specific - * call coverage is purely a side-effect of wanting real test data. - * - * @author thorog...@google.com (Sam Thorogood) - */ - -public class RoundTripTest extends TestCase { - - private static final String SERVER1_DOMAIN = "google.com"; - private static final String SERVER2_DOMAIN = "acmewave.com"; - - private static final int PACKET_TIMEOUT = 10; - - private static class ServerInstances { - final String jid; - final XmppManager manager; - final XmppFederationHost host; - final XmppFederationRemote remote; - final XmppDisco disco; - final MockOutgoingPacketTransport transport; - - ServerInstances(String domain, MockOutgoingPacketTransport.Router router) { - // Mocks. - host = mock(XmppFederationHost.class); - remote = mock(XmppFederationRemote.class); - disco = mock(XmppDisco.class); - - // 'Real' instantiated classes! - jid = "wave." + domain; - transport = new MockOutgoingPacketTransport(router); - - final Map<String, Object> props = new HashMap<>(); - props.put("federation.xmpp_disco_successful_expiry", "6s"); - props.put("federation.xmpp_jid", jid); - manager = new XmppManager( - host, remote, disco, transport, ConfigFactory.parseMap(props)); - - // Verify manager callback. - verify(host).setManager(eq(manager)); - verify(remote).setManager(eq(manager)); - verify(disco).setManager(eq(manager)); - } - } - - private ServerInstances server1; - private ServerInstances server2; - - @Override - public void setUp() throws Exception { - super.setUp(); - - server1 = new ServerInstances(SERVER1_DOMAIN, new Router() { - @Override - public void route(Packet packet) { - server2.manager.receivePacket(packet); - } - }); - server2 = new ServerInstances(SERVER2_DOMAIN, new Router() { - @Override - public void route(Packet packet) { - server1.manager.receivePacket(packet); - } - }); - } - - /** - * Test the simple case of packet send/receive by sending a malformed request. - */ - public void testPacketSendMalformedFailure() { - Packet packet = new IQ(); - packet.setFrom(server1.jid); - packet.setID("irrelevant"); - packet.setTo(server2.jid); - - PacketCallback callback = mock(PacketCallback.class); - - // Send an outgoing packet from server1 -> server2 - server1.manager.send(packet, callback, PACKET_TIMEOUT); - assertEquals("First transport should have a single packet pending", - 1, server1.transport.packets.size()); - assertEquals("First transport should have unmodified outgoing packet", - packet, server1.transport.packets.peek()); - - // Confirm that server2 sent back an error - assertEquals("Second transport should have a single packet pending", - 1, server2.transport.packets.size()); - assertNotNull("Second transport should be an error packet", - server2.transport.packets.peek().getError()); - - // Ensure the error is interpreted correctly and returned to the callback - ArgumentCaptor<FederationError> errorCaptor = ArgumentCaptor.forClass(FederationError.class); - verify(callback).error(errorCaptor.capture()); - verify(callback, never()).run(any(Packet.class)); - assertEquals("Invalid packet was sent, error should be BAD_REQUEST", - FederationError.Code.BAD_REQUEST, errorCaptor.getValue().getErrorCode()); - } - - /** - * Test the simple case of having a response invoked based entirely on the - * timeout case. - */ - public void testPacketTimeout() throws Exception { - int TIMEOUT_DELAY = 0; - int TIMEOUT_WAIT = 5; - - // Send a valid packet, so it is received by the remote Disco mock, but not processed. - IQ packet = new IQ(); - packet.setFrom(server1.jid); - packet.setID("disco"); - packet.setTo(server2.jid); - packet.setType(IQ.Type.get); - packet.setChildElement("query", XmppNamespace.NAMESPACE_DISCO_ITEMS); - - PacketCallback callback = mock(PacketCallback.class); - final CountDownLatch finished = new CountDownLatch(1); - - Mockito.doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - FederationError error = (FederationError) invocation.getArguments()[0]; - assertEquals(FederationError.Code.REMOTE_SERVER_TIMEOUT, error.getErrorCode()); - finished.countDown(); - return null; - } - }).when(callback).error(any(FederationError.class)); - - server1.manager.send(packet, callback, TIMEOUT_DELAY); - assertTrue(finished.await(TIMEOUT_WAIT, TimeUnit.SECONDS)); - verify(callback, never()).run(any(Packet.class)); - - // Mockito says never to reset a mock, but we have to as the callback is - // already used deep in XmppManager. - Mockito.reset(callback); - - // For fun, process the request by the remote disco and return a response - // that will never be processed. - ArgumentCaptor<PacketCallback> server2Callback = ArgumentCaptor.forClass(PacketCallback.class); - verify(server2.disco).processDiscoItemsGet(eq(packet), server2Callback.capture()); - XmppDisco realDisco = new XmppDisco(MockDisco.config); - realDisco.setManager(server2.manager); - realDisco.processDiscoItemsGet(packet, server2Callback.getValue()); - - // Confirm disco on server2 has replied with a packet. - assertEquals(1, server2.transport.packets.size()); - assertEquals(null, server2.transport.packets.peek().getError()); - - // Confirm, however, that the packet is dropped by the first manager (no pending call!). - verifyZeroInteractions(callback); - } - - /** - * Test that an arbitrary error response is properly returned when generated - * by the second server. Also ensure that the second server can't invoke its - * callback twice. - */ - public void testErrorResponse() { - FederationError.Code TEST_CODE = FederationError.Code.NOT_AUTHORIZED; - PacketError.Condition TEST_CONDITION = PacketError.Condition.not_authorized; - - // Send a valid packet, so it is received by the remote Disco mock, but not - // explicitly processed. - IQ packet = new IQ(); - packet.setFrom(server1.jid); - packet.setID("disco"); - packet.setTo(server2.jid); - packet.setType(IQ.Type.get); - packet.setChildElement("query", XmppNamespace.NAMESPACE_DISCO_ITEMS); - - PacketCallback callback = mock(PacketCallback.class); - server1.manager.send(packet, callback, PACKET_TIMEOUT); - - // Accept the disco request and return TEST_CODE error. - ArgumentCaptor<PacketCallback> server2Callback = ArgumentCaptor.forClass(PacketCallback.class); - verify(server2.disco).processDiscoItemsGet(eq(packet), server2Callback.capture()); - server2Callback.getValue().error(FederationErrors.newFederationError(TEST_CODE)); - - // Try to then complete the message, but cause an IllegalStateException. - IQ fakeResponse = IQ.createResultIQ(packet); - try { - server2Callback.getValue().run(fakeResponse); - fail("Should not be able to invoke callback twice"); - } catch (IllegalStateException e) { - // pass - } - - // Check the outgoing packet log. - assertEquals(1, server2.transport.packets.size()); - Packet errorResponse = server2.transport.packets.peek(); - PacketError error = errorResponse.getError(); - assertNotNull(error); - assertEquals(TEST_CONDITION, error.getCondition()); - - // Assert that the error response does *not* include the original packet. - assertTrue(errorResponse instanceof IQ); - IQ errorIQ = (IQ) errorResponse; - assertEquals(null, errorIQ.getChildElement()); - - // Confirm that the error is received properly on the first server. - ArgumentCaptor<FederationError> returnedError = ArgumentCaptor.forClass(FederationError.class); - verify(callback).error(returnedError.capture()); - verify(callback, never()).run(any(Packet.class)); - assertEquals(TEST_CODE, returnedError.getValue().getErrorCode()); - - // If we push the error again, it should be dropped. Note that resetting the - // callback here is the simplest way to test this, since it is already - // registered inside the manager. - reset(callback); - server1.manager.receivePacket(errorResponse); - verifyZeroInteractions(callback); - } - - /** - * Test that an unhandled error (e.g. <forbidden>) is translated to - * UNDEFINED_CONDITION before being returned to the mocked callback. - */ - public void testUnhandledErrorResponse() { - IQ packet = new IQ(); - packet.setFrom(server1.jid); - packet.setID("foo"); - packet.setTo(server2.jid); - - // Disable routing so we can intercept the packet. - server1.transport.router = null; - PacketCallback callback = mock(PacketCallback.class); - server1.manager.send(packet, callback, PACKET_TIMEOUT); - - // Generate an explicit error <forbidden>. - IQ errorPacket = IQ.createResultIQ(packet); - errorPacket.setError(PacketError.Condition.forbidden); - server1.manager.receivePacket(errorPacket); - - // Confirm that <forbidden> is transformed to UNDEFINED_CONDITION. - ArgumentCaptor<FederationError> returnedError = ArgumentCaptor.forClass(FederationError.class); - verify(callback).error(returnedError.capture()); - verify(callback, never()).run(any(Packet.class)); - assertEquals(FederationError.Code.UNDEFINED_CONDITION, returnedError.getValue().getErrorCode()); - } - - /** - * Test that packet IDs cannot be re-used while in-flight, and also that may - * be re-used later. - */ - public void testReusePacketId() throws Exception { - int REUSE_FAIL_WAIT = 5; - - IQ packet = new IQ(); - packet.setFrom(server1.jid); - packet.setID("foo-packet"); - packet.setTo(server2.jid); - - // Disable routing so we can intercept the packet. - server1.transport.router = null; - PacketCallback callback = mock(PacketCallback.class); - server1.manager.send(packet, callback, PACKET_TIMEOUT); - assertEquals(1, server1.transport.packets.size()); - assertEquals(packet, server1.transport.packets.poll()); - - // Try sending another packet with the same ID - must fail (called back in - // another thread)! - PacketCallback invalidCallback = mock(PacketCallback.class); - final CountDownLatch finished = new CountDownLatch(1); - Mockito.doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - FederationError error = (FederationError) invocation.getArguments()[0]; - assertEquals(FederationError.Code.UNDEFINED_CONDITION, error.getErrorCode()); - finished.countDown(); - return null; - } - }).when(invalidCallback).error(any(FederationError.class)); - - server1.manager.send(packet, invalidCallback, PACKET_TIMEOUT); - assertTrue(finished.await(REUSE_FAIL_WAIT, TimeUnit.SECONDS)); - verify(invalidCallback, never()).run(any(Packet.class)); - - // Generate an explicit success response. - IQ successPacket = IQ.createResultIQ(packet); - server1.manager.receivePacket(successPacket); - verify(callback).run(eq(successPacket)); - verify(callback, never()).error(any(FederationError.class)); - - // Again, re-use the ID: should succeed since it is cleared from callbacks. - PacketCallback zeroCallback = mock(PacketCallback.class); - server1.manager.send(packet, zeroCallback, PACKET_TIMEOUT); - assertEquals(1, server1.transport.packets.size()); - assertEquals(packet, server1.transport.packets.poll()); - verifyZeroInteractions(zeroCallback); - } - - /** - * Test that if (e.g.) an IQ is sent, then an IQ must be returned as a - * response. If a Message is returned instead, this should invoke an error - * callback. - */ - public void testDropInvalidResponseType() throws Exception { - IQ packet = server1.manager.createRequestIQ(server2.jid); - - // Disable routing so we can intercept the packet. - server1.transport.router = null; - PacketCallback callback = mock(PacketCallback.class); - server1.manager.send(packet, callback, PACKET_TIMEOUT); - - // Generate an explicit Message receipt. - Message response = new Message(); - response.setTo(packet.getFrom()); - response.setID(packet.getID()); - response.setFrom(packet.getTo()); - response.addChildElement("received", XmppNamespace.NAMESPACE_XMPP_RECEIPTS); - server1.manager.receivePacket(response); - - // Confirm that an error callback is invoked. - ArgumentCaptor<FederationError> returnedError = ArgumentCaptor.forClass(FederationError.class); - verify(callback).error(returnedError.capture()); - verify(callback, never()).run(any(Packet.class)); - assertEquals(FederationError.Code.UNDEFINED_CONDITION, returnedError.getValue().getErrorCode()); - - // Confirm that sending a correct response now does nothing. - reset(callback); - IQ correctResponse = IQ.createResultIQ(packet); - server1.manager.receivePacket(correctResponse); - verifyZeroInteractions(callback); - } - -}