Well that's really strange. I'm not sure why you would be seeing a different result there. Just to be sure we're both running the exact same code, I'm going to send you a maven project off list that I've been using to run my tests. See if the tests will pass for you. If so, then you can compare the code and find out where the difference is.
-James On Wed, Feb 1, 2012 at 2:22 AM, William Afendy <[email protected]> wrote: > Hi James, > > Thank you for taking the time explaining async in great details. Your > example is exactly the direction I want to go, but I'm getting a > different result. Is it be possible that there's some jar files > missing? > > Here is what I got after I applied your mod: > > // Test async call: > > 3. Wed Feb 01 14:58:45 SGT 2012: Saying Hello (async)... > 4. Wed Feb 01 14:58:50 SGT 2012: Chat.hello(Callback<CharSequence>) > returned > 5. Wed Feb 01 14:58:50 SGT 2012: Callback<CharSequence>.get() returned > "Hello-17" > > // The Client code > > NettyTransceiver transceiver = new NettyTransceiver(new > InetSocketAddress(6666)); > Chat.Callback client = > SpecificRequestor.getClient(Chat.Callback.class, transceiver); > > final CallFuture<CharSequence> future1 = new CallFuture<CharSequence>(); > System.out.println("\n3. " + new Date() + ": Saying Hello (async)..."); > client.hello(future1); // This should not block. > System.out.println("4. " + new Date() + ": > Chat.hello(Callback<CharSequence>) returned"); > CharSequence asyncResult = future1.get(); // This should block for 5 > seconds > System.out.println("5. " + new Date() + ": > Callback<CharSequence>.get() returned \"" + asyncResult + "\""); > transceiver.close(); > > // Jar Libraries > > avro-1.6.1.jar > avro-ipc-1.6.1.jar > netty-3.3.0.Final.jar > slf4j-log4j12-1.6.1.jar > slf4j-api-1.6.1.jar > log4j-1.2.15.jar > jackson-core-asl-1.4.2.jar > jackson-mapper-asl-1.4.2.jar > > > > On Wed, Feb 1, 2012 at 2:24 PM, James Baldassari <[email protected]> > wrote: > > Hi William, > > > > Great test. I ran your code, and it worked as expected for me, but I > made > > some slight changes to the client side to demonstrate what's happening: > > > > // Test sync call: > > System.out.println("1. " + new Date() + ": Saying Hello (sync)..."); > > CharSequence syncResult = client.hello(); // This should block for 5 > > seconds > > System.out.println("2. " new Date() + ": Chat.hello() returned \"" + > > syncResult + "\""); > > > > // Test async call: > > > > final CallFuture<CharSequence> future1 = new > CallFuture<CharSequence>(); > > System.out.println("\n3. " + new Date() + ": Saying Hello > (async)..."); > > client.hello(future1); // This should not block. > > System.out.println("4. " + new Date() + ": > > Chat.hello(Callback<CharSequence>) returned"); > > CharSequence asyncResult = future1.get(); // This should block for 5 > > seconds > > System.out.println("5. " + new Date() + ": > Callback<CharSequence>.get() > > returned \"" + asyncResult + "\""); > > > > When I ran that I got the following output: > > > > 1. Wed Feb 01 00:13:36 EST 2012: Saying Hello (sync)... > > 2. Wed Feb 01 00:13:41 EST 2012: Chat.hello() returned "Hello" > > > > 3. Wed Feb 01 00:13:41 EST 2012: Saying Hello (async)... > > 4. Wed Feb 01 00:13:41 EST 2012: Chat.hello(Callback<CharSequence>) > > returned > > 5. Wed Feb 01 00:13:46 EST 2012: Callback<CharSequence>.get() > returned > > "Hello" > > > > As you can see, the synchronous call (lines 1-2) blocked for about 5 > seconds > > as expected. When the asynchronous call was invoked it returned > immediately > > (note timestamps on lines 3-4). The part that blocked was the > > CallFuture.get() on line 5 of the output. The result of the callback > can't > > be obtained until the server returns it (after waiting 5 seconds). > > > > I think I may know why this behavior seems confusing. In practice I > don't > > think many people will use CallFuture. It's basically an adapter to > make an > > asynchronous call synchronous by blocking until the result returns. > This is > > useful in unit tests and in situations where the client can't proceed > until > > the result is available. However, to really take advantage of the > > asynchronous API you never want to wait for the result of an RPC. The > > client should just invoke async RPCs with some Callback instance and then > > move onto other things, such as invoking more RPCs! > > > > Here's an example. Let's say we have an e-mail server with an Avro > protocol > > that allows us to access the users' mailboxes. We might have a method to > > allow us to search for all messages with a subject line that matches some > > regular expression. In IDL it might look something like this: > > > > protocol Mail { > > record Message { > > string from; > > array<string> to; > > union { string, null } subject; > > union { string, null } body; > > } > > array<Message> findBySubject(string regexp); > > } > > > > It doesn't really matter what the implementation of this protocol looks > like > > on the server side. Searching through all messages is likely to take > some > > time, so what we would want to do is to fire off an async RPC as soon as > the > > user clicks the search button, then return control to the UI immediately > so > > that the user can continue doing other things while the search is > running. > > Whenever the results come back we would then notify the user or populate > the > > search results in the UI, e.g. via ajax/comet if it's a web app. So we > > would have a Callback implementation that would look something like this: > > > > public class FindBySubjectCallback implements Callback<List<Message>> { > > private final RequestContext context; // RequestContext is some > class > > that allows us to send events back to the user > > public FindBySubjectCallback(RequestContext context) { > > this.context = context; > > } > > @Override > > public void handleResult(List<Message> result) { > > // Notify user with results: > > requestContext.fireSearchResultReadyEvent(result); > > } > > @Override > > public void handleError(Throwable error) { > > // Notify user that an error occurred: > > requestContext.fireErrorEvent(error); > > } > > } > > > > The client, which might be running in a servlet container, would then > just > > invoke the RPC like this: > > > > private Mail.Callback mailClient; // Client is initialized/injected > > somewhere > > ... > > public void findBySubject(String regexp, RequestContext context) { > > mailClient.findBySubject(regexp, new > FindBySubjectCallback(context)); > > // return immediately without waiting for the search to complete! > > } > > ... > > > > Anyway, hope that makes some sense. Let me know if you have any > questions. > > > > -James > > > > > > > > On Tue, Jan 31, 2012 at 11:23 PM, William Afendy <[email protected]> > wrote: > >> > >> Hi James, > >> > >> Thank you for your quick response. I'm still fairly new to the async > >> stuff. I fixed the ChatImpl as suggested to implement Chat instead of > >> Chat.Callback. I also added a 5 secs delay in the method hello(). > >> > >> There is still something missing, I can't really see the non-blocking > >> (async) part from Netty implementation. Please take a look at the > >> AvroClient.java code below, I understand when the client.hello() gets > >> called, this is the synchronous (blocking) part of the code. It blocked > for > >> 5 seconds as expected. Now, when I'm testing the async method by > creating > >> future1 then pass it in client.hello(future1), this method also blocks > for 5 > >> seconds. I do not know how to implement the async part properly. > >> > >> I appreciate the link you provided but it will take me sometime to > digest > >> your sample code. In the mean time, it would be great if you can set me > >> straight by explaining why the async method is also blocking. > >> > >> Thanks, > >> > >> William > >> > >> > >> ======================= > >> AvroClient.java - Client code. > >> ======================= > >> > >> public class AvroClient { > >> public static void main(String[] args) throws InterruptedException, > >> ExecutionException, TimeoutException { > >> try { > >> NettyTransceiver transceiver = new NettyTransceiver(new > >> InetSocketAddress(6666)); > >> Chat.Callback client = SpecificRequestor.getClient( > >> Chat.Callback.class, transceiver); > >> > >> System.out.println(client.hello()); //This should block for > 5 > >> seconds > >> System.out.println("This should print out 5 seconds later"); > >> > >> final CallFuture<CharSequence> future1 = new > >> CallFuture<CharSequence>(); > >> client.hello(future1); //This should not block. > >> System.out.println("This should print out immediately"); > >> System.out.println(future1.get()); > >> > >> transceiver.close(); > >> > >> } catch (IOException ex) { > >> System.err.println(ex); > >> } > >> } > >> } > >> > >> > >> > >> =========== > >> ChatImpl.java > >> =========== > >> > >> public class ChatImpl implements Chat { > >> @Override > >> public CharSequence hello() throws AvroRemoteException { > >> try { > >> Thread.sleep(5000); > >> } catch (InterruptedException ex) {} > >> return new Utf8("Hello"); > >> } > >> } > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> On Wed, Feb 1, 2012 at 11:09 AM, James Baldassari < > [email protected]> > >> wrote: > >>> > >>> Hi William, > >>> > >>> The documentation around the async interface is definitely lacking. > >>> There should probably be a separate page on the Avro site for that. > I'll > >>> try to find some time to work on it. In the meantime you can see some > >>> examples I put up on github:https://github.com/jbaldassari/Avro-RPC > >>> > >>> As for the problem you're having, there are no major issues with your > >>> code. The only thing wrong is that the server side (ChatImpl) should > >>> implement Chat, not Chat.Callback. One of the nice things about the > async > >>> interface is that it only affects the client side of the RPC; the > server > >>> doesn't have to have any knowledge that it's async. So the server > >>> implements the regular sync interface (Chat), and then the client is > free to > >>> use either the sync or async version when invoking RPCs. Does that > answer > >>> your question? > >>> > >>> -James > >>> > >>> > >>> > >>> On Tue, Jan 31, 2012 at 8:50 PM, William Afendy <[email protected]> > >>> wrote: > >>>> > >>>> Hi, > >>>> > >>>> I'm trying to implement Asynchronous calls by using NettyServer > >>>> implementation. After digging the source code, I found an example on > how to > >>>> use NettyServer from TestNettyServerWithCallbacks.java > >>>> > >>>> When running a few test, I realize that NettyServer never calls > >>>> hello(Callback) method, instead it keeps calling the synchronous > hello() > >>>> method. The client program prints out "Hello" but I'm expecting > >>>> "Hello-ASYNC" as a result. I really have no clue what's going on. > >>>> > >>>> I hope someone can shine some light on me and perhaps point out the > >>>> mistake or correct my logic. Below are the codes I use to perform a > simple > >>>> asynchronous test. > >>>> > >>>> ======================= > >>>> AvroClient.java - Client code. > >>>> ======================= > >>>> > >>>> public class AvroClient { > >>>> public static void main(String[] args) throws > InterruptedException, > >>>> ExecutionException, TimeoutException { > >>>> try { > >>>> NettyTransceiver transceiver = new NettyTransceiver(new > >>>> InetSocketAddress(6666)); > >>>> Chat.Callback client = > >>>> SpecificRequestor.getClient(Chat.Callback.class, transceiver); > >>>> > >>>> final CallFuture<CharSequence> future1 = new > >>>> CallFuture<CharSequence>(); > >>>> client.hello(future1); > >>>> > >>>> System.out.println(future1.get()); > >>>> > >>>> transceiver.close(); > >>>> > >>>> } catch (IOException ex) { > >>>> System.err.println(ex); > >>>> } > >>>> } > >>>> } > >>>> > >>>> =========================== > >>>> AvroNetty.java - The Server Code > >>>> =========================== > >>>> > >>>> public class AvroNetty { > >>>> public static void main(String[] args) { > >>>> Index indexImpl = new AsyncIndexImpl(); > >>>> Chat chatImpl = new ChatImpl(); > >>>> > >>>> Server server = new NettyServer(new > >>>> SpecificResponder(Chat.class, chatImpl), new InetSocketAddress(6666)); > >>>> server.start(); > >>>> System.out.println("Server is listening at port " + > >>>> server.getPort()); > >>>> } > >>>> } > >>>> > >>>> =========== > >>>> ChatImpl.java > >>>> =========== > >>>> > >>>> public class ChatImpl implements Chat.Callback { > >>>> @Override > >>>> public void hello(org.apache.avro.ipc.Callback<CharSequence> > >>>> callback) throws IOException { > >>>> callback.handleResult("Hello-ASYNC"); > >>>> } > >>>> > >>>> @Override > >>>> public CharSequence hello() throws AvroRemoteException { > >>>> return new Utf8("Hello"); > >>>> } > >>>> } > >>>> > >>>> ============================================= > >>>> Chat.java - This interface is auto-generated by avro-tool > >>>> ============================================= > >>>> > >>>> @SuppressWarnings("all") > >>>> public interface Chat { > >>>> public static final org.apache.avro.Protocol PROTOCOL = > >>>> > org.apache.avro.Protocol.parse("{\"protocol\":\"Chat\",\"namespace\":\"avro.test\",\"types\":[],\"messages\":{\"hello\":{\"request\":[],\"response\":\"string\"}}}"); > >>>> java.lang.CharSequence hello() throws > >>>> org.apache.avro.AvroRemoteException; > >>>> > >>>> @SuppressWarnings("all") > >>>> public interface Callback extends Chat { > >>>> public static final org.apache.avro.Protocol PROTOCOL = > >>>> avro.test.Chat.PROTOCOL; > >>>> void > hello(org.apache.avro.ipc.Callback<java.lang.CharSequence> > >>>> callback) throws java.io.IOException; > >>>> } > >>>> } > >>>> > >>>> ==================== > >>>> Here is the Avro Schema > >>>> ==================== > >>>> > >>>> { > >>>> "namespace": "avro.test", > >>>> "protocol": "Chat", > >>>> > >>>> "types" : [], > >>>> > >>>> "messages": { > >>>> "hello": { > >>>> "request": [], > >>>> "response": "string" > >>>> } > >>>> } > >>>> } > >>>> > >>>> > >>>> Thanks, > >>>> > >>>> -- > >>>> William Afendy > >>> > >>> > >> > >> > >> > >> -- > >> William Afendy > > > > > > > > -- > William Afendy >
