Author: arnaudsimon
Date: Mon Oct 1 05:06:17 2007
New Revision: 580929
URL: http://svn.apache.org/viewvc?rev=580929&view=rev
Log:
Changed for setting message flow to already started message listeners
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=580929&r1=580928&r2=580929&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Mon Oct 1 05:06:17 2007
@@ -53,7 +53,7 @@
/**
* The maximum number of pre-fetched messages per destination
*/
- private static final long MAX_PREFETCH = 100;
+ public static final long MAX_PREFETCH = 100;
/**
* The underlying QpidSession
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=580929&r1=580928&r2=580929&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Mon Oct 1 05:06:17 2007
@@ -51,7 +51,7 @@
/**
* The connection being used by this consumer
*/
- private AMQConnection _connection;
+ protected AMQConnection _connection;
private String _messageSelector;
@@ -86,7 +86,7 @@
protected MessageFactoryRegistry _messageFactory;
- private final AMQSession _session;
+ protected final AMQSession _session;
protected AMQProtocolHandler _protocolHandler;
@@ -354,7 +354,7 @@
return null;
}
- Object o = null;
+ Object o ;
if (l > 0)
{
o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=580929&r1=580928&r2=580929&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Mon Oct 1 05:06:17 2007
@@ -35,6 +35,7 @@
import org.apache.qpidity.filter.JMSSelectorFilter;
import javax.jms.JMSException;
+import javax.jms.MessageListener;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -139,7 +140,7 @@
try
{
ByteBuffer buff = message.readData();
- ByteBuffer newBuf = ByteBuffer.allocate(buff.remaining()) ;
+ ByteBuffer newBuf = ByteBuffer.allocate(buff.remaining());
newBuf.put(buff);
newMessage.receiveBody(newBuf);
}
@@ -324,5 +325,21 @@
_0_10session.getCurrentException();
}
return result;
+ }
+
+
+ public void setMessageListener(final MessageListener messageListener)
throws JMSException
+ {
+ super.setMessageListener(messageListener);
+ if (_connection.started())
+ {
+
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_MESSAGE,
+
AMQSession_0_10.MAX_PREFETCH);
+
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+
org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
+ 0xFFFFFFFF);
+ _0_10session.getQpidSession().sync();
+ }
}
}