[hotfix] [tests] Stabilize AsyncCallsTest by ensuring delayed messages are not executed before their time
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0391e414 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0391e414 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0391e414 Branch: refs/heads/master Commit: 0391e41464d9cbe35168d9b4b51fa4325eebb8af Parents: c962c45 Author: Stephan Ewen <se...@apache.org> Authored: Wed Apr 19 18:21:43 2017 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Thu Apr 20 10:52:37 2017 +0200 ---------------------------------------------------------------------- .../runtime/rpc/akka/AkkaInvocationHandler.java | 9 +++--- .../flink/runtime/rpc/akka/AkkaRpcActor.java | 33 +++++++++++--------- .../runtime/rpc/akka/messages/RunAsync.java | 23 ++++++-------- .../flink/runtime/rpc/AsyncCallsTest.java | 6 ++-- 4 files changed, 37 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0391e414/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java index 56505f9..c21accf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java @@ -177,12 +177,13 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, MainThrea } @Override - public void scheduleRunAsync(Runnable runnable, long delay) { + public void scheduleRunAsync(Runnable runnable, long delayMillis) { checkNotNull(runnable, "runnable"); - checkArgument(delay >= 0, "delay must be zero or greater"); - + checkArgument(delayMillis >= 0, "delay must be zero or greater"); + if (isLocal) { - rpcEndpoint.tell(new RunAsync(runnable, delay), ActorRef.noSender()); + long atTimeNanos = delayMillis == 0 ? 0 : System.nanoTime() + (delayMillis * 1_000_000); + rpcEndpoint.tell(new RunAsync(runnable, atTimeNanos), ActorRef.noSender()); } else { throw new RuntimeException("Trying to send a Runnable to a remote actor at " + rpcEndpoint.path() + ". This is not supported."); http://git-wip-us.apache.org/repos/asf/flink/blob/0391e414/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java index 99f8211..86cd83e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java @@ -268,22 +268,27 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp runAsync.getClass().getName(), runAsync.getClass().getName()); } - else if (runAsync.getDelay() == 0) { - // run immediately - try { - runAsync.getRunnable().run(); - } catch (Throwable t) { - LOG.error("Caught exception while executing runnable in main thread.", t); - ExceptionUtils.rethrowIfFatalErrorOrOOM(t); - } - } else { - // schedule for later. send a new message after the delay, which will then be immediately executed - FiniteDuration delay = new FiniteDuration(runAsync.getDelay(), TimeUnit.MILLISECONDS); - RunAsync message = new RunAsync(runAsync.getRunnable(), 0); + final long timeToRun = runAsync.getTimeNanos(); + final long delayNanos; + + if (timeToRun == 0 || (delayNanos = timeToRun - System.nanoTime()) <= 0) { + // run immediately + try { + runAsync.getRunnable().run(); + } catch (Throwable t) { + LOG.error("Caught exception while executing runnable in main thread.", t); + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); + } + } + else { + // schedule for later. send a new message after the delay, which will then be immediately executed + FiniteDuration delay = new FiniteDuration(delayNanos, TimeUnit.NANOSECONDS); + RunAsync message = new RunAsync(runAsync.getRunnable(), timeToRun); - getContext().system().scheduler().scheduleOnce(delay, getSelf(), message, - getContext().dispatcher(), ActorRef.noSender()); + getContext().system().scheduler().scheduleOnce(delay, getSelf(), message, + getContext().dispatcher(), ActorRef.noSender()); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/0391e414/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java index ce4f9d6..4b8a0b4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/RunAsync.java @@ -18,40 +18,37 @@ package org.apache.flink.runtime.rpc.akka.messages; -import java.io.Serializable; - import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkArgument; /** * Message for asynchronous runnable invocations */ -public final class RunAsync implements Serializable { - private static final long serialVersionUID = -3080595100695371036L; +public final class RunAsync { /** The runnable to be executed. Transient, so it gets lost upon serialization */ - private final transient Runnable runnable; + private final Runnable runnable; /** The delay after which the runnable should be called */ - private final long delay; + private final long atTimeNanos; /** * Creates a new {@code RunAsync} message. * - * @param runnable The Runnable to run. - * @param delay The delay in milliseconds. Zero indicates immediate execution. + * @param runnable The Runnable to run. + * @param atTimeNanos The time (as for System.nanoTime()) when to execute the runnable. */ - public RunAsync(Runnable runnable, long delay) { - checkArgument(delay >= 0); + public RunAsync(Runnable runnable, long atTimeNanos) { + checkArgument(atTimeNanos >= 0); this.runnable = checkNotNull(runnable); - this.delay = delay; + this.atTimeNanos = atTimeNanos; } public Runnable getRunnable() { return runnable; } - public long getDelay() { - return delay; + public long getTimeNanos() { + return atTimeNanos; } } http://git-wip-us.apache.org/repos/asf/flink/blob/0391e414/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java index 7affdb9..e636d6c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java @@ -23,10 +23,10 @@ import akka.actor.ActorSystem; import org.apache.flink.api.common.time.Time; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.akka.AkkaUtils; - import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.util.TestLogger; + import org.junit.AfterClass; import org.junit.Test; @@ -121,7 +121,7 @@ public class AsyncCallsTest extends TestLogger { final AtomicBoolean concurrentAccess = new AtomicBoolean(false); final OneShotLatch latch = new OneShotLatch(); - final long delay = 200; + final long delay = 100; TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, lock); testEndpoint.start(); @@ -161,7 +161,7 @@ public class AsyncCallsTest extends TestLogger { assertFalse("Rpc Endpoint had concurrent access", testEndpoint.hasConcurrentAccess()); assertFalse("Rpc Endpoint had concurrent access", concurrentAccess.get()); - assertTrue("call was not properly delayed", ((stop - start) / 1000000) >= delay); + assertTrue("call was not properly delayed", ((stop - start) / 1_000_000) >= delay); } // ------------------------------------------------------------------------