Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 3323543b6 -> ed91dcdae


[GOBBLIN-460] CountUpAndDownLatch allows counting back up after reaching 0.

Closes #2331 from ibuenros/fix-countupdownlatch


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/ed91dcda
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/ed91dcda
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/ed91dcda

Branch: refs/heads/master
Commit: ed91dcdae38f8ce0f439ddf108907f0e0fed2c36
Parents: 3323543
Author: ibuenros <issac.buenros...@gmail.com>
Authored: Mon Apr 9 09:22:25 2018 -0700
Committer: Hung Tran <hut...@linkedin.com>
Committed: Mon Apr 9 09:22:25 2018 -0700

----------------------------------------------------------------------
 .../gobblin/runtime/CountUpAndDownLatch.java    | 22 ++++++++--
 .../runtime/CountUpAndDownLatchTest.java        | 46 +++++---------------
 2 files changed, 30 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ed91dcda/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CountUpAndDownLatch.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CountUpAndDownLatch.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CountUpAndDownLatch.java
index c65aa60..120536d 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CountUpAndDownLatch.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/CountUpAndDownLatch.java
@@ -32,27 +32,41 @@ class CountUpAndDownLatch extends CountDownLatch {
 
   public CountUpAndDownLatch(int count) {
     super(0);
-    this.phaser = new Phaser(count);
+    this.phaser = new Phaser(count) {
+      @Override
+      protected boolean onAdvance(int phase, int registeredParties) {
+        // Need to override onAdvance because phaser by default terminates 
whenever registered parties reaches 0
+        return false;
+      }
+    };
   }
 
   @Override
   public void await() throws InterruptedException {
-    this.phaser.awaitAdvance(0);
+    int phase = getPhase();
+    this.phaser.awaitAdvance(phase);
   }
 
   @Override
   public boolean await(long timeout, TimeUnit unit) throws 
InterruptedException {
     try {
-      this.phaser.awaitAdvanceInterruptibly(0, timeout, unit);
+      int phase = getPhase();
+      this.phaser.awaitAdvanceInterruptibly(phase, timeout, unit);
       return true;
     } catch (TimeoutException te) {
       return false;
     }
   }
 
+  private int getPhase() {
+    int phase = this.phaser.register();
+    this.phaser.arriveAndDeregister();
+    return phase;
+  }
+
   @Override
   public void countDown() {
-    this.phaser.arrive();
+    this.phaser.arriveAndDeregister();
   }
 
   public void countUp() {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/ed91dcda/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/CountUpAndDownLatchTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/CountUpAndDownLatchTest.java
 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/CountUpAndDownLatchTest.java
index 5bad68d..5fd3664 100644
--- 
a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/CountUpAndDownLatchTest.java
+++ 
b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/CountUpAndDownLatchTest.java
@@ -40,33 +40,25 @@ public class CountUpAndDownLatchTest {
       executorService = Executors.newFixedThreadPool(1);
 
       CountUpAndDownLatch countUpAndDownLatch = new CountUpAndDownLatch(1);
-      Future future = executorService.submit(new Waiter(countUpAndDownLatch));
 
-      try {
-        future.get(50, TimeUnit.MILLISECONDS);
-        Assert.fail();
-      } catch (TimeoutException te) {
-        // Expected
-      }
+      Assert.assertFalse(countUpAndDownLatch.await(50, TimeUnit.MILLISECONDS));
 
       countUpAndDownLatch.countUp();
-      try {
-        future.get(50, TimeUnit.MILLISECONDS);
-        Assert.fail();
-      } catch (TimeoutException te) {
-        // Expected
-      }
+      Assert.assertFalse(countUpAndDownLatch.await(50, TimeUnit.MILLISECONDS));
 
       countUpAndDownLatch.countDown();
-      try {
-        future.get(50, TimeUnit.MILLISECONDS);
-        Assert.fail();
-      } catch (TimeoutException te) {
-        // Expected
-      }
+      Assert.assertFalse(countUpAndDownLatch.await(50, TimeUnit.MILLISECONDS));
+
+      countUpAndDownLatch.countDown();
+      Assert.assertTrue(countUpAndDownLatch.await(1, TimeUnit.SECONDS));
+
+      // count-up again
+      countUpAndDownLatch.countUp();
+      // verify we will wait even though the latch was at 0 before
+      Assert.assertFalse(countUpAndDownLatch.await(50, TimeUnit.MILLISECONDS));
 
       countUpAndDownLatch.countDown();
-      future.get(1, TimeUnit.SECONDS);
+      Assert.assertTrue(countUpAndDownLatch.await(1, TimeUnit.SECONDS));
     } finally {
       if (executorService != null) {
         executorService.shutdownNow();
@@ -74,18 +66,4 @@ public class CountUpAndDownLatchTest {
     }
   }
 
-  @AllArgsConstructor
-  public static class Waiter implements Runnable {
-    private final CountDownLatch countDownLatch;
-
-    @Override
-    public void run() {
-      try {
-        this.countDownLatch.await();
-      } catch (InterruptedException ie) {
-        throw new RuntimeException();
-      }
-    }
-  }
-
 }

Reply via email to