Author: boisvert
Date: Thu Mar 22 11:11:04 2007
New Revision: 521390
URL: http://svn.apache.org/viewvc?view=rev&rev=521390
Log:
Use sendSync() rather than send() for better load conditioning
Modified:
incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java
Modified:
incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java?view=diff&rev=521390&r1=521389&r2=521390
==============================================================================
--- incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java
(original)
+++ incubator/ode/trunk/jbi/src/main/java/org/apache/ode/jbi/OdeConsumer.java
Thu Mar 22 11:11:04 2007
@@ -44,9 +44,13 @@
class OdeConsumer extends ServiceBridge implements JbiMessageExchangeProcessor
{
private static final Log __log = LogFactory.getLog(OdeConsumer.class);
+ private static final long DEFAULT_SENDSYNC_TIMEOUT = 2 * 60 * 1000L;
+
private OdeContext _ode;
- private Map<String, String> _outstandingExchanges = new
ConcurrentHashMap<String, String>();
+ private long _sendSyncTimeout = DEFAULT_SENDSYNC_TIMEOUT;
+
+ private Map<String, PartnerRoleMessageExchange> _outstandingExchanges =
new ConcurrentHashMap<String, PartnerRoleMessageExchange>();
OdeConsumer(OdeContext ode) {
_ode = ode;
@@ -75,7 +79,7 @@
QName opname = new QName(se.getServiceName().getNamespaceURI(),
odeMex.getOperation().getName());
MessageExchangeFactory mexf =
_ode.getChannel().createExchangeFactory(se);
- MessageExchange jbiMex;
+ final MessageExchange jbiMex;
try {
jbiMex = mexf.createExchange(isTwoWay ?
MessageExchangePattern.IN_OUT : MessageExchangePattern.IN_ONLY);
jbiMex.setEndpoint(se);
@@ -99,12 +103,16 @@
_ode._scheduler.registerSynchronizer(new
Scheduler.Synchronizer() {
public void afterCompletion(boolean success) {
if (success) {
- _ode._executorService.submit( new Runnable() {
+ _ode._executorService.submit(new Runnable() {
public void run() {
try {
- _ode.getChannel().send(inonly);
+ boolean sendOk =
_ode.getChannel().sendSync(inonly, _sendSyncTimeout);
+ if (!sendOk) {
+ __log.warn("Timeout while sending
message for JBI message exchange: " + jbiMex.getExchangeId());
+ }
+ onJbiMessageExchange(inonly);
} catch (MessagingException e) {
- String errmsg = "Exception while
sending in-only message to JBI for ODE mex " + odeMex;
+ String errmsg = "Error sending
request-only message to JBI for ODE mex " + odeMex;
__log.error(errmsg, e);
}
}
@@ -125,13 +133,17 @@
_ode._scheduler.registerSynchronizer(new
Scheduler.Synchronizer() {
public void afterCompletion(boolean success) {
if (success) {
- _ode._executorService.submit( new Runnable() {
+ _ode._executorService.submit(new Runnable() {
public void run() {
try {
-
_outstandingExchanges.put(inout.getExchangeId(), odeMex.getMessageExchangeId());
- _ode.getChannel().send(inout);
+
_outstandingExchanges.put(inout.getExchangeId(), odeMex);
+ boolean sendOk =
_ode.getChannel().sendSync(inout, _sendSyncTimeout);
+ if (!sendOk) {
+ __log.warn("Timeout while sending
message for JBI message exchange: " + jbiMex.getExchangeId());
+ }
+ onJbiMessageExchange(inout);
} catch (MessagingException e) {
- String errmsg = "Exception while
sending request-only message to JBI for ODE mex " + odeMex;
+ String errmsg = "Error sending
request-only message to JBI for ODE mex " + odeMex;
__log.error(errmsg, e);
}
}
@@ -159,41 +171,34 @@
}
public void onJbiMessageExchange(MessageExchange jbiMex) throws
MessagingException {
- if (jbiMex.getPattern().equals(MessageExchangePattern.IN_ONLY)) {
- // Ignore these, they're one way.
- } else if (jbiMex.getPattern().equals(MessageExchangePattern.IN_OUT)) {
- if (jbiMex.getStatus() == ExchangeStatus.ACTIVE) {
+ if (!jbiMex.getPattern().equals(MessageExchangePattern.IN_ONLY) &&
+ !jbiMex.getPattern().equals(MessageExchangePattern.IN_OUT)) {
+ __log.error("JBI MessageExchange " + jbiMex.getExchangeId() + " is
of an unsupported pattern " + jbiMex.getPattern());
+ return;
+ }
+ if (jbiMex.getStatus() == ExchangeStatus.ACTIVE) {
+ if (jbiMex.getPattern().equals(MessageExchangePattern.IN_OUT)) {
outResponse((InOut) jbiMex);
- jbiMex.setStatus(ExchangeStatus.DONE);
- _ode.getChannel().send(jbiMex);
- } else if (jbiMex.getStatus() == ExchangeStatus.ERROR)
- outFailure((InOut) jbiMex);
- else
- __log.warn("Unexpected state for JBI message exchange: " +
jbiMex.getExchangeId());
+ }
+ jbiMex.setStatus(ExchangeStatus.DONE);
+ _ode.getChannel().send(jbiMex);
+ } else if (jbiMex.getStatus() == ExchangeStatus.ERROR) {
+ outFailure((InOut) jbiMex);
} else {
- __log.fatal("JBI MessageExchange " + jbiMex.getExchangeId() + " is
of an unsupported pattern " + jbiMex.getPattern());
+ __log.error("Unexpected status " + jbiMex.getStatus() + " for JBI
message exchange: " + jbiMex.getExchangeId());
}
-
}
private void outFailure(final InOut jbiMex) {
- final String mexref =
_outstandingExchanges.remove(jbiMex.getExchangeId());
- if (mexref == null) {
- __log.warn("Received a response for unkown JBI message exchange "
+ jbiMex.getExchangeId());
+ final PartnerRoleMessageExchange pmex =
_outstandingExchanges.remove(jbiMex.getExchangeId());
+ if (pmex == null) {
+ __log.warn("Received a response for unknown JBI message exchange "
+ jbiMex.getExchangeId());
return;
}
try {
_ode._scheduler.execTransaction(new Callable<Boolean>() {
public Boolean call() throws Exception {
- PartnerRoleMessageExchange pmex =
(PartnerRoleMessageExchange) _ode._server.getEngine().getMessageExchange(
- mexref);
-
- if (pmex == null) {
- __log.warn("Cannot locate ODE message exchange: " +
mexref + "; ignoring.");
- return null;
- }
-
pmex.replyWithFailure(FailureType.OTHER, "Error: " +
jbiMex.getError(), null);
return null;
}
@@ -205,9 +210,9 @@
}
private void outResponse(final InOut jbiMex) {
- final String mexref =
_outstandingExchanges.remove(jbiMex.getExchangeId());
- if (mexref == null) {
- __log.warn("Received a response for unkown JBI message exchange "
+ jbiMex.getExchangeId());
+ final PartnerRoleMessageExchange pmex =
_outstandingExchanges.remove(jbiMex.getExchangeId());
+ if (pmex == null) {
+ __log.warn("Received a response for unknown JBI message exchange "
+ jbiMex.getExchangeId());
return;
}
@@ -215,15 +220,6 @@
_ode._scheduler.execTransaction(new Callable<Boolean>() {
@SuppressWarnings("unchecked")
public Boolean call() throws Exception {
- PartnerRoleMessageExchange pmex =
(PartnerRoleMessageExchange) _ode._server.getEngine().getMessageExchange(
- mexref);
-
- if (pmex == null) {
- // I'm a bit unclear as to why this would occur, but
it appears to be possible.
- __log.warn("Cannot locate ODE message exchange: " +
mexref + "; ignoring.");
- return null;
- }
-
String mapperName =
pmex.getProperty(Mapper.class.getName());
Mapper mapper = mapperName == null ?
_ode.getDefaultMapper() : _ode.getMapper(mapperName);
if (mapper == null) {
@@ -268,5 +264,13 @@
__log.error("error delivering RESPONSE: ", ex);
}
+ }
+
+ public void setSendSyncTimeout(long timeout) {
+ _sendSyncTimeout = timeout;
+ }
+
+ public long getSendSyncTimeout() {
+ return _sendSyncTimeout;
}
}