Author: rhs
Date: Tue Apr 22 11:01:08 2008
New Revision: 650598

URL: http://svn.apache.org/viewvc?rev=650598&view=rev
Log:
QPID-832: moved more 0-8 specific code to 0-8 subclasses

Modified:
    
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
    
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
    
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=650598&r1=650597&r2=650598&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
 Tue Apr 22 11:01:08 2008
@@ -153,25 +153,7 @@
         }
     }
 
-    private void declareDestination(AMQDestination destination)
-    {
-
-        ExchangeDeclareBody body = 
getSession().getMethodRegistry().createExchangeDeclareBody(_session.getTicket(),
-                                                                               
               destination.getExchangeName(),
-                                                                               
               destination.getExchangeClass(),
-                                                                               
               false,
-                                                                               
               false,
-                                                                               
               false,
-                                                                               
               false,
-                                                                               
               true,
-                                                                               
               null);
-        // Declare the exchange
-        // Note that the durable and internal arguments are ignored since 
passive is set to false
-
-        AMQFrame declare = body.generateFrame(_channelId);
-
-        _protocolHandler.writeFrame(declare);
-    }
+    abstract void declareDestination(AMQDestination destination);
 
     public void setDisableMessageID(boolean b) throws JMSException
     {
@@ -497,81 +479,7 @@
             type = AMQDestination.UNKNOWN_TYPE;
         }
 
-      //  
message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(),
 type);
-
-
-        BasicPublishBody body = 
getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(),
-                                                                               
         destination.getExchangeName(),
-                                                                               
         destination.getRoutingKey(),
-                                                                               
         mandatory,
-                                                                               
         immediate);
-
-        AMQFrame publishFrame = body.generateFrame(_channelId);
-
-        message.prepareForSending();
-        ByteBuffer payload = message.getData();
-        BasicContentHeaderProperties contentHeaderProperties = 
message.getContentHeaderProperties();
-
-        if (!_disableTimestamps)
-        {
-            final long currentTime = System.currentTimeMillis();
-            contentHeaderProperties.setTimestamp(currentTime);
-
-            if (timeToLive > 0)
-            {
-                contentHeaderProperties.setExpiration(currentTime + 
timeToLive);
-            }
-            else
-            {
-                contentHeaderProperties.setExpiration(0);
-            }
-        }
-
-        contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
-        contentHeaderProperties.setPriority((byte) priority);
-
-        final int size = (payload != null) ? payload.limit() : 0;
-        final int contentBodyFrameCount = 
calculateContentBodyFrameCount(payload);
-        final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount];
-
-        if (payload != null)
-        {
-            createContentBodies(payload, frames, 2, _channelId);
-        }
-
-        if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled())
-        {
-            _logger.debug("Sending content body frames to " + destination);
-        }
-
-
-        // TODO: This is a hacky way of getting the AMQP class-id for the 
Basic class
-        int classIfForBasic = 
getSession().getMethodRegistry().createBasicQosOkBody().getClazz();
-
-        AMQFrame contentHeaderFrame =
-            ContentHeaderBody.createAMQFrame(_channelId,
-                                             classIfForBasic, 0, 
contentHeaderProperties, size);
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug("Sending content header frame to " + destination);
-        }
-
-        frames[0] = publishFrame;
-        frames[1] = contentHeaderFrame;
-        CompositeAMQDataBlock compositeFrame = new 
CompositeAMQDataBlock(frames);
-
-        try
-        {
-            _session.checkFlowControl();
-        }
-        catch (InterruptedException e)
-        {
-            JMSException jmsEx = new JMSException("Interrupted while waiting 
for flow control to be removed");
-            jmsEx.setLinkedException(e);
-            throw jmsEx;
-        }
-
-        _protocolHandler.writeFrame(compositeFrame, wait);
+        sendMessage(destination, origMessage, message, deliveryMode, priority, 
timeToLive, mandatory, immediate, wait);
 
         if (message != origMessage)
         {
@@ -589,8 +497,9 @@
         }
     }
 
-    public abstract void sendMessage(AMQDestination destination, Message 
origMessage, AbstractJMSMessage message, int deliveryMode,
-            int priority, long timeToLive, boolean mandatory, boolean 
immediate, boolean wait)throws JMSException;
+    abstract void sendMessage(AMQDestination destination, Message origMessage, 
AbstractJMSMessage message,
+                              int deliveryMode, int priority, long timeToLive, 
boolean mandatory,
+                              boolean immediate, boolean wait)throws 
JMSException;
 
     private void checkTemporaryDestination(AMQDestination destination) throws 
JMSException
     {
@@ -610,60 +519,6 @@
                 throw new JMSException("Cannot send to a deleted temporary 
destination");
             }
         }
-    }
-
-    /**
-     * Create content bodies. This will split a large message into numerous 
bodies depending on the negotiated
-     * maximum frame size.
-     *
-     * @param payload
-     * @param frames
-     * @param offset
-     * @param channelId @return the array of content bodies
-     */
-    private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, 
int offset, int channelId)
-    {
-
-        if (frames.length == (offset + 1))
-        {
-            frames[offset] = ContentBody.createAMQFrame(channelId, new 
ContentBody(payload));
-        }
-        else
-        {
-
-            final long framePayloadMax = 
_session.getAMQConnection().getMaximumFrameSize() - 1;
-            long remaining = payload.remaining();
-            for (int i = offset; i < frames.length; i++)
-            {
-                payload.position((int) framePayloadMax * (i - offset));
-                int length = (remaining >= framePayloadMax) ? (int) 
framePayloadMax : (int) remaining;
-                payload.limit(payload.position() + length);
-                frames[i] = ContentBody.createAMQFrame(channelId, new 
ContentBody(payload.slice()));
-
-                remaining -= length;
-            }
-        }
-
-    }
-
-    private int calculateContentBodyFrameCount(ByteBuffer payload)
-    {
-        // we substract one from the total frame maximum size to account for 
the end of frame marker in a body frame
-        // (0xCE byte).
-        int frameCount;
-        if ((payload == null) || (payload.remaining() == 0))
-        {
-            frameCount = 0;
-        }
-        else
-        {
-            int dataLength = payload.remaining();
-            final long framePayloadMax = 
_session.getAMQConnection().getMaximumFrameSize() - 1;
-            int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
-            frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
-        }
-
-        return frameCount;
     }
 
     public void setMimeType(String mimeType) throws JMSException

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=650598&r1=650597&r2=650598&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
 Tue Apr 22 11:01:08 2008
