Author: rajith
Date: Tue Oct 9 11:15:45 2007
New Revision: 583251
URL: http://svn.apache.org/viewvc?rev=583251&view=rev
Log:
Fixed an error with the credit based flow control
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=583251&r1=583250&r2=583251&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
Tue Oct 9 11:15:45 2007
@@ -393,7 +393,7 @@
private void setVirtualHost(String virtualHost)
{
- if (virtualHost.startsWith("/"))
+ if (virtualHost != null && virtualHost.startsWith("/"))
{
virtualHost = virtualHost.substring(1);
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java?rev=583251&r1=583250&r2=583251&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
Tue Oct 9 11:15:45 2007
@@ -268,7 +268,8 @@
public String toString()
{
- StringBuffer sb = new StringBuffer();
+ return _url;
+ /*StringBuffer sb = new StringBuffer();
sb.append(AMQ_PROTOCOL);
sb.append("://");
@@ -299,7 +300,7 @@
sb.append(optionsToString());
- return sb.toString();
+ return sb.toString();*/
}
private String optionsToString()
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=583251&r1=583250&r2=583251&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
Tue Oct 9 11:15:45 2007
@@ -345,6 +345,7 @@
consumer.isNoLocal() ?
Option.NO_LOCAL : Option.NO_OPTION,
consumer.isExclusive() ?
Option.EXCLUSIVE : Option.NO_OPTION);
+ getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
// We need to sync so that we get notify of an error.
getQpidSession().sync();
getCurrentException();
@@ -438,7 +439,7 @@
{
getQpidSession().messageStop(consumer.getConsumerTag().toString());
getQpidSession().messageFlowMode(consumer.getConsumerTag().toString(),
Session.MESSAGE_FLOW_MODE_CREDIT);
-
+
getQpidSession().messageFlow(consumer.getConsumerTag().toString(),
Session.MESSAGE_FLOW_UNIT_BYTE, 0xFFFFFFFF);
}
}
else
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=583251&r1=583250&r2=583251&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
Tue Oct 9 11:15:45 2007
@@ -174,7 +174,7 @@
{
((AMQSession_0_10)
getSession()).getQpidSession().messageStop(getConsumerTag().toString());
((AMQSession_0_10) getSession()).getQpidSession().sync();
- // confirm cancel
+ // confirm cancel
getSession().confirmConsumerCancelled(getConsumerTag());
try
{
@@ -303,7 +303,7 @@
{
// do nothing as the rollback operation will do the job.
}
-
+
/**
* Acquire a message
*
@@ -338,8 +338,11 @@
super.setMessageListener(messageListener);
if (messageListener == null)
{
-
_0_10session.getQpidSession().messageFlowMode(getConsumerTag().toString(),
Session.MESSAGE_FLOW_MODE_CREDIT);
_0_10session.getQpidSession().messageStop(getConsumerTag().toString());
+
_0_10session.getQpidSession().messageFlowMode(getConsumerTag().toString(),
Session.MESSAGE_FLOW_MODE_CREDIT);
+
_0_10session.getQpidSession().messageFlow(getConsumerTag().toString(),
+ org.apache.qpidity.nclient.Session.MESSAGE_FLOW_UNIT_BYTE,
+ 0xFFFFFFFF);
_0_10session.getQpidSession().sync();
}
else
@@ -367,14 +370,19 @@
if (l > 0)
{
o = _synchronousQueue.poll(l, TimeUnit.MILLISECONDS);
-
_0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
- _0_10session.getQpidSession().sync();
- o = _synchronousQueue.poll();
+ if (o == null)
+ {
+ _logger.debug("Message Didn't arrive in time, checking if one
is inflight");
+ // checking if one is inflight
+
_0_10session.getQpidSession().messageFlush(getConsumerTag().toString());
+ _0_10session.getQpidSession().sync();
+ o = _synchronousQueue.poll();
+ }
}
else
{
o = _synchronousQueue.take();
}
- return null;
+ return o;
}
}