Author: rajith
Date: Mon Aug 13 09:41:34 2007
New Revision: 565407

URL: http://svn.apache.org/viewvc?view=rev&rev=565407
Log:
Added support for message handling.
Sending Messages
-------------------
ByteBufferMessage for small messages - data will be in memory.
FileMessage and StreamingMessage for sending large messages.

Receiving Messages
-------------------
For small messages u can use MessageListener and receive ByteBufferMessage.
You need to use the MessageListener with the MessagePartListenerAdapter.

For large messages it is recomended to use MessagePartListener.


Added:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java
Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java?view=diff&rev=565407&r1=565406&r2=565407
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java
 Mon Aug 13 09:41:34 2007
@@ -1,5 +1,7 @@
 package org.apache.qpidity.client;
 
+import java.io.EOFException;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -45,15 +47,37 @@
         super.messageSubscribe(queue, destination, confirmMode, acquireMode, 
filter, options);
     }
 
-    public void messageTransfer(String exchange, Message msg, short 
confirmMode, short acquireMode)
+    public void messageTransfer(String destination, Message msg, short 
confirmMode, short acquireMode) throws IOException
     {
-        // need to break it down into small pieces
-        super.messageTransfer(exchange, confirmMode, acquireMode);
+        // The javadoc clearly says that this method is suitable for small 
messages
+        // therefore reading the content in one shot.
+        super.messageTransfer(destination, confirmMode, acquireMode);
         super.headers(msg.getDeliveryProperties(),msg.getMessageProperties());
-        // super.data(bytes); *
-        // super.endData()
+        super.data(msg.readData());
+        super.endData();        
     }
     
