Author: rajith
Date: Thu Aug  9 18:49:45 2007
New Revision: 564451

URL: http://svn.apache.org/viewvc?view=rev&rev=564451
Log:
Added a Toy Exchange that does same basic routing for direct and topic.
Should be good enough for Arnaud to test atleast the basic JMS functionality.
Added a FileMessage to demo Martins requirment. Haven't tested yet.
The Toy Broker can now accept subscriptions and transfer messages to clients


Added:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyExchange.java
Removed:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/
Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Connection.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Connection.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java

Added: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java?view=auto&rev=564451
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java
 Thu Aug  9 18:49:45 2007
@@ -0,0 +1,108 @@
+package org.apache.qpidity.client;
+
+import java.net.URL;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.qpidity.Channel;
+import org.apache.qpidity.Connection;
+import org.apache.qpidity.ConnectionClose;
+import org.apache.qpidity.ConnectionDelegate;
+import org.apache.qpidity.MinaHandler;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.SessionDelegate;
+
+
+public class Client implements org.apache.qpidity.client.Connection
+{
+    private AtomicInteger _channelNo = new AtomicInteger();
+    private Connection _conn;
+    private ExceptionListener _exceptionListner;
+    private final Lock _lock = new ReentrantLock();
+    
+    public static org.apache.qpidity.client.Connection createConnection()
+    {
+        return new Client();
+    }
+    
+    public void connect(String host, int port,String virtualHost,String 
username, String password) throws QpidException
+    {
+        Condition negotiationComplete = _lock.newCondition();
+        _lock.lock();
+        
+        ConnectionDelegate connectionDelegate = new ConnectionDelegate()
+        {            
+            public SessionDelegate getSessionDelegate()
+            {
+                return new ClientSessionDelegate();
+            }
+            
+            @Override public void connectionClose(Channel context, 
ConnectionClose struct) 
+            {
+                _exceptionListner.onException(new QpidException("Server closed 
the connection: Reason " + struct.getReplyText(),struct.getReplyCode(),null));
+            }
+        };
+        
+        connectionDelegate.setCondition(_lock,negotiationComplete);
+        connectionDelegate.setUsername(username);
+        connectionDelegate.setPassword(password);
+        connectionDelegate.setVirtualHost(virtualHost);
+        
+        _conn = MinaHandler.connect(host, port,connectionDelegate);
+                
+        _conn.getOutputHandler().handle(_conn.getHeader().toByteBuffer());     
   
+        
+        try
+        {
+            negotiationComplete.await();
+        }
+        catch (Exception e)
+        {
+            //
+        }
+        finally
+        {
+            _lock.unlock();
+        }
+    }
+    
+    /*
+     * Until the dust settles with the URL disucssion
+     * I am not going to implement this.
+     */
+    public void connect(URL url) throws QpidException
+    {
+        throw new UnsupportedOperationException(); 
+    }
+    
+    public void close() throws QpidException
+    {   
+        Channel ch = _conn.getChannel(0);
+        ch.connectionClose(0, "client is closing", 0, 0);
+        //need to close the connection underneath as well
+    }
+
+    public Session createSession(long expiryInSeconds)
+    {
+        Channel ch = _conn.getChannel(_channelNo.incrementAndGet());   
+        ClientSession ssn = new ClientSession();
+        ssn.attach(ch);
+        ssn.sessionOpen(expiryInSeconds);
+        
+        return ssn;
+    }
+
+    public DtxSession createDTXSession(int expiryInSeconds)
+    {
+        // TODO Auto-generated method stub
+        return null;
+    }
+    
+    public void setExceptionListener(ExceptionListener exceptionListner)
+    {
+        _exceptionListner = exceptionListner;        
+    }
+
+}

Added: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java?view=auto&rev=564451
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java
 Thu Aug  9 18:49:45 2007
@@ -0,0 +1,99 @@
+package org.apache.qpidity.client;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpidity.Option;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.Range;
+import org.apache.qpidity.RangeSet;
+import org.apache.qpidity.api.Message;
+
+/**
+ * Implements a Qpid Sesion. 
+ */
+public class ClientSession extends org.apache.qpidity.Session implements 
org.apache.qpidity.client.Session
+{
+    private Map<String,MessagePartListener> _messageListeners = new 
HashMap<String,MessagePartListener>();
+    private ExceptionListener _exceptionListner;
+    private RangeSet _acquiredMessages;
+    private RangeSet _rejectedMessages;
+    private Map<String,List<RangeSet>> _unackedMessages = new 
HashMap<String,List<RangeSet>>();
+    
+    @Override public void sessionClose()
+    {
+        // release all unacked messages and then issues a close
+        super.sessionClose();
+    }
+    
+    public void messageAcknowledge(RangeSet ranges)
+    {
+        for (Range range : ranges)
+        {
+            for (long l = range.getLower(); l <= range.getUpper(); l++)
+            {
+                System.out.println("Acknowleding message for : " + 
super.getCommand((int) l));
+                super.processed(l);
+            }
+        }
+    }
+
+    public void messageSubscribe(String queue, String destination, short 
confirmMode, short acquireMode, MessagePartListener listener, Map<String, ?> 
filter, Option... options)
+    {
+        setMessageListener(destination,listener);
+        super.messageSubscribe(queue, destination, confirmMode, acquireMode, 
filter, options);
+    }
+
+    public void messageTransfer(String exchange, Message msg, short 
confirmMode, short acquireMode)
+    {
+        // need to break it down into small pieces
+        super.messageTransfer(exchange, confirmMode, acquireMode);
+        super.headers(msg.getDeliveryProperties(),msg.getMessageProperties());
+        // super.data(bytes); *
+        // super.endData()
+    }
+    
+    
+    public RangeSet getAccquiredMessages()
+    {
+        return _acquiredMessages;
+    }
+
+    public RangeSet getRejectedMessages()
+    {
+        return _rejectedMessages;
+    }
+    
+    public void setMessageListener(String destination, MessagePartListener 
listener)
+    {
+        _messageListeners.put(destination, listener);       
+    }
+    
+    public void setExceptionListener(ExceptionListener exceptionListner)
+    {
+        _exceptionListner = exceptionListner;        
+    }   
+    
+    // ugly but nessacery
+    
+    void setAccquiredMessages(RangeSet acquiredMessages)
+    {
+        _acquiredMessages = acquiredMessages;
+    }
+    
+    void setRejectedMessages(RangeSet rejectedMessages)
+    {
+        _rejectedMessages = rejectedMessages;
+    }
+    
+    void notifyException(QpidException ex)
+    {
+        _exceptionListner.onException(ex);
+    }
+    
+    Map<String,MessagePartListener> getMessageListerners()
+    {
+        return _messageListeners;
+    }
+}

Added: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java?view=auto&rev=564451
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java
 Thu Aug  9 18:49:45 2007
@@ -0,0 +1,78 @@
+package org.apache.qpidity.client;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.Frame;
+import org.apache.qpidity.MessageAcquired;
+import org.apache.qpidity.MessageReject;
+import org.apache.qpidity.MessageTransfer;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.Range;
+import org.apache.qpidity.RangeSet;
+import org.apache.qpidity.Session;
+import org.apache.qpidity.SessionDelegate;
+import org.apache.qpidity.Struct;
+
+
+public class ClientSessionDelegate extends SessionDelegate
+{    
+    private MessageTransfer _currentTransfer;
+    private MessagePartListener _currentMessageListener;
+    
+    @Override public void data(Session ssn, Frame frame)
+    {
+        for (ByteBuffer b : frame)
+        {    
+            _currentMessageListener.addData(b);
+        }
+        if (frame.isLastSegment() && frame.isLastFrame())
+        {
+            _currentMessageListener.messageReceived();
+        }
+        
+    }
+
+    @Override public void headers(Session ssn, Struct... headers)
+    {
+        _currentMessageListener.messageHeaders(headers);
+    }
+
+
+    @Override public void messageTransfer(Session session, MessageTransfer 
currentTransfer)
+    {
+        _currentTransfer = currentTransfer;
+        _currentMessageListener = 
((ClientSession)session).getMessageListerners().get(currentTransfer.getDestination());
+        
+        //a better way is to tell the broker to stop the transfer
+        if (_currentMessageListener == null && 
_currentTransfer.getAcquireMode() == 1)
+        {
+            RangeSet transfers = new RangeSet();
+            transfers.add(_currentTransfer.getId());            
+            session.messageRelease(transfers);
+        }
+    }
+    
+    //  --------------------------------------------
+    //   Message methods
+    // --------------------------------------------
+    
+    
+    @Override public void messageReject(Session session, MessageReject struct) 
+    {
+        for (Range range : struct.getTransfers())
+        {
+            for (long l = range.getLower(); l <= range.getUpper(); l++)
+            {
+                System.out.println("message rejected: " +
+                        session.getCommand((int) l));
+            }
+        }
+        ((ClientSession)session).setRejectedMessages(struct.getTransfers());
+        ((ClientSession)session).notifyException(new QpidException("Message 
Rejected",0,null));
+    }
+    
+    @Override public void messageAcquired(Session session, MessageAcquired 
struct) 
+    {
+        ((ClientSession)session).setAccquiredMessages(struct.getTransfers());
+    }
+}

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Connection.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Connection.java?view=diff&rev=564451&r1=564450&r2=564451
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Connection.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Connection.java
 Thu Aug  9 18:49:45 2007
@@ -20,6 +20,7 @@
 
 
 import java.net.URL;
+import java.util.UUID;
 
 import org.apache.qpidity.QpidException;
 
@@ -46,7 +47,7 @@
      * @throws QpidException If the communication layer fails to connect with 
the broker.
      */
     public void connect(URL url) throws QpidException;
-
+    
     /**
      * Close this connection.
      *
@@ -83,5 +84,6 @@
      *
      * @param exceptionListner The execptionListener
      */
+    
     public void setExceptionListener(ExceptionListener exceptionListner);
 }

Added: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java?view=auto&rev=564451
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java
 Thu Aug  9 18:49:45 2007
@@ -0,0 +1,84 @@
+package org.apache.qpidity.client;
+
+import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.client.util.MessagePartListenerAdapter;
+
+public class DemoClient
+{
+    public static MessagePartListenerAdapter createAdapter()
+    {
+        return new MessagePartListenerAdapter(new MessageListener()
+        {
+            public void onMessage(Message m)
+            {
+                System.out.println("\n================== Received Msg 
==================");
+                System.out.println("Message Id : " + 
m.getMessageProperties().getMessageId());
+                System.out.println(m.toString());
+                System.out.println("================== End Msg 
==================\n");
+            }
+            
+        });
+    }
+
+    public static final void main(String[] args)
+    {
+        Connection conn = Client.createConnection();
+        try{
+            conn.connect("0.0.0.0", 5672, "test", "guest", "guest");
+        }catch(Exception e){
+            e.printStackTrace();
+        }
+        
+        Session ssn = conn.createSession(50000);
+        ssn.setExceptionListener(new ExceptionListener()
+                {
+                     public void onException(QpidException e)
+                     {
+                         System.out.println(e);
+                     }
+                });
+        ssn.queueDeclare("queue1", null, null);
+        ssn.queueBind("queue1", "amq.direct", "queue1",null);
+        ssn.sync();
+        
+        ssn.messageSubscribe("queue1", "myDest", (short)0, 
(short)0,createAdapter(), null);
+
+        // queue
+        ssn.messageTransfer("amq.direct", (short) 0, (short) 1);
+        ssn.headers(new DeliveryProperties().setRoutingKey("queue1"),new 
MessageProperties().setMessageId("123"));
+        ssn.data("this is the data");
+        ssn.endData();
+
+        //reject
+        ssn.messageTransfer("amq.direct", (short) 0, (short) 1);
+        ssn.data("this should be rejected");
+        ssn.headers(new DeliveryProperties().setRoutingKey("stocks"));
+        ssn.endData();
+        ssn.sync();
+        
+        // topic subs
+        ssn.messageSubscribe("topic1", "myDest2", (short)0, 
(short)0,createAdapter(), null);
+        ssn.messageSubscribe("topic2", "myDest3", (short)0, 
(short)0,createAdapter(), null);
+        ssn.messageSubscribe("topic3", "myDest4", (short)0, 
(short)0,createAdapter(), null);
+        ssn.sync();
+        
+        ssn.queueDeclare("topic1", null, null);
+        ssn.queueBind("topic1", "amq.topic", "stock.*",null);        
+        ssn.queueDeclare("topic2", null, null);
+        ssn.queueBind("topic2", "amq.topic", "stock.us.*",null);
+        ssn.queueDeclare("topic3", null, null);
+        ssn.queueBind("topic3", "amq.topic", "stock.us.rh",null);
+        ssn.sync();
+        
+        // topic
+        ssn.messageTransfer("amq.topic", (short) 0, (short) 1);
+        ssn.data("Topic message");
+        ssn.headers(new DeliveryProperties().setRoutingKey("stock.us.ibm"),new 
MessageProperties().setMessageId("456"));
+        ssn.endData();
+        ssn.sync();
+    }
+    
+}

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java?view=diff&rev=564451&r1=564450&r2=564451
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
 Thu Aug  9 18:49:45 2007
@@ -20,7 +20,6 @@
 
 import java.nio.ByteBuffer;
 import java.util.Map;
-import java.util.UUID;
 
 import org.apache.qpidity.Option;
 import org.apache.qpidity.RangeSet;
@@ -66,13 +65,6 @@
      * <p> When a session is suspend any operation of this session and of the 
associated resources are unavailable.
      */
     public void sessionSuspend();
-
-    /**
-     * This will resume an existing session
-     * <p> Upon resume the session is attached with an underlying channel
-     * hence making operation on this session available.
-     */
-    public void sessionResume(UUID sessionId);
 
     //------------------------------------------------------ 
     //                 Messaging methods

Added: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java?view=auto&rev=564451
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
 Thu Aug  9 18:49:45 2007
@@ -0,0 +1,99 @@
+package org.apache.qpidity.client.util;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.api.Message;
+
+/**
+ * FileMessage provides pull style semantics for
+ * larges messages backed by a disk. 
+ * Instead of loading all data into memeory it uses
+ * FileChannel to map regions of the file into memeory
+ * at a time.
+ * 
+ * The write methods are not supported. 
+ * 
+ * From the standpoint of performance it is generally 
+ * only worth mapping relatively large files into memory.
+ *    
+ * FileMessage msg = new FileMessage(in,delProps,msgProps);
+ * session.messageTransfer(dest,msg,0,0);
+ * 
+ * The messageTransfer method will read the file in chunks
+ * and stream it.
+ *
+ */
+public class FileMessage implements Message
+{
+    private MessageProperties _messageProperties;
+    private DeliveryProperties _deliveryProperties;
+    private FileChannel _fileChannel;
+    private int _chunkSize;
+    private long _fileSize;
+    private long _pos = 0;
+    
+    public FileMessage(FileInputStream in,int chunkSize,DeliveryProperties 
deliveryProperties,MessageProperties messageProperties)throws IOException
+    {
+        _messageProperties = messageProperties;
+        _deliveryProperties = deliveryProperties;
+        
+        _fileChannel = in.getChannel();
+        _chunkSize = chunkSize;
+        _fileSize = _fileChannel.size();
+        
+        if (_fileSize <= _chunkSize)
+        {
+            _chunkSize = (int)_fileSize;
+        }
+    }
+    
+    public void appendData(byte[] src)
+    {
+        throw new UnsupportedOperationException("This Message is read only 
after the initial source");
+    }
+
+    public void appendData(ByteBuffer src)
+    {
+        throw new UnsupportedOperationException("This Message is read only 
after the initial source");
+    }
+
+    public DeliveryProperties getDeliveryProperties()
+    {
+        return _deliveryProperties;
+    }
+
+    public MessageProperties getMessageProperties()
+    {
+        return _messageProperties;
+    }
+
+    public void readData(byte[] target) throws IOException
+    {        
+        int readLen = target.length <= _chunkSize ? target.length : _chunkSize;
+        if (_pos + readLen > _fileSize)
+        {
+            readLen = (int)(_fileSize - _pos);
+        }
+        MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY, 
_pos, readLen);
+        _pos += readLen;
+        bb.get(target);
+    }
+    
+    public ByteBuffer readData() throws IOException
+    {
+        if (_pos + _chunkSize > _fileSize)
+        {
+            _chunkSize = (int)(_fileSize - _pos);
+        }
+        MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY, 
_pos, _chunkSize);
+        _pos += _chunkSize;
+        return bb;
+    }
+
+}

Added: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java?view=auto&rev=564451
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
 Thu Aug  9 18:49:45 2007
@@ -0,0 +1,149 @@
+package org.apache.qpidity.client.util;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.Struct;
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.client.MessageListener;
+import org.apache.qpidity.client.MessagePartListener;
+
+/** 
+ * This is a simple message assembler.
+ * Will call onMessage method of the adaptee 
+ * when all message data is read.
+ * 
+ * This is a good convinience utility for handling
+ * small messages
+ */
+public class MessagePartListenerAdapter implements MessagePartListener
+{
+       MessageListener _adaptee;
+       Message _currentMsg;
+    DeliveryProperties _currentDeliveryProps;
+    MessageProperties _currentMessageProps;
+    
+       public MessagePartListenerAdapter(MessageListener listener)
+       {
+               _adaptee = listener;
+        
+        // temp solution.
+        _currentMsg = new Message()
+        {
+            Queue<ByteBuffer> _data = new LinkedList<ByteBuffer>();
+            ByteBuffer _readBuffer;
+            private int dataSize; 
+            
+            public void appendData(byte[] src) throws IOException
+            {
+                appendData(ByteBuffer.wrap(src));
+            }
+
+            public void appendData(ByteBuffer src) throws IOException
+            {
+                _data.offer(src);
+                dataSize += src.remaining();
+            }
+            
+            public DeliveryProperties getDeliveryProperties()
+            {
+                return _currentDeliveryProps;
+            }
+
+            public MessageProperties getMessageProperties()
+            {
+                return _currentMessageProps;
+            }
+
+            // since we provide the message only after completion
+            // we can assume that when this method is called we have
+            // received all data.
+            public void readData(byte[] target) throws IOException
+            {
+                if (_data.size() >0 && _readBuffer == null)
+                {
+                    buildReadBuffer();
+                }
+                
+                _readBuffer.get(target);
+            }
+            
+            public ByteBuffer readData() throws IOException
+            {
+                if (_data.size() >0 && _readBuffer == null)
+                {
+                    buildReadBuffer();
+                }
+                
+                return _readBuffer;
+            }
+            
+            private void buildReadBuffer()
+            {
+                //optimize for the simple cases
+                if(_data.size() == 1)
+                {
+                    _readBuffer = _data.element().duplicate();
+                }
+                else
+                {
+                    _readBuffer = ByteBuffer.allocate(dataSize);
+                    for(ByteBuffer buf:_data)
+                    {
+                        _readBuffer.put(buf);
+                    }
+                }
+            }
+            
+            //hack for testing
+            @Override public String toString()
+            {
+                if (_data.size() >0 && _readBuffer == null)
+                {
+                    buildReadBuffer();
+                }
+                byte[] b = new byte[_readBuffer.limit()];
+                _readBuffer.get(b);
+                return new String(b);
+            }
+        };
+    }
+    
+    public void addData(ByteBuffer src)
+    {
+        try
+        {
+            _currentMsg.appendData(src);
+        }
+        catch(IOException e)
+        {
+            // A chance for IO exception
+            // doesn't occur as we are using 
+            // a ByteBuffer
+        }
+    }
+
+       public void messageHeaders(Struct... headers)
+       {
+               for(Struct struct: headers)
+        {
+                   if(struct instanceof DeliveryProperties)
+            {
+                _currentDeliveryProps = (DeliveryProperties)struct;      
+            }
+            else if (struct instanceof MessageProperties)
+            {
+                _currentMessageProps = (MessageProperties)struct;      
+            }
+        }
+       }
+    
+       public void messageReceived()
+       {        
+        _adaptee.onMessage(_currentMsg);
+       }
+}

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java?view=diff&rev=564451&r1=564450&r2=564451
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
 Thu Aug  9 18:49:45 2007
@@ -18,13 +18,13 @@
 package org.apache.qpidity.jms;
 
 import org.apache.qpidity.jms.message.QpidMessage;
-import org.apache.qpidity.impl.MessagePartListenerAdapter;
 import org.apache.qpidity.RangeSet;
 import org.apache.qpidity.QpidException;
 import org.apache.qpidity.Option;
 import org.apache.qpidity.filter.MessageFilter;
 import org.apache.qpidity.filter.JMSSelectorFilter;
 import org.apache.qpidity.client.MessagePartListener;
+import org.apache.qpidity.client.util.MessagePartListenerAdapter;
 import org.apache.qpidity.exchange.ExchangeDefaults;
 
 import javax.jms.*;

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java?view=diff&rev=564451&r1=564450&r2=564451
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
 Thu Aug  9 18:49:45 2007
@@ -26,9 +26,9 @@
 import javax.jms.QueueBrowser;
 
 import org.apache.qpidity.client.MessagePartListener;
+import org.apache.qpidity.client.util.MessagePartListenerAdapter;
 import org.apache.qpidity.filter.JMSSelectorFilter;
 import org.apache.qpidity.filter.MessageFilter;
-import org.apache.qpidity.impl.MessagePartListenerAdapter;
 
 /**
  * Implementation of the JMS QueueBrowser interface

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Connection.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Connection.java?view=diff&rev=564451&r1=564450&r2=564451
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Connection.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Connection.java
 Thu Aug  9 18:49:45 2007
@@ -111,6 +111,8 @@
         }
         
         // not sure if this is the right place
+        System.out.println("\n--------------------Broker Start Connection 
Negotiation -----------------------\n");
+        
         getChannel(0).connectionStart(header.getMajor(), header.getMinor(), 
null, "PLAIN", "utf8");
     }
 

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java?view=diff&rev=564451&r1=564450&r2=564451
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java
 Thu Aug  9 18:49:45 2007
@@ -46,8 +46,8 @@
  */
 public abstract class ConnectionDelegate extends Delegate<Channel>
 {
-    private String _username;
-    private String _password;
+    private String _username = "guest";
+    private String _password = "guest";;
     private String _mechanism;
     private String _virtualHost;
     private SaslClient saslClient;
@@ -70,6 +70,7 @@
     //-----------------------------------------------
     @Override public void connectionStart(Channel context, ConnectionStart 
struct) 
     {
+        System.out.println("\n--------------------Client Start Connection 
Negotiation -----------------------\n");
         System.out.println("The broker has sent connection-start");
         
         String mechanism = null;
@@ -132,15 +133,19 @@
         String knownHosts = struct.getKnownHosts();
         System.out.println("The broker has opened the connection for use");
         System.out.println("The broker supplied the following hosts for 
failover " + knownHosts);
-        _negotiationCompleteLock.lock();
-        try
-        {
-            _negotiationComplete.signalAll();
-        }
-        finally
+        if(_negotiationCompleteLock != null)
         {
-            _negotiationCompleteLock.unlock();
+            _negotiationCompleteLock.lock();
+            try
+            {
+                _negotiationComplete.signalAll();
+            }
+            finally
+            {
+                _negotiationCompleteLock.unlock();
+            }
         }
+        System.out.println("\n-------------------- Client End Connection 
Negotiation -----------------------\n");
     }
     
     public void connectionRedirect(Channel context, ConnectionRedirect struct) 
@@ -240,8 +245,9 @@
     @Override public void connectionOpen(Channel context, ConnectionOpen 
struct) 
     {
        String hosts = "amqp:1223243232325";
-       System.out.println("The client has sent connection-open-ok");
+       System.out.println("The client has sent connection-open");
        context.connectionOpenOk(hosts);
+       System.out.println("\n-------------------- Broker End Connection 
Negotiation -----------------------\n");
     }
     
     

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java?view=diff&rev=564451&r1=564450&r2=564451
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
 Thu Aug  9 18:49:45 2007
@@ -20,19 +20,16 @@
  */
 package org.apache.qpidity;
 
-import java.io.IOException;
+import static org.apache.qpidity.Functions.str;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
-
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 
-import static org.apache.qpidity.Functions.*;
-
 
 /**
  * ToyBroker
@@ -43,21 +40,28 @@
 class ToyBroker extends SessionDelegate
 {
 
-    private Map<String,Queue<Message>> queues;
+    private ToyExchange exchange;
     private MessageTransfer xfr = null;
     private DeliveryProperties props = null;
     private Struct[] headers = null;
     private List<Frame> frames = null;
-
-    public ToyBroker(Map<String,Queue<Message>> queues)
+    private Map<String,String> consumers = new HashMap<String,String>();
+    
+    public ToyBroker(ToyExchange exchange)
     {
-        this.queues = queues;
+        this.exchange = exchange;
     }
 
     @Override public void queueDeclare(Session ssn, QueueDeclare qd)
     {
-        queues.put(qd.getQueue(), new LinkedList());
-        System.out.println("declared queue: " + qd.getQueue());
+        exchange.createQueue(qd.getQueue());
+        System.out.println("\n==================> declared queue: " + 
qd.getQueue() + "\n");
+    }
+    
+    @Override public void queueBind(Session ssn, QueueBind qb)
+    {
+        exchange.bindQueue(qb.getExchange(), qb.getRoutingKey(),qb.getQueue());
+        System.out.println("\n==================> bound queue: " + 
qb.getQueue() + " with routing key " + qb.getRoutingKey() + "\n");
     }
 
     @Override public void queueQuery(Session ssn, QueueQuery qq)
@@ -65,6 +69,12 @@
         QueueQueryResult result = new QueueQueryResult().queue(qq.getQueue());
         ssn.executionResult(qq.getId(), result);
     }
+    
+    @Override public void messageSubscribe(Session ssn, MessageSubscribe ms)
+    {
+        consumers.put(ms.getDestination(),ms.getQueue());
+        System.out.println("\n==================> message subscribe : " + 
ms.getDestination() + "\n");
+    }    
 
     @Override public void messageTransfer(Session ssn, MessageTransfer xfr)
     {
@@ -88,16 +98,7 @@
                 props = (DeliveryProperties) hdr;
             }
         }
-
-        if (props != null && !props.getDiscardUnroutable())
-        {
-            String dest = xfr.getDestination();
-            if (!queues.containsKey(dest))
-            {
-                reject(ssn);
-            }
-        }
-
+        
         this.headers = headers;
     }
 
@@ -115,16 +116,17 @@
         if (frame.isLastSegment() && frame.isLastFrame())
         {
             String dest = xfr.getDestination();
-            Queue queue = queues.get(dest);
-            if (queue == null)
+            Message m = new Message(headers, frames);
+            
+            if (exchange.route(dest,props.getRoutingKey(),m))
             {
-                reject(ssn);
+                System.out.println("queued " + m);
+                dispatchMessages(ssn);
             }
             else
             {
-                Message m = new Message(headers, frames);
-                queue.offer(m);
-                System.out.println("queued " + m);
+                
+                reject(ssn);
             }
             ssn.processed(xfr);
             xfr = null;
@@ -145,8 +147,35 @@
             ssn.messageReject(ranges, 0, "no such destination");
         }
     }
+    
+    private void transferMessage(Session ssn,String dest, Message m)
+    {
+        System.out.println("\n==================> Transfering message to: " 
+dest + "\n");
+        ssn.messageTransfer(dest, (short)0, (short)0);
+        ssn.headers(m.headers);
+        for (Frame f : m.frames)
+        {
+            for (ByteBuffer b : f)
+            {
+                ssn.data(b);
+            }
+        }
+        ssn.endData();
+    }
+    
+    public void dispatchMessages(Session ssn)
+    {
+        for (String dest: consumers.keySet())
+        {
+            Message m = exchange.getQueue(consumers.get(dest)).poll();
+            if(m != null)
+            {
+                transferMessage(ssn,dest,m);
+            }
+        }
+    }
 
-    private class Message
+    class Message
     {
         private final Struct[] headers;
         private final List<Frame> frames;
@@ -188,14 +217,12 @@
 
     public static final void main(String[] args) throws IOException
     {
-        final Map<String,Queue<Message>> queues =
-            new HashMap<String,Queue<Message>>();
-        
+        final ToyExchange exchange = new ToyExchange();        
         ConnectionDelegate delegate = new ConnectionDelegate()
         {
             public SessionDelegate getSessionDelegate()
             {
-                return new ToyBroker(queues);
+                return new ToyBroker(exchange);
             }
         };
         

Added: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyExchange.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyExchange.java?view=auto&rev=564451
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyExchange.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyExchange.java
 Thu Aug  9 18:49:45 2007
@@ -0,0 +1,132 @@
+package org.apache.qpidity;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.qpidity.ToyBroker.Message;
+
+public class ToyExchange
+{
+  final static String DIRECT = "amq.direct";
+  final static String TOPIC = "amq.topic";
+  
+  private Map<String,List<Queue<Message>>> directEx = new 
HashMap<String,List<Queue<Message>>>();
+  private Map<String,List<Queue<Message>>> topicEx = new 
HashMap<String,List<Queue<Message>>>(); 
+  private Map<String,Queue<Message>> queues = new 
HashMap<String,Queue<Message>>(); 
+  
+  public void createQueue(String name)
+  {
+      queues.put(name, new LinkedList<Message>());
+  }
+  
+  public Queue<Message> getQueue(String name)
+  {
+      return queues.get(name);
+  }
+  
+  public void bindQueue(String type,String binding,String queueName)
+  {
+      Queue<Message> queue = queues.get(queueName);
+      binding = normalizeKey(binding);
+      if(DIRECT.equals(type))
+      {
+          
+          if (directEx.containsKey(binding))
+          {
+              List<Queue<Message>> list = directEx.get(binding);
+              list.add(queue);
+          }
+          else
+          {
+              List<Queue<Message>> list = new LinkedList<Queue<Message>>();
+              list.add(queue);
+              directEx.put(binding,list);
+          }
+      }
+      else
+      {
+          if (topicEx.containsKey(binding))
+          {
+              List<Queue<Message>> list = topicEx.get(binding);
+              list.add(queue);
+          }
+          else
+          {
+              List<Queue<Message>> list = new LinkedList<Queue<Message>>();
+              list.add(queue);
+              topicEx.put(binding,list);
+          }
+      }
+  }
+  
+  public boolean route(String dest,String routingKey,Message msg)
+  {
+      List<Queue<Message>> queues;
+      if(DIRECT.equals(dest))
+      {
+          queues = directEx.get(routingKey);          
+      }
+      else
+      {
+          queues = matchWildCard(routingKey);
+      }
+      if(queues != null && queues.size()>0)
+      {
+          System.out.println("Message stored in " + queues.size() + " queues");
+          storeMessage(msg,queues);
+          return true;
+      }
+      else
+      {
+          System.out.println("Message unroutable " + msg);
+          return false;
+      }
+  }
+  
+  private String normalizeKey(String routingKey)
+  {
+      if(routingKey.indexOf(".*")>1)
+      {
+          return routingKey.substring(0,routingKey.indexOf(".*"));
+      }
+      else
+      {
+          return routingKey;
+      }
+  }
+  
+  private List<Queue<Message>> matchWildCard(String routingKey)
+  {        
+      List<Queue<Message>> selected = new ArrayList<Queue<Message>>();
+      
+      for(String key: topicEx.keySet())
+      {
+          Pattern p = Pattern.compile(key);
+          Matcher m = p.matcher(routingKey);
+          if (m.find())
+          {
+              for(Queue<Message> queue : topicEx.get(key))
+              {
+                    selected.add(queue);
+              }
+          }
+      }
+      
+      return selected;      
+  }
+
+  private void storeMessage(Message msg,List<Queue<Message>> selected)
+  {
+      for(Queue<Message> queue : selected)
+      {
+          queue.offer(msg);
+      }
+  }
+  
+}

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java?view=diff&rev=564451&r1=564450&r2=564451
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java
 Thu Aug  9 18:49:45 2007
@@ -1,5 +1,6 @@
 package org.apache.qpidity.api;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.qpidity.MessageProperties;
@@ -43,16 +44,16 @@
         * </ul>
         * @param src
         */
-       public void appendData(byte[] src);
+       public void appendData(byte[] src) throws IOException;
 
-    public void appendData(ByteBuffer src);
+    public void appendData(ByteBuffer src) throws IOException;
     
        /**
         * This will abstract the underlying message data.
         * The Message implementation may not hold all message
         * data in memory (especially in the case of large messages)
         * 
-        * The read function might copy data from a 
+        * The read function might copy data from
         * <ul>
         * <li> From memory (Ex: ByteBuffer)
         * <li> From Disk
@@ -60,7 +61,8 @@
         * </ul>
         * @param target
         */
-    public void readData(byte[] target);   
+    public void readData(byte[] target) throws IOException;   
 
+    public ByteBuffer readData() throws IOException; 
 }
 


Reply via email to