@@ -55,7 +55,7 @@
               mandatory, waitUntilSent);
     }
 
-    public void declareDestination(AMQDestination destination)
+    void declareDestination(AMQDestination destination)
     {
         ((AMQSession_0_10) 
getSession()).getQpidSession().exchangeDeclare(destination.getExchangeName().toString(),
                                                                           
destination.getExchangeClass().toString(),
@@ -67,9 +67,9 @@
     /**
      * Sends a message to a given destination
      */
-    public void sendMessage(AMQDestination destination, Message origMessage, 
AbstractJMSMessage message,
-                            int deliveryMode, int priority, long timeToLive, 
boolean mandatory, boolean immediate,
-                            boolean wait) throws JMSException
+    void sendMessage(AMQDestination destination, Message origMessage, 
AbstractJMSMessage message,
+                     int deliveryMode, int priority, long timeToLive, boolean 
mandatory, boolean immediate,
+                     boolean wait) throws JMSException
     {
         message.prepareForSending();
         if (message.get010Message() == null)

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=650598&r1=650597&r2=650598&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
 Tue Apr 22 11:01:08 2008
@@ -45,17 +45,18 @@
         super(connection, destination,transacted,channelId,session, 
protocolHandler, producerId, immediate, mandatory,waitUntilSent);
     }
 
-    public void declareDestination(AMQDestination destination)
+    void declareDestination(AMQDestination destination)
     {
+
         ExchangeDeclareBody body = 
getSession().getMethodRegistry().createExchangeDeclareBody(_session.getTicket(),
-                destination.getExchangeName(),
-                destination.getExchangeClass(),
-                false,
-                false,
-                false,
-                false,
-                true,
-                null);
+                                                                               
               destination.getExchangeName(),
+                                                                               
               destination.getExchangeClass(),
+                                                                               
               false,
+                                                                               
               false,
+                                                                               
               false,
+                                                                               
               false,
+                                                                               
               true,
+                                                                               
               null);
         // Declare the exchange
         // Note that the durable and internal arguments are ignored since 
passive is set to false
 
@@ -64,17 +65,15 @@
         _protocolHandler.writeFrame(declare);
     }
 
-    public void sendMessage(AMQDestination destination, Message 
origMessage,AbstractJMSMessage message,
-            int deliveryMode,int priority, long timeToLive, boolean mandatory, 
boolean immediate, boolean wait) throws JMSException
+    void sendMessage(AMQDestination destination, Message 
origMessage,AbstractJMSMessage message,
+                     int deliveryMode,int priority, long timeToLive, boolean 
mandatory, boolean immediate,
+                     boolean wait) throws JMSException
     {
-//      AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
         BasicPublishBody body = 
getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(),
-                destination.getExchangeName(),
-                destination.getRoutingKey(),
-                mandatory,
-                immediate);
+                                                                               
         destination.getExchangeName(),
+                                                                               
         destination.getRoutingKey(),
+                                                                               
         mandatory,
+                                                                               
         immediate);
 
         AMQFrame publishFrame = body.generateFrame(_channelId);
 
@@ -114,17 +113,13 @@
             _logger.debug("Sending content body frames to " + destination);
         }
 
+
         // TODO: This is a hacky way of getting the AMQP class-id for the 
Basic class
         int classIfForBasic = 
getSession().getMethodRegistry().createBasicQosOkBody().getClazz();
 
-        // weight argument of zero indicates no child content headers, just 
bodies
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from 
ProtocolInitiation for this session.
-
         AMQFrame contentHeaderFrame =
             ContentHeaderBody.createAMQFrame(_channelId,
                                              classIfForBasic, 0, 
contentHeaderProperties, size);
-
         if (_logger.isDebugEnabled())
         {
             _logger.debug("Sending content header frame to " + destination);
@@ -133,17 +128,19 @@
         frames[0] = publishFrame;
         frames[1] = contentHeaderFrame;
         CompositeAMQDataBlock compositeFrame = new 
CompositeAMQDataBlock(frames);
-        _protocolHandler.writeFrame(compositeFrame, wait);
 
-        if (message != origMessage)
+        try
         {
-            _logger.debug("Updating original message");
-            origMessage.setJMSPriority(message.getJMSPriority());
-            origMessage.setJMSTimestamp(message.getJMSTimestamp());
-            _logger.debug("Setting JMSExpiration:" + 
message.getJMSExpiration());
-            origMessage.setJMSExpiration(message.getJMSExpiration());
-            origMessage.setJMSMessageID(message.getJMSMessageID());
+            _session.checkFlowControl();
         }
+        catch (InterruptedException e)
+        {
+            JMSException jmsEx = new JMSException("Interrupted while waiting 
for flow control to be removed");
+            jmsEx.setLinkedException(e);
+            throw jmsEx;
+        }
+
+        _protocolHandler.writeFrame(compositeFrame, wait);
     }
 
     /**


Reply via email to