+    public void messageStream(String destination, Message msg, short 
confirmMode, short acquireMode) throws IOException
+    {
+        super.messageTransfer(destination, confirmMode, acquireMode);
+        super.headers(msg.getDeliveryProperties(),msg.getMessageProperties());
+        boolean b = true;
+        int count = 0;
+        while(b)
+        {   
+            try
+            {
+                System.out.println("count : " + count++);
+                super.data(msg.readData());
+            }
+            catch(EOFException e)
+            {
+                b = false;
+            }
+        }   
+        
+        super.endData();
+    }
     
     public RangeSet getAccquiredMessages()
     {

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java?view=diff&rev=565407&r1=565406&r2=565407
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
 Mon Aug 13 09:41:34 2007
@@ -18,6 +18,7 @@
  */
 package org.apache.qpidity.client;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Map;
 
@@ -72,7 +73,47 @@
     //------------------------------------------------------
     /**
      * Transfer the given message to a specified exchange.
+     * 
+     * <p>This is a convinience method for providing a complete message
+     * using a single method which internaly is mapped to messageTransfer(), 
headers() followed
+     * by data() and an endData(). 
+     * <b><i>This method should only be used by small messages</b></i></p>
      *
+     * @param destination The exchange the message is being sent.
+     * @param msg         The Message to be sent
+     * @param confirmMode <ul> </li>off (0): confirmation is not required, 
once a message has been transferred in pre-acquire
+     *                    mode (or once acquire has been sent in no-acquire 
mode) the message is considered
+     *                    transferred
+     *                    <p/>
+     *                    <li> on  (1): an acquired message (whether 
acquisition was implicit as in pre-acquire mode or
+     *                    explicit as in no-acquire mode) is not considered 
transferred until the original
+     *                    transfer is complete (signaled via 
execution.complete)
+     *                    </ul>
+     * @param acquireMode <ul> 
+     *                    <li> no-acquire  (0): the message must be explicitly 
acquired                    
+     *                    <li> pre-acquire (1): the message is acquired when 
the transfer starts
+     *                    </ul>                         
+     */
+    public void messageTransfer(String destination, Message msg, short 
confirmMode, short acquireMode)throws IOException;
+    
+    /**
+     * <p>This is a convinience method for streaming a message using pull 
semantics
+     * using a single method as opposed to doing a push using 
messageTransfer(), headers() followed
+     * by a series of data() and an endData().</p> 
+     * <p>Internally data will be pulled from Message object(which wrap's a 
data stream) using it's read()
+     * and pushed using messageTransfer(), headers() followed by a series of 
data() and an endData().
+     * <br><b><i>This method should only be used by large messages</b></i><br>
+     * There are two convinience Message classes provided to facilitate this.
+     * <ul>
+     * <li> <code>FileMessage</code>
+     * <li> <code>StreamingMessage</code>
+     * </ul>
+     * You could also implement a the <code>Message</code> interface to and 
wrap any
+     * data stream you want.
+     * </p>
+     * 
+     * @param destination The exchange the message is being sent.
+     * @param msg         The Message to be sent  
      * @param confirmMode <ul> </li>off (0): confirmation is not required, 
once a message has been transferred in pre-acquire
      *                    mode (or once acquire has been sent in no-acquire 
mode) the message is considered
      *                    transferred
@@ -85,10 +126,8 @@
      *                    <p/>
      *                    <li> pre-acquire (1): the message is acquired when 
the transfer starts
      *                    </ul>
-     * @param exchange    The exchange the message is being sent.
-     * @param msg         The Message to be sent
      */
-    public void messageTransfer(String destination, Message msg, short 
confirmMode, short acquireMode);
+    public void messageStream(String destination, Message msg, short 
confirmMode, short acquireMode)throws IOException;
 
     /**
      * Declare the beginning of a message transfer operation. This operation 
must

Added: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java?view=auto&rev=565407
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
 Mon Aug 13 09:41:34 2007
@@ -0,0 +1,111 @@
+package org.apache.qpidity.client.util;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.api.Message;
+
+/**
+ * <p>A Simple implementation of the message interface
+ * for small messages. When the readData methods are called
+ * we assume the message is complete. i.e there want be any
+ * appendData operations after that.</p>
+ * 
+ * <p>If you need large message support please see 
+ * <code>FileMessage</code> and <code>StreamingMessage</code>
+ * </p>
+ */
+public class ByteBufferMessage implements Message
+{
+    private Queue<ByteBuffer> _data = new LinkedList<ByteBuffer>();
+    private ByteBuffer _readBuffer;
+    private int dataSize; 
+    private DeliveryProperties _currentDeliveryProps;
+    private MessageProperties _currentMessageProps;
+    
+    
+    public void appendData(byte[] src) throws IOException
+    {
+        appendData(ByteBuffer.wrap(src));
+    }
+
+    public void appendData(ByteBuffer src) throws IOException
+    {
+        _data.offer(src);
+        dataSize += src.remaining();
+    }
+    
+    public DeliveryProperties getDeliveryProperties()
+    {
+        return _currentDeliveryProps;
+    }
+
+    public MessageProperties getMessageProperties()
+    {
+        return _currentMessageProps;
+    }
+    
+    public void setDeliveryProperties(DeliveryProperties props)
+    {
+        _currentDeliveryProps = props;
+    }
+
+    public void setMessageProperties(MessageProperties props)
+    {
+        _currentMessageProps = props;
+    }
+    
+    public void readData(byte[] target) throws IOException
+    {
+        if (_data.size() >0 && _readBuffer == null)
+        {
+            buildReadBuffer();
+        }
+        
+        _readBuffer.get(target);
+    }
+    
+    public ByteBuffer readData() throws IOException
+    {
+        if (_data.size() >0 && _readBuffer == null)
+        {
+            buildReadBuffer();
+        }
+        
+        return _readBuffer;
+    }
+    
+    private void buildReadBuffer()
+    {
+        //optimize for the simple cases
+        if(_data.size() == 1)
+        {
+            _readBuffer = _data.element().duplicate();
+        }
+        else
+        {
+            _readBuffer = ByteBuffer.allocate(dataSize);
+            for(ByteBuffer buf:_data)
+            {
+                _readBuffer.put(buf);
+            }
+        }
+    }
+    
+    //hack for testing
+    @Override public String toString()
+    {
+        if (_data.size() >0 && _readBuffer == null)
+        {
+            buildReadBuffer();
+        }
+        ByteBuffer temp = _readBuffer.duplicate();
+        byte[] b = new byte[temp.limit()];
+        temp.get(b);
+        return new String(b);
+    }
+}

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java?view=diff&rev=565407&r1=565406&r2=565407
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
 Mon Aug 13 09:41:34 2007
@@ -1,5 +1,6 @@
 package org.apache.qpidity.client.util;
 
+import java.io.EOFException;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -29,10 +30,8 @@
  * and stream it.
  *
  */
-public class FileMessage implements Message
+public class FileMessage extends ReadOnlyMessage implements Message
 {
-    private MessageProperties _messageProperties;
-    private DeliveryProperties _deliveryProperties;
     private FileChannel _fileChannel;
     private int _chunkSize;
     private long _fileSize;
@@ -52,46 +51,24 @@
             _chunkSize = (int)_fileSize;
         }
     }
