Author: gnodet
Date: Fri Sep 22 10:28:57 2006
New Revision: 449018

URL: http://svn.apache.org/viewvc?view=rev&rev=449018
Log:
SM-595: Replace BoundedLinkedQueue by a standard queue

Removed:
    
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/util/BoundedLinkedQueue.java
Modified:
    
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/ComponentMBeanImpl.java
    
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java
    
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/MessageExchangeFactoryImpl.java

Modified: 
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/ComponentMBeanImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/ComponentMBeanImpl.java?view=diff&rev=449018&r1=449017&r2=449018
==============================================================================
--- 
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/ComponentMBeanImpl.java
 (original)
+++ 
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/ComponentMBeanImpl.java
 Fri Sep 22 10:28:57 2006
@@ -68,6 +68,7 @@
     private MessagingStats messagingStats;
     private ComponentNameSpace componentName;
     private String description = "POJO Component";
+    private int queueCapacity = 1024;
     private boolean pojo;
     private boolean binding;
     private boolean service;
@@ -404,12 +405,7 @@
      * @return the capacity of the inbound queue
      */
     public int getInboundQueueCapacity(){
-        // TODO: should not be on the delivery channel
-        if (getDeliveryChannel() != null) {
-            return getDeliveryChannel().getQueueCapacity();
-        } else {
-            return 0;
-        }
+        return queueCapacity;
     }
     
     /**
@@ -417,10 +413,10 @@
      * @param value
      */
     public void setInboundQueueCapacity(int value){
-        // TODO: should not be on the delivery channel
         if (getDeliveryChannel() != null) {
-            getDeliveryChannel().setQueueCapacity(value);
+            throw new IllegalStateException("The component must be shut down 
before changing queue capacity");
         }
+        this.queueCapacity = value;
     }
     
     /**

Modified: 
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java?view=diff&rev=449018&r1=449017&r2=449018
==============================================================================
--- 
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java
 (original)
+++ 
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java
 Fri Sep 22 10:28:57 2006
@@ -16,6 +16,7 @@
  */
 package org.apache.servicemix.jbi.messaging;
 
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -34,19 +35,21 @@
 import javax.transaction.TransactionManager;
 import javax.xml.namespace.QName;
 
-import org.apache.activemq.util.IdGenerator;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.servicemix.JbiConstants;
 import org.apache.servicemix.MessageExchangeListener;
+import org.apache.servicemix.id.IdGenerator;
 import org.apache.servicemix.jbi.ExchangeTimeoutException;
 import org.apache.servicemix.jbi.container.ActivationSpec;
 import org.apache.servicemix.jbi.container.JBIContainer;
 import org.apache.servicemix.jbi.framework.ComponentContextImpl;
 import org.apache.servicemix.jbi.framework.ComponentMBeanImpl;
-import org.apache.servicemix.jbi.util.BoundedLinkedQueue;
 
+import edu.emory.mathcs.backport.java.util.concurrent.ArrayBlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
 import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -61,7 +64,7 @@
     private JBIContainer container;
     private ComponentContextImpl context;
     private ComponentMBeanImpl component;
-    private BoundedLinkedQueue queue = new BoundedLinkedQueue();
+    private BlockingQueue queue;
     private IdGenerator idGenerator = new IdGenerator();
     private MessageExchangeFactory inboundFactory;
     private int intervalCount = 0;
@@ -86,6 +89,7 @@
     public DeliveryChannelImpl(ComponentMBeanImpl component) {
         this.component = component;
         this.container = component.getContainer();
+        this.queue = new 
ArrayBlockingQueue(component.getInboundQueueCapacity());
     }
 
     /**
@@ -96,22 +100,6 @@
     }
 
     /**
-     * @return the capacity of the inbound queue
-     */
-    public int getQueueCapacity() {
-        return queue.capacity();
-    }
-
-    /**
-     * Set the inbound queue capacity
-     * 
-     * @param value
-     */
-    public void setQueueCapacity(int value) {
-        queue.setCapacity(value);
-    }
-
-    /**
      * close the delivery channel
      * 
      * @throws MessagingException
@@ -121,7 +109,8 @@
             if (log.isDebugEnabled()) {
                 log.debug("Closing DeliveryChannel " + this);
             }
-            List pending = queue.closeAndFlush();
+            List pending = new ArrayList(queue.size());
+            queue.drainTo(pending);
             for (Iterator iter = pending.iterator(); iter.hasNext();) {
                 MessageExchangeImpl messageExchange = (MessageExchangeImpl) 
iter.next();
                 if (messageExchange.getTransactionContext() != null && 
messageExchange.getMirror().getSyncState() == 
MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
@@ -263,7 +252,7 @@
     public MessageExchange accept(long timeoutMS) throws MessagingException {
         try {
             checkNotClosed();
-            MessageExchangeImpl me = (MessageExchangeImpl) 
queue.poll(timeoutMS);
+            MessageExchangeImpl me = (MessageExchangeImpl) 
queue.poll(timeoutMS, TimeUnit.MILLISECONDS);
             if (me != null) {
                 // If the exchange has already timed out,
                 // do not give it to the component

Modified: 
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/MessageExchangeFactoryImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/MessageExchangeFactoryImpl.java?view=diff&rev=449018&r1=449017&r2=449018
==============================================================================
--- 
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/MessageExchangeFactoryImpl.java
 (original)
+++ 
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/MessageExchangeFactoryImpl.java
 Fri Sep 22 10:28:57 2006
@@ -18,8 +18,8 @@
 
 import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.activemq.util.IdGenerator;
 import org.apache.servicemix.JbiConstants;
+import org.apache.servicemix.id.IdGenerator;
 import org.apache.servicemix.jbi.framework.ComponentContextImpl;
 
 import javax.jbi.messaging.InOnly;


Reply via email to