Repository: incubator-gobblin Updated Branches: refs/heads/master 3323543b6 -> ed91dcdae
[GOBBLIN-460] CountUpAndDownLatch allows counting back up after reaching 0. Closes #2331 from ibuenros/fix-countupdownlatch Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ed91dcda Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ed91dcda Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ed91dcda Branch: refs/heads/master Commit: ed91dcdae38f8ce0f439ddf108907f0e0fed2c36 Parents: 3323543 Author: ibuenros <issac.buenros...@gmail.com> Authored: Mon Apr 9 09:22:25 2018 -0700 Committer: Hung Tran <hut...@linkedin.com> Committed: Mon Apr 9 09:22:25 2018 -0700 ---------------------------------------------------------------------- .../gobblin/runtime/CountUpAndDownLatch.java | 22 ++++++++-- .../runtime/CountUpAndDownLatchTest.java | 46 +++++--------------- 2 files changed, 30 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ed91dcda/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CountUpAndDownLatch.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CountUpAndDownLatch.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CountUpAndDownLatch.java index c65aa60..120536d 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CountUpAndDownLatch.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CountUpAndDownLatch.java @@ -32,27 +32,41 @@ class CountUpAndDownLatch extends CountDownLatch { public CountUpAndDownLatch(int count) { super(0); - this.phaser = new Phaser(count); + this.phaser = new Phaser(count) { + @Override + protected boolean onAdvance(int phase, int registeredParties) { + // Need to override onAdvance because phaser by default terminates whenever registered parties reaches 0 + return false; + } + }; } @Override public void await() throws InterruptedException { - this.phaser.awaitAdvance(0); + int phase = getPhase(); + this.phaser.awaitAdvance(phase); } @Override public boolean await(long timeout, TimeUnit unit) throws InterruptedException { try { - this.phaser.awaitAdvanceInterruptibly(0, timeout, unit); + int phase = getPhase(); + this.phaser.awaitAdvanceInterruptibly(phase, timeout, unit); return true; } catch (TimeoutException te) { return false; } } + private int getPhase() { + int phase = this.phaser.register(); + this.phaser.arriveAndDeregister(); + return phase; + } + @Override public void countDown() { - this.phaser.arrive(); + this.phaser.arriveAndDeregister(); } public void countUp() { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ed91dcda/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/CountUpAndDownLatchTest.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/CountUpAndDownLatchTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/CountUpAndDownLatchTest.java index 5bad68d..5fd3664 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/CountUpAndDownLatchTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/CountUpAndDownLatchTest.java @@ -40,33 +40,25 @@ public class CountUpAndDownLatchTest { executorService = Executors.newFixedThreadPool(1); CountUpAndDownLatch countUpAndDownLatch = new CountUpAndDownLatch(1); - Future future = executorService.submit(new Waiter(countUpAndDownLatch)); - try { - future.get(50, TimeUnit.MILLISECONDS); - Assert.fail(); - } catch (TimeoutException te) { - // Expected - } + Assert.assertFalse(countUpAndDownLatch.await(50, TimeUnit.MILLISECONDS)); countUpAndDownLatch.countUp(); - try { - future.get(50, TimeUnit.MILLISECONDS); - Assert.fail(); - } catch (TimeoutException te) { - // Expected - } + Assert.assertFalse(countUpAndDownLatch.await(50, TimeUnit.MILLISECONDS)); countUpAndDownLatch.countDown(); - try { - future.get(50, TimeUnit.MILLISECONDS); - Assert.fail(); - } catch (TimeoutException te) { - // Expected - } + Assert.assertFalse(countUpAndDownLatch.await(50, TimeUnit.MILLISECONDS)); + + countUpAndDownLatch.countDown(); + Assert.assertTrue(countUpAndDownLatch.await(1, TimeUnit.SECONDS)); + + // count-up again + countUpAndDownLatch.countUp(); + // verify we will wait even though the latch was at 0 before + Assert.assertFalse(countUpAndDownLatch.await(50, TimeUnit.MILLISECONDS)); countUpAndDownLatch.countDown(); - future.get(1, TimeUnit.SECONDS); + Assert.assertTrue(countUpAndDownLatch.await(1, TimeUnit.SECONDS)); } finally { if (executorService != null) { executorService.shutdownNow(); @@ -74,18 +66,4 @@ public class CountUpAndDownLatchTest { } } - @AllArgsConstructor - public static class Waiter implements Runnable { - private final CountDownLatch countDownLatch; - - @Override - public void run() { - try { - this.countDownLatch.await(); - } catch (InterruptedException ie) { - throw new RuntimeException(); - } - } - } - }