Author: rajith
Date: Thu Jan 18 14:54:31 2007
New Revision: 497616

URL: http://svn.apache.org/viewvc?view=rev&rev=497616
Log:
implemented the logic for MessageTransfer and MessageAppend

Modified:
    
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
    
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
    
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
    
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
    
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
    
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
    
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java
    
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
    
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java

Modified: 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java?view=diff&rev=497616&r1=497615&r2=497616
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageAppendMethodHandler.java
 Thu Jan 18 14:54:31 2007
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.client.handler;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.MessageAppendBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
@@ -30,6 +31,7 @@
 public class MessageAppendMethodHandler implements StateAwareMethodListener
 {
     private static MessageAppendMethodHandler _instance = new 
MessageAppendMethodHandler();
+    private static final Logger _logger = 
Logger.getLogger(AMQProtocolSession.class);
 
     public static MessageAppendMethodHandler getInstance()
     {
@@ -44,7 +46,11 @@
                                        AMQMethodEvent evt)
                                 throws AMQException
     {
-               // TODO
+       try {
+                       
protocolSession.messageAppendBodyReceived((MessageAppendBody)evt.getMethod());
+               } catch (Exception e) {
+                       _logger.error("Unable to add data from 
MessageAppendBody",e); 
+               }
     }
 }
 

Modified: 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java?view=diff&rev=497616&r1=497615&r2=497616
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageCloseMethodHandler.java
 Thu Jan 18 14:54:31 2007
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.client.handler;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.MessageCloseBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
@@ -30,7 +31,8 @@
 public class MessageCloseMethodHandler implements StateAwareMethodListener
 {
     private static MessageCloseMethodHandler _instance = new 
MessageCloseMethodHandler();
-
+    private static final Logger _logger = 
Logger.getLogger(AMQProtocolSession.class);
+    
     public static MessageCloseMethodHandler getInstance()
     {
         return _instance;
@@ -44,7 +46,10 @@
                                        AMQMethodEvent evt)
                                 throws AMQException
     {
-               // TODO
+               MessageCloseBody body = (MessageCloseBody)evt.getMethod();
+               String referenceId = new String(body.getReference());
+               protocolSession.deliverMessageToAMQSession(evt.getChannelId(), 
referenceId);
+               _logger.debug("Method Close Body received, notify session to 
accept unprocessed message");
     }
 }
 

Modified: 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java?view=diff&rev=497616&r1=497615&r2=497616
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
 Thu Jan 18 14:54:31 2007
@@ -22,13 +22,14 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.MessageTransferBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.client.message.MessageHeaders;
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.Content;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
 
 public class MessageTransferMethodHandler implements StateAwareMethodListener
 {
@@ -50,7 +51,7 @@
     {
        final UnprocessedMessage msg = new UnprocessedMessage();
        MessageTransferBody transferBody = (MessageTransferBody) 
evt.getMethod();
-        msg.content = transferBody.getBody();
+        
         msg.channelId = evt.getChannelId();
         msg.deliveryTag = evt.getRequestId();
         _logger.debug("New JmsDeliver method received");
@@ -74,7 +75,16 @@
         
         msg.contentHeader = messageHeaders;
         
-        protocolSession.unprocessedMessageReceived(msg);
+        if(transferBody.getBody().contentType == 
Content.ContentTypeEnum.CONTENT_TYPE_INLINE)
+        {
+               msg.addContent(transferBody.getBody().getContentAsByteArray());
+               protocolSession.deliverMessageToAMQSession(evt.getChannelId(), 
msg);
+        }
+        else
+        {
+               String referenceId = new 
String(transferBody.getBody().getContentAsByteArray());
+               
protocolSession.deliverMessageToAMQSession(evt.getChannelId(),referenceId);
+        }
         
     }
 }

Modified: 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?view=diff&rev=497616&r1=497615&r2=497616
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
 Thu Jan 18 14:54:31 2007
@@ -20,15 +20,15 @@
  */
 package org.apache.qpid.client.message;
 
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.Content;
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
-
-import javax.jms.JMSException;
 import java.util.Iterator;
 import java.util.List;
 
+import javax.jms.JMSException;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+
 public abstract class AbstractJMSMessageFactory implements MessageFactory
 {
     private static final Logger _logger = 
Logger.getLogger(AbstractJMSMessageFactory.class);
@@ -38,11 +38,14 @@
                        ByteBuffer data, MessageHeaders contentHeader) throws 
AMQException;
 
        protected AbstractJMSMessage createMessageWithBody(long messageNbr,
-                       MessageHeaders contentHeader, Content body) throws 
AMQException {
-        ByteBuffer data;
-
-        data = ByteBuffer.allocate(body.content.remaining());
-        data.put(body.content);
+                       MessageHeaders contentHeader, List contents) throws 
AMQException {
+               
+               ByteBuffer data = 
ByteBuffer.allocate((int)contentHeader.getSize());        
+        for (final Iterator it = contents.iterator();it.hasNext();)
+        {
+            byte[] bytes = (byte[]) it.next();
+            data.put(bytes);
+        }
         data.flip();
         
         _logger.debug("Creating message from buffer with position=" + 
data.position() + " and remaining=" + data.remaining());
@@ -52,9 +55,9 @@
 
     public AbstractJMSMessage createMessage(long messageNbr, boolean 
redelivered,
                                                                                
MessageHeaders contentHeader,
-                                                                               
Content body) throws JMSException, AMQException
+                                                                               
List contents) throws JMSException, AMQException
     {
-        final AbstractJMSMessage msg = createMessageWithBody(messageNbr, 
contentHeader, body);
+        final AbstractJMSMessage msg = createMessageWithBody(messageNbr, 
contentHeader, contents);
         msg.setJMSRedelivered(redelivered);
         return msg;
     }

Modified: 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java?view=diff&rev=497616&r1=497615&r2=497616
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
 Thu Jan 18 14:54:31 2007
@@ -20,8 +20,9 @@
  */
 package org.apache.qpid.client.message;
 
+import java.util.List;
+
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.Content;
 
 import javax.jms.JMSException;
 
@@ -30,7 +31,7 @@
 {
     AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
                                      MessageHeaders contentHeader,
-                                     Content body)
+                                     List contents)
         throws JMSException, AMQException;
 
     AbstractJMSMessage createMessage() throws JMSException;

Modified: 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?view=diff&rev=497616&r1=497615&r2=497616
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
 Thu Jan 18 14:54:31 2007
@@ -21,6 +21,7 @@
 package org.apache.qpid.client.message;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import javax.jms.JMSException;
@@ -59,7 +60,7 @@
      */
     public AbstractJMSMessage createMessage(long deliveryTag, boolean 
redelivered,
                                             MessageHeaders contentHeader,
-                                            Content body) throws AMQException, 
JMSException
+                                            List contents) throws 
AMQException, JMSException
     {
         MessageFactory mf = (MessageFactory) 
_mimeToFactoryMap.get(contentHeader.getContentType());
         if (mf == null)
@@ -68,7 +69,7 @@
         }
         else
         {
-            return mf.createMessage(deliveryTag, redelivered, contentHeader, 
body);
+            return mf.createMessage(deliveryTag, redelivered, contentHeader, 
contents);
         }
     }
 

