Author: ritchiem
Date: Tue Nov 6 03:12:37 2007
New Revision: 592374
URL: http://svn.apache.org/viewvc?rev=592374&view=rev
Log:
QPID-662 Transactional state not correctly reported after fail over. We now
record if we have sent any messages
from here we can check if we have failed over and so have lost messages from
the transaction making it invalid.
Added:
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java
(with props)
Modified:
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java
Modified:
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=592374&r1=592373&r2=592374&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Tue Nov 6 03:12:37 2007
@@ -20,11 +20,11 @@
*/
package org.apache.qpid.client;
+import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.AMQInvalidRoutingKeyException;
import org.apache.qpid.AMQUndeliveredException;
-import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverNoopSupport;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -100,6 +100,7 @@
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
+import javax.jms.TransactionRolledBackException;
import java.io.Serializable;
import java.text.MessageFormat;
import java.util.ArrayList;
@@ -293,6 +294,11 @@
private final boolean _strictAMQPFATAL;
private final Object _messageDeliveryLock = new Object();
+ /** Session state : used to detect if commit is a) required b) allowed ,
i.e. does the tx span failover. */
+ private boolean _dirty;
+ /** Has failover occured on this session */
+ private boolean _failedOver;
+
/**
* Creates a new session on a connection.
*
@@ -610,30 +616,65 @@
{
checkTransacted();
- try
+ new FailoverNoopSupport<Object, JMSException>(new
FailoverProtectedOperation<Object, JMSException>()
{
- // Acknowledge up to message last delivered (if any) for each
consumer.
- // need to send ack for messages delivered to consumers so far
- for (Iterator<BasicMessageConsumer> i =
_consumers.values().iterator(); i.hasNext();)
+ public Object execute() throws JMSException, FailoverException
{
- // Sends acknowledgement to server
- i.next().acknowledgeLastDelivered();
- }
+ //Check that we are clean to commit.
+ if (_failedOver && _dirty)
+ {
+ rollback();
- // Commits outstanding messages sent and outstanding
acknowledgements.
- final AMQProtocolHandler handler = getProtocolHandler();
+ throw new TransactionRolledBackException("Connection
failover has occured since last send. " +
+ "Forced
rollback");
+ }
- handler.syncWrite(TxCommitBody.createAMQFrame(_channelId,
getProtocolMajorVersion(), getProtocolMinorVersion()),
- TxCommitOkBody.class);
- }
- catch (AMQException e)
- {
- throw new JMSAMQException("Failed to commit: " + e.getMessage(),
e);
- }
- catch (FailoverException e)
- {
- throw new JMSAMQException("Fail-over interrupted commit. Status of
the commit is uncertain.", e);
- }
+ try
+ {
+ // Acknowledge up to message last delivered (if any) on
this session.
+ // We only need to find the highest value and ack that as
commit is session level.
+ Long lastTag = -1L;
+
+ for (Iterator<BasicMessageConsumer> i =
_consumers.values().iterator(); i.hasNext();)
+ {
+// i.next().acknowledgeLastDelivered();
+// }
+
+ // get next acknowledgement to server
+ Long next = i.next().getLastDelivered();
+ if (next != null && next > lastTag)
+ {
+ lastTag = next;
+ }
+ }
+
+ if (lastTag != -1)
+ {
+ acknowledgeMessage(lastTag, true);
+ }
+
+ // Commits outstanding messages sent and outstanding
acknowledgements.
+ final AMQProtocolHandler handler = getProtocolHandler();
+
+ handler.syncWrite(TxCommitBody.createAMQFrame(_channelId,
getProtocolMajorVersion(), getProtocolMinorVersion()),
+ TxCommitOkBody.class);
+
+ markClean();
+ }
+
+ catch (AMQException e)
+ {
+ throw new JMSAMQException("Failed to commit: " +
e.getMessage(), e);
+ }
+
+ catch (FailoverException e)
+ {
+ throw new JMSAMQException("Fail-over interrupted commit.
Status of the commit is uncertain.", e);
+ }
+
+ return null;
+ }
+ }, _connection).execute();
}
public void confirmConsumerCancelled(AMQShortString consumerTag)
@@ -1431,6 +1472,8 @@
_connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId,
getProtocolMajorVersion(), getProtocolMinorVersion()),
TxRollbackOkBody.class);
+ markClean();
+
if (!isSuspended)
{
suspendChannel(false);
@@ -1731,6 +1774,7 @@
*/
void resubscribe() throws AMQException
{
+ _failedOver = true;
resubscribeProducers();
resubscribeConsumers();
}
@@ -2530,6 +2574,41 @@
Object getMessageDeliveryLock()
{
return _messageDeliveryLock;
+ }
+
+ /**
+ * Signifies that the session has pending sends to commit.
+ */
+ public void markDirty()
+ {
+ _dirty = true;
+ }
+
+ /**
+ * Signifies that the session has no pending sends to commit.
+ */
+ public void markClean()
+ {
+ _dirty = false;
+ _failedOver = false;
+ }
+
+ /**
+ * Check to see if failover has occured since the last call to
markClean(commit or rollback).
+ * @return boolean true if failover has occured.
+ */
+ public boolean hasFailedOver()
+ {
+ return _failedOver;
+ }
+
+ /**
+ * Check to see if any message have been sent in this transaction and have
not been commited.
+ * @return boolean true if a message has been sent but not commited
+ */
+ public boolean isDirty()
+ {
+ return _dirty;
}
/** Responsible for decoding a message fragment and passing it to the
appropriate message consumer. */
Added:
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java?rev=592374&view=auto
==============================================================================
---
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java
(added)
+++
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java
Tue Nov 6 03:12:37 2007
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+
+/**
+ * AMQSessionDirtyException represents all failures to send data on a
transacted session that is
+ * no longer in a state that the client expects. i.e. failover has occured so
previously sent messages
+ * will not be part of the transaction.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent attempt to perform additional sends on a dirty session.
+ * </table>
+ */
+public class AMQSessionDirtyException extends AMQException
+{
+ public AMQSessionDirtyException(String msg)
+ {
+ super(AMQConstant.RESOURCE_ERROR, msg);
+ }
+}
Propchange:
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=592374&r1=592373&r2=592374&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Tue Nov 6 03:12:37 2007
@@ -754,6 +754,30 @@
}
}
+ /**
+ * Acknowledge up to last message delivered (if any). Used when commiting.
+ *
+ * @return the lastDeliveryTag to acknowledge
+ */
+ Long getLastDelivered()
+ {
+ if (!_receivedDeliveryTags.isEmpty())
+ {
+ Long lastDeliveryTag = _receivedDeliveryTags.poll();
+
+ while (!_receivedDeliveryTags.isEmpty())
+ {
+ lastDeliveryTag = _receivedDeliveryTags.poll();
+ }
+
+ assert _receivedDeliveryTags.isEmpty();
+
+ return lastDeliveryTag;
+ }
+
+ return null;
+ }
+
/** Acknowledge up to last message delivered (if any). Used when
commiting. */
void acknowledgeLastDelivered()
{
@@ -772,6 +796,7 @@
}
}
+
void notifyError(Throwable cause)
{
// synchronized (_closed)
@@ -783,7 +808,7 @@
if (_closedStack != null)
{
_logger.trace(_consumerTag + " notifyError():"
- + Arrays.asList(stackTrace).subList(3,
stackTrace.length - 1));
+ + Arrays.asList(stackTrace).subList(3,
stackTrace.length - 1));
_logger.trace(_consumerTag + " previously" +
_closedStack.toString());
}
else
@@ -904,7 +929,7 @@
if (_logger.isDebugEnabled())
{
_logger.debug("Rejecting the messages(" +
_receivedDeliveryTags.size() + ") in _receivedDTs (RQ)"
- + "for consumer with tag:" + _consumerTag);
+ + "for consumer with tag:" + _consumerTag);
}
Long tag = _receivedDeliveryTags.poll();
@@ -934,7 +959,7 @@
if (_logger.isDebugEnabled())
{
_logger.debug("Rejecting the messages(" +
_synchronousQueue.size() + ") in _syncQueue (PRQ)"
- + "for consumer with tag:" + _consumerTag);
+ + "for consumer with tag:" + _consumerTag);
}
Iterator iterator = _synchronousQueue.iterator();
Modified:
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=592374&r1=592373&r2=592374&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
(original)
+++
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
Tue Nov 6 03:12:37 2007
@@ -60,46 +60,30 @@
private AMQConnection _connection;
- /**
- * If true, messages will not get a timestamp.
- */
+ /** If true, messages will not get a timestamp. */
private boolean _disableTimestamps;
- /**
- * Priority of messages created by this producer.
- */
+ /** Priority of messages created by this producer. */
private int _messagePriority;
- /**
- * Time to live of messages. Specified in milliseconds but AMQ has 1
second resolution.
- */
+ /** Time to live of messages. Specified in milliseconds but AMQ has 1
second resolution. */
private long _timeToLive;
- /**
- * Delivery mode used for this producer.
- */
+ /** Delivery mode used for this producer. */
private int _deliveryMode = DeliveryMode.PERSISTENT;
- /**
- * The Destination used for this consumer, if specified upon creation.
- */
+ /** The Destination used for this consumer, if specified upon creation. */
protected AMQDestination _destination;
- /**
- * Default encoding used for messages produced by this producer.
- */
+ /** Default encoding used for messages produced by this producer. */
private String _encoding;
- /**
- * Default encoding used for message produced by this producer.
- */
+ /** Default encoding used for message produced by this producer. */
private String _mimeType;
private AMQProtocolHandler _protocolHandler;
- /**
- * True if this producer was created from a transacted session
- */
+ /** True if this producer was created from a transacted session */
private boolean _transacted;
private int _channelId;
@@ -112,9 +96,7 @@
*/
private long _producerId;
- /**
- * The session used to create this producer
- */
+ /** The session used to create this producer */
private AMQSession _session;
private final boolean _immediate;
@@ -128,8 +110,8 @@
private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
protected BasicMessageProducer(AMQConnection connection, AMQDestination
destination, boolean transacted, int channelId,
- AMQSession session, AMQProtocolHandler protocolHandler, long
producerId, boolean immediate, boolean mandatory,
- boolean waitUntilSent)
+ AMQSession session, AMQProtocolHandler
protocolHandler, long producerId, boolean immediate, boolean mandatory,
+ boolean waitUntilSent)
{
_connection = connection;
_destination = destination;
@@ -162,16 +144,16 @@
// Note that the durable and internal arguments are ignored since
passive is set to false
// TODO: Be aware of possible changes to parameter order as versions
change.
AMQFrame declare =
- ExchangeDeclareBody.createAMQFrame(_channelId,
_protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(), null, // arguments
- false, // autoDelete
- false, // durable
- destination.getExchangeName(), // exchange
- false, // internal
- true, // nowait
- false, // passive
- _session.getTicket(), // ticket
- destination.getExchangeClass()); // type
+ ExchangeDeclareBody.createAMQFrame(_channelId,
_protocolHandler.getProtocolMajorVersion(),
+
_protocolHandler.getProtocolMinorVersion(), null, // arguments
+ false, // autoDelete
+ false, // durable
+
destination.getExchangeName(), // exchange
+ false, // internal
+ true, // nowait
+ false, // passive
+ _session.getTicket(), //
ticket
+
destination.getExchangeClass()); // type
_protocolHandler.writeFrame(declare);
}
@@ -208,7 +190,7 @@
if ((i != DeliveryMode.NON_PERSISTENT) && (i !=
DeliveryMode.PERSISTENT))
{
throw new JMSException("DeliveryMode must be either NON_PERSISTENT
or PERSISTENT. Value of " + i
- + " is illegal");
+ + " is illegal");
}
_deliveryMode = i;
@@ -320,12 +302,12 @@
{
validateDestination(destination);
sendImpl((AMQDestination) destination, message, _deliveryMode,
_messagePriority, _timeToLive, _mandatory,
- _immediate);
+ _immediate);
}
}
public void send(Destination destination, Message message, int
deliveryMode, int priority, long timeToLive)
- throws JMSException
+ throws JMSException
{
checkPreConditions();
checkDestination(destination);
@@ -337,7 +319,7 @@
}
public void send(Destination destination, Message message, int
deliveryMode, int priority, long timeToLive,
- boolean mandatory) throws JMSException
+ boolean mandatory) throws JMSException
{
checkPreConditions();
checkDestination(destination);
@@ -349,7 +331,7 @@
}
public void send(Destination destination, Message message, int
deliveryMode, int priority, long timeToLive,
- boolean mandatory, boolean immediate) throws JMSException
+ boolean mandatory, boolean immediate) throws JMSException
{
checkPreConditions();
checkDestination(destination);
@@ -361,7 +343,7 @@
}
public void send(Destination destination, Message message, int
deliveryMode, int priority, long timeToLive,
- boolean mandatory, boolean immediate, boolean waitUntilSent) throws
JMSException
+ boolean mandatory, boolean immediate, boolean
waitUntilSent) throws JMSException
{
checkPreConditions();
checkDestination(destination);
@@ -369,7 +351,7 @@
{
validateDestination(destination);
sendImpl((AMQDestination) destination, message, deliveryMode,
priority, timeToLive, mandatory, immediate,
- waitUntilSent);
+ waitUntilSent);
}
}
@@ -415,7 +397,7 @@
else
{
throw new JMSException("Unable to send message, due to class
conversion error: "
- + message.getClass().getName());
+ + message.getClass().getName());
}
}
}
@@ -425,14 +407,14 @@
if (!(destination instanceof AMQDestination))
{
throw new JMSException("Unsupported destination class: "
- + ((destination != null) ? destination.getClass() : null));
+ + ((destination != null) ?
destination.getClass() : null));
}
declareDestination((AMQDestination) destination);
}
protected void sendImpl(AMQDestination destination, Message message, int
deliveryMode, int priority, long timeToLive,
- boolean mandatory, boolean immediate) throws JMSException
+ boolean mandatory, boolean immediate) throws
JMSException
{
sendImpl(destination, message, deliveryMode, priority, timeToLive,
mandatory, immediate, _waitUntilSent);
}
@@ -447,16 +429,27 @@
* @param timeToLive
* @param mandatory
* @param immediate
+ *
* @throws JMSException
*/
protected void sendImpl(AMQDestination destination, Message origMessage,
int deliveryMode, int priority, long timeToLive,
- boolean mandatory, boolean immediate, boolean wait) throws JMSException
+ boolean mandatory, boolean immediate, boolean
wait) throws JMSException
{
checkTemporaryDestination(destination);
origMessage.setJMSDestination(destination);
AbstractJMSMessage message = convertToNativeMessage(origMessage);
+ if (_transacted)
+ {
+ if (_session.hasFailedOver() && _session.isDirty())
+ {
+ throw new JMSAMQException("Failover has occurred and session
is dirty so unable to send.",
+ new
AMQSessionDirtyException("Failover has occurred and session is dirty " +
+ "so
unable to send."));
+ }
+ }
+
if (_disableMessageId)
{
message.setJMSMessageID(null);
@@ -489,12 +482,12 @@
// TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
AMQFrame publishFrame =
- BasicPublishBody.createAMQFrame(_channelId,
_protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(),
destination.getExchangeName(), // exchange
- immediate, // immediate
- mandatory, // mandatory
- destination.getRoutingKey(), // routingKey
- _session.getTicket()); // ticket
+ BasicPublishBody.createAMQFrame(_channelId,
_protocolHandler.getProtocolMajorVersion(),
+
_protocolHandler.getProtocolMinorVersion(), destination.getExchangeName(), //
exchange
+ immediate, // immediate
+ mandatory, // mandatory
+ destination.getRoutingKey(),
// routingKey
+ _session.getTicket()); //
ticket
message.prepareForSending();
ByteBuffer payload = message.getData();
@@ -536,9 +529,9 @@
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
AMQFrame contentHeaderFrame =
- ContentHeaderBody.createAMQFrame(_channelId,
-
BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion()), 0,
contentHeaderProperties, size);
+ ContentHeaderBody.createAMQFrame(_channelId,
+
BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(),
+
_protocolHandler.getProtocolMinorVersion()), 0, contentHeaderProperties, size);
if (_logger.isDebugEnabled())
{
_logger.debug("Sending content header frame to " + destination);
@@ -558,6 +551,11 @@
origMessage.setJMSExpiration(message.getJMSExpiration());
origMessage.setJMSMessageID(message.getJMSMessageID());
}
+
+ if (_transacted)
+ {
+ _session.markDirty();
+ }
}
private void checkTemporaryDestination(AMQDestination destination) throws
JMSException
@@ -669,7 +667,7 @@
if ((_destination != null) && (suppliedDestination != null))
{
throw new UnsupportedOperationException(
- "This message producer was created with a Destination,
therefore you cannot use an unidentified Destination");
+ "This message producer was created with a Destination,
therefore you cannot use an unidentified Destination");
}
if (suppliedDestination == null)
Modified:
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=592374&r1=592373&r2=592374&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
(original)
+++
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
Tue Nov 6 03:12:37 2007
@@ -104,23 +104,22 @@
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Create the filter chain to filter this handlers events.
- * <td> [EMAIL PROTECTED] ProtocolCodecFilter}, [EMAIL PROTECTED]
SSLContextFactory}, [EMAIL PROTECTED] SSLFilter}, [EMAIL PROTECTED]
ReadWriteThreadModel}.
+ * <td> [EMAIL PROTECTED] ProtocolCodecFilter}, [EMAIL PROTECTED]
SSLContextFactory}, [EMAIL PROTECTED] SSLFilter}, [EMAIL PROTECTED]
ReadWriteThreadModel}.
*
* <tr><td> Maintain fail-over state.
* <tr><td>
* </table>
*
* @todo Explain the system property: amqj.shared_read_write_pool. How does
putting the protocol codec filter before the
- * async write filter make it a shared pool? The pooling filter uses the
same thread pool for reading and writing
- * anyway, see [EMAIL PROTECTED] org.apache.qpid.pool.PoolingFilter},
docs for comments. Will putting the protocol codec
- * filter before it mean not doing the read/write asynchronously but in
the main filter thread?
- *
+ * async write filter make it a shared pool? The pooling filter uses the same
thread pool for reading and writing
+ * anyway, see [EMAIL PROTECTED] org.apache.qpid.pool.PoolingFilter}, docs for
comments. Will putting the protocol codec
+ * filter before it mean not doing the read/write asynchronously but in the
main filter thread?
* @todo Use a single handler instance, by shifting everything to do with the
'protocol session' state, including
- * failover state, into AMQProtocolSession, and tracking that from
AMQConnection? The lifecycles of
- * AMQProtocolSesssion and AMQConnection will be the same, so if there
is high cohesion between them, they could
- * be merged, although there is sense in keeping the session model
seperate. Will clarify things by having data
- * held per protocol handler, per protocol session, per network
connection, per channel, in seperate classes, so
- * that lifecycles of the fields match lifecycles of their containing
objects.
+ * failover state, into AMQProtocolSession, and tracking that from
AMQConnection? The lifecycles of
+ * AMQProtocolSesssion and AMQConnection will be the same, so if there is high
cohesion between them, they could
+ * be merged, although there is sense in keeping the session model seperate.
Will clarify things by having data
+ * held per protocol handler, per protocol session, per network connection,
per channel, in seperate classes, so
+ * that lifecycles of the fields match lifecycles of their containing objects.
*/
public class AMQProtocolHandler extends IoHandlerAdapter
{
@@ -200,7 +199,7 @@
{
SSLConfiguration sslConfig = _connection.getSSLConfiguration();
SSLContextFactory sslFactory =
- new SSLContextFactory(sslConfig.getKeystorePath(),
sslConfig.getKeystorePassword(), sslConfig.getCertType());
+ new SSLContextFactory(sslConfig.getKeystorePath(),
sslConfig.getKeystorePassword(), sslConfig.getCertType());
SSLFilter sslFilter = new
SSLFilter(sslFactory.buildClientContext());
sslFilter.setUseClientMode(true);
session.getFilterChain().addBefore("protocolFilter", "ssl",
sslFilter);
@@ -235,7 +234,7 @@
* @param session The MINA session.
*
* @todo Clarify: presumably exceptionCaught is called when the client is
sending during a connection failure and
- * not otherwise? The above comment doesn't make that clear.
+ * not otherwise? The above comment doesn't make that clear.
*/
public void sessionClosed(IoSession session)
{
@@ -413,74 +412,74 @@
switch (bodyFrame.getFrameType())
{
- case AMQMethodBody.TYPE:
+ case AMQMethodBody.TYPE:
- if (debug)
- {
- _logger.debug("(" + System.identityHashCode(this) + ")Method
frame received: " + frame);
- }
+ if (debug)
+ {
+ _logger.debug("(" + System.identityHashCode(this) +
")Method frame received: " + frame);
+ }
- final AMQMethodEvent<AMQMethodBody> evt =
- new AMQMethodEvent<AMQMethodBody>(frame.getChannel(),
(AMQMethodBody) bodyFrame);
+ final AMQMethodEvent<AMQMethodBody> evt =
+ new AMQMethodEvent<AMQMethodBody>(frame.getChannel(),
(AMQMethodBody) bodyFrame);
- try
- {
-
- boolean wasAnyoneInterested =
getStateManager().methodReceived(evt);
- if (!_frameListeners.isEmpty())
+ try
{
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
+
+ boolean wasAnyoneInterested =
getStateManager().methodReceived(evt);
+ if (!_frameListeners.isEmpty())
{
- final AMQMethodListener listener = (AMQMethodListener)
it.next();
- wasAnyoneInterested = listener.methodReceived(evt) ||
wasAnyoneInterested;
+ Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final AMQMethodListener listener =
(AMQMethodListener) it.next();
+ wasAnyoneInterested = listener.methodReceived(evt)
|| wasAnyoneInterested;
+ }
}
- }
- if (!wasAnyoneInterested)
- {
- throw new AMQException("AMQMethodEvent " + evt + " was not
processed by any listener. Listeners:"
- + _frameListeners);
+ if (!wasAnyoneInterested)
+ {
+ throw new AMQException("AMQMethodEvent " + evt + " was
not processed by any listener. Listeners:"
+ + _frameListeners);
+ }
}
- }
- catch (AMQException e)
- {
- getStateManager().error(e);
- if (!_frameListeners.isEmpty())
+ catch (AMQException e)
{
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
+ getStateManager().error(e);
+ if (!_frameListeners.isEmpty())
{
- final AMQMethodListener listener = (AMQMethodListener)
it.next();
- listener.error(e);
+ Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final AMQMethodListener listener =
(AMQMethodListener) it.next();
+ listener.error(e);
+ }
}
- }
- exceptionCaught(session, e);
- }
+ exceptionCaught(session, e);
+ }
- break;
+ break;
- case ContentHeaderBody.TYPE:
+ case ContentHeaderBody.TYPE:
- _protocolSession.messageContentHeaderReceived(frame.getChannel(),
(ContentHeaderBody) bodyFrame);
- break;
+
_protocolSession.messageContentHeaderReceived(frame.getChannel(),
(ContentHeaderBody) bodyFrame);
+ break;
- case ContentBody.TYPE:
+ case ContentBody.TYPE:
- _protocolSession.messageContentBodyReceived(frame.getChannel(),
(ContentBody) bodyFrame);
- break;
+
_protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody)
bodyFrame);
+ break;
- case HeartbeatBody.TYPE:
+ case HeartbeatBody.TYPE:
- if (debug)
- {
- _logger.debug("Received heartbeat");
- }
+ if (debug)
+ {
+ _logger.debug("Received heartbeat");
+ }
- break;
+ break;
- default:
+ default:
}
@@ -491,6 +490,8 @@
public void messageSent(IoSession session, Object message) throws Exception
{
+// System.err.println("Sent PS:" +
System.identityHashCode(_protocolSession) + ":" + message);
+
final long sentMessages = _messagesOut++;
final boolean debug = _logger.isDebugEnabled();
@@ -547,7 +548,7 @@
* @param listener the blocking listener. Note the calling thread will
block.
*/
public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
BlockingMethodFrameListener listener)
- throws AMQException, FailoverException
+ throws AMQException, FailoverException
{
return writeCommandFrameAndWaitForReply(frame, listener,
DEFAULT_SYNC_TIMEOUT);
}
@@ -560,7 +561,7 @@
* @param listener the blocking listener. Note the calling thread will
block.
*/
public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
BlockingMethodFrameListener listener,
- long timeout) throws AMQException, FailoverException
+ long timeout)
throws AMQException, FailoverException
{
try
{
@@ -570,8 +571,8 @@
AMQMethodEvent e = listener.blockForFrame(timeout);
return e;
- // When control resumes before this line, a reply will have
been received
- // that matches the criteria defined in the blocking listener
+ // When control resumes before this line, a reply will have been
received
+ // that matches the criteria defined in the blocking listener
}
catch (AMQException e)
{
@@ -595,7 +596,7 @@
public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long
timeout) throws AMQException, FailoverException
{
return writeCommandFrameAndWaitForReply(frame, new
SpecificMethodFrameListener(frame.getChannel(), responseClass),
- timeout);
+ timeout);
}
public void closeSession(AMQSession session) throws AMQException
@@ -621,12 +622,12 @@
// TODO: Connect this to the session version obtained from
ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
final AMQFrame frame =
- ConnectionCloseBody.createAMQFrame(0,
_protocolSession.getProtocolMajorVersion(),
- _protocolSession.getProtocolMinorVersion(), // AMQP version
(major, minor)
- 0, // classId
- 0, // methodId
- AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- new AMQShortString("JMS client is closing the connection."));
// replyText
+ ConnectionCloseBody.createAMQFrame(0,
_protocolSession.getProtocolMajorVersion(),
+
_protocolSession.getProtocolMinorVersion(), // AMQP version (major, minor)
+ 0, // classId
+ 0, // methodId
+
AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ new AMQShortString("JMS
client is closing the connection.")); // replyText
try
{
Modified:
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?rev=592374&r1=592373&r2=592374&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
(original)
+++
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
Tue Nov 6 03:12:37 2007
@@ -255,9 +255,9 @@
if (_currentState != s)
{
_logger.warn("State not achieved within permitted time.
Current state " + _currentState
- + ", desired state: " + s);
+ + ", desired state: " + s);
throw new AMQException("State not achieved within permitted
time. Current state " + _currentState
- + ", desired state: " + s);
+ + ", desired state: " + s);
}
}
Modified:
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java?rev=592374&r1=592373&r2=592374&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java
(original)
+++
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java
Tue Nov 6 03:12:37 2007
@@ -22,23 +22,26 @@
package org.apache.qpid.server.txn;
import junit.framework.TestCase;
-import junit.framework.Assert;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQSessionDirtyException;
import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
-import org.apache.log4j.Logger;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
-import javax.jms.Session;
+import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
-import javax.jms.ConnectionFactory;
-import javax.jms.Connection;
-import javax.jms.Message;
+import javax.jms.Session;
import javax.jms.TextMessage;
-import javax.jms.MessageListener;
-import javax.naming.spi.InitialContextFactory;
+import javax.jms.TransactionRolledBackException;
import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
import java.util.Hashtable;
import java.util.concurrent.CountDownLatch;
@@ -49,7 +52,8 @@
private static final Logger _logger = Logger.getLogger(TxnTest.class);
- protected final String BROKER = "vm://:1";//"localhost";
+ //Set retries quite high to ensure that it continues to retry whilst the
InVM broker is restarted.
+ protected final String BROKER = "vm://:1?retries='1000'";
protected final String VHOST = "/test";
protected final String QUEUE = "TxnTestQueue";
@@ -75,7 +79,11 @@
Hashtable<String, String> env = new Hashtable<String, String>();
env.put("connectionfactory.connection", "amqp://guest:[EMAIL
PROTECTED]" + VHOST + "?brokerlist='" + BROKER + "'");
- env.put("queue.queue", QUEUE);
+
+ // Ensure that the queue is unique for each test run.
+ // There appears to be other old sesssion/consumers when looping the
tests this means that sometimes a message
+ // will disappear. When it has actually gone to the old client.
+ env.put("queue.queue", QUEUE + "-" + System.currentTimeMillis());
_context = factory.getInitialContext(env);
@@ -109,7 +117,7 @@
{
_producerConnection.close();
}
-
+
super.tearDown();
if (BROKER.startsWith("vm://"))
@@ -124,10 +132,8 @@
_consumer.setMessageListener(this);
_clientConnection.start();
- //Set TTL
_producer.send(_producerSession.createTextMessage("TxtTestML"));
-
try
{
//Wait for message to arrive
@@ -150,7 +156,6 @@
public void onMessage(Message message)
{
-
try
{
assertEquals("Incorrect Message Received.", "TxtTestML",
((TextMessage) message).getText());
@@ -170,19 +175,235 @@
{
_clientConnection.start();
- //Set TTL
_producer.send(_producerSession.createTextMessage("TxtTestReceive"));
//Receive Message
Message received = _consumer.receive(1000);
+ _clientSession.commit();
+
assertEquals("Incorrect Message Received.", "TxtTestReceive",
((TextMessage) received).getText());
- //Receive Message
+ //Receive Message
received = _consumer.receive(1000);
assertNull("More messages received", received);
_consumer.close();
}
+
+ /**
+ * Test that after the connection has failed over that a sent message is
still correctly receieved.
+ * Using Auto-Ack consumer.
+ *
+ * @throws JMSException
+ */
+ public void testReceiveAfterFailover() throws JMSException
+ {
+// System.err.println("testReceiveAfterFailover");
+ _clientConnection.close();
+
+ MessageConsumer consumer = _producerSession.createConsumer(_queue);
+
+ failServer();
+
+// System.err.println("Server restarted");
+
+ String MESSAGE_TXT = "TxtTestReceiveAfterFailoverTX";
+
+// System.err.println("Prod Session:" + _producerSession + ":" +
((AMQSession) _producerSession).isClosed());
+
+ Message sent = _producerSession.createTextMessage(MESSAGE_TXT);
+// System.err.println("Created message");
+
+ _producer.send(sent);
+// System.err.println("Sent message");
+
+ //Verify correct message received
+ Message received = consumer.receive(10000);
+// System.err.println("Message Receieved:" + received);
+
+ assertNotNull("Message should be received.", received);
+ assertEquals("Incorrect Message Received.", MESSAGE_TXT,
((TextMessage) received).getText());
+
+ //Check no more messages are received
+ received = consumer.receive(1000);
+ System.err.println("Second receive completed.");
+
+ assertNull("More messages received", received);
+
+ _producer.close();
+// System.err.println("Close producer");
+
+ consumer.close();
+// System.err.println("Close consumer");
+
+ _producerConnection.close();
+ }
+
+ /**
+ * Test that after the connection has failed over the dirty transaction is
notified when calling commit
+ *
+ * @throws JMSException
+ */
+ public void testSendBeforeFailoverThenCommitTx() throws JMSException
+ {
+// System.err.println("testSendBeforeFailoverThenCommitTx");
+ _clientConnection.start();
+
+ //Create a transacted producer.
+ MessageProducer txProducer = _clientSession.createProducer(_queue);
+
+ String MESSAGE_TXT = "testSendBeforeFailoverThenCommitTx";
+
+ //Send the first message
+ txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+
+ failServer();
+
+ //Check that the message isn't received.
+ Message received = _consumer.receive(1000);
+ assertNull("Message received after failover to clean broker!",
received);
+
+ //Attempt to commit session
+ try
+ {
+ _clientSession.commit();
+ fail("TransactionRolledBackException not thrown");
+ }
+ catch (JMSException jmse)
+ {
+ if (!(jmse instanceof TransactionRolledBackException))
+ {
+ fail(jmse.toString());
+ }
+ }
+
+ //Close consumer & producer
+ _consumer.close();
+ txProducer.close();
+ }
+
+ /**
+ * Test that after the connection has failed over the dirty transaction is
fast failed by throwing an
+ * Exception on the next send.
+ *
+ * @throws JMSException
+ */
+ public void testSendBeforeFailoverThenSendTx() throws JMSException
+ {
+// System.err.println("testSendBeforeFailoverThenSendTx");
+
+ _clientConnection.start();
+ MessageProducer txProducer = _clientSession.createProducer(_queue);
+
+ String MESSAGE_TXT = "TxtTestSendBeforeFailoverTx";
+
+ //Send the first message
+ txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+
+ failServer();
+
+ //Check that the message isn't received.
+ Message received = _consumer.receive(1000);
+ assertNull("Message received after failover to clean broker!",
received);
+
+ //Attempt to send another message on the session, here we should fast
fail.
+ try
+ {
+ txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+ fail("JMSException not thrown");
+ }
+ catch (JMSException jmse)
+ {
+ if (!(jmse.getLinkedException() instanceof
AMQSessionDirtyException))
+ {
+ fail(jmse.toString());
+ }
+ }
+
+
+ _consumer.close();
+ }
+
+ public void testSendBeforeFailoverThenSend2Tx() throws JMSException
+ {
+// System.err.println("testSendBeforeFailoverThenSendTx");
+
+ _clientConnection.start();
+ MessageProducer txProducer = _clientSession.createProducer(_queue);
+
+ String MESSAGE_TXT = "TxtTestSendBeforeFailoverTx";
+
+ //Send the first message
+ txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+
+ failServer();
+
+ //Check that the message isn't received.
+ Message received = _consumer.receive(1000);
+ assertNull("Message received after failover to clean broker!",
received);
+
+ _clientSession.rollback();
+
+ //Attempt to send another message on the session, here we should fast
fail.
+ try
+ {
+ txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+ txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+ }
+ catch (JMSException jmse)
+ {
+ if (jmse.getLinkedException() instanceof AMQSessionDirtyException)
+ {
+ fail(jmse.toString());
+ }
+ }
+
+
+ _consumer.close();
+ }
+
+
+ private void failServer()
+ {
+ if (BROKER.startsWith("vm://"))
+ {
+ //Work around for MessageStore not being initialise and the send
not fully completing before the failover occurs.
+ try
+ {
+ Thread.sleep(5000);
+ }
+ catch (InterruptedException e)
+ {
+
+ }
+
+ TransportConnection.killAllVMBrokers();
+ ApplicationRegistry.remove(1);
+ try
+ {
+ TransportConnection.createVMBroker(1);
+ }
+ catch (AMQVMBrokerCreationException e)
+ {
+ _logger.error("Unable to restart broker due to :" + e);
+ }
+
+ //Work around for receive not being failover aware.. because it is
the first receive it trys to
+ // unsuspend the channel but in this case the ChannelFlow command
goes on the old session and the response on the
+ // new one ... though I thought the statemanager recorded the
listeners so should be ok.???
+ try
+ {
+ Thread.sleep(5000);
+ }
+ catch (InterruptedException e)
+ {
+
+ }
+
+ }
+
+ }
+
}