Hi James,

Thank you for taking the time explaining async in great details. Your
example is exactly the direction I want to go, but I'm getting a
different result. Is it be possible that there's some jar files
missing?

Here is what I got after I applied your mod:

// Test async call:

3. Wed Feb 01 14:58:45 SGT 2012: Saying Hello (async)...
4. Wed Feb 01 14:58:50 SGT 2012: Chat.hello(Callback<CharSequence>) returned
5. Wed Feb 01 14:58:50 SGT 2012: Callback<CharSequence>.get() returned
"Hello-17"

// The Client code

NettyTransceiver transceiver = new NettyTransceiver(new
InetSocketAddress(6666));
Chat.Callback client =
SpecificRequestor.getClient(Chat.Callback.class, transceiver);

final CallFuture<CharSequence> future1 = new CallFuture<CharSequence>();
System.out.println("\n3. " + new Date() + ": Saying Hello (async)...");
client.hello(future1); // This should not block.
System.out.println("4. " + new Date() + ":
Chat.hello(Callback<CharSequence>) returned");
CharSequence asyncResult = future1.get(); // This should block for 5 seconds
System.out.println("5. " + new Date() + ":
Callback<CharSequence>.get() returned \"" + asyncResult + "\"");
transceiver.close();

// Jar Libraries

avro-1.6.1.jar
avro-ipc-1.6.1.jar
netty-3.3.0.Final.jar
slf4j-log4j12-1.6.1.jar
slf4j-api-1.6.1.jar
log4j-1.2.15.jar
jackson-core-asl-1.4.2.jar
jackson-mapper-asl-1.4.2.jar



On Wed, Feb 1, 2012 at 2:24 PM, James Baldassari <[email protected]> wrote:
> Hi William,
>
> Great test.  I ran your code, and it worked as expected for me, but I made
> some slight changes to the client side to demonstrate what's happening:
>
>     // Test sync call:
>     System.out.println("1. " + new Date() + ": Saying Hello (sync)...");
>     CharSequence syncResult = client.hello(); // This should block for 5
> seconds
>     System.out.println("2. " new Date() + ": Chat.hello() returned \"" +
> syncResult + "\"");
>
>     // Test async call:
>
>     final CallFuture<CharSequence> future1 = new CallFuture<CharSequence>();
>     System.out.println("\n3. " + new Date() + ": Saying Hello (async)...");
>     client.hello(future1); // This should not block.
>     System.out.println("4. " + new Date() + ":
> Chat.hello(Callback<CharSequence>) returned");
>     CharSequence asyncResult = future1.get(); // This should block for 5
> seconds
>     System.out.println("5. " + new Date() + ": Callback<CharSequence>.get()
> returned \"" + asyncResult + "\"");
>
> When I ran that I got the following output:
>
>     1. Wed Feb 01 00:13:36 EST 2012: Saying Hello (sync)...
>     2. Wed Feb 01 00:13:41 EST 2012: Chat.hello() returned "Hello"
>
>     3. Wed Feb 01 00:13:41 EST 2012: Saying Hello (async)...
>     4. Wed Feb 01 00:13:41 EST 2012: Chat.hello(Callback<CharSequence>)
> returned
>     5. Wed Feb 01 00:13:46 EST 2012: Callback<CharSequence>.get() returned
> "Hello"
>
> As you can see, the synchronous call (lines 1-2) blocked for about 5 seconds
> as expected.  When the asynchronous call was invoked it returned immediately
> (note timestamps on lines 3-4).  The part that blocked was the
> CallFuture.get() on line 5 of the output.  The result of the callback can't
> be obtained until the server returns it (after waiting 5 seconds).
>
> I think I may know why this behavior seems confusing.  In practice I don't
> think many people will use CallFuture.  It's basically an adapter to make an
> asynchronous call synchronous by blocking until the result returns.  This is
> useful in unit tests and in situations where the client can't proceed until
> the result is available.  However, to really take advantage of the
> asynchronous API you never want to wait for the result of an RPC.  The
> client should just invoke async RPCs with some Callback instance and then
> move onto other things, such as invoking more RPCs!
>
> Here's an example.  Let's say we have an e-mail server with an Avro protocol
> that allows us to access the users' mailboxes.  We might have a method to
> allow us to search for all messages with a subject line that matches some
> regular expression.  In IDL it might look something like this:
>
>     protocol Mail {
>       record Message {
>         string from;
>         array<string> to;
>         union { string, null } subject;
>         union { string, null } body;
>       }
>       array<Message> findBySubject(string regexp);
>     }
>
> It doesn't really matter what the implementation of this protocol looks like
> on the server side.  Searching through all messages is likely to take some
> time, so what we would want to do is to fire off an async RPC as soon as the
> user clicks the search button, then return control to the UI immediately so
> that the user can continue doing other things while the search is running.
> Whenever the results come back we would then notify the user or populate the
> search results in the UI, e.g. via ajax/comet if it's a web app.  So we
> would have a Callback implementation that would look something like this:
>
>   public class FindBySubjectCallback implements Callback<List<Message>> {
>     private final RequestContext context;  // RequestContext is some class
> that allows us to send events back to the user
>     public FindBySubjectCallback(RequestContext context) {
>       this.context = context;
>     }
>     @Override
>     public void handleResult(List<Message> result) {
>       // Notify user with results:
>       requestContext.fireSearchResultReadyEvent(result);
>     }
>     @Override
>     public void handleError(Throwable error) {
>       // Notify user that an error occurred:
>       requestContext.fireErrorEvent(error);
>     }
>   }
>
> The client, which might be running in a servlet container, would then just
> invoke the RPC like this:
>
>     private Mail.Callback mailClient; // Client is initialized/injected
> somewhere
>     ...
>     public void findBySubject(String regexp, RequestContext context) {
>       mailClient.findBySubject(regexp, new FindBySubjectCallback(context));
>       // return immediately without waiting for the search to complete!
>     }
>     ...
>
> Anyway, hope that makes some sense.  Let me know if you have any questions.
>
> -James
>
>
>
> On Tue, Jan 31, 2012 at 11:23 PM, William Afendy <[email protected]> wrote:
>>
>> Hi James,
>>
>> Thank you for your quick response. I'm still fairly new to the async
>> stuff. I fixed the ChatImpl as suggested to implement Chat instead of
>> Chat.Callback. I also added a 5 secs delay in the method hello().
>>
>> There is still something missing, I can't really see the non-blocking
>> (async) part from Netty implementation. Please take a look at the
>> AvroClient.java code below, I understand when the client.hello() gets
>> called, this is the synchronous (blocking) part of the code. It blocked for
>> 5 seconds as expected. Now, when I'm testing the async method by creating
>> future1 then pass it in client.hello(future1), this method also blocks for 5
>> seconds. I do not know how to implement the async part properly.
>>
>> I appreciate the link you provided but it will take me sometime to digest
>> your sample code. In the mean time, it would be great if you can set me
>> straight by explaining why the async method is also blocking.
>>
>> Thanks,
>>
>> William
>>
>>
>> =======================
>> AvroClient.java - Client code.
>> =======================
>>
>> public class AvroClient {
>>     public static void main(String[] args) throws InterruptedException,
>> ExecutionException, TimeoutException {
>>         try {
>>             NettyTransceiver transceiver = new NettyTransceiver(new
>> InetSocketAddress(6666));
>>             Chat.Callback client = SpecificRequestor.getClient(
>> Chat.Callback.class, transceiver);
>>
>>             System.out.println(client.hello()); //This should block for 5
>> seconds
>>             System.out.println("This should print out 5 seconds later");
>>
>>             final CallFuture<CharSequence> future1 = new
>> CallFuture<CharSequence>();
>>             client.hello(future1); //This should not block.
>>             System.out.println("This should print out immediately");
>>             System.out.println(future1.get());
>>
>>             transceiver.close();
>>
>>         } catch (IOException ex) {
>>             System.err.println(ex);
>>         }
>>     }
>> }
>>
>>
>>
>> ===========
>> ChatImpl.java
>> ===========
>>
>> public class ChatImpl implements Chat {
>>     @Override
>>     public CharSequence hello() throws AvroRemoteException {
>>         try {
>>             Thread.sleep(5000);
>>         } catch (InterruptedException ex) {}
>>         return new Utf8("Hello");
>>     }
>> }
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Feb 1, 2012 at 11:09 AM, James Baldassari <[email protected]>
>> wrote:
>>>
>>> Hi William,
>>>
>>> The documentation around the async interface is definitely lacking.
>>> There should probably be a separate page on the Avro site for that.  I'll
>>> try to find some time to work on it.  In the meantime you can see some
>>> examples I put up on github:https://github.com/jbaldassari/Avro-RPC
>>>
>>> As for the problem you're having, there are no major issues with your
>>> code.  The only thing wrong is that the server side (ChatImpl) should
>>> implement Chat, not Chat.Callback.  One of the nice things about the async
>>> interface is that it only affects the client side of the RPC; the server
>>> doesn't have to have any knowledge that it's async.  So the server
>>> implements the regular sync interface (Chat), and then the client is free to
>>> use either the sync or async version when invoking RPCs.  Does that answer
>>> your question?
>>>
>>> -James
>>>
>>>
>>>
>>> On Tue, Jan 31, 2012 at 8:50 PM, William Afendy <[email protected]>
>>> wrote:
>>>>
>>>> Hi,
>>>>
>>>> I'm trying to implement Asynchronous calls by using NettyServer
>>>> implementation. After digging the source code, I found an example on how to
>>>> use NettyServer from TestNettyServerWithCallbacks.java
>>>>
>>>> When running a few test, I realize that NettyServer never calls
>>>> hello(Callback) method, instead it keeps calling the synchronous hello()
>>>> method. The client program prints out "Hello" but I'm expecting
>>>> "Hello-ASYNC" as a result. I really have no clue what's going on.
>>>>
>>>> I hope someone can shine some light on me and perhaps point out the
>>>> mistake or correct my logic. Below are the codes I use to perform a simple
>>>> asynchronous test.
>>>>
>>>> =======================
>>>> AvroClient.java - Client code.
>>>> =======================
>>>>
>>>> public class AvroClient {
>>>>     public static void main(String[] args) throws InterruptedException,
>>>> ExecutionException, TimeoutException {
>>>>         try {
>>>>             NettyTransceiver transceiver = new NettyTransceiver(new
>>>> InetSocketAddress(6666));
>>>>             Chat.Callback client =
>>>> SpecificRequestor.getClient(Chat.Callback.class, transceiver);
>>>>
>>>>             final CallFuture<CharSequence> future1 = new
>>>> CallFuture<CharSequence>();
>>>>             client.hello(future1);
>>>>
>>>>             System.out.println(future1.get());
>>>>
>>>>             transceiver.close();
>>>>
>>>>         } catch (IOException ex) {
>>>>             System.err.println(ex);
>>>>         }
>>>>     }
>>>> }
>>>>
>>>> ===========================
>>>> AvroNetty.java - The Server Code
>>>> ===========================
>>>>
>>>> public class AvroNetty {
>>>>     public static void main(String[] args) {
>>>>         Index indexImpl = new AsyncIndexImpl();
>>>>         Chat chatImpl = new ChatImpl();
>>>>
>>>>         Server server = new NettyServer(new
>>>> SpecificResponder(Chat.class, chatImpl), new InetSocketAddress(6666));
>>>>         server.start();
>>>>         System.out.println("Server is listening at port " +
>>>> server.getPort());
>>>>     }
>>>> }
>>>>
>>>> ===========
>>>> ChatImpl.java
>>>> ===========
>>>>
>>>> public class ChatImpl implements Chat.Callback {
>>>>     @Override
>>>>     public void hello(org.apache.avro.ipc.Callback<CharSequence>
>>>> callback) throws IOException {
>>>>         callback.handleResult("Hello-ASYNC");
>>>>     }
>>>>
>>>>     @Override
>>>>     public CharSequence hello() throws AvroRemoteException {
>>>>         return new Utf8("Hello");
>>>>     }
>>>> }
>>>>
>>>> =============================================
>>>> Chat.java - This interface is auto-generated by avro-tool
>>>> =============================================
>>>>
>>>> @SuppressWarnings("all")
>>>> public interface Chat {
>>>>     public static final org.apache.avro.Protocol PROTOCOL =
>>>> org.apache.avro.Protocol.parse("{\"protocol\":\"Chat\",\"namespace\":\"avro.test\",\"types\":[],\"messages\":{\"hello\":{\"request\":[],\"response\":\"string\"}}}");
>>>>     java.lang.CharSequence hello() throws
>>>> org.apache.avro.AvroRemoteException;
>>>>
>>>>     @SuppressWarnings("all")
>>>>     public interface Callback extends Chat {
>>>>         public static final org.apache.avro.Protocol PROTOCOL =
>>>> avro.test.Chat.PROTOCOL;
>>>>         void hello(org.apache.avro.ipc.Callback<java.lang.CharSequence>
>>>> callback) throws java.io.IOException;
>>>>     }
>>>> }
>>>>
>>>> ====================
>>>> Here is the Avro Schema
>>>> ====================
>>>>
>>>> {
>>>>   "namespace": "avro.test",
>>>>   "protocol": "Chat",
>>>>
>>>>   "types" : [],
>>>>
>>>>   "messages": {
>>>>       "hello": {
>>>>                     "request": [],
>>>>                     "response": "string"
>>>>       }
>>>>   }
>>>> }
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> --
>>>> William Afendy
>>>
>>>
>>
>>
>>
>> --
>> William Afendy
>
>



-- 
William Afendy

Reply via email to