Author: gnodet
Date: Fri Sep 22 10:28:57 2006
New Revision: 449018
URL: http://svn.apache.org/viewvc?view=rev&rev=449018
Log:
SM-595: Replace BoundedLinkedQueue by a standard queue
Removed:
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/util/BoundedLinkedQueue.java
Modified:
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/ComponentMBeanImpl.java
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/MessageExchangeFactoryImpl.java
Modified:
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/ComponentMBeanImpl.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/ComponentMBeanImpl.java?view=diff&rev=449018&r1=449017&r2=449018
==============================================================================
---
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/ComponentMBeanImpl.java
(original)
+++
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/framework/ComponentMBeanImpl.java
Fri Sep 22 10:28:57 2006
@@ -68,6 +68,7 @@
private MessagingStats messagingStats;
private ComponentNameSpace componentName;
private String description = "POJO Component";
+ private int queueCapacity = 1024;
private boolean pojo;
private boolean binding;
private boolean service;
@@ -404,12 +405,7 @@
* @return the capacity of the inbound queue
*/
public int getInboundQueueCapacity(){
- // TODO: should not be on the delivery channel
- if (getDeliveryChannel() != null) {
- return getDeliveryChannel().getQueueCapacity();
- } else {
- return 0;
- }
+ return queueCapacity;
}
/**
@@ -417,10 +413,10 @@
* @param value
*/
public void setInboundQueueCapacity(int value){
- // TODO: should not be on the delivery channel
if (getDeliveryChannel() != null) {
- getDeliveryChannel().setQueueCapacity(value);
+ throw new IllegalStateException("The component must be shut down
before changing queue capacity");
}
+ this.queueCapacity = value;
}
/**
Modified:
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java?view=diff&rev=449018&r1=449017&r2=449018
==============================================================================
---
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java
(original)
+++
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/DeliveryChannelImpl.java
Fri Sep 22 10:28:57 2006
@@ -16,6 +16,7 @@
*/
package org.apache.servicemix.jbi.messaging;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -34,19 +35,21 @@
import javax.transaction.TransactionManager;
import javax.xml.namespace.QName;
-import org.apache.activemq.util.IdGenerator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.servicemix.JbiConstants;
import org.apache.servicemix.MessageExchangeListener;
+import org.apache.servicemix.id.IdGenerator;
import org.apache.servicemix.jbi.ExchangeTimeoutException;
import org.apache.servicemix.jbi.container.ActivationSpec;
import org.apache.servicemix.jbi.container.JBIContainer;
import org.apache.servicemix.jbi.framework.ComponentContextImpl;
import org.apache.servicemix.jbi.framework.ComponentMBeanImpl;
-import org.apache.servicemix.jbi.util.BoundedLinkedQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.ArrayBlockingQueue;
+import edu.emory.mathcs.backport.java.util.concurrent.BlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -61,7 +64,7 @@
private JBIContainer container;
private ComponentContextImpl context;
private ComponentMBeanImpl component;
- private BoundedLinkedQueue queue = new BoundedLinkedQueue();
+ private BlockingQueue queue;
private IdGenerator idGenerator = new IdGenerator();
private MessageExchangeFactory inboundFactory;
private int intervalCount = 0;
@@ -86,6 +89,7 @@
public DeliveryChannelImpl(ComponentMBeanImpl component) {
this.component = component;
this.container = component.getContainer();
+ this.queue = new
ArrayBlockingQueue(component.getInboundQueueCapacity());
}
/**
@@ -96,22 +100,6 @@
}
/**
- * @return the capacity of the inbound queue
- */
- public int getQueueCapacity() {
- return queue.capacity();
- }
-
- /**
- * Set the inbound queue capacity
- *
- * @param value
- */
- public void setQueueCapacity(int value) {
- queue.setCapacity(value);
- }
-
- /**
* close the delivery channel
*
* @throws MessagingException
@@ -121,7 +109,8 @@
if (log.isDebugEnabled()) {
log.debug("Closing DeliveryChannel " + this);
}
- List pending = queue.closeAndFlush();
+ List pending = new ArrayList(queue.size());
+ queue.drainTo(pending);
for (Iterator iter = pending.iterator(); iter.hasNext();) {
MessageExchangeImpl messageExchange = (MessageExchangeImpl)
iter.next();
if (messageExchange.getTransactionContext() != null &&
messageExchange.getMirror().getSyncState() ==
MessageExchangeImpl.SYNC_STATE_SYNC_SENT) {
@@ -263,7 +252,7 @@
public MessageExchange accept(long timeoutMS) throws MessagingException {
try {
checkNotClosed();
- MessageExchangeImpl me = (MessageExchangeImpl)
queue.poll(timeoutMS);
+ MessageExchangeImpl me = (MessageExchangeImpl)
queue.poll(timeoutMS, TimeUnit.MILLISECONDS);
if (me != null) {
// If the exchange has already timed out,
// do not give it to the component
Modified:
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/MessageExchangeFactoryImpl.java
URL:
http://svn.apache.org/viewvc/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/MessageExchangeFactoryImpl.java?view=diff&rev=449018&r1=449017&r2=449018
==============================================================================
---
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/MessageExchangeFactoryImpl.java
(original)
+++
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/messaging/MessageExchangeFactoryImpl.java
Fri Sep 22 10:28:57 2006
@@ -18,8 +18,8 @@
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.activemq.util.IdGenerator;
import org.apache.servicemix.JbiConstants;
+import org.apache.servicemix.id.IdGenerator;
import org.apache.servicemix.jbi.framework.ComponentContextImpl;
import javax.jbi.messaging.InOnly;