Hi James,
Thank you for taking the time explaining async in great details. Your
example is exactly the direction I want to go, but I'm getting a
different result. Is it be possible that there's some jar files
missing?
Here is what I got after I applied your mod:
// Test async call:
3. Wed Feb 01 14:58:45 SGT 2012: Saying Hello (async)...
4. Wed Feb 01 14:58:50 SGT 2012: Chat.hello(Callback<CharSequence>) returned
5. Wed Feb 01 14:58:50 SGT 2012: Callback<CharSequence>.get() returned
"Hello-17"
// The Client code
NettyTransceiver transceiver = new NettyTransceiver(new
InetSocketAddress(6666));
Chat.Callback client =
SpecificRequestor.getClient(Chat.Callback.class, transceiver);
final CallFuture<CharSequence> future1 = new CallFuture<CharSequence>();
System.out.println("\n3. " + new Date() + ": Saying Hello (async)...");
client.hello(future1); // This should not block.
System.out.println("4. " + new Date() + ":
Chat.hello(Callback<CharSequence>) returned");
CharSequence asyncResult = future1.get(); // This should block for 5 seconds
System.out.println("5. " + new Date() + ":
Callback<CharSequence>.get() returned \"" + asyncResult + "\"");
transceiver.close();
// Jar Libraries
avro-1.6.1.jar
avro-ipc-1.6.1.jar
netty-3.3.0.Final.jar
slf4j-log4j12-1.6.1.jar
slf4j-api-1.6.1.jar
log4j-1.2.15.jar
jackson-core-asl-1.4.2.jar
jackson-mapper-asl-1.4.2.jar
On Wed, Feb 1, 2012 at 2:24 PM, James Baldassari <[email protected]> wrote:
> Hi William,
>
> Great test. I ran your code, and it worked as expected for me, but I made
> some slight changes to the client side to demonstrate what's happening:
>
> // Test sync call:
> System.out.println("1. " + new Date() + ": Saying Hello (sync)...");
> CharSequence syncResult = client.hello(); // This should block for 5
> seconds
> System.out.println("2. " new Date() + ": Chat.hello() returned \"" +
> syncResult + "\"");
>
> // Test async call:
>
> final CallFuture<CharSequence> future1 = new CallFuture<CharSequence>();
> System.out.println("\n3. " + new Date() + ": Saying Hello (async)...");
> client.hello(future1); // This should not block.
> System.out.println("4. " + new Date() + ":
> Chat.hello(Callback<CharSequence>) returned");
> CharSequence asyncResult = future1.get(); // This should block for 5
> seconds
> System.out.println("5. " + new Date() + ": Callback<CharSequence>.get()
> returned \"" + asyncResult + "\"");
>
> When I ran that I got the following output:
>
> 1. Wed Feb 01 00:13:36 EST 2012: Saying Hello (sync)...
> 2. Wed Feb 01 00:13:41 EST 2012: Chat.hello() returned "Hello"
>
> 3. Wed Feb 01 00:13:41 EST 2012: Saying Hello (async)...
> 4. Wed Feb 01 00:13:41 EST 2012: Chat.hello(Callback<CharSequence>)
> returned
> 5. Wed Feb 01 00:13:46 EST 2012: Callback<CharSequence>.get() returned
> "Hello"
>
> As you can see, the synchronous call (lines 1-2) blocked for about 5 seconds
> as expected. When the asynchronous call was invoked it returned immediately
> (note timestamps on lines 3-4). The part that blocked was the
> CallFuture.get() on line 5 of the output. The result of the callback can't
> be obtained until the server returns it (after waiting 5 seconds).
>
> I think I may know why this behavior seems confusing. In practice I don't
> think many people will use CallFuture. It's basically an adapter to make an
> asynchronous call synchronous by blocking until the result returns. This is
> useful in unit tests and in situations where the client can't proceed until
> the result is available. However, to really take advantage of the
> asynchronous API you never want to wait for the result of an RPC. The
> client should just invoke async RPCs with some Callback instance and then
> move onto other things, such as invoking more RPCs!
>
> Here's an example. Let's say we have an e-mail server with an Avro protocol
> that allows us to access the users' mailboxes. We might have a method to
> allow us to search for all messages with a subject line that matches some
> regular expression. In IDL it might look something like this:
>
> protocol Mail {
> record Message {
> string from;
> array<string> to;
> union { string, null } subject;
> union { string, null } body;
> }
> array<Message> findBySubject(string regexp);
> }
>
> It doesn't really matter what the implementation of this protocol looks like
> on the server side. Searching through all messages is likely to take some
> time, so what we would want to do is to fire off an async RPC as soon as the
> user clicks the search button, then return control to the UI immediately so
> that the user can continue doing other things while the search is running.
> Whenever the results come back we would then notify the user or populate the
> search results in the UI, e.g. via ajax/comet if it's a web app. So we
> would have a Callback implementation that would look something like this:
>
> public class FindBySubjectCallback implements Callback<List<Message>> {
> private final RequestContext context; // RequestContext is some class
> that allows us to send events back to the user
> public FindBySubjectCallback(RequestContext context) {
> this.context = context;
> }
> @Override
> public void handleResult(List<Message> result) {
> // Notify user with results:
> requestContext.fireSearchResultReadyEvent(result);
> }
> @Override
> public void handleError(Throwable error) {
> // Notify user that an error occurred:
> requestContext.fireErrorEvent(error);
> }
> }
>
> The client, which might be running in a servlet container, would then just
> invoke the RPC like this:
>
> private Mail.Callback mailClient; // Client is initialized/injected
> somewhere
> ...
> public void findBySubject(String regexp, RequestContext context) {
> mailClient.findBySubject(regexp, new FindBySubjectCallback(context));
> // return immediately without waiting for the search to complete!
> }
> ...
>
> Anyway, hope that makes some sense. Let me know if you have any questions.
>
> -James
>
>
>
> On Tue, Jan 31, 2012 at 11:23 PM, William Afendy <[email protected]> wrote:
>>
>> Hi James,
>>
>> Thank you for your quick response. I'm still fairly new to the async
>> stuff. I fixed the ChatImpl as suggested to implement Chat instead of
>> Chat.Callback. I also added a 5 secs delay in the method hello().
>>
>> There is still something missing, I can't really see the non-blocking
>> (async) part from Netty implementation. Please take a look at the
>> AvroClient.java code below, I understand when the client.hello() gets
>> called, this is the synchronous (blocking) part of the code. It blocked for
>> 5 seconds as expected. Now, when I'm testing the async method by creating
>> future1 then pass it in client.hello(future1), this method also blocks for 5
>> seconds. I do not know how to implement the async part properly.
>>
>> I appreciate the link you provided but it will take me sometime to digest
>> your sample code. In the mean time, it would be great if you can set me
>> straight by explaining why the async method is also blocking.
>>
>> Thanks,
>>
>> William
>>
>>
>> =======================
>> AvroClient.java - Client code.
>> =======================
>>
>> public class AvroClient {
>> public static void main(String[] args) throws InterruptedException,
>> ExecutionException, TimeoutException {
>> try {
>> NettyTransceiver transceiver = new NettyTransceiver(new
>> InetSocketAddress(6666));
>> Chat.Callback client = SpecificRequestor.getClient(
>> Chat.Callback.class, transceiver);
>>
>> System.out.println(client.hello()); //This should block for 5
>> seconds
>> System.out.println("This should print out 5 seconds later");
>>
>> final CallFuture<CharSequence> future1 = new
>> CallFuture<CharSequence>();
>> client.hello(future1); //This should not block.
>> System.out.println("This should print out immediately");
>> System.out.println(future1.get());
>>
>> transceiver.close();
>>
>> } catch (IOException ex) {
>> System.err.println(ex);
>> }
>> }
>> }
>>
>>
>>
>> ===========
>> ChatImpl.java
>> ===========
>>
>> public class ChatImpl implements Chat {
>> @Override
>> public CharSequence hello() throws AvroRemoteException {
>> try {
>> Thread.sleep(5000);
>> } catch (InterruptedException ex) {}
>> return new Utf8("Hello");
>> }
>> }
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Feb 1, 2012 at 11:09 AM, James Baldassari <[email protected]>
>> wrote:
>>>
>>> Hi William,
>>>
>>> The documentation around the async interface is definitely lacking.
>>> There should probably be a separate page on the Avro site for that. I'll
>>> try to find some time to work on it. In the meantime you can see some
>>> examples I put up on github:https://github.com/jbaldassari/Avro-RPC
>>>
>>> As for the problem you're having, there are no major issues with your
>>> code. The only thing wrong is that the server side (ChatImpl) should
>>> implement Chat, not Chat.Callback. One of the nice things about the async
>>> interface is that it only affects the client side of the RPC; the server
>>> doesn't have to have any knowledge that it's async. So the server
>>> implements the regular sync interface (Chat), and then the client is free to
>>> use either the sync or async version when invoking RPCs. Does that answer
>>> your question?
>>>
>>> -James
>>>
>>>
>>>
>>> On Tue, Jan 31, 2012 at 8:50 PM, William Afendy <[email protected]>
>>> wrote:
>>>>
>>>> Hi,
>>>>
>>>> I'm trying to implement Asynchronous calls by using NettyServer
>>>> implementation. After digging the source code, I found an example on how to
>>>> use NettyServer from TestNettyServerWithCallbacks.java
>>>>
>>>> When running a few test, I realize that NettyServer never calls
>>>> hello(Callback) method, instead it keeps calling the synchronous hello()
>>>> method. The client program prints out "Hello" but I'm expecting
>>>> "Hello-ASYNC" as a result. I really have no clue what's going on.
>>>>
>>>> I hope someone can shine some light on me and perhaps point out the
>>>> mistake or correct my logic. Below are the codes I use to perform a simple
>>>> asynchronous test.
>>>>
>>>> =======================
>>>> AvroClient.java - Client code.
>>>> =======================
>>>>
>>>> public class AvroClient {
>>>> public static void main(String[] args) throws InterruptedException,
>>>> ExecutionException, TimeoutException {
>>>> try {
>>>> NettyTransceiver transceiver = new NettyTransceiver(new
>>>> InetSocketAddress(6666));
>>>> Chat.Callback client =
>>>> SpecificRequestor.getClient(Chat.Callback.class, transceiver);
>>>>
>>>> final CallFuture<CharSequence> future1 = new
>>>> CallFuture<CharSequence>();
>>>> client.hello(future1);
>>>>
>>>> System.out.println(future1.get());
>>>>
>>>> transceiver.close();
>>>>
>>>> } catch (IOException ex) {
>>>> System.err.println(ex);
>>>> }
>>>> }
>>>> }
>>>>
>>>> ===========================
>>>> AvroNetty.java - The Server Code
>>>> ===========================
>>>>
>>>> public class AvroNetty {
>>>> public static void main(String[] args) {
>>>> Index indexImpl = new AsyncIndexImpl();
>>>> Chat chatImpl = new ChatImpl();
>>>>
>>>> Server server = new NettyServer(new
>>>> SpecificResponder(Chat.class, chatImpl), new InetSocketAddress(6666));
>>>> server.start();
>>>> System.out.println("Server is listening at port " +
>>>> server.getPort());
>>>> }
>>>> }
>>>>
>>>> ===========
>>>> ChatImpl.java
>>>> ===========
>>>>
>>>> public class ChatImpl implements Chat.Callback {
>>>> @Override
>>>> public void hello(org.apache.avro.ipc.Callback<CharSequence>
>>>> callback) throws IOException {
>>>> callback.handleResult("Hello-ASYNC");
>>>> }
>>>>
>>>> @Override
>>>> public CharSequence hello() throws AvroRemoteException {
>>>> return new Utf8("Hello");
>>>> }
>>>> }
>>>>
>>>> =============================================
>>>> Chat.java - This interface is auto-generated by avro-tool
>>>> =============================================
>>>>
>>>> @SuppressWarnings("all")
>>>> public interface Chat {
>>>> public static final org.apache.avro.Protocol PROTOCOL =
>>>> org.apache.avro.Protocol.parse("{\"protocol\":\"Chat\",\"namespace\":\"avro.test\",\"types\":[],\"messages\":{\"hello\":{\"request\":[],\"response\":\"string\"}}}");
>>>> java.lang.CharSequence hello() throws
>>>> org.apache.avro.AvroRemoteException;
>>>>
>>>> @SuppressWarnings("all")
>>>> public interface Callback extends Chat {
>>>> public static final org.apache.avro.Protocol PROTOCOL =
>>>> avro.test.Chat.PROTOCOL;
>>>> void hello(org.apache.avro.ipc.Callback<java.lang.CharSequence>
>>>> callback) throws java.io.IOException;
>>>> }
>>>> }
>>>>
>>>> ====================
>>>> Here is the Avro Schema
>>>> ====================
>>>>
>>>> {
>>>> "namespace": "avro.test",
>>>> "protocol": "Chat",
>>>>
>>>> "types" : [],
>>>>
>>>> "messages": {
>>>> "hello": {
>>>> "request": [],
>>>> "response": "string"
>>>> }
>>>> }
>>>> }
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> --
>>>> William Afendy
>>>
>>>
>>
>>
>>
>> --
>> William Afendy
>
>
--
William Afendy