Author: keith Date: Wed Aug 6 03:31:43 2008 New Revision: 20465 URL: http://wso2.org/svn/browse/wso2?view=rev&revision=20465
Log: Fixing Mashup-1068. Need to patch axis2 again Added: branches/mashup/java/1.5/java/modules/patches/axis2/src/org/apache/axis2/transport/xmpp/ branches/mashup/java/1.5/java/modules/patches/axis2/src/org/apache/axis2/transport/xmpp/XMPPListener.java branches/mashup/java/1.5/java/modules/patches/axis2/src/org/apache/axis2/transport/xmpp/XMPPSender.java branches/mashup/java/1.5/java/modules/patches/axis2/src/org/apache/axis2/transport/xmpp/util/ branches/mashup/java/1.5/java/modules/patches/axis2/src/org/apache/axis2/transport/xmpp/util/XMPPClientSidePacketListener.java branches/mashup/java/1.5/java/modules/patches/axis2/src/org/apache/axis2/transport/xmpp/util/XMPPPacketListener.java Modified: branches/mashup/java/1.5/java/modules/patches/axis2/resources/axis2-patches.txt Modified: branches/mashup/java/1.5/java/modules/patches/axis2/resources/axis2-patches.txt URL: http://wso2.org/svn/browse/wso2/branches/mashup/java/1.5/java/modules/patches/axis2/resources/axis2-patches.txt?rev=20465&r1=20464&r2=20465&view=diff ============================================================================== --- branches/mashup/java/1.5/java/modules/patches/axis2/resources/axis2-patches.txt (original) +++ branches/mashup/java/1.5/java/modules/patches/axis2/resources/axis2-patches.txt Wed Aug 6 03:31:43 2008 @@ -8,3 +8,4 @@ https://wso2.org/jira/browse/MASHUP-1048 https://wso2.org/jira/browse/MASHUP-1056 https://wso2.org/jira/browse/MASHUP-1061 +https://wso2.org/jira/browse/MASHUP-1068 Added: branches/mashup/java/1.5/java/modules/patches/axis2/src/org/apache/axis2/transport/xmpp/XMPPListener.java URL: http://wso2.org/svn/browse/wso2/branches/mashup/java/1.5/java/modules/patches/axis2/src/org/apache/axis2/transport/xmpp/XMPPListener.java?pathrev=20465 ============================================================================== --- (empty file) +++ branches/mashup/java/1.5/java/modules/patches/axis2/src/org/apache/axis2/transport/xmpp/XMPPListener.java Wed Aug 6 03:31:43 2008 @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.axis2.transport.xmpp; + +import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService; +import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue; +import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor; +import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; +import org.apache.axiom.om.OMElement; +import org.apache.axis2.AxisFault; +import org.apache.axis2.addressing.EndpointReference; +import org.apache.axis2.context.ConfigurationContext; +import org.apache.axis2.context.MessageContext; +import org.apache.axis2.context.SessionContext; +import org.apache.axis2.description.Parameter; +import org.apache.axis2.description.ParameterIncludeImpl; +import org.apache.axis2.description.TransportInDescription; +import org.apache.axis2.transport.TransportListener; +import org.apache.axis2.transport.xmpp.util.XMPPConnectionFactory; +import org.apache.axis2.transport.xmpp.util.XMPPConstants; +import org.apache.axis2.transport.xmpp.util.XMPPPacketListener; +import org.apache.axis2.transport.xmpp.util.XMPPServerCredentials; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.jivesoftware.smack.XMPPConnection; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + + +public class XMPPListener implements TransportListener { + private static Log log = LogFactory.getLog(XMPPListener.class); + private ConfigurationContext configurationContext = null; + private String replyTo = ""; + + /** + * A Map containing the connection factories managed by this, + * keyed by userName-at-jabberServerURL + */ + private Map connectionFactories = new HashMap(); + private ExecutorService workerPool; + private static final int WORKERS_MAX_THREADS = 5; + private static final long WORKER_KEEP_ALIVE = 60L; + private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS; + private XMPPConnection xmppConnection = null; + + + + public XMPPListener() { + } + + /** + * Initializing the XMPPListener. Retrieve connection details provided in + * xmpp transport receiver, connect to those servers & start listening in + * for messages. + */ + public void init(ConfigurationContext configurationCtx, TransportInDescription transportIn) + throws AxisFault { + log.info("Initializing XMPPListener..."); + configurationContext = configurationCtx; + initializeConnectionFactories(configurationContext,transportIn); + if (connectionFactories.isEmpty()) { + log.warn("No XMPP connection factories defined." + + "Will not listen for any XMPP messages"); + return; + } + } + + /** + * Extract connection details & connect to those xmpp servers. + * @see init(ConfigurationContext configurationCtx, TransportInDescription transportIn) + * @param configurationContext + * @param transportIn + */ + private void initializeConnectionFactories( + ConfigurationContext configurationContext, + TransportInDescription transportIn) throws AxisFault{ + + Iterator serversToListenOn = transportIn.getParameters().iterator(); + while (serversToListenOn.hasNext()) { + Parameter connection = (Parameter) serversToListenOn.next(); + log.info("Trying to establish connection for : "+connection.getName()); + ParameterIncludeImpl pi = new ParameterIncludeImpl(); + try { + pi.deserializeParameters((OMElement) connection.getValue()); + } catch (AxisFault axisFault) { + log.error("Error reading parameters"); + } + + Iterator params = pi.getParameters().iterator(); + XMPPServerCredentials serverCredentials = + new XMPPServerCredentials(); + + while (params.hasNext()) { + Parameter param = (Parameter) params.next(); + if(XMPPConstants.XMPP_SERVER_URL.equals(param.getName())){ + serverCredentials.setServerUrl((String)param.getValue()); + }else if(XMPPConstants.XMPP_SERVER_USERNAME.equals(param.getName())){ + serverCredentials.setAccountName((String)param.getValue()); + }else if(XMPPConstants.XMPP_SERVER_PASSWORD.equals(param.getName())){ + serverCredentials.setPassword((String)param.getValue()); + }else if(XMPPConstants.XMPP_SERVER_TYPE.equals(param.getName())){ + serverCredentials.setServerType((String)param.getValue()); + } + } + XMPPConnectionFactory xmppConnectionFactory = new XMPPConnectionFactory(); + xmppConnectionFactory.connect(serverCredentials); + + connectionFactories.put(serverCredentials.getAccountName() + "@" + + serverCredentials.getServerUrl(), xmppConnectionFactory); + } + } + + /** + * Stop XMPP listener & disconnect from all XMPP Servers + */ + public void stop() { + if (workerPool != null || !workerPool.isShutdown()) { + workerPool.shutdown(); + } + //TODO : Iterate through all connections in connectionFactories & call disconnect() + } + + /** + * Returns Default EPR for a given Service name & IP + * @param serviceName + * @param ip + */ + public EndpointReference getEPRForService(String serviceName, String ip) throws AxisFault { + return getEPRsForService(serviceName, ip)[0]; + } + + /** + * Returns all EPRs for a given Service name & IP + * @param serviceName + * @param ip + */ + public EndpointReference[] getEPRsForService(String serviceName, String ip) throws AxisFault { + return new EndpointReference[]{new EndpointReference(XMPPConstants.XMPP_PREFIX + + replyTo + "?" + configurationContext + .getServiceContextPath() + "/" + serviceName)}; + } + + + public SessionContext getSessionContext(MessageContext messageContext) { + return null; + } + + public void destroy() { + if(xmppConnection != null && xmppConnection.isConnected()){ + xmppConnection.disconnect(); + } + } + + /** + * Start a pool of Workers. For each connection in connectionFactories, + * assign a packer listener. This packet listener will trigger when a + * message arrives. + */ + public void start() throws AxisFault { + // create thread pool of workers + ExecutorService workerPool = new ThreadPoolExecutor( + 1, + WORKERS_MAX_THREADS, WORKER_KEEP_ALIVE, TIME_UNIT, + new LinkedBlockingQueue(), + new org.apache.axis2.util.threadpool.DefaultThreadFactory( + new ThreadGroup("XMPP Worker thread group"), + "XMPPWorker")); + + Iterator iter = connectionFactories.values().iterator(); + while (iter.hasNext()) { + XMPPConnectionFactory connectionFactory = (XMPPConnectionFactory) iter.next(); + XMPPPacketListener xmppPacketListener = + new XMPPPacketListener(connectionFactory,this.configurationContext,workerPool); + connectionFactory.listen(xmppPacketListener); + } + } +} Added: branches/mashup/java/1.5/java/modules/patches/axis2/src/org/apache/axis2/transport/xmpp/XMPPSender.java URL: http://wso2.org/svn/browse/wso2/branches/mashup/java/1.5/java/modules/patches/axis2/src/org/apache/axis2/transport/xmpp/XMPPSender.java?pathrev=20465 ============================================================================== --- (empty file) +++ branches/mashup/java/1.5/java/modules/patches/axis2/src/org/apache/axis2/transport/xmpp/XMPPSender.java Wed Aug 6 03:31:43 2008 @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.axis2.transport.xmpp; + +import org.apache.axiom.om.OMElement; +import org.apache.axis2.AxisFault; +import org.apache.axis2.Constants; +import org.apache.axis2.client.Options; +import org.apache.axis2.context.ConfigurationContext; +import org.apache.axis2.context.MessageContext; +import org.apache.axis2.description.Parameter; +import org.apache.axis2.description.TransportOutDescription; +import org.apache.axis2.description.WSDL2Constants; +import org.apache.axis2.handlers.AbstractHandler; +import org.apache.axis2.transport.OutTransportInfo; +import org.apache.axis2.transport.TransportSender; +import org.apache.axis2.transport.xmpp.util.XMPPClientSidePacketListener; +import org.apache.axis2.transport.xmpp.util.XMPPConnectionFactory; +import org.apache.axis2.transport.xmpp.util.XMPPConstants; +import org.apache.axis2.transport.xmpp.util.XMPPOutTransportInfo; +import org.apache.axis2.transport.xmpp.util.XMPPServerCredentials; +import org.apache.axis2.transport.xmpp.util.XMPPUtils; +import org.apache.axis2.util.Utils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.jivesoftware.smack.Chat; +import org.jivesoftware.smack.ChatManager; +import org.jivesoftware.smack.XMPPConnection; +import org.jivesoftware.smack.XMPPException; +import org.jivesoftware.smack.filter.PacketFilter; +import org.jivesoftware.smack.filter.PacketTypeFilter; +import org.jivesoftware.smack.packet.Message; + +public class XMPPSender extends AbstractHandler implements TransportSender { + Log log = null; + XMPPConnectionFactory connectionFactory; + XMPPServerCredentials serverCredentials; + + public XMPPSender() { + log = LogFactory.getLog(XMPPSender.class); + } + + public void cleanup(MessageContext msgContext) throws AxisFault { + } + + /** + * Initialize the transport sender by reading pre-defined connection factories for + * outgoing messages. These will create sessions (one per each destination dealt with) + * to be used when messages are being sent. + * @param confContext the configuration context + * @param transportOut the transport sender definition from axis2.xml + * @throws AxisFault on error + */ + public void init(ConfigurationContext confContext, + TransportOutDescription transportOut) throws AxisFault { + //if connection details are available from axis configuration + //use those & connect to jabber server(s) + serverCredentials = new XMPPServerCredentials(); + getConnectionDetailsFromAxisConfiguration(transportOut); + connectionFactory = new XMPPConnectionFactory(); + connectionFactory.connect(serverCredentials); + } + + /** + * Extract connection details from Client options + * @param msgCtx + */ + private void connectUsingClientOptions(MessageContext msgCtx) throws AxisFault{ + getConnectionDetailsFromClientOptions(msgCtx); + connectionFactory = new XMPPConnectionFactory(); + connectionFactory.connect(serverCredentials); + } + + public void stop() {} + + public InvocationResponse invoke(MessageContext msgContext) + throws AxisFault { + String targetAddress = (String) msgContext.getProperty( + Constants.Configuration.TRANSPORT_URL); + if (targetAddress != null) { + sendMessage(msgContext, targetAddress, null); + } else if (msgContext.getTo() != null && !msgContext.getTo().hasAnonymousAddress()) { + targetAddress = msgContext.getTo().getAddress(); + + if (!msgContext.getTo().hasNoneAddress()) { + sendMessage(msgContext, targetAddress, null); + } else { + //Don't send the message. + return InvocationResponse.CONTINUE; + } + } else if (msgContext.isServerSide()) { + // get the out transport info for server side when target EPR is unknown + sendMessage(msgContext, null, + (OutTransportInfo) msgContext.getProperty(Constants.OUT_TRANSPORT_INFO)); + } + return InvocationResponse.CONTINUE; + } + + /** + * Send the given message over XMPP transport + * + * @param msgCtx the axis2 message context + * @throws AxisFault on error + */ + public void sendMessage(MessageContext msgCtx, String targetAddress, + OutTransportInfo outTransportInfo) throws AxisFault { + XMPPConnection xmppConnection = null; + XMPPOutTransportInfo xmppOutTransportInfo = null; + + //if on client side,create connection to xmpp server + if(!msgCtx.isServerSide()){ + connectUsingClientOptions(msgCtx); + } + + Message message = new Message(); + Options options = msgCtx.getOptions(); + String serviceName = XMPPUtils.getServiceName(targetAddress); + + if (targetAddress != null) { + xmppOutTransportInfo = new XMPPOutTransportInfo(targetAddress); + xmppOutTransportInfo.setConnectionFactory(connectionFactory); + } else if (msgCtx.getTo() != null && + !msgCtx.getTo().hasAnonymousAddress()) { + //TODO + } else if (msgCtx.isServerSide()) { + xmppOutTransportInfo = (XMPPOutTransportInfo) + msgCtx.getProperty(Constants.OUT_TRANSPORT_INFO); + } + + + if(msgCtx.isServerSide()){ + xmppConnection = xmppOutTransportInfo.getConnectionFactory().getXmppConnection(); + message.setProperty(XMPPConstants.IS_SERVER_SIDE, new Boolean(false)); + message.setProperty(XMPPConstants.IN_REPLY_TO, xmppOutTransportInfo.getInReplyTo()); + }else{ + xmppConnection = xmppOutTransportInfo.getConnectionFactory().getXmppConnection(); + message.setProperty(XMPPConstants.IS_SERVER_SIDE, new Boolean(true)); + message.setProperty(XMPPConstants.SERVICE_NAME, serviceName); + message.setProperty(XMPPConstants.ACTION, options.getAction()); + } + + if(xmppConnection == null){ + handleException("Connection to XMPP Server is not established."); + } + + //initialize the chat manager using connection + ChatManager chatManager = xmppConnection.getChatManager(); + Chat chat = chatManager.createChat(xmppOutTransportInfo.getDestinationAccount(), null); + + try + { + OMElement msgElement = msgCtx.getEnvelope(); + if (msgCtx.isDoingREST()) { + msgElement = msgCtx.getEnvelope().getBody().getFirstElement(); + } + boolean waitForResponse = + msgCtx.getOperationContext() != null && + WSDL2Constants.MEP_URI_OUT_IN.equals( + msgCtx.getOperationContext().getAxisOperation().getMessageExchangePattern()); + + + String soapMessage = msgElement.toString(); + //int endOfXMLDeclaration = soapMessage.indexOf("?>"); + //String modifiedSOAPMessage = soapMessage.substring(endOfXMLDeclaration+2); + message.setBody(soapMessage); + + XMPPClientSidePacketListener xmppClientSidePacketListener = null; + if(waitForResponse && !msgCtx.isServerSide()){ + PacketFilter filter = new PacketTypeFilter(message.getClass()); + xmppClientSidePacketListener = new XMPPClientSidePacketListener(msgCtx); + xmppConnection.addPacketListener(xmppClientSidePacketListener,filter); + } + + chat.sendMessage(message); + log.debug("Sent message :"+message.toXML()); + + //If this is on client side, wait for the response from server. + //Is this the best way to do this? + if(waitForResponse && !msgCtx.isServerSide()){ + while(! xmppClientSidePacketListener.isResponseReceived()){ + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + log.debug("Sleep interrupted",e); + } + } + xmppConnection.disconnect(); + } + + } catch (XMPPException e) { + log.error("Error occurred while sending the message : "+message.toXML(),e); + handleException("Error occurred while sending the message : "+message.toXML(),e); + }finally{ + if(!msgCtx.isServerSide()){ + xmppConnection.disconnect(); + } + } + } + + + /** + * Extract connection details from axis2.xml's transportsender section + * @param serverCredentials + * @param transportOut + */ + private void getConnectionDetailsFromAxisConfiguration(TransportOutDescription transportOut){ + if(transportOut != null){ + Parameter serverUrl = transportOut.getParameter(XMPPConstants.XMPP_SERVER_URL); + if (serverUrl != null) { + serverCredentials.setServerUrl(Utils.getParameterValue(serverUrl)); + } + + Parameter userName = transportOut.getParameter(XMPPConstants.XMPP_SERVER_USERNAME); + if (userName != null) { + serverCredentials.setAccountName(Utils.getParameterValue(userName)); + } + + Parameter password = transportOut.getParameter(XMPPConstants.XMPP_SERVER_PASSWORD); + if (password != null) { + serverCredentials.setPassword(Utils.getParameterValue(password)); + } + + Parameter serverType = transportOut.getParameter(XMPPConstants.XMPP_SERVER_TYPE); + if (serverType != null) { + serverCredentials.setServerType(Utils.getParameterValue(serverType)); + } + } + } + + /** + * Extract connection details from client options + * @param serverCredentials + * @param msgContext + */ + private void getConnectionDetailsFromClientOptions(MessageContext msgContext){ + Options clientOptions = msgContext.getOptions(); + + if (clientOptions.getProperty(XMPPConstants.XMPP_SERVER_USERNAME) != null){ + serverCredentials.setAccountName((String)clientOptions.getProperty(XMPPConstants.XMPP_SERVER_USERNAME)); + } + if (clientOptions.getProperty(XMPPConstants.XMPP_SERVER_PASSWORD) != null){ + serverCredentials.setPassword((String)clientOptions.getProperty(XMPPConstants.XMPP_SERVER_PASSWORD)); + } + if (clientOptions.getProperty(XMPPConstants.XMPP_SERVER_URL) != null){ + serverCredentials.setServerUrl((String)clientOptions.getProperty(XMPPConstants.XMPP_SERVER_URL)); + } + if (clientOptions.getProperty(XMPPConstants.XMPP_SERVER_TYPE) != null){ + serverCredentials.setServerType((String)clientOptions.getProperty(XMPPConstants.XMPP_SERVER_TYPE)); + } + } + + private void handleException(String msg, Exception e) throws AxisFault { + log.error(msg, e); + throw new AxisFault(msg, e); + } + private void handleException(String msg) throws AxisFault { + log.error(msg); + throw new AxisFault(msg); + } +} \ No newline at end of file Added: branches/mashup/java/1.5/java/modules/patches/axis2/src/org/apache/axis2/transport/xmpp/util/XMPPClientSidePacketListener.java URL: http://wso2.org/svn/browse/wso2/branches/mashup/java/1.5/java/modules/patches/axis2/src/org/apache/axis2/transport/xmpp/util/XMPPClientSidePacketListener.java?pathrev=20465 ============================================================================== --- (empty file) +++ branches/mashup/java/1.5/java/modules/patches/axis2/src/org/apache/axis2/transport/xmpp/util/XMPPClientSidePacketListener.java Wed Aug 6 03:31:43 2008 @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.axis2.transport.xmpp.util; + +import org.apache.axis2.context.MessageContext; +import org.apache.commons.lang.StringEscapeUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.jivesoftware.smack.PacketListener; +import org.jivesoftware.smack.packet.Message; +import org.jivesoftware.smack.packet.Packet; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; + +public class XMPPClientSidePacketListener implements PacketListener { + private static Log log = LogFactory.getLog(XMPPClientSidePacketListener.class); + private MessageContext messageContext = null; + private boolean responseReceived; + + public XMPPClientSidePacketListener(MessageContext messageContext){ + this.messageContext = messageContext; + } + + /** + * This method will be triggered, when a message is arrived at client side + */ + public void processPacket(Packet packet) { + Message message = (Message)packet; + String xml = StringEscapeUtils.unescapeXml(message.getBody()); + log.debug("Client received message : "+xml); + this.responseReceived = true; + InputStream inputStream = new ByteArrayInputStream(xml.getBytes()); + messageContext.setProperty(MessageContext.TRANSPORT_IN, inputStream); + } + + /** + * Indicates response message is received at client side. + * @see processPacket(Packet packet) + * @return + */ + public boolean isResponseReceived() { + return responseReceived; + } + +} Added: branches/mashup/java/1.5/java/modules/patches/axis2/src/org/apache/axis2/transport/xmpp/util/XMPPPacketListener.java URL: http://wso2.org/svn/browse/wso2/branches/mashup/java/1.5/java/modules/patches/axis2/src/org/apache/axis2/transport/xmpp/util/XMPPPacketListener.java?pathrev=20465 ============================================================================== --- (empty file) +++ branches/mashup/java/1.5/java/modules/patches/axis2/src/org/apache/axis2/transport/xmpp/util/XMPPPacketListener.java Wed Aug 6 03:31:43 2008 @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.axis2.transport.xmpp.util; + +import edu.emory.mathcs.backport.java.util.concurrent.Executor; +import org.apache.axiom.om.OMException; +import org.apache.axiom.soap.SOAPEnvelope; +import org.apache.axis2.AxisFault; +import org.apache.axis2.Constants; +import org.apache.axis2.addressing.EndpointReference; +import org.apache.axis2.context.ConfigurationContext; +import org.apache.axis2.context.MessageContext; +import org.apache.axis2.description.AxisService; +import org.apache.axis2.description.TransportInDescription; +import org.apache.axis2.description.TransportOutDescription; +import org.apache.axis2.engine.AxisEngine; +import org.apache.axis2.transport.TransportUtils; +import org.apache.axis2.util.MessageContextBuilder; +import org.apache.commons.lang.StringEscapeUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.jivesoftware.smack.PacketListener; +import org.jivesoftware.smack.packet.Message; +import org.jivesoftware.smack.packet.Packet; + +import javax.xml.parsers.FactoryConfigurationError; +import javax.xml.stream.XMLStreamException; +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.HashMap; + +public class XMPPPacketListener implements PacketListener { + private static final Log log = LogFactory.getLog(XMPPPacketListener.class); + private XMPPConnectionFactory xmppConnectionFactory = null; + private ConfigurationContext configurationContext = null; + private Executor workerPool = null; + + public XMPPPacketListener(XMPPConnectionFactory xmppConnectionFactory, ConfigurationContext configurationContext, Executor workerPool) { + this.xmppConnectionFactory = xmppConnectionFactory; + this.configurationContext = configurationContext; + this.workerPool = workerPool; + } + + /** + * This method gets triggered when server side gets a message + */ + public void processPacket(Packet packet) { + log.debug("Received : "+packet.toXML()); + if(packet instanceof Message){ + workerPool.execute(new Worker(packet)); + } + } + + /** + * Creates message context using values received in XMPP packet + * @param packet + * @return MessageContext + * @throws AxisFault + */ + private MessageContext createMessageContext(Packet packet) throws AxisFault { + Message message = (Message) packet; + + Boolean isServerSide = (Boolean) message + .getProperty(XMPPConstants.IS_SERVER_SIDE); + String serviceName = (String) message + .getProperty(XMPPConstants.SERVICE_NAME); + String action = (String) message.getProperty(XMPPConstants.ACTION); + MessageContext msgContext = null; + + TransportInDescription transportIn = configurationContext + .getAxisConfiguration().getTransportIn("xmpp"); + TransportOutDescription transportOut = configurationContext + .getAxisConfiguration().getTransportOut("xmpp"); + if ((transportIn != null) && (transportOut != null)) { + msgContext = configurationContext.createMessageContext(); + msgContext.setTransportIn(transportIn); + msgContext.setTransportOut(transportOut); + if (isServerSide != null) { + msgContext.setServerSide(isServerSide.booleanValue()); + } + msgContext.setProperty( + org.apache.axis2.transport.mail.Constants.CONTENT_TYPE, + "text/xml"); + msgContext.setProperty( + Constants.Configuration.CHARACTER_SET_ENCODING, "UTF-8"); + msgContext.setIncomingTransportName("xmpp"); + + HashMap services = configurationContext.getAxisConfiguration() + .getServices(); + + AxisService axisService = (AxisService) services.get(serviceName); + msgContext.setAxisService(axisService); + msgContext.setSoapAction(action); + + // pass the configurationFactory to transport sender + msgContext.setProperty("XMPPConfigurationFactory", + this.xmppConnectionFactory); + + if (packet.getFrom() != null) { + msgContext.setFrom(new EndpointReference(packet.getFrom())); + } + if (packet.getTo() != null) { + msgContext.setTo(new EndpointReference(packet.getTo())); + } + + XMPPOutTransportInfo xmppOutTransportInfo = new XMPPOutTransportInfo(); + xmppOutTransportInfo + .setConnectionFactory(this.xmppConnectionFactory); + + String packetFrom = packet.getFrom(); + if (packetFrom != null) { + EndpointReference fromEPR = new EndpointReference(packetFrom); + xmppOutTransportInfo.setFrom(fromEPR); + xmppOutTransportInfo.setDestinationAccount(packetFrom); + } + + // Save Message-Id to set as In-Reply-To on reply + String xmppMessageId = packet.getPacketID(); + if (xmppMessageId != null) { + xmppOutTransportInfo.setInReplyTo(xmppMessageId); + } + msgContext.setProperty( + org.apache.axis2.Constants.OUT_TRANSPORT_INFO, + xmppOutTransportInfo); + buildSOAPEnvelope(packet, msgContext); + } else { + throw new AxisFault("Either transport in or transport out is null"); + } + return msgContext; + } + + /** + * builds SOAP envelop using message contained in packet + * @param packet + * @param msgContext + * @throws AxisFault + */ + private void buildSOAPEnvelope(Packet packet, MessageContext msgContext) throws AxisFault{ + Message message = (Message)packet; + String xml = StringEscapeUtils.unescapeXml(message.getBody()); + InputStream inputStream = new ByteArrayInputStream(xml.getBytes()); + SOAPEnvelope envelope; + try { + envelope = TransportUtils.createSOAPMessage(msgContext, inputStream, "text/xml"); + if(msgContext.isServerSide()){ + log.debug("Received Envelope : "+xml); + } + msgContext.setEnvelope(envelope); + }catch (OMException e) { + log.error("Error occured while trying to create " + + "message content using XMPP message received :"+packet.toXML(), e); + throw new AxisFault("Error occured while trying to create " + + "message content using XMPP message received :"+packet.toXML()); + }catch (XMLStreamException e) { + log.error("Error occured while trying to create " + + "message content using XMPP message received :"+packet.toXML(), e); + throw new AxisFault("Error occured while trying to create " + + "message content using XMPP message received :"+packet.toXML()); + }catch (FactoryConfigurationError e) { + log.error("Error occured while trying to create " + + "message content using XMPP message received :"+packet.toXML(), e); + throw new AxisFault("Error occured while trying to create " + + "message content using XMPP message received :"+packet.toXML()); + }catch (AxisFault e){ + log.error("Error occured while trying to create " + + "message content using XMPP message received :"+packet.toXML(), e); + throw new AxisFault("Error occured while trying to create " + + "message content using XMPP message received :"+packet.toXML()); + } + } + + + /** + * The actual Runnable Worker implementation which will process the + * received XMPP messages in the worker thread pool + */ + class Worker implements Runnable { + private Packet packet = null; + Worker(Packet packet) { + this.packet = packet; + } + + public void run() { + MessageContext msgCtx = null; + try { + msgCtx = createMessageContext(packet); + if(msgCtx.isProcessingFault() && msgCtx.isServerSide()){ + AxisEngine.sendFault(msgCtx); + }else{ + AxisEngine.receive(msgCtx); + } + } catch (AxisFault e) { + log.error("Error occurred while sending message"+e); + if (msgCtx != null && msgCtx.isServerSide()) { + MessageContext faultContext; + try { + faultContext = MessageContextBuilder.createFaultMessageContext(msgCtx, e); + AxisEngine.sendFault(faultContext); + } catch (AxisFault e1) { + log.error("Error occurred while creating SOAPFault message"+e1); + } + } + } + } + } +} _______________________________________________ Mashup-dev mailing list [email protected] http://mailman.wso2.org/cgi-bin/mailman/listinfo/mashup-dev
