Author: arnaudsimon
Date: Mon Aug 6 10:15:56 2007
New Revision: 563197
URL: http://svn.apache.org/viewvc?view=rev&rev=563197
Log:
Added dtx classes
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java
(with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
(with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java
(with props)
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/DtxSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/DtxSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/DtxSession.java?view=diff&rev=563197&r1=563196&r2=563197
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/DtxSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/DtxSession.java
Mon Aug 6 10:15:56 2007
@@ -18,7 +18,7 @@
*/
package org.apache.qpidity;
-import org.apache.qpidity.QpidException;
+import javax.transaction.xa.Xid;
/**
* This session�s resources are control under the scope of a distributed
transaction.
@@ -27,11 +27,112 @@
{
/**
- * Get the XA resource associated with this session.
+ * This method is called when messages should be produced and consumed on
behalf a transaction
+ * branch identified by xid.
+ * possible options are:
+ * <ul>
+ * <li> [EMAIL PROTECTED] Option#JOIN}: Indicate that the start applies
to joining a transaction previously seen.
+ * <li> [EMAIL PROTECTED] Option#RESUME}: Indicate that the start applies
to resuming a suspended transaction branch specified.
+ * </ul>
*
- * @return this session XA resource.
- * @throws QpidException If the session fails to retrieve its associated
XA resource
- * due to some error.
+ * @param xid Specifies the xid of the transaction branch to be
started.
+ * @param options Possible options are: [EMAIL PROTECTED] Option#JOIN} and
[EMAIL PROTECTED] Option#RESUME}.
+ * @throws QpidException If the session fails to start due to some error
*/
- public javax.transaction.xa.XAResource getDTXResource() throws
QpidException;
+ public void dtxDemarcationStart(Xid xid, Option... options) throws
QpidException;
+
+ /**
+ * This method is called when the work done on behalf a transaction branch
finishes or needs to
+ * be suspended.
+ * possible options are:
+ * <ul>
+ * <li> [EMAIL PROTECTED] Option#FAIL}: indicates that this portion of
work has failed;
+ * otherwise this portion of work has
+ * completed successfully.
+ * <li> [EMAIL PROTECTED] Option#SUSPEND}: Indicates that the transaction
branch is
+ * temporarily suspended in an incomplete state.
+ * </ul>
+ *
+ * @param xid Specifies the xid of the transaction branch to be ended.
+ * @param options Available options are: [EMAIL PROTECTED] Option#FAIL}
and [EMAIL PROTECTED] Option#SUSPEND}.
+ * @throws QpidException If the session fails to end due to some error
+ */
+ public void dtxDemarcationEnd(Xid xid, Option... options) throws
QpidException;
+
+ /**
+ * Commit the work done on behalf a transaction branch. This method
commits the work associated
+ * with xid. Any produced messages are made available and any consumed
messages are discarded.
+ * possible option is:
+ * <ul>
+ * <li> [EMAIL PROTECTED] Option#ONE_PHASE}: When set then one-phase
commit optimization is used.
+ * </ul>
+ *
+ * @param xid Specifies the xid of the transaction branch to be
committed.
+ * @param options Available option is: [EMAIL PROTECTED] Option#ONE_PHASE}
+ * @throws QpidException If the session fails to commit due to some error
+ */
+ public void dtxCoordinationCommit(Xid xid, Option... options) throws
QpidException;
+
+ /**
+ * This method is called to forget about a heuristically completed
transaction branch.
+ *
+ * @param xid Specifies the xid of the transaction branch to be forgotten.
+ * @throws QpidException If the session fails to forget due to some error
+ */
+ public void dtxCoordinationForget(Xid xid) throws QpidException;
+
+ /**
+ * This method obtains the current transaction timeout value in seconds.
If set-timeout was not
+ * used prior to invoking this method, the return value is the default
timeout; otherwise, the
+ * value used in the previous set-timeout call is returned.
+ *
+ * @param xid Specifies the xid of the transaction branch for getting the
timeout.
+ * @return The current transaction timeout value in seconds.
+ * @throws QpidException If the session fails to get the timeout due to
some error
+ */
+ public long dtxCoordinationGetTimeout(Xid xid) throws QpidException;
+
+ /**
+ * This method prepares for commitment any message produced or consumed on
behalf of xid.
+ *
+ * @param xid Specifies the xid of the transaction branch that can be
prepared.
+ * @return The status of the prepare operation: can be one of those:
+ * xa-ok: Normal execution.
+ * <p/>
+ * xa-rdonly: The transaction branch was read-only and has been
committed.
+ * <p/>
+ * xa-rbrollback: The broker marked the transaction branch
rollback-only for an unspecified
+ * reason.
+ * <p/>
+ * xa-rbtimeout: The work represented by this transaction branch
took too long.
+ * @throws QpidException If the session fails to prepare due to some error
+ */
+ public short dtxCoordinationPrepare(Xid xid) throws QpidException;
+
+ /**
+ * This method is called to obtain a list of transaction branches that are
in a prepared or
+ * heuristically completed state.
+ *
+ * @return a array of xids to be recovered.
+ * @throws QpidException If the session fails to recover due to some error
+ */
+ public Xid[] dtxCoordinationRecover() throws QpidException;
+
+ /**
+ * This method rolls back the work associated with xid. Any produced
messages are discarded and
+ * any consumed messages are re-enqueued.
+ *
+ * @param xid Specifies the xid of the transaction branch that can be
rolled back.
+ * @throws QpidException If the session fails to rollback due to some error
+ */
+ public void dtxCoordinationRollback(Xid xid) throws QpidException;
+
+ /**
+ * Sets the specified transaction branch timeout value in seconds.
+ *
+ * @param xid Specifies the xid of the transaction branch for setting
the timeout.
+ * @param timeout The transaction timeout value in seconds.
+ * @throws QpidException If the session fails to set the timeout due to
some error
+ */
+ public void dtxCoordinationSetTimeout(Xid xid, long timeout) throws
QpidException;
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java?view=diff&rev=563197&r1=563196&r2=563197
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java
Mon Aug 6 10:15:56 2007
@@ -42,7 +42,7 @@
/**
* Maps from session id (Integer) to SessionImpl instance
*/
- private final Vector<SessionImpl> _sessions = new Vector<SessionImpl>();
+ protected final Vector<SessionImpl> _sessions = new Vector<SessionImpl>();
/**
* This is the clientID
@@ -113,10 +113,18 @@
* @return A newly created session
* @throws JMSException If the Connection object fails to create a session
due to some internal error.
*/
- public Session createSession(boolean transacted, int acknowledgeMode)
throws JMSException
+ public synchronized Session createSession(boolean transacted, int
acknowledgeMode) throws JMSException
{
checkNotClosed();
- SessionImpl session = new SessionImpl(this, transacted,
acknowledgeMode);
+ SessionImpl session = null;
+ try
+ {
+ session = new SessionImpl(this, transacted, acknowledgeMode,
false);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
// add this session with the list of session that are handled by this
connection
_sessions.add(session);
return session;
@@ -178,7 +186,7 @@
* @return the <CODE>ExceptionListener</CODE> for this connection
* @throws JMSException In case of unforeseen problem
*/
- public ExceptionListener getExceptionListener() throws JMSException
+ public synchronized ExceptionListener getExceptionListener() throws
JMSException
{
checkNotClosed();
return _exceptionListener;
@@ -203,7 +211,7 @@
* @param exceptionListener The connection listener.
* @throws JMSException If the connection is closed.
*/
- public void setExceptionListener(ExceptionListener exceptionListener)
throws JMSException
+ public synchronized void setExceptionListener(ExceptionListener
exceptionListener) throws JMSException
{
checkNotClosed();
_exceptionListener = exceptionListener;
@@ -217,7 +225,7 @@
*
* @throws JMSException In case of a problem due to some internal error.
*/
- public void start() throws JMSException
+ public synchronized void start() throws JMSException
{
checkNotClosed();
if (!_started)
@@ -231,7 +239,7 @@
}
catch (Exception e)
{
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ throw
ExceptionHelper.convertQpidExceptionToJMSException(e);
}
}
_started = true;
@@ -248,7 +256,7 @@
*
* @throws JMSException In case of a problem due to some internal error.
*/
- public void stop() throws JMSException
+ public synchronized void stop() throws JMSException
{
checkNotClosed();
if (_started)
@@ -262,7 +270,7 @@
}
catch (Exception e)
{
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ throw
ExceptionHelper.convertQpidExceptionToJMSException(e);
}
}
_started = false;
@@ -284,7 +292,7 @@
*
* @throws JMSException In case of a problem due to some internal error.
*/
- public void close() throws JMSException
+ public synchronized void close() throws JMSException
{
checkNotClosed();
if (!_isClosed)
@@ -320,8 +328,8 @@
* @throws JMSException In case of a problem due to some internal error.
*/
public ConnectionConsumer createConnectionConsumer(Destination
destination, String messageSelector,
- ServerSessionPool
sessionPool, int maxMessages) throws
-
JMSException
+ ServerSessionPool
sessionPool, int maxMessages)
+ throws JMSException
{
checkNotClosed();
return null;
@@ -359,7 +367,7 @@
* @return A queueSession object/
* @throws JMSException If creating a QueueSession fails due to some
internal error.
*/
- public QueueSession createQueueSession(boolean transacted, int
acknowledgeMode) throws JMSException
+ public synchronized QueueSession createQueueSession(boolean transacted,
int acknowledgeMode) throws JMSException
{
checkNotClosed();
QueueSessionImpl queueSession = new QueueSessionImpl(this, transacted,
acknowledgeMode);
@@ -380,8 +388,8 @@
* @throws JMSException In case of a problem due to some internal error.
*/
public ConnectionConsumer createConnectionConsumer(Queue queue, String
messageSelector,
- ServerSessionPool
sessionPool, int maxMessages) throws
-
JMSException
+ ServerSessionPool
sessionPool, int maxMessages)
+ throws JMSException
{
return createConnectionConsumer((Destination) queue, messageSelector,
sessionPool, maxMessages);
}
@@ -396,7 +404,7 @@
* @return a newly created topic session
* @throws JMSException If creating the session fails due to some internal
error.
*/
- public TopicSession createTopicSession(boolean transacted, int
acknowledgeMode) throws JMSException
+ public synchronized TopicSession createTopicSession(boolean transacted,
int acknowledgeMode) throws JMSException
{
checkNotClosed();
TopicSessionImpl session = new TopicSessionImpl(this, transacted,
acknowledgeMode);
@@ -418,8 +426,8 @@
* @throws JMSException In case of a problem due to some internal error.
*/
public ConnectionConsumer createConnectionConsumer(Topic topic, String
messageSelector,
- ServerSessionPool
sessionPool, int maxMessages) throws
-
JMSException
+ ServerSessionPool
sessionPool, int maxMessages)
+ throws JMSException
{
return createConnectionConsumer((Destination) topic, messageSelector,
sessionPool, maxMessages);
}
@@ -442,7 +450,8 @@
{
_logger.debug("Connection has been closed. Cannot invoke any
further operations.");
}
- throw new javax.jms.IllegalStateException("Connection has been
closed. Cannot invoke any further operations.");
+ throw new javax.jms.IllegalStateException(
+ "Connection has been closed. Cannot invoke any further
operations.");
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java?view=diff&rev=563197&r1=563196&r2=563197
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ExceptionHelper.java
Mon Aug 6 10:15:56 2007
@@ -20,6 +20,7 @@
import org.apache.qpidity.QpidException;
import javax.jms.JMSException;
+import javax.transaction.xa.XAException;
/**
* Helper class for handling exceptions
@@ -46,5 +47,13 @@
jmsException = (JMSException) exception;
}
return jmsException;
+ }
+
+ static public XAException convertQpidExceptionToXAException(QpidException
exception)
+ {
+ String qpidErrorCode = exception.getErrorCode();
+ // todo map this error to an XA code
+ int xaCode = XAException.XAER_PROTO;
+ return new XAException(xaCode);
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java?view=diff&rev=563197&r1=563196&r2=563197
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageProducerImpl.java
Mon Aug 6 10:15:56 2007
@@ -305,10 +305,10 @@
throw new IllegalArgumentException("Time to live must be
non-negative - supplied value was " + timeToLive);
}
// check that the message is not a foreign one
-
+ // todo
// set the properties
- //
+ // todo
// dispatch it
// todo
getSession().getQpidSession().messageTransfer(((DestinationImpl)
destination).getExchangeName(), message, Option);
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java?view=diff&rev=563197&r1=563196&r2=563197
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/SessionImpl.java
Mon Aug 6 10:15:56 2007
@@ -128,10 +128,12 @@
* @param transacted Indicates if the session transacted.
* @param acknowledgeMode The session's acknowledgement mode. This value
is ignored and set to
* [EMAIL PROTECTED] Session#SESSION_TRANSACTED} if
the <code>transacted</code> parameter is true.
+ * @param isXA Indicates whether this session is an XA session.
* @throws JMSSecurityException If the user could not be authenticated.
- * @throws JMSException In case of internal error.
+ * @throws QpidException In case of internal error.
*/
- protected SessionImpl(ConnectionImpl connection, boolean transacted, int
acknowledgeMode) throws JMSException
+ protected SessionImpl(ConnectionImpl connection, boolean transacted, int
acknowledgeMode, boolean isXA)
+ throws QpidException
{
_connection = connection;
_transacted = transacted;
@@ -141,19 +143,12 @@
acknowledgeMode = Session.SESSION_TRANSACTED;
}
_acknowledgeMode = acknowledgeMode;
- try
- {
- // create the qpid session with an expiry <= 0 so that the
session does not expire
- _qpidSession = _connection.getQpidConnection().createSession(0);
- // set transacted if required
- if (_transacted)
- {
- //_qpidSession.setTransacted();
- }
- }
- catch (QpidException e)
+ // create the qpid session with an expiry <= 0 so that the session
does not expire
+ _qpidSession = _connection.getQpidConnection().createSession(0);
+ // set transacted if required
+ if (_transacted && !isXA)
{
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ _qpidSession.txSelect();
}
// init the message dispatcher.
initMessageDispatcherThread();
@@ -314,7 +309,6 @@
// commit the underlying Qpid Session
try
{
- // Note: this operation makes sure that asynch message processing
has returned
_qpidSession.txCommit();
}
catch (QpidException e)
@@ -341,7 +335,6 @@
// rollback the underlying Qpid Session
try
{
- // Note: this operation makes sure that asynch message processing
has returned
_qpidSession.txRollback();
}
catch (org.apache.qpidity.QpidException e)
@@ -640,7 +633,7 @@
}
catch (QpidException e)
{
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
}
return result;
}
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java?view=auto&rev=563197
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java
(added)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java
Mon Aug 6 10:15:56 2007
@@ -0,0 +1,54 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.jms;
+
+import org.apache.qpidity.QpidException;
+
+import javax.jms.XAConnection;
+import javax.jms.JMSException;
+import javax.jms.XASession;
+
+/**
+ * This class implements the javax.jms.XAConnection interface
+ */
+public class XAConnectionImpl extends ConnectionImpl implements XAConnection
+{
+ /**
+ * Creates an XASession.
+ *
+ * @return A newly created XASession.
+ * @throws JMSException If the XAConnectiono fails to create an XASession
due to
+ * some internal error.
+ */
+ public synchronized XASession createXASession() throws JMSException
+ {
+ checkNotClosed();
+ XASessionImpl xasession;
+ try
+ {
+ xasession = new XASessionImpl(this);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ // add this session with the list of session that are handled by this
connection
+ _sessions.add(xasession);
+ return xasession;
+ }
+}
Propchange:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java?view=auto&rev=563197
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
(added)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
Mon Aug 6 10:15:56 2007
@@ -0,0 +1,329 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.jms;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.qpidity.Option;
+import org.apache.qpidity.QpidException;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import javax.transaction.xa.XAException;
+
+/**
+ * This is an implementation of javax.jms.XAResource.
+ */
+public class XAResourceImpl implements XAResource
+{
+ /**
+ * this XAResourceImpl's logger
+ */
+ private static final Logger _logger =
LoggerFactory.getLogger(XAResourceImpl.class);
+
+ /**
+ * Reference to the associated XASession
+ */
+ private XASessionImpl _xaSession = null;
+
+ /**
+ * The XID of this resource
+ */
+ private Xid _xid;
+
+ //--- constructor
+
+ /**
+ * Create an XAResource associated with a XASession
+ *
+ * @param xaSession The session XAresource
+ */
+ protected XAResourceImpl(XASessionImpl xaSession)
+ {
+ _xaSession = xaSession;
+ }
+
+ //--- The XAResource
+ /**
+ * Commits the global transaction specified by xid.
+ *
+ * @param xid A global transaction identifier
+ * @param b If true, use a one-phase commit protocol to commit the work
done on behalf of xid.
+ * @throws XAException An error has occurred. Possible XAExceptions are
XAER_RMERR, XAER_NOTA or XAER_PROTO.
+ */
+ public void commit(Xid xid, boolean b) throws XAException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("commit ", xid);
+ }
+ try
+ {
+ _xaSession.getQpidSession().dtxCoordinationCommit(xid, b ?
Option.ONE_PHASE : Option.NO_OPTION);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToXAException(e);
+ }
+ }
+
+ /**
+ * Ends the work performed on behalf of a transaction branch.
+ * The resource manager disassociates the XA resource from the transaction
branch specified
+ * and lets the transaction complete.
+ * <ul>
+ * <li> If TMSUSPEND is specified in the flags, the transaction branch is
temporarily suspended in an incomplete state.
+ * The transaction context is in a suspended state and must be resumed via
the start method with TMRESUME specified.
+ * <li> If TMFAIL is specified, the portion of work has failed. The
resource manager may mark the transaction as rollback-only
+ * <li> If TMSUCCESS is specified, the portion of work has completed
successfully.
+ * /ul>
+ *
+ * @param xid A global transaction identifier that is the same as the
identifier used previously in the start method
+ * @param flag One of TMSUCCESS, TMFAIL, or TMSUSPEND.
+ * @throws XAException An error has occurred. Possible XAException values
+ * are XAER_RMERR, XAER_RMFAILED, XAER_NOTA,
XAER_INVAL, XAER_PROTO, or XA_RB*.
+ */
+ public void end(Xid xid, int flag) throws XAException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("end ", xid);
+ }
+ try
+ {
+ _xid = null;
+ _xaSession.getQpidSession()
+ .dtxDemarcationEnd(xid, flag == XAResource.TMFAIL ?
Option.FAIL : Option.NO_OPTION,
+ flag == XAResource.TMSUSPEND ?
Option.SUSPEND : Option.NO_OPTION);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToXAException(e);
+ }
+ }
+
+ /**
+ * Tells the resource manager to forget about a heuristically completed
transaction branch.
+ *
+ * @param xid A global transaction identifier
+ * @throws XAException An error has occurred. Possible exception values
are XAER_RMERR, XAER_RMFAIL,
+ * XAER_NOTA, XAER_INVAL, or XAER_PROTO.
+ */
+ public void forget(Xid xid) throws XAException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("forget ", xid);
+ }
+ try
+ {
+ _xaSession.getQpidSession()
+ .dtxCoordinationForget(xid);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToXAException(e);
+ }
+ }
+
+ /**
+ * Obtains the current transaction timeout value set for this XAResource
instance.
+ * If XAResource.setTransactionTimeout was not used prior to invoking this
method,
+ * the return value is the default timeout i.e. 0;
+ * otherwise, the value used in the previous setTransactionTimeout call is
returned.
+ *
+ * @return The transaction timeout value in seconds.
+ * @throws XAException An error has occurred. Possible exception values
are XAER_RMERR, XAER_RMFAIL.
+ */
+ public int getTransactionTimeout() throws XAException
+ {
+ int result = 0;
+ if (_xid != null)
+ {
+ try
+ {
+ result = (int)
_xaSession.getQpidSession().dtxCoordinationGetTimeout(_xid);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToXAException(e);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * This method is called to determine if the resource manager instance
represented
+ * by the target object is the same as the resouce manager instance
represented by
+ * the parameter xaResource.
+ *
+ * @param xaResource An XAResource object whose resource manager instance
is to
+ * be compared with the resource manager instance of the
target object
+ * @return <code>true</code> if it's the same RM instance; otherwise
<code>false</code>.
+ * @throws XAException An error has occurred. Possible exception values
are XAER_RMERR, XAER_RMFAIL.
+ */
+ public boolean isSameRM(XAResource xaResource) throws XAException
+ {
+ // TODO : get the server identity of xaResource and compare it with
our own one
+ return false;
+ }
+
+ /**
+ * Prepare for a transaction commit of the transaction specified in
<code>Xid</code>.
+ *
+ * @param xid A global transaction identifier.
+ * @return A value indicating the resource manager's vote on the outcome
of the transaction.
+ * The possible values are: XA_RDONLY or XA_OK.
+ * @throws XAException An error has occurred. Possible exception values
are: XAER_RMERR or XAER_NOTA
+ */
+ public int prepare(Xid xid) throws XAException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("prepare ", xid);
+ }
+ int result;
+ try
+ {
+ result = _xaSession.getQpidSession()
+ .dtxCoordinationPrepare(xid);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToXAException(e);
+ }
+ if (result == XAException.XA_RDONLY)
+ {
+ throw new XAException(XAException.XA_RDONLY);
+ }
+ else if (result == XAException.XA_RBROLLBACK)
+ {
+ throw new XAException(XAException.XA_RBROLLBACK);
+ }
+ return result;
+ }
+
+ /**
+ * Obtains a list of prepared transaction branches.
+ * <p/>
+ * The transaction manager calls this method during recovery to obtain the
list of transaction branches
+ * that are currently in prepared or heuristically completed states.
+ *
+ * @param flag One of TMSTARTRSCAN, TMENDRSCAN, TMNOFLAGS.
+ * TMNOFLAGS must be used when no other flags are set in the
parameter.
+ * @return zero or more XIDs of the transaction branches that are
currently in a prepared or heuristically
+ * completed state.
+ * @throws XAException An error has occurred. Possible value is XAER_INVAL.
+ */
+ public Xid[] recover(int flag) throws XAException
+ {
+ try
+ {
+ // the flag is ignored
+ return _xaSession.getQpidSession()
+ .dtxCoordinationRecover();
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToXAException(e);
+ }
+ }
+
+ /**
+ * Informs the resource manager to roll back work done on behalf of a
transaction branch
+ *
+ * @param xid A global transaction identifier.
+ * @throws XAException An error has occurred.
+ */
+ public void rollback(Xid xid) throws XAException
+ {
+ try
+ {
+ // the flag is ignored
+ _xaSession.getQpidSession()
+ .dtxCoordinationRollback(xid);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToXAException(e);
+ }
+ }
+
+ /**
+ * Sets the current transaction timeout value for this XAResource instance.
+ * Once set, this timeout value is effective until setTransactionTimeout is
+ * invoked again with a different value.
+ * To reset the timeout value to the default value used by the resource
manager, set the value to zero.
+ *
+ * @param timeout The transaction timeout value in seconds.
+ * @return true if transaction timeout value is set successfully;
otherwise false.
+ * @throws XAException An error has occurred. Possible exception values
are XAER_RMERR, XAER_RMFAIL, or XAER_INVAL.
+ */
+ public boolean setTransactionTimeout(int timeout) throws XAException
+ {
+ boolean result = false;
+ if (_xid != null)
+ {
+ try
+ {
+ // the flag is ignored
+ _xaSession.getQpidSession()
+ .dtxCoordinationSetTimeout(_xid, timeout);
+ result = true;
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToXAException(e);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Starts work on behalf of a transaction branch specified in xid.
+ * <ul>
+ * <li> If TMJOIN is specified, an exception is thrown as it is not
supported
+ * <li> If TMRESUME is specified, the start applies to resuming a
suspended transaction specified in the parameter xid.
+ * <li> If neither TMJOIN nor TMRESUME is specified and the transaction
specified by xid has previously been seen by the
+ * resource manager, the resource manager throws the XAException exception
with XAER_DUPID error code.
+ * </ul>
+ *
+ * @param xid A global transaction identifier to be associated with the
resource
+ * @param flag One of TMNOFLAGS, TMJOIN, or TMRESUME
+ * @throws XAException An error has occurred. Possible exceptions
+ * are XA_RB*, XAER_RMERR, XAER_RMFAIL, XAER_DUPID,
XAER_OUTSIDE, XAER_NOTA, XAER_INVAL, or XAER_PROTO.
+ */
+ public void start(Xid xid, int flag) throws XAException
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("start ", xid);
+ }
+ _xid = xid;
+ try
+ {
+ _xaSession.getQpidSession()
+ .dtxDemarcationStart(xid, flag == XAResource.TMJOIN ?
Option.JOIN : Option.NO_OPTION,
+ flag == XAResource.TMRESUME ?
Option.RESUME : Option.NO_OPTION);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToXAException(e);
+ }
+ }
+}
Propchange:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java?view=auto&rev=563197
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java
(added)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java
Mon Aug 6 10:15:56 2007
@@ -0,0 +1,118 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.jms;
+
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.DtxSession;
+
+import javax.jms.XASession;
+import javax.jms.Session;
+import javax.jms.JMSException;
+import javax.jms.TransactionInProgressException;
+import javax.transaction.xa.XAResource;
+
+/**
+ * This is an implementation of the javax.jms.XASEssion interface.
+ */
+public class XASessionImpl extends SessionImpl implements XASession
+{
+ /**
+ * XAResource associated with this XASession
+ */
+ private final XAResourceImpl _xaResource;
+
+ /**
+ * This XASession Qpid DtxSession
+ */
+ private DtxSession _qpidDtxSession;
+
+ //-- Constructors
+ /**
+ * Create a JMS XASession
+ *
+ * @param connection The ConnectionImpl object from which the Session is
created.
+ * @throws QpidException In case of internal error.
+ */
+ protected XASessionImpl(ConnectionImpl connection) throws QpidException
+ {
+ super(connection, true, // this is a transacted session
+ Session.SESSION_TRANSACTED, // the ack mode is transacted
+ true); // this is an XA session so do not set tx
+ _qpidDtxSession =
getConnection().getQpidConnection().createDTXSession(0);
+ _xaResource = new XAResourceImpl(this);
+ }
+
+ //--- javax.jms.XASEssion API
+
+ /**
+ * Gets the session associated with this XASession.
+ *
+ * @return the session object
+ * @throws JMSException if an internal error occurs.
+ * @since 1.1
+ */
+ public Session getSession() throws JMSException
+ {
+ return this;
+ }
+
+ /**
+ * Returns an XA resource.
+ *
+ * @return An XA resource.
+ */
+ public XAResource getXAResource()
+ {
+ return _xaResource;
+ }
+
+ //-- overwritten mehtods
+ /**
+ * Throws a [EMAIL PROTECTED] TransactionInProgressException}, since it
should
+ * not be called for an XASession object.
+ *
+ * @throws TransactionInProgressException always.
+ */
+ public void commit() throws JMSException
+ {
+ throw new TransactionInProgressException(
+ "XASession: A direct invocation of the commit operation is
probibited!");
+ }
+
+ /**
+ * Throws a [EMAIL PROTECTED] TransactionInProgressException}, since it
should
+ * not be called for an XASession object.
+ *
+ * @throws TransactionInProgressException always.
+ */
+ public void rollback() throws JMSException
+ {
+ throw new TransactionInProgressException(
+ "XASession: A direct invocation of the rollback operation is
probibited!");
+ }
+
+ /**
+ * Access to the underlying Qpid Session
+ *
+ * @return The associated Qpid Session.
+ */
+ protected org.apache.qpidity.DtxSession getQpidSession()
+ {
+ return _qpidDtxSession;
+ }
+}
Propchange:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XASessionImpl.java
------------------------------------------------------------------------------
svn:eol-style = native