[hotfix] [tests] Stabilize AsyncCallsTest by ensuring delayed messages are not 
executed before their time


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0391e414
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0391e414
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0391e414

Branch: refs/heads/master
Commit: 0391e41464d9cbe35168d9b4b51fa4325eebb8af
Parents: c962c45
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Apr 19 18:21:43 2017 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Apr 20 10:52:37 2017 +0200

----------------------------------------------------------------------
 .../runtime/rpc/akka/AkkaInvocationHandler.java |  9 +++---
 .../flink/runtime/rpc/akka/AkkaRpcActor.java    | 33 +++++++++++---------
 .../runtime/rpc/akka/messages/RunAsync.java     | 23 ++++++--------
 .../flink/runtime/rpc/AsyncCallsTest.java       |  6 ++--
 4 files changed, 37 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0391e414/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index 56505f9..c21accf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -177,12 +177,13 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaGateway, MainThrea
        }
 
        @Override
-       public void scheduleRunAsync(Runnable runnable, long delay) {
+       public void scheduleRunAsync(Runnable runnable, long delayMillis) {
                checkNotNull(runnable, "runnable");
-               checkArgument(delay >= 0, "delay must be zero or greater");
-               
+               checkArgument(delayMillis >= 0, "delay must be zero or 
greater");
+
                if (isLocal) {
-                       rpcEndpoint.tell(new RunAsync(runnable, delay), 
ActorRef.noSender());
+                       long atTimeNanos = delayMillis == 0 ? 0 : 
System.nanoTime() + (delayMillis * 1_000_000);
+                       rpcEndpoint.tell(new RunAsync(runnable, atTimeNanos), 
ActorRef.noSender());
                } else {
                        throw new RuntimeException("Trying to send a Runnable 
to a remote actor at " +
                                rpcEndpoint.path() + ". This is not 
supported.");

http://git-wip-us.apache.org/repos/asf/flink/blob/0391e414/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 99f8211..86cd83e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -268,22 +268,27 @@ class AkkaRpcActor<C extends RpcGateway, T extends 
RpcEndpoint<C>> extends Untyp
                                runAsync.getClass().getName(),
                                runAsync.getClass().getName());
                }
-               else if (runAsync.getDelay() == 0) {
-                       // run immediately
-                       try {
-                               runAsync.getRunnable().run();
-                       } catch (Throwable t) {
-                               LOG.error("Caught exception while executing 
runnable in main thread.", t);
-                               ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
-                       }
-               }
                else {
-                       // schedule for later. send a new message after the 
delay, which will then be immediately executed 
-                       FiniteDuration delay = new 
FiniteDuration(runAsync.getDelay(), TimeUnit.MILLISECONDS);
-                       RunAsync message = new RunAsync(runAsync.getRunnable(), 
0);
+                       final long timeToRun = runAsync.getTimeNanos(); 
+                       final long delayNanos;
+
+                       if (timeToRun == 0 || (delayNanos = timeToRun - 
System.nanoTime()) <= 0) {
+                               // run immediately
+                               try {
+                                       runAsync.getRunnable().run();
+                               } catch (Throwable t) {
+                                       LOG.error("Caught exception while 
executing runnable in main thread.", t);
+                                       
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+                               }
+                       }
+                       else {
+                               // schedule for later. send a new message after 
the delay, which will then be immediately executed 
+                               FiniteDuration delay = new 
FiniteDuration(delayNanos, TimeUnit.NANOSECONDS);
+                               RunAsync message = new 
RunAsync(runAsync.getRunnable(), timeToRun);
 
-                       getContext().system().scheduler().scheduleOnce(delay, 
getSelf(), message,
-                                       getContext().dispatcher(), 
ActorRef.noSender());
+                               
getContext().system().scheduler().scheduleOnce(delay, getSelf(), message,
+                                               getContext().dispatcher(), 
ActorRef.noSender());
+                       }
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0391e414/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
index ce4f9d6..4b8a0b4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java
@@ -18,40 +18,37 @@
 
 package org.apache.flink.runtime.rpc.akka.messages;
 
-import java.io.Serializable;
-
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
  * Message for asynchronous runnable invocations
  */
-public final class RunAsync implements Serializable {
-       private static final long serialVersionUID = -3080595100695371036L;
+public final class RunAsync {
 
        /** The runnable to be executed. Transient, so it gets lost upon 
serialization */ 
-       private final transient Runnable runnable;
+       private final Runnable runnable;
 
        /** The delay after which the runnable should be called */
-       private final long delay;
+       private final long atTimeNanos;
 
        /**
         * Creates a new {@code RunAsync} message.
         * 
-        * @param runnable  The Runnable to run.
-        * @param delay     The delay in milliseconds. Zero indicates immediate 
execution.
+        * @param runnable    The Runnable to run.
+        * @param atTimeNanos The time (as for System.nanoTime()) when to 
execute the runnable.
         */
-       public RunAsync(Runnable runnable, long delay) {
-               checkArgument(delay >= 0);
+       public RunAsync(Runnable runnable, long atTimeNanos) {
+               checkArgument(atTimeNanos >= 0);
                this.runnable = checkNotNull(runnable);
-               this.delay = delay;
+               this.atTimeNanos = atTimeNanos;
        }
 
        public Runnable getRunnable() {
                return runnable;
        }
 
-       public long getDelay() {
-               return delay;
+       public long getTimeNanos() {
+               return atTimeNanos;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0391e414/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
index 7affdb9..e636d6c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -23,10 +23,10 @@ import akka.actor.ActorSystem;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
-
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
 import org.junit.Test;
 
@@ -121,7 +121,7 @@ public class AsyncCallsTest extends TestLogger {
                final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
                final OneShotLatch latch = new OneShotLatch();
 
-               final long delay = 200;
+               final long delay = 100;
 
                TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, 
lock);
                testEndpoint.start();
@@ -161,7 +161,7 @@ public class AsyncCallsTest extends TestLogger {
                assertFalse("Rpc Endpoint had concurrent access", 
testEndpoint.hasConcurrentAccess());
                assertFalse("Rpc Endpoint had concurrent access", 
concurrentAccess.get());
 
-               assertTrue("call was not properly delayed", ((stop - start) / 
1000000) >= delay);
+               assertTrue("call was not properly delayed", ((stop - start) / 
1_000_000) >= delay);
        }
 
        // 
------------------------------------------------------------------------

Reply via email to