Author: rhs
Date: Tue Apr 22 11:01:08 2008
New Revision: 650598
URL: http://svn.apache.org/viewvc?rev=650598&view=rev
Log:
QPID-832: moved more 0-8 specific code to 0-8 subclasses
Modified:
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
Modified:
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=650598&r1=650597&r2=650598&view=diff
==============================================================================
---
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
(original)
+++
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
Tue Apr 22 11:01:08 2008
@@ -153,25 +153,7 @@
}
}
- private void declareDestination(AMQDestination destination)
- {
-
- ExchangeDeclareBody body =
getSession().getMethodRegistry().createExchangeDeclareBody(_session.getTicket(),
-
destination.getExchangeName(),
-
destination.getExchangeClass(),
-
false,
-
false,
-
false,
-
false,
-
true,
-
null);
- // Declare the exchange
- // Note that the durable and internal arguments are ignored since
passive is set to false
-
- AMQFrame declare = body.generateFrame(_channelId);
-
- _protocolHandler.writeFrame(declare);
- }
+ abstract void declareDestination(AMQDestination destination);
public void setDisableMessageID(boolean b) throws JMSException
{
@@ -497,81 +479,7 @@
type = AMQDestination.UNKNOWN_TYPE;
}
- //
message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(),
type);
-
-
- BasicPublishBody body =
getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(),
-
destination.getExchangeName(),
-
destination.getRoutingKey(),
-
mandatory,
-
immediate);
-
- AMQFrame publishFrame = body.generateFrame(_channelId);
-
- message.prepareForSending();
- ByteBuffer payload = message.getData();
- BasicContentHeaderProperties contentHeaderProperties =
message.getContentHeaderProperties();
-
- if (!_disableTimestamps)
- {
- final long currentTime = System.currentTimeMillis();
- contentHeaderProperties.setTimestamp(currentTime);
-
- if (timeToLive > 0)
- {
- contentHeaderProperties.setExpiration(currentTime +
timeToLive);
- }
- else
- {
- contentHeaderProperties.setExpiration(0);
- }
- }
-
- contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
- contentHeaderProperties.setPriority((byte) priority);
-
- final int size = (payload != null) ? payload.limit() : 0;
- final int contentBodyFrameCount =
calculateContentBodyFrameCount(payload);
- final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount];
-
- if (payload != null)
- {
- createContentBodies(payload, frames, 2, _channelId);
- }
-
- if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled())
- {
- _logger.debug("Sending content body frames to " + destination);
- }
-
-
- // TODO: This is a hacky way of getting the AMQP class-id for the
Basic class
- int classIfForBasic =
getSession().getMethodRegistry().createBasicQosOkBody().getClazz();
-
- AMQFrame contentHeaderFrame =
- ContentHeaderBody.createAMQFrame(_channelId,
- classIfForBasic, 0,
contentHeaderProperties, size);
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Sending content header frame to " + destination);
- }
-
- frames[0] = publishFrame;
- frames[1] = contentHeaderFrame;
- CompositeAMQDataBlock compositeFrame = new
CompositeAMQDataBlock(frames);
-
- try
- {
- _session.checkFlowControl();
- }
- catch (InterruptedException e)
- {
- JMSException jmsEx = new JMSException("Interrupted while waiting
for flow control to be removed");
- jmsEx.setLinkedException(e);
- throw jmsEx;
- }
-
- _protocolHandler.writeFrame(compositeFrame, wait);
+ sendMessage(destination, origMessage, message, deliveryMode, priority,
timeToLive, mandatory, immediate, wait);
if (message != origMessage)
{
@@ -589,8 +497,9 @@
}
}
- public abstract void sendMessage(AMQDestination destination, Message
origMessage, AbstractJMSMessage message, int deliveryMode,
- int priority, long timeToLive, boolean mandatory, boolean
immediate, boolean wait)throws JMSException;
+ abstract void sendMessage(AMQDestination destination, Message origMessage,
AbstractJMSMessage message,
+ int deliveryMode, int priority, long timeToLive,
boolean mandatory,
+ boolean immediate, boolean wait)throws
JMSException;
private void checkTemporaryDestination(AMQDestination destination) throws
JMSException
{
@@ -610,60 +519,6 @@
throw new JMSException("Cannot send to a deleted temporary
destination");
}
}
- }
-
- /**
- * Create content bodies. This will split a large message into numerous
bodies depending on the negotiated
- * maximum frame size.
- *
- * @param payload
- * @param frames
- * @param offset
- * @param channelId @return the array of content bodies
- */
- private void createContentBodies(ByteBuffer payload, AMQFrame[] frames,
int offset, int channelId)
- {
-
- if (frames.length == (offset + 1))
- {
- frames[offset] = ContentBody.createAMQFrame(channelId, new
ContentBody(payload));
- }
- else
- {
-
- final long framePayloadMax =
_session.getAMQConnection().getMaximumFrameSize() - 1;
- long remaining = payload.remaining();
- for (int i = offset; i < frames.length; i++)
- {
- payload.position((int) framePayloadMax * (i - offset));
- int length = (remaining >= framePayloadMax) ? (int)
framePayloadMax : (int) remaining;
- payload.limit(payload.position() + length);
- frames[i] = ContentBody.createAMQFrame(channelId, new
ContentBody(payload.slice()));
-
- remaining -= length;
- }
- }
-
- }
-
- private int calculateContentBodyFrameCount(ByteBuffer payload)
- {
- // we substract one from the total frame maximum size to account for
the end of frame marker in a body frame
- // (0xCE byte).
- int frameCount;
- if ((payload == null) || (payload.remaining() == 0))
- {
- frameCount = 0;
- }
- else
- {
- int dataLength = payload.remaining();
- final long framePayloadMax =
_session.getAMQConnection().getMaximumFrameSize() - 1;
- int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
- frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
- }
-
- return frameCount;
}
public void setMimeType(String mimeType) throws JMSException
Modified:
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=650598&r1=650597&r2=650598&view=diff
==============================================================================
---
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
(original)
+++
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
Tue Apr 22 11:01:08 2008
@@ -55,7 +55,7 @@
mandatory, waitUntilSent);
}
- public void declareDestination(AMQDestination destination)
+ void declareDestination(AMQDestination destination)
{
((AMQSession_0_10)
getSession()).getQpidSession().exchangeDeclare(destination.getExchangeName().toString(),
destination.getExchangeClass().toString(),
@@ -67,9 +67,9 @@
/**
* Sends a message to a given destination
*/
- public void sendMessage(AMQDestination destination, Message origMessage,
AbstractJMSMessage message,
- int deliveryMode, int priority, long timeToLive,
boolean mandatory, boolean immediate,
- boolean wait) throws JMSException
+ void sendMessage(AMQDestination destination, Message origMessage,
AbstractJMSMessage message,
+ int deliveryMode, int priority, long timeToLive, boolean
mandatory, boolean immediate,
+ boolean wait) throws JMSException
{
message.prepareForSending();
if (message.get010Message() == null)
Modified:
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=650598&r1=650597&r2=650598&view=diff
==============================================================================
---
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
(original)
+++
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
Tue Apr 22 11:01:08 2008
@@ -45,17 +45,18 @@
super(connection, destination,transacted,channelId,session,
protocolHandler, producerId, immediate, mandatory,waitUntilSent);
}
- public void declareDestination(AMQDestination destination)
+ void declareDestination(AMQDestination destination)
{
+
ExchangeDeclareBody body =
getSession().getMethodRegistry().createExchangeDeclareBody(_session.getTicket(),
- destination.getExchangeName(),
- destination.getExchangeClass(),
- false,
- false,
- false,
- false,
- true,
- null);
+
destination.getExchangeName(),
+
destination.getExchangeClass(),
+
false,
+
false,
+
false,
+
false,
+
true,
+
null);
// Declare the exchange
// Note that the durable and internal arguments are ignored since
passive is set to false
@@ -64,17 +65,15 @@
_protocolHandler.writeFrame(declare);
}
- public void sendMessage(AMQDestination destination, Message
origMessage,AbstractJMSMessage message,
- int deliveryMode,int priority, long timeToLive, boolean mandatory,
boolean immediate, boolean wait) throws JMSException
+ void sendMessage(AMQDestination destination, Message
origMessage,AbstractJMSMessage message,
+ int deliveryMode,int priority, long timeToLive, boolean
mandatory, boolean immediate,
+ boolean wait) throws JMSException
{
-// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
BasicPublishBody body =
getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(),
- destination.getExchangeName(),
- destination.getRoutingKey(),
- mandatory,
- immediate);
+
destination.getExchangeName(),
+
destination.getRoutingKey(),
+
mandatory,
+
immediate);
AMQFrame publishFrame = body.generateFrame(_channelId);
@@ -114,17 +113,13 @@
_logger.debug("Sending content body frames to " + destination);
}
+
// TODO: This is a hacky way of getting the AMQP class-id for the
Basic class
int classIfForBasic =
getSession().getMethodRegistry().createBasicQosOkBody().getClazz();
- // weight argument of zero indicates no child content headers, just
bodies
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
-
AMQFrame contentHeaderFrame =
ContentHeaderBody.createAMQFrame(_channelId,
classIfForBasic, 0,
contentHeaderProperties, size);
-
if (_logger.isDebugEnabled())
{
_logger.debug("Sending content header frame to " + destination);
@@ -133,17 +128,19 @@
frames[0] = publishFrame;
frames[1] = contentHeaderFrame;
CompositeAMQDataBlock compositeFrame = new
CompositeAMQDataBlock(frames);
- _protocolHandler.writeFrame(compositeFrame, wait);
- if (message != origMessage)
+ try
{
- _logger.debug("Updating original message");
- origMessage.setJMSPriority(message.getJMSPriority());
- origMessage.setJMSTimestamp(message.getJMSTimestamp());
- _logger.debug("Setting JMSExpiration:" +
message.getJMSExpiration());
- origMessage.setJMSExpiration(message.getJMSExpiration());
- origMessage.setJMSMessageID(message.getJMSMessageID());
+ _session.checkFlowControl();
}
+ catch (InterruptedException e)
+ {
+ JMSException jmsEx = new JMSException("Interrupted while waiting
for flow control to be removed");
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
+ }
+
+ _protocolHandler.writeFrame(compositeFrame, wait);
}
/**