-    
-    public void appendData(byte[] src)
-    {
-        throw new UnsupportedOperationException("This Message is read only 
after the initial source");
-    }
-
-    public void appendData(ByteBuffer src)
-    {
-        throw new UnsupportedOperationException("This Message is read only 
after the initial source");
-    }
-
-    public DeliveryProperties getDeliveryProperties()
-    {
-        return _deliveryProperties;
-    }
-
-    public MessageProperties getMessageProperties()
-    {
-        return _messageProperties;
-    }
 
     public void readData(byte[] target) throws IOException
     {        
-        int readLen = target.length <= _chunkSize ? target.length : _chunkSize;
-        if (_pos + readLen > _fileSize)
-        {
-            readLen = (int)(_fileSize - _pos);
-        }
-        MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY, 
_pos, readLen);
-        _pos += readLen;
-        bb.get(target);
+        throw new UnsupportedOperationException();              
     }
     
     public ByteBuffer readData() throws IOException
     {
+        if (_pos == _fileSize)
+        {
+            throw new EOFException();
+        }
+        
         if (_pos + _chunkSize > _fileSize)
         {
             _chunkSize = (int)(_fileSize - _pos);
         }
-        MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY, 
_pos, _chunkSize);
+        MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY, 
_pos, _chunkSize);        
         _pos += _chunkSize;
         return bb;
     }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java?view=diff&rev=565407&r1=565406&r2=565407
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
 Mon Aug 13 09:41:34 2007
@@ -2,13 +2,10 @@
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.Queue;
 
 import org.apache.qpidity.DeliveryProperties;
 import org.apache.qpidity.MessageProperties;
 import org.apache.qpidity.Struct;
-import org.apache.qpidity.api.Message;
 import org.apache.qpidity.client.MessageListener;
 import org.apache.qpidity.client.MessagePartListener;
 
