Author: bryanduxbury
Date: Wed Oct 6 20:04:40 2010
New Revision: 1005221
URL: http://svn.apache.org/viewvc?rev=1005221&view=rev
Log:
THRIFT-862. java: Async client issues / improvements
This patch improves quite a large number of things about the async client code.
Patch: Ning Liang
Modified:
incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/AsyncMethodCallback.java
incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncClient.java
incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java
incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
incubator/thrift/trunk/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java
incubator/thrift/trunk/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java
Modified:
incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/AsyncMethodCallback.java
URL:
http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/AsyncMethodCallback.java?rev=1005221&r1=1005220&r2=1005221&view=diff
==============================================================================
---
incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/AsyncMethodCallback.java
(original)
+++
incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/AsyncMethodCallback.java
Wed Oct 6 20:04:40 2010
@@ -18,6 +18,7 @@
*/
package org.apache.thrift.async;
+
public interface AsyncMethodCallback<T> {
/**
* This method will be called when the remote side has completed invoking
@@ -32,7 +33,7 @@ public interface AsyncMethodCallback<T>
* This method will be called when there is an unexpected clientside
* exception. This does not include application-defined exceptions that
* appear in the IDL, but rather things like IOExceptions.
- * @param throwable
+ * @param exception
*/
- public void onError(Throwable throwable);
+ public void onError(Exception exception);
}
Modified:
incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncClient.java
URL:
http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncClient.java?rev=1005221&r1=1005220&r2=1005221&view=diff
==============================================================================
---
incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncClient.java
(original)
+++
incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncClient.java
Wed Oct 6 20:04:40 2010
@@ -26,7 +26,7 @@ public abstract class TAsyncClient {
protected final TNonblockingTransport transport;
protected final TAsyncClientManager manager;
protected TAsyncMethodCall currentMethod;
- private Throwable error;
+ private Exception error;
private long timeout;
public TAsyncClient(TProtocolFactory protocolFactory, TAsyncClientManager
manager, TNonblockingTransport transport) {
@@ -44,7 +44,7 @@ public abstract class TAsyncClient {
return protocolFactory;
}
- public long getTimeout() {
+ public long getTimeout() {
return timeout;
}
@@ -68,7 +68,7 @@ public abstract class TAsyncClient {
* Get the client's error - returns null if no error
* @return
*/
- public Throwable getError() {
+ public Exception getError() {
return error;
}
@@ -94,9 +94,9 @@ public abstract class TAsyncClient {
/**
* Called by delegate method on error
*/
- protected void onError(Throwable throwable) {
+ protected void onError(Exception exception) {
transport.close();
currentMethod = null;
- error = throwable;
+ error = exception;
}
}
Modified:
incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java
URL:
http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java?rev=1005221&r1=1005220&r2=1005221&view=diff
==============================================================================
---
incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java
(original)
+++
incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java
Wed Oct 6 20:04:40 2010
@@ -23,12 +23,13 @@ import java.nio.channels.ClosedSelectorE
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
-import java.util.HashSet;
+import java.util.Comparator;
import java.util.Iterator;
-import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang.ObjectUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,7 +39,7 @@ import org.slf4j.LoggerFactory;
*/
public class TAsyncClientManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(TAsyncClientManager.class.getName());
-
+
private final SelectThread selectThread;
private final ConcurrentLinkedQueue<TAsyncMethodCall> pendingCalls = new
ConcurrentLinkedQueue<TAsyncMethodCall>();
@@ -48,6 +49,9 @@ public class TAsyncClientManager {
}
public void call(TAsyncMethodCall method) throws TException {
+ if (!isRunning()) {
+ throw new TException("SelectThread is not running");
+ }
method.prepareMethodCall();
pendingCalls.add(method);
selectThread.getSelector().wakeup();
@@ -56,18 +60,21 @@ public class TAsyncClientManager {
public void stop() {
selectThread.finish();
}
-
+
+ public boolean isRunning() {
+ return selectThread.isAlive();
+ }
+
private class SelectThread extends Thread {
- // Selector waits at most SELECT_TIME milliseconds before waking
- private static final long SELECT_TIME = 5;
-
private final Selector selector;
private volatile boolean running;
- private final Set<TAsyncMethodCall> timeoutWatchSet = new
HashSet<TAsyncMethodCall>();
+ private final TreeSet<TAsyncMethodCall> timeoutWatchSet = new
TreeSet<TAsyncMethodCall>(new TAsyncMethodCallTimeoutComparator());
public SelectThread() throws IOException {
this.selector = SelectorProvider.provider().openSelector();
this.running = true;
+ this.setName("TAsyncClientManager#SelectorThread " + this.getId());
+
// We don't want to hold up the JVM when shutting down
setDaemon(true);
}
@@ -85,16 +92,29 @@ public class TAsyncClientManager {
while (running) {
try {
try {
- selector.select(SELECT_TIME);
+ if (timeoutWatchSet.size() == 0) {
+ // No timeouts, so select indefinitely
+ selector.select();
+ } else {
+ // We have a timeout pending, so calculate the time until then
and select appropriately
+ long nextTimeout = timeoutWatchSet.first().getTimeoutTimestamp();
+ long selectTime = nextTimeout - System.currentTimeMillis();
+ if (selectTime > 0) {
+ // Next timeout is in the future, select and wake up then
+ selector.select(selectTime);
+ } else {
+ // Next timeout is now or in past, select immediately so we
can time out
+ selector.selectNow();
+ }
+ }
} catch (IOException e) {
LOGGER.error("Caught IOException in TAsyncClientManager!", e);
}
-
transitionMethods();
- timeoutIdleMethods();
+ timeoutMethods();
startPendingMethods();
- } catch (Throwable throwable) {
- LOGGER.error("Ignoring uncaught exception in SelectThread",
throwable);
+ } catch (Exception exception) {
+ LOGGER.error("Ignoring uncaught exception in SelectThread",
exception);
}
}
}
@@ -126,18 +146,16 @@ public class TAsyncClientManager {
}
// Timeout any existing method calls
- private void timeoutIdleMethods() {
+ private void timeoutMethods() {
Iterator<TAsyncMethodCall> iterator = timeoutWatchSet.iterator();
+ long currentTime = System.currentTimeMillis();
while (iterator.hasNext()) {
TAsyncMethodCall methodCall = iterator.next();
- long clientTimeout = methodCall.getClient().getTimeout();
- long timeElapsed = System.currentTimeMillis() -
methodCall.getLastTransitionTime();
-
- if (timeElapsed > clientTimeout) {
+ if (currentTime >= methodCall.getTimeoutTimestamp()) {
iterator.remove();
- methodCall.onError(new TimeoutException("Operation " +
- methodCall.getClass() + " timed out after " + timeElapsed +
- " milliseconds."));
+ methodCall.onError(new TimeoutException("Operation " +
methodCall.getClass() + " timed out after " + (currentTime -
methodCall.getStartTime()) + " ms."));
+ } else {
+ break;
}
}
}
@@ -149,17 +167,30 @@ public class TAsyncClientManager {
// Catch registration errors. method will catch transition errors and
cleanup.
try {
methodCall.start(selector);
-
+
// If timeout specified and first transition went smoothly, add to
timeout watch set
TAsyncClient client = methodCall.getClient();
if (client.hasTimeout() && !client.hasError()) {
timeoutWatchSet.add(methodCall);
}
- } catch (Throwable e) {
- LOGGER.warn("Caught throwable in TAsyncClientManager!", e);
- methodCall.onError(e);
+ } catch (Exception exception) {
+ LOGGER.warn("Caught exception in TAsyncClientManager!", exception);
+ methodCall.onError(exception);
}
}
}
}
+
+ // Comparator used in TreeSet
+ private static class TAsyncMethodCallTimeoutComparator implements
Comparator<TAsyncMethodCall> {
+ @Override
+ public int compare(TAsyncMethodCall left, TAsyncMethodCall right) {
+ if (left.getTimeoutTimestamp() == right.getTimeoutTimestamp()) {
+ return (int)(left.getSequenceId() - right.getSequenceId());
+ } else {
+ return (int)(left.getTimeoutTimestamp() - right.getTimeoutTimestamp());
+ }
+ }
+ }
+
}
Modified:
incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
URL:
http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java?rev=1005221&r1=1005220&r2=1005221&view=diff
==============================================================================
---
incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
(original)
+++
incubator/thrift/trunk/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
Wed Oct 6 20:04:40 2010
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocol;
@@ -38,9 +39,10 @@ import org.apache.thrift.transport.TTran
* - public T getResult() throws <Exception_1>, <Exception_2>, ...
* @param <T>
*/
-public abstract class TAsyncMethodCall<T extends TAsyncMethodCall> {
+public abstract class TAsyncMethodCall<T> {
private static final int INITIAL_MEMORY_BUFFER_SIZE = 128;
+ private static AtomicLong sequenceIdCounter = new AtomicLong(0);
public static enum State {
CONNECTING,
@@ -62,20 +64,21 @@ public abstract class TAsyncMethodCall<T
protected final TAsyncClient client;
private final AsyncMethodCallback<T> callback;
private final boolean isOneway;
-
- private long lastTransitionTime;
-
+ private long sequenceId;
+
private ByteBuffer sizeBuffer;
private final byte[] sizeBufferArray = new byte[4];
private ByteBuffer frameBuffer;
+ private long startTime = System.currentTimeMillis();
+
protected TAsyncMethodCall(TAsyncClient client, TProtocolFactory
protocolFactory, TNonblockingTransport transport, AsyncMethodCallback<T>
callback, boolean isOneway) {
this.transport = transport;
this.callback = callback;
this.protocolFactory = protocolFactory;
this.client = client;
this.isOneway = isOneway;
- this.lastTransitionTime = System.currentTimeMillis();
+ this.sequenceId = TAsyncMethodCall.sequenceIdCounter.getAndIncrement();
}
protected State getState() {
@@ -86,13 +89,25 @@ public abstract class TAsyncMethodCall<T
return state == State.RESPONSE_READ;
}
- protected long getLastTransitionTime() {
- return lastTransitionTime;
+ protected long getStartTime() {
+ return startTime;
+ }
+
+ protected long getSequenceId() {
+ return sequenceId;
}
public TAsyncClient getClient() {
return client;
}
+
+ public boolean hasTimeout() {
+ return client.hasTimeout();
+ }
+
+ public long getTimeoutTimestamp() {
+ return client.getTimeout() + startTime;
+ }
protected abstract void write_args(TProtocol protocol) throws TException;
@@ -181,15 +196,14 @@ public abstract class TAsyncMethodCall<T
throw new IllegalStateException("Method call in state " + state
+ " but selector called transition method. Seems like a bug...");
}
- lastTransitionTime = System.currentTimeMillis();
- } catch (Throwable e) {
+ } catch (Exception e) {
key.cancel();
key.attach(null);
onError(e);
}
}
- protected void onError(Throwable e) {
+ protected void onError(Exception e) {
client.onError(e);
callback.onError(e);
state = State.ERROR;
Modified:
incubator/thrift/trunk/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java
URL:
http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java?rev=1005221&r1=1005220&r2=1005221&view=diff
==============================================================================
---
incubator/thrift/trunk/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java
(original)
+++
incubator/thrift/trunk/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java
Wed Oct 6 20:04:40 2010
@@ -30,10 +30,15 @@ import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Wrapper around ServerSocketChannel
*/
public class TNonblockingServerSocket extends TNonblockingServerTransport {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TNonblockingServerTransport.class.getName());
/**
* This channel is where all the nonblocking magic happens.
@@ -152,8 +157,7 @@ public class TNonblockingServerSocket ex
try {
serverSocket_.close();
} catch (IOException iox) {
- System.err.println("WARNING: Could not close server socket: " +
- iox.getMessage());
+ LOGGER.warn("WARNING: Could not close server socket: " +
iox.getMessage());
}
serverSocket_ = null;
}
Modified:
incubator/thrift/trunk/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java
URL:
http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java?rev=1005221&r1=1005220&r2=1005221&view=diff
==============================================================================
---
incubator/thrift/trunk/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java
(original)
+++
incubator/thrift/trunk/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java
Wed Oct 6 20:04:40 2010
@@ -18,6 +18,7 @@
*/
package org.apache.thrift.async;
+import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
@@ -45,31 +46,182 @@ import thrift.test.Srv.AsyncClient.primi
import thrift.test.Srv.AsyncClient.voidMethod_call;
public class TestTAsyncClientManager extends TestCase {
- private static void fail(Throwable throwable) {
- StringWriter sink = new StringWriter();
- throwable.printStackTrace(new PrintWriter(sink, true));
- fail("unexpected error " + sink.toString());
+
+ private THsHaServer server_;
+ private Thread serverThread_;
+ private TAsyncClientManager clientManager_;
+
+ public void setUp() throws Exception {
+ server_ = new THsHaServer(new Srv.Processor(new SrvHandler()), new
TNonblockingServerSocket(ServerTestBase.PORT));
+ serverThread_ = new Thread(new Runnable() {
+ public void run() {
+ server_.serve();
+ }
+ });
+ serverThread_.start();
+ clientManager_ = new TAsyncClientManager();
+ Thread.sleep(500);
}
-
- private static abstract class FailureLessCallback<T extends
TAsyncMethodCall> implements AsyncMethodCallback<T> {
- @Override
- public void onError(Throwable throwable) {
- fail(throwable);
+
+ public void tearDown() throws Exception {
+ server_.stop();
+ clientManager_.stop();
+ serverThread_.join();
+ }
+
+ public void testBasicCall() throws Exception {
+ Srv.AsyncClient client = getClient();
+ basicCall(client);
+ }
+
+ public void testBasicCallWithTimeout() throws Exception {
+ Srv.AsyncClient client = getClient();
+ client.setTimeout(5000);
+ basicCall(client);
+ }
+
+ public void testTimeoutCall() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ Srv.AsyncClient client = getClient();
+ client.setTimeout(100);
+ client.primitiveMethod(new AsyncMethodCallback<primitiveMethod_call>() {
+ @Override
+ public void onError(Exception exception) {
+ try {
+ if (!(exception instanceof TimeoutException)) {
+ StringWriter sink = new StringWriter();
+ exception.printStackTrace(new PrintWriter(sink, true));
+ fail("expected TimeoutException but got " + sink.toString());
+ }
+ } finally {
+ latch.countDown();
+ }
+ }
+
+ @Override
+ public void onComplete(primitiveMethod_call response) {
+ try {
+ fail("Should not have finished timed out call.");
+ } finally {
+ latch.countDown();
+ }
+ }
+ });
+ latch.await(2, TimeUnit.SECONDS);
+ assertTrue(client.hasError());
+ assertTrue(client.getError() instanceof TimeoutException);
+ }
+
+ public void testVoidCall() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicBoolean returned = new AtomicBoolean(false);
+ Srv.AsyncClient client = getClient();
+ client.voidMethod(new
FailureLessCallback<Srv.AsyncClient.voidMethod_call>() {
+ @Override
+ public void onComplete(voidMethod_call response) {
+ try {
+ response.getResult();
+ returned.set(true);
+ } catch (TException e) {
+ fail(e);
+ } finally {
+ latch.countDown();
+ }
+ }
+ });
+ latch.await(1, TimeUnit.SECONDS);
+ assertTrue(returned.get());
+ }
+
+ public void testOnewayCall() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicBoolean returned = new AtomicBoolean(false);
+ Srv.AsyncClient client = getClient();
+ client.onewayMethod(new FailureLessCallback<onewayMethod_call>() {
+ @Override
+ public void onComplete(onewayMethod_call response) {
+ try {
+ response.getResult();
+ returned.set(true);
+ } catch (TException e) {
+ fail(e);
+ } finally {
+ latch.countDown();
+ }
+ }
+ });
+ latch.await(1, TimeUnit.SECONDS);
+ assertTrue(returned.get());
+ }
+
+ public void testParallelCalls() throws Exception {
+ // make multiple calls with deserialization in the selector thread (repro
Eric's issue)
+ int numThreads = 50;
+ int numCallsPerThread = 100;
+ List<JankyRunnable> runnables = new ArrayList<JankyRunnable>();
+ List<Thread> threads = new ArrayList<Thread>();
+ for (int i = 0; i < numThreads; i++) {
+ JankyRunnable runnable = new JankyRunnable(numCallsPerThread);
+ Thread thread = new Thread(runnable);
+ thread.start();
+ threads.add(thread);
+ runnables.add(runnable);
}
+ for (Thread thread : threads) {
+ thread.join();
+ }
+ int numSuccesses = 0;
+ for (JankyRunnable runnable : runnables) {
+ numSuccesses += runnable.getNumSuccesses();
+ }
+ assertEquals(numThreads * numCallsPerThread, numSuccesses);
+ }
+
+ private Srv.AsyncClient getClient() throws IOException {
+ TNonblockingSocket clientSocket = new
TNonblockingSocket(ServerTestBase.HOST, ServerTestBase.PORT);
+ return new Srv.AsyncClient(new TBinaryProtocol.Factory(), clientManager_,
clientSocket);
}
-
+
+ private void basicCall(Srv.AsyncClient client) throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicBoolean returned = new AtomicBoolean(false);
+ client.Janky(1, new FailureLessCallback<Srv.AsyncClient.Janky_call>() {
+ @Override
+ public void onComplete(Janky_call response) {
+ try {
+ assertEquals(3, response.getResult());
+ returned.set(true);
+ } catch (TException e) {
+ fail(e);
+ } finally {
+ latch.countDown();
+ }
+ }
+
+ @Override
+ public void onError(Exception exception) {
+ try {
+ StringWriter sink = new StringWriter();
+ exception.printStackTrace(new PrintWriter(sink, true));
+ fail("unexpected onError with exception " + sink.toString());
+ } finally {
+ latch.countDown();
+ }
+ }
+ });
+ latch.await(100, TimeUnit.SECONDS);
+ assertTrue(returned.get());
+ }
+
public class SrvHandler implements Iface {
+ // Use this method for a standard call testing
@Override
public int Janky(int arg) throws TException {
assertEquals(1, arg);
return 3;
}
- @Override
- public void methodWithDefaultArgs(int something) throws TException {
- }
-
- // Using this method for timeout testing
+ // Using this method for timeout testing - sleeps for 1 second before
returning
@Override
public int primitiveMethod() throws TException {
try {
@@ -79,6 +231,9 @@ public class TestTAsyncClientManager ext
}
return 0;
}
+
+ @Override
+ public void methodWithDefaultArgs(int something) throws TException { }
@Override
public CompactProtoTestStruct structMethod() throws TException {
@@ -93,20 +248,29 @@ public class TestTAsyncClientManager ext
public void onewayMethod() throws TException {
}
}
-
- public class JankyRunnable implements Runnable {
- private TAsyncClientManager acm_;
+
+ private static abstract class FailureLessCallback<T extends
TAsyncMethodCall> implements AsyncMethodCallback<T> {
+ @Override
+ public void onError(Exception exception) {
+ fail(exception);
+ }
+ }
+
+ private static void fail(Exception exception) {
+ StringWriter sink = new StringWriter();
+ exception.printStackTrace(new PrintWriter(sink, true));
+ fail("unexpected error " + sink.toString());
+ }
+
+ private class JankyRunnable implements Runnable {
private int numCalls_;
private int numSuccesses_ = 0;
private Srv.AsyncClient client_;
- private TNonblockingSocket clientSocket_;
- public JankyRunnable(TAsyncClientManager acm, int numCalls) throws
Exception {
- this.acm_ = acm;
- this.numCalls_ = numCalls;
- this.clientSocket_ = new TNonblockingSocket(ServerTestBase.HOST,
ServerTestBase.PORT);
- this.client_ = new Srv.AsyncClient(new TBinaryProtocol.Factory(), acm_,
clientSocket_);
- this.client_.setTimeout(20000);
+ public JankyRunnable(int numCalls) throws Exception {
+ numCalls_ = numCalls;
+ client_ = getClient();
+ client_.setTimeout(20000);
}
public int getNumSuccesses() {
@@ -119,14 +283,14 @@ public class TestTAsyncClientManager ext
try {
// connect an async client
final CountDownLatch latch = new CountDownLatch(1);
- final AtomicBoolean jankyReturned = new AtomicBoolean(false);
+ final AtomicBoolean returned = new AtomicBoolean(false);
client_.Janky(1, new
AsyncMethodCallback<Srv.AsyncClient.Janky_call>() {
-
+
@Override
public void onComplete(Janky_call response) {
try {
assertEquals(3, response.getResult());
- jankyReturned.set(true);
+ returned.set(true);
latch.countDown();
} catch (TException e) {
latch.countDown();
@@ -135,10 +299,10 @@ public class TestTAsyncClientManager ext
}
@Override
- public void onError(Throwable throwable) {
+ public void onError(Exception exception) {
try {
StringWriter sink = new StringWriter();
- throwable.printStackTrace(new PrintWriter(sink, true));
+ exception.printStackTrace(new PrintWriter(sink, true));
fail("unexpected onError on iteration " + iteration + ": " +
sink.toString());
} finally {
latch.countDown();
@@ -148,7 +312,7 @@ public class TestTAsyncClientManager ext
boolean calledBack = latch.await(30, TimeUnit.SECONDS);
assertTrue("wasn't called back in time on iteration " + iteration,
calledBack);
- assertTrue("onComplete not called on iteration " + iteration,
jankyReturned.get());
+ assertTrue("onComplete not called on iteration " + iteration,
returned.get());
this.numSuccesses_++;
} catch (Exception e) {
fail(e);
@@ -156,173 +320,4 @@ public class TestTAsyncClientManager ext
}
}
}
-
- public void standardCallTest(Srv.AsyncClient client) throws Exception {
- final CountDownLatch latch = new CountDownLatch(1);
- final AtomicBoolean jankyReturned = new AtomicBoolean(false);
- client.Janky(1, new FailureLessCallback<Srv.AsyncClient.Janky_call>() {
- @Override
- public void onComplete(Janky_call response) {
- try {
- assertEquals(3, response.getResult());
- jankyReturned.set(true);
- } catch (TException e) {
- fail(e);
- } finally {
- latch.countDown();
- }
- }
- });
-
- latch.await(100, TimeUnit.SECONDS);
- assertTrue(jankyReturned.get());
- }
-
- public void testIt() throws Exception {
- // put up a server
- final THsHaServer s = new THsHaServer(new Srv.Processor(new SrvHandler()),
- new TNonblockingServerSocket(ServerTestBase.PORT));
- new Thread(new Runnable() {
- @Override
- public void run() {
- s.serve();
- }
- }).start();
- Thread.sleep(1000);
-
- // set up async client manager
- TAsyncClientManager acm = new TAsyncClientManager();
-
- // connect an async client
- TNonblockingSocket clientSock = new TNonblockingSocket(
- ServerTestBase.HOST, ServerTestBase.PORT);
- Srv.AsyncClient client = new Srv.AsyncClient(new
TBinaryProtocol.Factory(), acm, clientSock);
-
- // make a standard method call
- standardCallTest(client);
-
- // make a standard method call that succeeds within timeout
- assertFalse(s.isStopped());
- client.setTimeout(5000);
- standardCallTest(client);
-
- // make a void method call
- assertFalse(s.isStopped());
- final CountDownLatch voidLatch = new CountDownLatch(1);
- final AtomicBoolean voidMethodReturned = new AtomicBoolean(false);
- client.voidMethod(new
FailureLessCallback<Srv.AsyncClient.voidMethod_call>() {
- @Override
- public void onComplete(voidMethod_call response) {
- try {
- response.getResult();
- voidMethodReturned.set(true);
- } catch (TException e) {
- fail(e);
- } finally {
- voidLatch.countDown();
- }
- }
- });
- voidLatch.await(1, TimeUnit.SECONDS);
- assertTrue(voidMethodReturned.get());
-
- // make a oneway method call
- assertFalse(s.isStopped());
- final CountDownLatch onewayLatch = new CountDownLatch(1);
- final AtomicBoolean onewayReturned = new AtomicBoolean(false);
- client.onewayMethod(new FailureLessCallback<onewayMethod_call>() {
- @Override
- public void onComplete(onewayMethod_call response) {
- try {
- response.getResult();
- onewayReturned.set(true);
- } catch (TException e) {
- fail(e);
- } finally {
- onewayLatch.countDown();
- }
- }
- });
- onewayLatch.await(1, TimeUnit.SECONDS);
- assertTrue(onewayReturned.get());
-
- // make another standard method call
- assertFalse(s.isStopped());
- final CountDownLatch voidAfterOnewayLatch = new CountDownLatch(1);
- final AtomicBoolean voidAfterOnewayReturned = new AtomicBoolean(false);
- client.voidMethod(new FailureLessCallback<voidMethod_call>() {
- @Override
- public void onComplete(voidMethod_call response) {
- try {
- response.getResult();
- voidAfterOnewayReturned.set(true);
- } catch (TException e) {
- fail(e);
- } finally {
- voidAfterOnewayLatch.countDown();
- }
- }
- });
- voidAfterOnewayLatch.await(1, TimeUnit.SECONDS);
- assertTrue(voidAfterOnewayReturned.get());
-
- // make multiple calls with deserialization in the selector thread (repro
Eric's issue)
- assertFalse(s.isStopped());
- int numThreads = 50;
- int numCallsPerThread = 100;
- List<JankyRunnable> runnables = new ArrayList<JankyRunnable>();
- List<Thread> threads = new ArrayList<Thread>();
- for (int i = 0; i < numThreads; i++) {
- JankyRunnable runnable = new JankyRunnable(acm, numCallsPerThread);
- Thread thread = new Thread(runnable);
- thread.start();
- threads.add(thread);
- runnables.add(runnable);
- }
- for (Thread thread : threads) {
- thread.join();
- }
- int numSuccesses = 0;
- for (JankyRunnable runnable : runnables) {
- numSuccesses += runnable.getNumSuccesses();
- }
- assertEquals(numThreads * numCallsPerThread, numSuccesses);
-
- // check that timeouts work
- assertFalse(s.isStopped());
- assertTrue(clientSock.isOpen());
- final CountDownLatch timeoutLatch = new CountDownLatch(1);
- client.setTimeout(100);
- client.primitiveMethod(new AsyncMethodCallback<primitiveMethod_call>() {
-
- @Override
- public void onError(Throwable throwable) {
- try {
- if (!(throwable instanceof TimeoutException)) {
- StringWriter sink = new StringWriter();
- throwable.printStackTrace(new PrintWriter(sink, true));
- fail("expected TimeoutException but got " + sink.toString());
- }
- } finally {
- timeoutLatch.countDown();
- }
- }
-
- @Override
- public void onComplete(primitiveMethod_call response) {
- try {
- fail("should not have finished timed out call.");
- } finally {
- timeoutLatch.countDown();
- }
- }
-
- });
- timeoutLatch.await(2, TimeUnit.SECONDS);
- assertTrue(client.hasError());
- assertTrue(client.getError() instanceof TimeoutException);
-
- // error closes socket and make sure isOpen reflects that
- assertFalse(clientSock.isOpen());
- }
-}
+}
\ No newline at end of file