Modified: 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java?view=diff&rev=497616&r1=497615&r2=497616
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java
 Thu Jan 18 14:54:31 2007
@@ -69,7 +69,17 @@
 
     private String _routingKey;
     
-    public MessageHeaders()
+    private int _size;
+    
+    public int getSize() {
+               return _size;
+       }
+
+       public void setSize(int size) {
+               this._size = size;
+       }
+
+       public MessageHeaders()
     {
     }
 

Modified: 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?view=diff&rev=497616&r1=497615&r2=497616
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
 Thu Jan 18 14:54:31 2007
@@ -20,10 +20,8 @@
  */
 package org.apache.qpid.client.message;
 
-import org.apache.qpid.framing.*;
-
-import java.util.List;
 import java.util.LinkedList;
+import java.util.List;
 
 /**
  * This class contains everything needed to process a JMS message. It 
assembles the
@@ -34,13 +32,20 @@
  * the MINA dispatcher thread.
  *
  */
-public class UnprocessedMessage
-{
-    private long _bytesReceived = 0;
-
-    public Content content;
-    public int channelId;
-    public long deliveryTag;
-    public MessageHeaders contentHeader;
-    
+public class UnprocessedMessage {
+       public int bytesReceived = 0;
+
+       public List contents = new LinkedList();
+
+       public int channelId;
+
+       public long deliveryTag;
+
+       public MessageHeaders contentHeader;
+
+       public void addContent(byte[] content) {
+               contents.add(content);
+               bytesReceived = bytesReceived + content.length;
+       }
+
 }

Modified: 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=497616&r1=497615&r2=497616
==============================================================================
--- 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
 (original)
+++ 
incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
 Thu Jan 18 14:54:31 2007
@@ -42,6 +42,7 @@
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.AMQRequestBody;
 import org.apache.qpid.framing.AMQResponseBody;
+import org.apache.qpid.framing.MessageAppendBody;
 import org.apache.qpid.framing.ProtocolInitiation;
 import org.apache.qpid.framing.ProtocolVersionList;
 import org.apache.qpid.framing.RequestManager;
@@ -94,7 +95,7 @@
      * Maps from a channel id to an unprocessed message. This is used to tie 
together the
      * JmsDeliverBody (which arrives first) with the subsequent content header 
and content bodies.
      */
-    protected ConcurrentMap _channelId2UnprocessedMsgMap = new 
ConcurrentHashMap();
+    protected ConcurrentMap _referenceId2UnprocessedMsgMap = new 
ConcurrentHashMap();
 
     protected ConcurrentMap _channelId2RequestMgrMap = new ConcurrentHashMap();
     protected ConcurrentMap _channelId2ResponseMgrMap = new 
ConcurrentHashMap();
@@ -244,15 +245,21 @@
     }
 
     /**
-     * Callback invoked from the BasicDeliverMethodHandler when a message has 
been received.
+     * This is involed from MessageTransferMethodHandler if type is 
CONTENT_TYPE_REFERENCE
      * This is invoked on the MINA dispatcher thread.
      * @param message
      * @throws AMQException if this was not expected
      */