@@ -23,94 +20,12 @@
 public class MessagePartListenerAdapter implements MessagePartListener
 {
        MessageListener _adaptee;
-       Message _currentMsg;
-    DeliveryProperties _currentDeliveryProps;
-    MessageProperties _currentMessageProps;
+    ByteBufferMessage _currentMsg;
     
        public MessagePartListenerAdapter(MessageListener listener)
        {
                _adaptee = listener;
-        
-        // temp solution.
-        _currentMsg = new Message()
-        {
-            Queue<ByteBuffer> _data = new LinkedList<ByteBuffer>();
-            ByteBuffer _readBuffer;
-            private int dataSize; 
-            
-            public void appendData(byte[] src) throws IOException
-            {
-                appendData(ByteBuffer.wrap(src));
-            }
-
-            public void appendData(ByteBuffer src) throws IOException
-            {
-                _data.offer(src);
-                dataSize += src.remaining();
-            }
-            
-            public DeliveryProperties getDeliveryProperties()
-            {
-                return _currentDeliveryProps;
-            }
-
-            public MessageProperties getMessageProperties()
-            {
-                return _currentMessageProps;
-            }
-
-            // since we provide the message only after completion
-            // we can assume that when this method is called we have
-            // received all data.
-            public void readData(byte[] target) throws IOException
-            {
-                if (_data.size() >0 && _readBuffer == null)
-                {
-                    buildReadBuffer();
-                }
-                
-                _readBuffer.get(target);
-            }
-            
-            public ByteBuffer readData() throws IOException
-            {
-                if (_data.size() >0 && _readBuffer == null)
-                {
-                    buildReadBuffer();
-                }
-                
-                return _readBuffer;
-            }
-            
-            private void buildReadBuffer()
-            {
-                //optimize for the simple cases
-                if(_data.size() == 1)
-                {
-                    _readBuffer = _data.element().duplicate();
-                }
-                else
-                {
-                    _readBuffer = ByteBuffer.allocate(dataSize);
-                    for(ByteBuffer buf:_data)
-                    {
-                        _readBuffer.put(buf);
-                    }
-                }
-            }
-            
-            //hack for testing
-            @Override public String toString()
-            {
-                if (_data.size() >0 && _readBuffer == null)
-                {
-                    buildReadBuffer();
-                }
-                byte[] b = new byte[_readBuffer.limit()];
-                _readBuffer.get(b);
-                return new String(b);
-            }
-        };
+        _currentMsg = new ByteBufferMessage();        
     }
     
     public void addData(ByteBuffer src)
@@ -133,11 +48,11 @@
         {
                    if(struct instanceof DeliveryProperties)
             {
-                _currentDeliveryProps = (DeliveryProperties)struct;      
+                _currentMsg.setDeliveryProperties((DeliveryProperties)struct); 
     
             }
             else if (struct instanceof MessageProperties)
             {
-                _currentMessageProps = (MessageProperties)struct;      
+                _currentMsg.setMessageProperties((MessageProperties)struct);   
   
             }
         }
        }

Added: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java?view=auto&rev=565407
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java
 Mon Aug 13 09:41:34 2007
@@ -0,0 +1,34 @@
+package org.apache.qpidity.client.util;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.api.Message;
+
+public abstract class ReadOnlyMessage implements Message
+{
+    MessageProperties _messageProperties;
+    DeliveryProperties _deliveryProperties;
+        
+    public void appendData(byte[] src)
+    {
+        throw new UnsupportedOperationException("This Message is read only 
after the initial source");
+    }
+
+    public void appendData(ByteBuffer src)
+    {
+        throw new UnsupportedOperationException("This Message is read only 
after the initial source");
+    }
+
+    public DeliveryProperties getDeliveryProperties()
+    {
+        return _deliveryProperties;
+    }
+
+    public MessageProperties getMessageProperties()
+    {
+        return _messageProperties;
+    } 
+
+}

Added: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java?view=auto&rev=565407
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java
 Mon Aug 13 09:41:34 2007
@@ -0,0 +1,48 @@
+package org.apache.qpidity.client.util;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.api.Message;
+
+public class StreamingMessage extends ReadOnlyMessage implements Message
+{
+    SocketChannel _socChannel;
+    private int _chunkSize;
+    private ByteBuffer _readBuf;
+    
+    public StreamingMessage(SocketChannel in,int chunkSize,DeliveryProperties 
deliveryProperties,MessageProperties messageProperties)throws IOException
+    {
+        _messageProperties = messageProperties;
+        _deliveryProperties = deliveryProperties;
+        
+        _socChannel = in;
+        _chunkSize = chunkSize;
+        _readBuf = ByteBuffer.allocate(_chunkSize);
+    }
+    
+    public void readData(byte[] target) throws IOException
+    {
+        throw new UnsupportedOperationException(); 
+    }
+
+    public ByteBuffer readData() throws IOException
+    {
+        if(_socChannel.isConnected() && _socChannel.isOpen())
+        {
+            _readBuf.clear();
+            _socChannel.read(_readBuf);
+        }
+        else
+        {
+            throw new EOFException("The underlying socket/channel has closed");
+        }
+        
+        return _readBuf.duplicate();
+    }
+
+}


Reply via email to