Author: gnodet
Date: Fri Apr 27 05:07:00 2007
New Revision: 533071
URL: http://svn.apache.org/viewvc?view=rev&rev=533071
Log:
SM-940: OneWay operation are not supported on jsr181
Modified:
incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-jsr181/src/main/java/org/apache/servicemix/jsr181/xfire/JbiChannel.java
Modified:
incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-jsr181/src/main/java/org/apache/servicemix/jsr181/xfire/JbiChannel.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-jsr181/src/main/java/org/apache/servicemix/jsr181/xfire/JbiChannel.java?view=diff&rev=533071&r1=533070&r2=533071
==============================================================================
---
incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-jsr181/src/main/java/org/apache/servicemix/jsr181/xfire/JbiChannel.java
(original)
+++
incubator/servicemix/branches/servicemix-3.1/deployables/serviceengines/servicemix-jsr181/src/main/java/org/apache/servicemix/jsr181/xfire/JbiChannel.java
Fri Apr 27 05:07:00 2007
@@ -20,10 +20,10 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.net.URI;
import javax.jbi.messaging.DeliveryChannel;
import javax.jbi.messaging.ExchangeStatus;
-import javax.jbi.messaging.InOut;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessageExchangeFactory;
import javax.jbi.messaging.NormalizedMessage;
@@ -36,6 +36,7 @@
import org.apache.servicemix.jbi.jaxp.StAXSourceTransformer;
import org.apache.servicemix.jbi.jaxp.StringSource;
+import org.apache.servicemix.jbi.messaging.MessageExchangeSupport;
import org.apache.servicemix.jsr181.JBIContext;
import org.codehaus.xfire.MessageContext;
import org.codehaus.xfire.XFireException;
@@ -59,10 +60,15 @@
public static final String JBI_ENDPOINT = "jbi.endpoint";
public static final String JBI_SECURITY_PROPAGATATION =
"jbi.security.propagation";
- private static ThreadLocal transformer = new ThreadLocal();
+ private static ThreadLocal<StAXSourceTransformer> transformer = new
ThreadLocal<StAXSourceTransformer>();
+
+ public JbiChannel(String uri, JbiTransport transport) {
+ setTransport(transport);
+ setUri(uri);
+ }
protected static StAXSourceTransformer getTransformer() {
- StAXSourceTransformer t = (StAXSourceTransformer) transformer.get();
+ StAXSourceTransformer t = transformer.get();
if (t == null) {
t = new StAXSourceTransformer();
transformer.set(t);
@@ -70,11 +76,6 @@
return t;
}
- public JbiChannel(String uri, JbiTransport transport) {
- setTransport(transport);
- setUri(uri);
- }
-
public void open() throws Exception {
}
@@ -83,7 +84,8 @@
final OutputStream out = (OutputStream)
context.getProperty(Channel.BACKCHANNEL_URI);
if (out != null) {
try {
- final XMLStreamWriter writer =
getTransformer().getOutputFactory().createXMLStreamWriter(out,
message.getEncoding());
+ final XMLStreamWriter writer =
getTransformer().getOutputFactory()
+ .createXMLStreamWriter(out, message.getEncoding());
message.getSerializer().writeMessage(message, writer,
context);
writer.close();
} catch (XMLStreamException e) {
@@ -95,52 +97,57 @@
try {
DeliveryChannel channel = ((JbiTransport)
getTransport()).getContext().getDeliveryChannel();
MessageExchangeFactory factory =
channel.createExchangeFactory();
- if (context.getExchange().hasOutMessage()) {
- InOut me = factory.createInOutExchange();
- me.setInterfaceName((QName)
context.getService().getProperty(JBI_INTERFACE_NAME));
-
me.setOperation(context.getExchange().getOperation().getQName());
- me.setService((QName)
context.getService().getProperty(JBI_SERVICE_NAME));
- me.setEndpoint((ServiceEndpoint)
context.getService().getProperty(JBI_ENDPOINT));
- NormalizedMessage msg = me.createMessage();
- me.setInMessage(msg);
- if
(Boolean.TRUE.equals(context.getService().getProperty(JBI_SECURITY_PROPAGATATION)))
{
- MessageExchange oldMe =
JBIContext.getMessageExchange();
- NormalizedMessage oldMsg = (oldMe != null) ?
oldMe.getMessage("in") : null;
- if (oldMsg != null) {
-
msg.setSecuritySubject(oldMsg.getSecuritySubject());
- }
+ URI mep = null;
+ if (context.getExchange().getOperation().getOutputMessage() !=
null) {
+ mep = MessageExchangeSupport.IN_OUT;
+ } else if
(context.getExchange().getOperation().getFaults().size() > 0) {
+ mep = MessageExchangeSupport.ROBUST_IN_ONLY;
+ } else {
+ mep = MessageExchangeSupport.IN_ONLY;
+ }
+ MessageExchange me = factory.createExchange(mep);
+ me.setInterfaceName((QName)
context.getService().getProperty(JBI_INTERFACE_NAME));
+
me.setOperation(context.getExchange().getOperation().getQName());
+ me.setService((QName)
context.getService().getProperty(JBI_SERVICE_NAME));
+ me.setEndpoint((ServiceEndpoint)
context.getService().getProperty(JBI_ENDPOINT));
+ NormalizedMessage msg = me.createMessage();
+ me.setMessage(msg, "in");
+ if
(Boolean.TRUE.equals(context.getService().getProperty(JBI_SECURITY_PROPAGATATION)))
{
+ MessageExchange oldMe = JBIContext.getMessageExchange();
+ NormalizedMessage oldMsg = (oldMe != null) ?
oldMe.getMessage("in") : null;
+ if (oldMsg != null) {
+ msg.setSecuritySubject(oldMsg.getSecuritySubject());
}
- msg.setContent(getContent(context, message));
- if (!channel.sendSync(me)) {
- throw new XFireException("Unable to send jbi exchange:
sendSync returned false");
+ }
+ msg.setContent(getContent(context, message));
+ if (!channel.sendSync(me)) {
+ throw new XFireException("Unable to send jbi exchange:
sendSync returned false");
+ }
+ if (me.getStatus() == ExchangeStatus.ERROR) {
+ if (me.getError() != null) {
+ throw new XFireFault(me.getError(),
XFireFault.RECEIVER);
+ } else {
+ throw new XFireFault("Unkown Error",
XFireFault.RECEIVER);
}
- if (me.getStatus() == ExchangeStatus.ERROR) {
- me.setStatus(ExchangeStatus.DONE);
- channel.send(me);
- if (me.getError() != null) {
- throw new XFireFault(me.getError(),
XFireFault.RECEIVER);
- } else {
- throw new XFireFault("Unkown Error",
XFireFault.RECEIVER);
- }
- } else if (me.getFault() != null){
+ } else if (me.getStatus() == ExchangeStatus.ACTIVE) {
+ if (me.getFault() != null) {
JDOMResult result = new JDOMResult();
String str =
getTransformer().contentToString(me.getFault());
getTransformer().toResult(new StringSource(str),
result);
Element e = result.getDocument().getRootElement();
e = (Element) e.clone();
+ me.setStatus(ExchangeStatus.DONE);
+ channel.send(me);
XFireFault xfireFault = new XFireFault(str,
XFireFault.RECEIVER);
xfireFault.getDetail().addContent(e);
throw xfireFault;
+ } else if (me.getMessage("out") != null) {
+ Source outSrc = me.getMessage("out").getContent();
+ InMessage inMessage = new
InMessage(getTransformer().toXMLStreamReader(outSrc), getUri());
+ getEndpoint().onReceive(context, inMessage);
+ me.setStatus(ExchangeStatus.DONE);
+ channel.send(me);
}
- Source outSrc = me.getOutMessage().getContent();
-
- InMessage inMessage = new
InMessage(getTransformer().toXMLStreamReader(outSrc), getUri());
- getEndpoint().onReceive(context, inMessage);
-
- me.setStatus(ExchangeStatus.DONE);
- channel.send(me);
- } else {
- // TODO
}
} catch (XFireException e) {
throw e;
@@ -150,15 +157,15 @@
}
}
- protected Source getContent(MessageContext context, OutMessage message)
throws XMLStreamException, IOException, XFireException {
+ protected Source getContent(MessageContext context,
+ OutMessage message) throws XMLStreamException,
IOException, XFireException {
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
- XMLStreamWriter writer =
getTransformer().getOutputFactory().createXMLStreamWriter(outStream,
message.getEncoding());
+ XMLStreamWriter writer = getTransformer().getOutputFactory()
+ .createXMLStreamWriter(outStream, message.getEncoding());
MessageSerializer serializer = context.getOutMessage().getSerializer();
- if (serializer == null)
- {
- AbstractSoapBinding binding = (AbstractSoapBinding)
context.getBinding();
- if (binding == null)
- {
+ if (serializer == null) {
+ AbstractSoapBinding binding = (AbstractSoapBinding)
context.getBinding();
+ if (binding == null) {
throw new XFireException("Couldn't find the binding!");
}
serializer = AbstractSoapBinding.getSerializer(binding.getStyle(),
binding.getUse());
@@ -166,8 +173,7 @@
serializer.writeMessage(message, writer, context);
writer.close();
outStream.close();
- StreamSource src = new StreamSource(new
ByteArrayInputStream(outStream.toByteArray()));
- return src;
+ return new StreamSource(new
ByteArrayInputStream(outStream.toByteArray()));
}
}