-    public void unprocessedMessageReceived(UnprocessedMessage message) throws 
AMQException
+    public void unprocessedMessageReceived(String referenceId, 
UnprocessedMessage message) throws AMQException
     {
-        //_channelId2UnprocessedMsgMap.put(message.channelId, message);
-       deliverMessageToAMQSession(message.channelId, message);
+        _referenceId2UnprocessedMsgMap.put(referenceId, message);
+    }
+    
+    public void messageAppendBodyReceived(MessageAppendBody appendBody) throws 
Exception
+    {
+       String referenceId = new String(appendBody.getReference());
+       UnprocessedMessage msg = 
(UnprocessedMessage)_referenceId2UnprocessedMsgMap.get(referenceId);
+       msg.addContent(appendBody.bytes);
     }
     
     public void messageRequestBodyReceived(int channelId, AMQRequestBody 
requestBody) throws Exception
@@ -279,63 +286,30 @@
         requestManager.responseReceived(responseBody);
     }
 
-//     public void messageContentHeaderReceived(int channelId, 
ContentHeaderBody contentHeader)
-//             throws AMQException
-//     {
-//         UnprocessedMessage msg = (UnprocessedMessage) 
_channelId2UnprocessedMsgMap.get(channelId);
-//         if (msg == null)
-//         {
-//             throw new AMQException("Error: received content header without 
having received a BasicDeliver frame first");
-//         }
-//         if (msg.contentHeader != null)
-//         {
-//             throw new AMQException("Error: received duplicate content 
header or did not receive correct number of content body frames");
-//         }
-//         msg.contentHeader = contentHeader;
-//         if (contentHeader.bodySize == 0)
-//         {
-//             deliverMessageToAMQSession(channelId, msg);
-//         }
-//     }
-// 
-//     public void messageContentBodyReceived(int channelId, ContentBody 
contentBody) throws AMQException
-//     {
-//         UnprocessedMessage msg = (UnprocessedMessage) 
_channelId2UnprocessedMsgMap.get(channelId);
-//         if (msg == null)
-//         {
-//             throw new AMQException("Error: received content body without 
having received a JMSDeliver frame first");
-//         }
-//         if (msg.contentHeader == null)
-//         {
-//             _channelId2UnprocessedMsgMap.remove(channelId);
-//             throw new AMQException("Error: received content body without 
having received a ContentHeader frame first");
-//         }
-//         try
-//         {
-//             msg.receiveBody(contentBody);
-//         }
-//         catch (UnexpectedBodyReceivedException e)
-//         {
-//             _channelId2UnprocessedMsgMap.remove(channelId);
-//             throw e;
-//         }
-//         if (msg.isAllBodyDataReceived())
-//         {
-//             deliverMessageToAMQSession(channelId, msg);
-//         }
-//     }
-
     /**
+     * This is involed from MessageTransferMethodHandler if type is 
CONTENT_TYPE_INLINE
      * Deliver a message to the appropriate session, removing the unprocessed 
message
      * from our map
      * @param channelId the channel id the message should be delivered to
      * @param msg the message
      */
-    private void deliverMessageToAMQSession(int channelId, UnprocessedMessage 
msg)
+    public void deliverMessageToAMQSession(int channelId, UnprocessedMessage 
msg)
     {
         AMQSession session = (AMQSession) _channelId2SessionMap.get(channelId);
+        msg.contentHeader.setSize(msg.bytesReceived);
         session.messageReceived(msg);
-        //_channelId2UnprocessedMsgMap.remove(channelId);
+    }
+    
+    /**
+     * This is involed from MessageCloseMethodHandler if type is 
CONTENT_TYPE_REFERENCE
+     * In this case we use the reference id to obtain the unprocessed message
+     * The channel id is used to retrive a session
+     */
+    public void deliverMessageToAMQSession(int channelId, String referenceId)
+    {
+       UnprocessedMessage msg = 
(UnprocessedMessage)_referenceId2UnprocessedMsgMap.get(referenceId);
+       deliverMessageToAMQSession(channelId,msg);
+       _referenceId2UnprocessedMsgMap.remove(referenceId);
     }
     
     public long writeRequest(int channelNum, AMQMethodBody methodBody,


Reply via email to