Author: rajith
Date: Fri Aug 24 15:26:42 2007
New Revision: 569547
URL: http://svn.apache.org/viewvc?rev=569547&view=rev
Log:
Added basic test case to test JMS
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Channel.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyExchange.java
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java?rev=569547&view=auto
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java
(added)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java
Fri Aug 24 15:26:42 2007
@@ -0,0 +1,34 @@
+package org.apache.qpidity.client;
+
+import org.apache.qpidity.jms.ConnectionFactoryImpl;
+import org.apache.qpidity.jms.TopicImpl;
+
+public class JMSTestCase
+{
+ public static void main(String[] args)
+ {
+ try
+ {
+ javax.jms.Connection con = (new
ConnectionFactoryImpl("localhost",5672, "test",
"guest","guest")).createConnection();
+ con.start();
+
+ javax.jms.Session ssn = con.createSession(false, 1);
+
+ javax.jms.Destination dest = new TopicImpl("myTopic");
+ javax.jms.MessageProducer prod = ssn.createProducer(dest);
+ javax.jms.MessageConsumer cons = ssn.createConsumer(dest);
+
+ javax.jms.BytesMessage msg = ssn.createBytesMessage();
+ msg.writeInt(123);
+ prod.send(msg);
+
+ javax.jms.Message m = cons.receive();
+ System.out.println(m);
+
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java?rev=569547&r1=569546&r2=569547&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
Fri Aug 24 15:26:42 2007
@@ -87,22 +87,12 @@
public void readData(byte[] target) throws IOException
{
- if (_data.size() >0 && _readBuffer == null)
- {
- buildReadBuffer();
- }
-
- _readBuffer.get(target);
+ getReadBuffer().get(target);
}
public ByteBuffer readData() throws IOException
- {
- if (_data.size() >0 && _readBuffer == null)
- {
- buildReadBuffer();
- }
-
- return _readBuffer;
+ {
+ return getReadBuffer();
}
private void buildReadBuffer()
@@ -122,16 +112,39 @@
}
}
+ private ByteBuffer getReadBuffer() throws IOException
+ {
+ if (_readBuffer != null )
+ {
+ return _readBuffer.slice();
+ }
+ else
+ {
+ if (_data.size() >0)
+ {
+ buildReadBuffer();
+ return _readBuffer.slice();
+ }
+ else
+ {
+ throw new IOException("No Data to read");
+ }
+ }
+ }
+
//hack for testing
@Override public String toString()
{
- if (_data.size() >0 && _readBuffer == null)
+ try
+ {
+ ByteBuffer temp = getReadBuffer();
+ byte[] b = new byte[temp.remaining()];
+ temp.get(b);
+ return new String(b);
+ }
+ catch(IOException e)
{
- buildReadBuffer();
+ return "No data";
}
- ByteBuffer temp = _readBuffer.duplicate();
- byte[] b = new byte[temp.remaining()];
- temp.get(b);
- return new String(b);
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java?rev=569547&r1=569546&r2=569547&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
Fri Aug 24 15:26:42 2007
@@ -368,7 +368,9 @@
// if this consumer is stopped then this will be call when
starting
requestOneMessage();
//When sync() returns we know whether we have received a
message or not.
+ System.out.println("Internal receive -- Called sync()");
getSession().getQpidSession().sync();
+ System.out.println("Internal receive -- Returned from
sync()");
}
if (_messageReceived.get() && timeout < 0)
{
@@ -492,26 +494,32 @@
* @param message The message delivered to this consumer.
*/
protected synchronized void onMessage(QpidMessage message)
- {
+ {
try
{
// if there is a message selector then we need to evaluate it.
boolean messageOk = true;
if (_messageSelector != null)
{
- messageOk = _filter.matches((Message) message);
+ messageOk = _filter.matches((Message) message);
}
+
+ System.out.println("Received a message- onMessage in message
consumer Impl");
if (!messageOk && _preAcquire)
{
// this is the case for topics
// We need to ack this message
+ System.out.println("onMessage - trying to ack message");
acknowledgeMessage(message);
+ System.out.println("onMessage - acked message");
}
// now we need to acquire this message if needed
// this is the case of queue with a message selector set
if (!_preAcquire && messageOk)
{
+ System.out.println("onMessage - trying to acquire message");
messageOk = acquireMessage(message);
+ System.out.println("onMessage - acquired message");
}
// if this consumer is synchronous then set the current message and
@@ -520,15 +528,17 @@
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Received a message- onMessage in message
consumer Impl");
+ _logger.debug("Received a message- onMessage in message
consumer Impl");
}
synchronized (_incomingMessageLock)
{
+ System.out.println("got incomming message lock");
if (messageOk)
{
// we have received a proper message that we can
deliver
if (_isReceiving)
{
+ System.out.println("Is receiving true, setting
message and notifying");
_incomingMessage = message;
_incomingMessageLock.notify();
}
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Channel.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Channel.java?rev=569547&r1=569546&r2=569547&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Channel.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Channel.java
Fri Aug 24 15:26:42 2007
@@ -140,7 +140,10 @@
method = m;
}
- System.out.println("sent " + m);
+ if (m.getEncodedTrack() != Frame.L4)
+ {
+ System.out.println("sent control " + m.getClass().getName());
+ }
}
public void headers(Struct ... headers)
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java?rev=569547&r1=569546&r2=569547&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java
Fri Aug 24 15:26:42 2007
@@ -42,7 +42,7 @@
Session ssn = event.context;
Method method = event.target;
method.setId(ssn.nextCommandId());
- System.out.println("delegating " + method + "[" + method.getId() + "]
to " + delegate);
+ System.out.println("\n Delegating " + method.getClass().getName() +
"[" + method.getId() + "] to " + delegate.getClass().getName() + "\n");
method.delegate(ssn, delegate);
if (!method.hasPayload())
{
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java?rev=569547&r1=569546&r2=569547&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java
Fri Aug 24 15:26:42 2007
@@ -50,7 +50,7 @@
public void handle(Event<C,Segment> event)
{
- System.out.println("got method segment:\n " + event.target);
+ //System.out.println("got method segment:\n " + event.target);
Iterator<ByteBuffer> fragments = event.target.getFragments();
Decoder dec = new FragmentDecoder(major, minor, fragments);
int type = (int) dec.readLong();
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java?rev=569547&r1=569546&r2=569547&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java
Fri Aug 24 15:26:42 2007
@@ -40,7 +40,7 @@
public void handle(Event<C,Method> event)
{
Method method = event.target;
- System.out.println("delegating " + method + " to " + delegate);
+ System.out.println("\nDelegating " + method.getClass().getName() + "
to " + delegate.getClass().getName() + "\n");
method.delegate(event.context, delegate);
}
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java?rev=569547&r1=569546&r2=569547&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java
Fri Aug 24 15:26:42 2007
@@ -104,12 +104,19 @@
}
void flushProcessed()
- {
+ {
+ for (Range r: processed)
+ {
+ System.out.println("Completed Range [" + r.getLower() + "," +
r.getUpper() +"]" );
+ }
+ System.out.println("Notifying peer with execution complete");
executionComplete(0, processed);
}
void syncPoint()
{
+ System.out.println("===========Request received to
sync==========================");
+
Range range = new Range(0, getCommandsIn() - 1);
boolean flush;
synchronized (processed)
@@ -147,9 +154,11 @@
for (long id = lower; id <= upper; id++)
{
commands.remove(id);
- }
+ }
+
if (commands.isEmpty())
{
+ System.out.println("\n All outstanding commands are completed
!!!! \n");
commands.notifyAll();
}
}
@@ -167,7 +176,8 @@
{
synchronized (commands)
{
- commands.put(commandsOut++, m);
+ System.out.println("sent command " + m.getClass().getName() +
" command Id" + commandsOut);
+ commands.put(commandsOut++, m);
}
}
channel.method(m);
@@ -200,6 +210,7 @@
public void sync()
{
+ System.out.println("calling sync()");
synchronized (commands)
{
if (!commands.isEmpty())
@@ -210,7 +221,9 @@
while (!commands.isEmpty())
{
try {
+ System.out.println("\n============sync() waiting for
commmands to be completed ==============\n");
commands.wait();
+ System.out.println("\n============sync() got
notified=========================================\n");
}
catch (InterruptedException e)
{
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java?rev=569547&r1=569546&r2=569547&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
Fri Aug 24 15:26:42 2007
@@ -46,7 +46,8 @@
{
for (Range range : ranges)
{
- ssn.complete(range.getLower(), range.getUpper());
+ System.out.println("completed command range: " +
range.getLower() + " to " + range.getUpper());
+ ssn.complete(range.getLower(), range.getUpper());
}
}
ssn.complete(excmp.getCumulativeExecutionMark());
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java?rev=569547&r1=569546&r2=569547&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
Fri Aug 24 15:26:42 2007
@@ -28,6 +28,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
/**
@@ -44,7 +46,7 @@
private DeliveryProperties props = null;
private Struct[] headers = null;
private List<Frame> frames = null;
- private Map<String,String> consumers = new HashMap<String,String>();
+ private Map<String,Consumer> consumers = new
ConcurrentHashMap<String,Consumer>();
public ToyBroker(ToyExchange exchange)
{
@@ -71,14 +73,30 @@
@Override public void messageSubscribe(Session ssn, MessageSubscribe ms)
{
- consumers.put(ms.getDestination(),ms.getQueue());
- System.out.println("\n==================> message subscribe : " +
ms.getDestination() + "\n");
- }
+ Consumer c = new Consumer();
+ c._queueName = ms.getQueue();
+ consumers.put(ms.getDestination(),c);
+ System.out.println("\n==================> message subscribe : " +
ms.getDestination() + " queue: " + ms.getQueue() + "\n");
+ }
+
+ @Override public void messageFlow(Session ssn,MessageFlow struct)
+ {
+ Consumer c = consumers.get(struct.getDestination());
+ c._credit = struct.getValue();
+ System.out.println("\n==================> message flow : " +
struct.getDestination() + " credit: " + struct.getValue() + "\n");
+ }
+
+ @Override public void messageFlush(Session ssn,MessageFlush struct)
+ {
+ System.out.println("\n==================> message flush for consumer :
" + struct.getDestination() + "\n");
+ checkAndSendMessagesToConsumer(ssn,struct.getDestination());
+ }
@Override public void messageTransfer(Session ssn, MessageTransfer xfr)
{
this.xfr = xfr;
- frames = new ArrayList();
+ frames = new ArrayList<Frame>();
+ System.out.println("received transfer " + xfr.getDestination());
}
public void headers(Session ssn, Struct ... headers)
@@ -95,6 +113,7 @@
if (hdr instanceof DeliveryProperties)
{
props = (DeliveryProperties) hdr;
+ System.out.println("received headers routing_key " +
props.getRoutingKey());
}
}
@@ -147,7 +166,7 @@
}
}
- private void transferMessage(Session ssn,String dest, Message m)
+ private void transferMessageToPeer(Session ssn,String dest, Message m)
{
System.out.println("\n==================> Transfering message to: "
+dest + "\n");
ssn.messageTransfer(dest, (short)0, (short)0);
@@ -162,15 +181,24 @@
ssn.endData();
}
- public void dispatchMessages(Session ssn)
+ private void dispatchMessages(Session ssn)
{
for (String dest: consumers.keySet())
{
- Message m = exchange.getQueue(consumers.get(dest)).poll();
- if(m != null)
- {
- transferMessage(ssn,dest,m);
- }
+ checkAndSendMessagesToConsumer(ssn,dest);
+ }
+ }
+
+ private void checkAndSendMessagesToConsumer(Session ssn,String dest)
+ {
+ Consumer c = consumers.get(dest);
+ LinkedBlockingQueue<Message> queue = exchange.getQueue(c._queueName);
+ Message m = queue.poll();
+ while (m != null && c._credit>0)
+ {
+ transferMessageToPeer(ssn,dest,m);
+ c._credit--;
+ m = queue.poll();
}
}
@@ -212,6 +240,15 @@
return sb.toString();
}
+ }
+
+ // ugly, but who cares :)
+ // assumes unit is always no of messages, not bytes
+ // assumes it's credit mode and not window
+ private class Consumer
+ {
+ long _credit;
+ String _queueName;
}
public static final void main(String[] args) throws IOException
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyExchange.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyExchange.java?rev=569547&r1=569546&r2=569547&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyExchange.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyExchange.java
Fri Aug 24 15:26:42 2007
@@ -5,7 +5,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -16,35 +16,35 @@
final static String DIRECT = "amq.direct";
final static String TOPIC = "amq.topic";
- private Map<String,List<Queue<Message>>> directEx = new
HashMap<String,List<Queue<Message>>>();
- private Map<String,List<Queue<Message>>> topicEx = new
HashMap<String,List<Queue<Message>>>();
- private Map<String,Queue<Message>> queues = new
HashMap<String,Queue<Message>>();
+ private Map<String,List<LinkedBlockingQueue<Message>>> directEx = new
HashMap<String,List<LinkedBlockingQueue<Message>>>();
+ private Map<String,List<LinkedBlockingQueue<Message>>> topicEx = new
HashMap<String,List<LinkedBlockingQueue<Message>>>();
+ private Map<String,LinkedBlockingQueue<Message>> queues = new
HashMap<String,LinkedBlockingQueue<Message>>();
public void createQueue(String name)
{
- queues.put(name, new LinkedList<Message>());
+ queues.put(name, new LinkedBlockingQueue<Message>());
}
- public Queue<Message> getQueue(String name)
+ public LinkedBlockingQueue<Message> getQueue(String name)
{
return queues.get(name);
}
public void bindQueue(String type,String binding,String queueName)
{
- Queue<Message> queue = queues.get(queueName);
+ LinkedBlockingQueue<Message> queue = queues.get(queueName);
binding = normalizeKey(binding);
if(DIRECT.equals(type))
{
if (directEx.containsKey(binding))
{
- List<Queue<Message>> list = directEx.get(binding);
+ List<LinkedBlockingQueue<Message>> list = directEx.get(binding);
list.add(queue);
}
else
{
- List<Queue<Message>> list = new LinkedList<Queue<Message>>();
+ List<LinkedBlockingQueue<Message>> list = new
LinkedList<LinkedBlockingQueue<Message>>();
list.add(queue);
directEx.put(binding,list);
}
@@ -53,12 +53,12 @@
{
if (topicEx.containsKey(binding))
{
- List<Queue<Message>> list = topicEx.get(binding);
+ List<LinkedBlockingQueue<Message>> list = topicEx.get(binding);
list.add(queue);
}
else
{
- List<Queue<Message>> list = new LinkedList<Queue<Message>>();
+ List<LinkedBlockingQueue<Message>> list = new
LinkedList<LinkedBlockingQueue<Message>>();
list.add(queue);
topicEx.put(binding,list);
}
@@ -67,7 +67,7 @@
public boolean route(String dest,String routingKey,Message msg)
{
- List<Queue<Message>> queues;
+ List<LinkedBlockingQueue<Message>> queues;
if(DIRECT.equals(dest))
{
queues = directEx.get(routingKey);
@@ -101,9 +101,9 @@
}
}
- private List<Queue<Message>> matchWildCard(String routingKey)
+ private List<LinkedBlockingQueue<Message>> matchWildCard(String routingKey)
{
- List<Queue<Message>> selected = new ArrayList<Queue<Message>>();
+ List<LinkedBlockingQueue<Message>> selected = new
ArrayList<LinkedBlockingQueue<Message>>();
for(String key: topicEx.keySet())
{
@@ -111,7 +111,7 @@
Matcher m = p.matcher(routingKey);
if (m.find())
{
- for(Queue<Message> queue : topicEx.get(key))
+ for(LinkedBlockingQueue<Message> queue : topicEx.get(key))
{
selected.add(queue);
}
@@ -121,9 +121,9 @@
return selected;
}
- private void storeMessage(Message msg,List<Queue<Message>> selected)
+ private void storeMessage(Message msg,List<LinkedBlockingQueue<Message>>
selected)
{
- for(Queue<Message> queue : selected)
+ for(LinkedBlockingQueue<Message> queue : selected)
{
queue.offer(msg);
}