| Commit in servicemix/base/src/main/java/org/servicemix/jbi/messaging on MAIN | |||
| DeliveryChannelImpl.java | +79 | -65 | 1.18 -> 1.19 |
tidied up some of the logging
servicemix/base/src/main/java/org/servicemix/jbi/messaging
diff -u -r1.18 -r1.19 --- DeliveryChannelImpl.java 2 Aug 2005 19:55:16 -0000 1.18 +++ DeliveryChannelImpl.java 3 Aug 2005 05:50:19 -0000 1.19 @@ -29,19 +29,19 @@
import org.activemq.util.IdGenerator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory;
+import org.servicemix.MessageExchangeListener;
import org.servicemix.jbi.container.ActivationSpec; import org.servicemix.jbi.container.JBIContainer; import org.servicemix.jbi.framework.ComponentConnector; import org.servicemix.jbi.framework.ComponentContextImpl; import org.servicemix.jbi.framework.LocalComponentConnector;
-import org.servicemix.MessageExchangeListener;
import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue; import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap; /** * DeliveryChannel implementation *
- * @version $Revision: 1.18 $
+ * @version $Revision: 1.19 $
*/
public class DeliveryChannelImpl implements DeliveryChannel {
private static final Log log = LogFactory.getLog(DeliveryChannel.class);
@@ -52,7 +52,6 @@
private Map acks = new ConcurrentHashMap();
private IdGenerator idGenerator = new IdGenerator();
private MessageExchangeFactory inboundFactory;
- private String componentName;
private MessagingStats messagingStats;
private boolean exchangeThrottling;
private long throttlingTimeout = 100;
@@ -61,37 +60,37 @@
private long lastSendTime = System.currentTimeMillis();
private long lastReceiveTime = System.currentTimeMillis();
-
/**
* Constructor
+ *
* @param container
* @param componentName
*/
- public DeliveryChannelImpl(JBIContainer container,String componentName) {
+ public DeliveryChannelImpl(JBIContainer container, String componentName) {
this.container = container;
- this.componentName = componentName;
this.messagingStats = new MessagingStats(componentName);
}
-
+
/**
* @return size of the inbound Queue
*/
- public int getQueueSize(){
+ public int getQueueSize() {
return queue.size();
}
-
+
/**
* @return the capacity of the inbound queue
*/
- public int getQueueCapacity(){
+ public int getQueueCapacity() {
return queue.capacity();
}
-
+
/**
* Set the inbound queue capacity
+ *
* @param value
*/
- public void setQueueCapacity(int value){
+ public void setQueueCapacity(int value) {
queue.setCapacity(value);
}
@@ -114,24 +113,46 @@
result.setContext(context);
ActivationSpec activationSpec = context.getActivationSpec();
if (activationSpec != null) {
+ String componentName = context.getComponentName();
// lets auto-default the container-routing information
QName serviceName = activationSpec.getDestinationService();
- result.setServiceName(serviceName); - result.setInterfaceName(activationSpec.getDestinationInterface());
+ if (serviceName != null) {
+ result.setServiceName(serviceName);
+ log.info("default destination serviceName for " + componentName + " = " + serviceName);
+ }
+ QName interfaceName = activationSpec.getDestinationInterface();
+ if (interfaceName != null) {
+ result.setInterfaceName(interfaceName);
+ log.info("default destination interfaceName for " + componentName + " = " + interfaceName);
+ }
+ QName operationName = activationSpec.getDestinationOperation();
+ if (operationName != null) {
+ result.setOperationName(operationName);
+ log.info("default destination operationName for " + componentName + " = " + operationName);
+ }
String endpointName = activationSpec.getEndpoint();
- log.info("default endpointName for factory to " + endpointName);
- if (serviceName != null && endpointName != null){
- endpointName = endpointName.trim();
- ServiceEndpoint[] endpoints = container.getRegistry().getEndpointsForService(serviceName);
- if (endpoints != null){
- for (int i = 0; i < endpoints.length; i++){
- if(endpoints[i].getEndpointName().equals(endpointName)){
- log.info("Set default endpoint for factory to " + endpoints[i]);
- result.setEndpoint(endpoints[i]);
- break;
+ if (endpointName != null) {
+ boolean endpointSet = false;
+ log.info("default destination endpointName for " + componentName + " = " + endpointName);
+ if (serviceName != null && endpointName != null) {
+ endpointName = endpointName.trim();
+ ServiceEndpoint[] endpoints = container.getRegistry().getEndpointsForService(serviceName);
+ if (endpoints != null) {
+ for (int i = 0;i < endpoints.length;i++) {
+ if (endpoints[i].getEndpointName().equals(endpointName)) {
+ result.setEndpoint(endpoints[i]);
+ log.info("Set default destination endpoint for " + componentName + " to "
+ + endpoints[i]);
+ endpointSet = true;
+ break;
+ }
}
}
}
+ if (!endpointSet) {
+ log.warn("Could not find destination endpoint for " + componentName + " service(" + serviceName
+ + ") with endpointName " + endpointName);
+ }
}
}
return result;
@@ -217,14 +238,14 @@
public void send(MessageExchange messageExchange) throws MessagingException {
MessageExchangeImpl messageExchangeImpl = (MessageExchangeImpl) messageExchange;
if (messageExchangeImpl.isOutbound()) {
- if (exchangeThrottling){
- if (throttlingInterval > intervalCount){
+ if (exchangeThrottling) {
+ if (throttlingInterval > intervalCount) {
intervalCount = 0;
try {
Thread.sleep(throttlingTimeout);
}
catch (InterruptedException e) {
- log.warn("throttling failed",e);
+ log.warn("throttling failed", e);
}
}
intervalCount++;
@@ -301,6 +322,7 @@
/**
* Get the context
+ *
* @return the context
*/
public ComponentContextImpl getContext() {
@@ -309,68 +331,73 @@
/**
* set the context
+ *
* @param context
*/
public void setContext(ComponentContextImpl context) {
this.context = context;
}
-
+
/**
* Get the MessagingStats
+ *
* @return messaging stats
*/
- public MessagingStats getMessagingStats(){
+ public MessagingStats getMessagingStats() {
return messagingStats;
}
-
+
/**
* Is MessageExchange sender throttling enabled ?
+ *
* @return true if throttling enabled
*/
- public boolean isExchangeThrottling(){
+ public boolean isExchangeThrottling() {
return exchangeThrottling;
}
-
+
/**
* Set message throttling
+ *
* @param value
- *
*/
- public void setExchangeThrottling(boolean value){
+ public void setExchangeThrottling(boolean value) {
this.exchangeThrottling = value;
}
-
+
/**
* Get the throttling timeout
+ *
* @return throttling tomeout (ms)
*/
- public long getThrottlingTimeout(){
+ public long getThrottlingTimeout() {
return throttlingTimeout;
}
-
+
/**
- * Set the throttling timout
+ * Set the throttling timout + *
* @param value (ms)
*/
- public void setThrottlingTimeout(long value){
+ public void setThrottlingTimeout(long value) {
throttlingTimeout = value;
}
-
+
/**
- * Get the interval for throttling - - * number of Exchanges set before the throttling timeout is applied
+ * Get the interval for throttling - number of Exchanges set before the throttling timeout is applied + *
* @return interval for throttling
*/
- public int getThrottlingInterval(){
+ public int getThrottlingInterval() {
return throttlingInterval;
}
-
+
/**
- * Set the throttling interval - * number of Exchanges set before the throttling timeout is applied
+ * Set the throttling interval number of Exchanges set before the throttling timeout is applied + *
* @param value
*/
- public void setThrottlingInterval(int value){
+ public void setThrottlingInterval(int value) {
throttlingInterval = value;
}
@@ -386,7 +413,7 @@
packet.setOutbound(false);
long currentTime = System.currentTimeMillis();
messagingStats.getInboundExchanges().increment();
- messagingStats.getInboundExchangeRate().addTime(currentTime-lastReceiveTime);
+ messagingStats.getInboundExchangeRate().addTime(currentTime - lastReceiveTime);
lastReceiveTime = currentTime;
MessageExchangeImpl me = createInboundExchange(packet);
me.setPacket(packet);
@@ -396,7 +423,6 @@
}
else {
try {
-
queue.put(me);
}
catch (InterruptedException e) {
@@ -408,7 +434,6 @@
AckHelper ack = (AckHelper) acks.remove(packet.getExchangeId());
if (ack != null) {
ack.getMessageExchange().setPacket(packet);
-
ack.done();
}
}
@@ -417,6 +442,7 @@
/**
* Get Inbound Factory
+ *
* @return the inbound message factory
*/
public MessageExchangeFactory getInboundFactory() {
@@ -425,27 +451,15 @@
}
return inboundFactory;
}
-
- protected MessageExchangeImpl createInboundExchange(ExchangePacket packet) throws MessagingException{
+
+ protected MessageExchangeImpl createInboundExchange(ExchangePacket packet) throws MessagingException {
MessageExchangeImpl me = (MessageExchangeImpl) getInboundFactory().createExchange(packet.getPattern());
me.setPacket(packet);
return me;
}
-
- protected boolean isOutboundRouter(){
- boolean result = false;
- ActivationSpec spec = getContext().getActivationSpec();
- if (spec != null){
- String endpoint = spec.getEndpoint();
- QName service = spec.getDestinationService();
- QName face = spec.getDestinationInterface();
- result = (endpoint != null && endpoint.length() > 0) || service != null || face != null;
- }
- return result;
- }
/**
- [EMAIL PROTECTED] pretty print
+ * @return pretty print
*/
public String toString() {
return "DeliveryChannel{" + componentConnector.getComponentNameSpace() + "}";
