Author: rajith
Date: Mon Aug 20 21:31:50 2007
New Revision: 567946
URL: http://svn.apache.org/viewvc?rev=567946&view=rev
Log:
changed DtxSession to return Futures, moved MessageListener to util and added
URL support to the client
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessageListener.java
Removed:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/MessageListener.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidBrowserListener.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java?rev=567946&r1=567945&r2=567946&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java
Mon Aug 20 21:31:50 2007
@@ -1,11 +1,11 @@
package org.apache.qpidity.client;
-import java.net.URL;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.qpidity.BrokerDetails;
import org.apache.qpidity.Channel;
import org.apache.qpidity.Connection;
import org.apache.qpidity.ConnectionClose;
@@ -14,9 +14,9 @@
import org.apache.qpidity.MinaHandler;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.SessionDelegate;
-import org.apache.qpidity.url.QpidURL;
import org.apache.qpidity.client.impl.ClientSession;
import org.apache.qpidity.client.impl.ClientSessionDelegate;
+import org.apache.qpidity.url.QpidURL;
public class Client implements org.apache.qpidity.client.Connection
@@ -86,7 +86,13 @@
*/
public void connect(QpidURL url) throws QpidException
{
- throw new UnsupportedOperationException();
+ // temp impl to tests
+ BrokerDetails details = url.getAllBrokerDetails().get(0);
+ connect(details.getHost(),
+ details.getPort(),
+ details.getVirtualHost(),
+ details.getUserName(),
+ details.getPassword());
}
public void close() throws QpidException
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java?rev=567946&r1=567945&r2=567946&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/DtxSession.java
Mon Aug 20 21:31:50 2007
@@ -18,8 +18,14 @@
*/
package org.apache.qpidity.client;
-import javax.transaction.xa.Xid;
-
+import org.apache.qpidity.DtxCoordinationCommitResult;
+import org.apache.qpidity.DtxCoordinationGetTimeoutResult;
+import org.apache.qpidity.DtxCoordinationPrepareResult;
+import org.apache.qpidity.DtxCoordinationRecoverResult;
+import org.apache.qpidity.DtxCoordinationRollbackResult;
+import org.apache.qpidity.DtxDemarcationEndResult;
+import org.apache.qpidity.DtxDemarcationStartResult;
+import org.apache.qpidity.Future;
import org.apache.qpidity.Option;
/**
@@ -40,7 +46,7 @@
* @param xid Specifies the xid of the transaction branch to be
started.
* @param options Possible options are: [EMAIL PROTECTED] Option#JOIN} and
[EMAIL PROTECTED] Option#RESUME}.
*/
- public void dtxDemarcationStart(Xid xid, Option... options);
+ public Future<DtxDemarcationStartResult> dtxDemarcationStart(String xid,
Option... options);
/**
* This method is called when the work done on behalf a transaction branch
finishes or needs to
@@ -57,7 +63,7 @@
* @param xid Specifies the xid of the transaction branch to be ended.
* @param options Available options are: [EMAIL PROTECTED] Option#FAIL}
and [EMAIL PROTECTED] Option#SUSPEND}.
*/
- public void dtxDemarcationEnd(Xid xid, Option... options);
+ public Future<DtxDemarcationEndResult> dtxDemarcationEnd(String xid,
Option... options);
/**
* Commit the work done on behalf a transaction branch. This method
commits the work associated
@@ -70,14 +76,14 @@
* @param xid Specifies the xid of the transaction branch to be
committed.
* @param options Available option is: [EMAIL PROTECTED] Option#ONE_PHASE}
*/
- public void dtxCoordinationCommit(Xid xid, Option... options);
+ public Future<DtxCoordinationCommitResult> dtxCoordinationCommit(String
xid, Option... options);
/**
* This method is called to forget about a heuristically completed
transaction branch.
*
* @param xid Specifies the xid of the transaction branch to be forgotten.
*/
- public void dtxCoordinationForget(Xid xid);
+ public void dtxCoordinationForget(String xid);
/**
* This method obtains the current transaction timeout value in seconds.
If set-timeout was not
@@ -87,7 +93,7 @@
* @param xid Specifies the xid of the transaction branch for getting the
timeout.
* @return The current transaction timeout value in seconds.
*/
- public long dtxCoordinationGetTimeout(Xid xid);
+ public Future<DtxCoordinationGetTimeoutResult>
dtxCoordinationGetTimeout(String xid);
/**
* This method prepares for commitment any message produced or consumed on
behalf of xid.
@@ -103,7 +109,7 @@
* <p/>
* xa-rbtimeout: The work represented by this transaction branch
took too long.
*/
- public short dtxCoordinationPrepare(Xid xid);
+ public Future<DtxCoordinationPrepareResult> dtxCoordinationPrepare(String
xid);
/**
* This method is called to obtain a list of transaction branches that are
in a prepared or
@@ -111,7 +117,7 @@
*
* @return a array of xids to be recovered.
*/
- public Xid[] dtxCoordinationRecover();
+ public Future<DtxCoordinationRecoverResult> dtxCoordinationRecover();
/**
* This method rolls back the work associated with xid. Any produced
messages are discarded and
@@ -119,7 +125,7 @@
*
* @param xid Specifies the xid of the transaction branch that can be
rolled back.
*/
- public void dtxCoordinationRollback(Xid xid);
+ public Future<DtxCoordinationRollbackResult>
dtxCoordinationRollback(String xid);
/**
* Sets the specified transaction branch timeout value in seconds.
@@ -127,5 +133,5 @@
* @param xid Specifies the xid of the transaction branch for setting
the timeout.
* @param timeout The transaction timeout value in seconds.
*/
- public void dtxCoordinationSetTimeout(Xid xid, long timeout);
+ public void dtxCoordinationSetTimeout(String xid, long timeout);
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java?rev=567946&r1=567945&r2=567946&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
Mon Aug 20 21:31:50 2007
@@ -3,8 +3,6 @@
import java.io.EOFException;
import java.io.IOException;
import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
import org.apache.qpidity.Option;
@@ -14,7 +12,6 @@
import org.apache.qpidity.api.Message;
import org.apache.qpidity.client.ExceptionListener;
import org.apache.qpidity.client.MessagePartListener;
-import org.apache.qpidity.client.Session;
/**
* Implements a Qpid Sesion.
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java?rev=567946&r1=567945&r2=567946&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java
Mon Aug 20 21:31:50 2007
@@ -7,8 +7,8 @@
import org.apache.qpidity.client.Client;
import org.apache.qpidity.client.Connection;
import org.apache.qpidity.client.ExceptionListener;
-import org.apache.qpidity.client.MessageListener;
import org.apache.qpidity.client.Session;
+import org.apache.qpidity.client.util.MessageListener;
import org.apache.qpidity.client.util.MessagePartListenerAdapter;
public class DemoClient
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java?rev=567946&r1=567945&r2=567946&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java
Mon Aug 20 21:31:50 2007
@@ -9,9 +9,9 @@
import org.apache.qpidity.client.Client;
import org.apache.qpidity.client.Connection;
import org.apache.qpidity.client.ExceptionListener;
-import org.apache.qpidity.client.MessageListener;
import org.apache.qpidity.client.Session;
import org.apache.qpidity.client.util.FileMessage;
+import org.apache.qpidity.client.util.MessageListener;
import org.apache.qpidity.client.util.MessagePartListenerAdapter;
public class LargeMsgDemoClient
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessageListener.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessageListener.java?rev=567946&view=auto
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessageListener.java
(added)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessageListener.java
Mon Aug 20 21:31:50 2007
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.client.util;
+
+import org.apache.qpidity.api.Message;
+
+/**
+ *A message listener
+ */
+public interface MessageListener
+{
+ /**
+ * Process an incoming message.
+ *
+ * @param message The incoming message.
+ */
+ public void onMessage(Message message);
+}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java?rev=567946&r1=567945&r2=567946&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
Mon Aug 20 21:31:50 2007
@@ -6,7 +6,6 @@
import org.apache.qpidity.DeliveryProperties;
import org.apache.qpidity.MessageProperties;
import org.apache.qpidity.Struct;
-import org.apache.qpidity.client.MessageListener;
import org.apache.qpidity.client.MessagePartListener;
/**
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidBrowserListener.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidBrowserListener.java?rev=567946&r1=567945&r2=567946&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidBrowserListener.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidBrowserListener.java
Mon Aug 20 21:31:50 2007
@@ -20,7 +20,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpidity.api.Message;
-import org.apache.qpidity.client.MessageListener;
+import org.apache.qpidity.client.util.MessageListener;
/**
* This listener idspatches messaes to its browser.
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java?rev=567946&r1=567945&r2=567946&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QpidMessageListener.java
Mon Aug 20 21:31:50 2007
@@ -20,7 +20,7 @@
import org.apache.qpidity.jms.message.QpidMessage;
import org.apache.qpidity.jms.message.MessageFactory;
import org.apache.qpidity.api.Message;
-import org.apache.qpidity.client.MessageListener;
+import org.apache.qpidity.client.util.MessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java?rev=567946&r1=567945&r2=567946&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAResourceImpl.java
Mon Aug 20 21:31:50 2007
@@ -17,14 +17,13 @@
*/
package org.apache.qpidity.jms;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.qpidity.Option;
-import org.apache.qpidity.QpidException;
-
+import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
-import javax.transaction.xa.XAException;
+
+import org.apache.qpidity.Option;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This is an implementation of javax.jms.XAResource.
@@ -72,7 +71,7 @@
{
_logger.debug("commit ", xid);
}
- _xaSession.getQpidSession().dtxCoordinationCommit(xid, b ?
Option.ONE_PHASE : Option.NO_OPTION);
+ _xaSession.getQpidSession().dtxCoordinationCommit(new
String(xid.getGlobalTransactionId()), b ? Option.ONE_PHASE : Option.NO_OPTION);
}
/**
@@ -99,14 +98,14 @@
}
xid = null;
_xaSession.getQpidSession()
- .dtxDemarcationEnd(xid, flag == XAResource.TMFAIL ?
Option.FAIL : Option.NO_OPTION,
+ .dtxDemarcationEnd(new String(xid.getGlobalTransactionId()),
flag == XAResource.TMFAIL ? Option.FAIL : Option.NO_OPTION,
flag == XAResource.TMSUSPEND ?
Option.SUSPEND : Option.NO_OPTION);
}
/**
* Tells the resource manager to forget about a heuristically completed
transaction branch.
*
- * @param xid A global transaction identifier
+ * @param new String(xid.getGlobalTransactionId() A global transaction
identifier
* @throws XAException An error has occurred. Possible exception values
are XAER_RMERR, XAER_RMFAIL,
* XAER_NOTA, XAER_INVAL, or XAER_PROTO.
*/
@@ -116,7 +115,7 @@
{
_logger.debug("forget ", xid);
}
- _xaSession.getQpidSession().dtxCoordinationForget(xid);
+ _xaSession.getQpidSession().dtxCoordinationForget(new
String(xid.getGlobalTransactionId()));
}
/**
@@ -133,7 +132,8 @@
int result = 0;
if (_xid != null)
{
- result = (int)
_xaSession.getQpidSession().dtxCoordinationGetTimeout(_xid);
+ result = 0;
+ _xaSession.getQpidSession().dtxCoordinationGetTimeout(new
String(_xid.getGlobalTransactionId()));
}
return result;
}
@@ -169,8 +169,9 @@
_logger.debug("prepare ", xid);
}
int result;
- result = _xaSession.getQpidSession()
- .dtxCoordinationPrepare(xid);
+ result = 0;
+ _xaSession.getQpidSession()
+ .dtxCoordinationPrepare(new String(xid.getGlobalTransactionId()));
if (result == XAException.XA_RDONLY)
{
@@ -198,8 +199,10 @@
public Xid[] recover(int flag) throws XAException
{
// the flag is ignored
- return _xaSession.getQpidSession()
+
+ _xaSession.getQpidSession()
.dtxCoordinationRecover();
+ return null;
}
/**
@@ -212,7 +215,7 @@
{
// the flag is ignored
_xaSession.getQpidSession()
- .dtxCoordinationRollback(xid);
+ .dtxCoordinationRollback(new
String(xid.getGlobalTransactionId()));
}
/**
@@ -231,7 +234,7 @@
if (_xid != null)
{
_xaSession.getQpidSession()
- .dtxCoordinationSetTimeout(_xid, timeout);
+ .dtxCoordinationSetTimeout(new
String(_xid.getGlobalTransactionId()), timeout);
result = true;
}
return result;
@@ -259,7 +262,7 @@
}
_xid = xid;
_xaSession.getQpidSession()
- .dtxDemarcationStart(xid, flag == XAResource.TMJOIN ? Option.JOIN :
Option.NO_OPTION,
+ .dtxDemarcationStart(new String(xid.getGlobalTransactionId()), flag ==
XAResource.TMJOIN ? Option.JOIN : Option.NO_OPTION,
flag == XAResource.TMRESUME ? Option.RESUME :
Option.NO_OPTION);
}
}