Repository: incubator-beam
Updated Branches:
  refs/heads/master c59ca3868 -> d299e2c25


Add DirtyBit to represent whether Counters have been committed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/72cfa709
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/72cfa709
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/72cfa709

Branch: refs/heads/master
Commit: 72cfa709689496f02cbe00fdc3f281e45fa11b23
Parents: c59ca38
Author: Pei He <pe...@google.com>
Authored: Tue Apr 19 18:21:40 2016 -0700
Committer: bchambers <bchamb...@google.com>
Committed: Tue Apr 26 08:24:30 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/util/common/Counter.java    | 459 ++++++++++++-------
 .../beam/sdk/util/common/CounterTest.java       | 146 +++++-
 2 files changed, 448 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/72cfa709/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java
index 6024576..9f9b0c1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java
@@ -25,6 +25,7 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 
 import org.apache.beam.sdk.values.TypeDescriptor;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.AtomicDouble;
 
 import java.util.Objects;
@@ -44,6 +45,14 @@ import javax.annotation.Nullable;
  * <p>Counters compare using value equality of their name, kind, and
  * cumulative value.  Equal counters should have equal toString()s.
  *
+ * <p>After all possible mutations have completed, the reader should check
+ * {@link #isDirty} for every counter, otherwise updates may be lost.
+ *
+ * <p>A counter may become dirty without a corresponding update to the value.
+ * This generally will occur when the calls to {@code addValue()}, {@code 
committing()},
+ * and {@code committed()} are interleaved such that the value is updated
+ * between the calls to committing and the read of the value.
+ *
  * @param <T> the type of values aggregated by this counter
  */
 public abstract class Counter<T> {
@@ -295,6 +304,76 @@ public abstract class Counter<T> {
   public abstract CounterMean<T> getMean();
 
   /**
+   * Represents whether counters' values have been committed to the backend.
+   *
+   * <p>Runners can use this information to optimize counters updates.
+   * For example, if counters are committed, runners may choose to skip the 
updates.
+   *
+   * <p>Counters' state transition table:
+   * {@code
+   * Action\Current State         COMMITTED        DIRTY        COMMITTING
+   * addValue()                   DIRTY            DIRTY        DIRTY
+   * committing()                 None             COMMITTING   None
+   * committed()                  None             None         COMMITTED
+   * }
+   */
+  @VisibleForTesting
+  enum CommitState {
+    /**
+     * There are no local updates that haven't been committed to the backend.
+     */
+    COMMITTED,
+    /**
+     * There are local updates that haven't been committed to the backend.
+     */
+    DIRTY,
+    /**
+     * Local updates are committing to the backend, but are still pending.
+     */
+    COMMITTING,
+  }
+
+  /**
+   * Returns if the counter contains non-committed aggregate.
+   */
+  public boolean isDirty() {
+    return commitState.get() != CommitState.COMMITTED;
+  }
+
+  /**
+   * Changes the counter from {@code CommitState.DIRTY} to {@code 
CommitState.COMMITTING}.
+   *
+   * @return true if successful. False return indicates that the commit state
+   * was not in {@code CommitState.DIRTY}.
+   */
+  public boolean committing() {
+    return commitState.compareAndSet(CommitState.DIRTY, 
CommitState.COMMITTING);
+  }
+
+  /**
+   * Changes the counter from {@code CommitState.COMMITTING} to {@code 
CommitState.COMMITTED}.
+   *
+   * @return true if successful.
+   *
+   * <p>False return indicates that the counter was updated while the 
committing is pending.
+   * That counter update might or might not has been committed. The {@code 
commitState} has to
+   * stay in {@code CommitState.DIRTY}.
+   */
+  public boolean committed() {
+    return commitState.compareAndSet(CommitState.COMMITTING, 
CommitState.COMMITTED);
+  }
+
+  /**
+   * Sets the counter to {@code CommitState.DIRTY}.
+   *
+   * <p>Must be called at the end of {@link #addValue}, {@link #resetToValue},
+   * {@link #resetMeanToValue}, and {@link #merge}.
+   */
+  protected void setDirty() {
+    commitState.set(CommitState.DIRTY);
+  }
+
+  /**
    * Returns a string representation of the Counter. Useful for debugging logs.
    * Example return value: "ElementCount:SUM(15)".
    */
@@ -382,9 +461,13 @@ public abstract class Counter<T> {
   /** The kind of aggregation function to apply to this counter. */
   protected final AggregationKind kind;
 
+  /** The commit state of this counter. **/
+  protected final AtomicReference<CommitState> commitState;
+
   protected Counter(CounterName name, AggregationKind kind) {
     this.name = name;
     this.kind = kind;
+    this.commitState = new AtomicReference<>(CommitState.COMMITTED);
   }
 
   
//////////////////////////////////////////////////////////////////////////////
@@ -425,27 +508,31 @@ public abstract class Counter<T> {
 
     @Override
     public LongCounter addValue(Long value) {
-      switch (kind) {
-        case SUM:
-          aggregate.addAndGet(value);
-          deltaAggregate.addAndGet(value);
-          break;
-        case MEAN:
-          addToMeanAndSet(value, mean);
-          addToMeanAndSet(value, deltaMean);
-          break;
-        case MAX:
-          maxAndSet(value, aggregate);
-          maxAndSet(value, deltaAggregate);
-          break;
-        case MIN:
-          minAndSet(value, aggregate);
-          minAndSet(value, deltaAggregate);
-          break;
-        default:
-          throw illegalArgumentException();
+      try {
+        switch (kind) {
+          case SUM:
+            aggregate.addAndGet(value);
+            deltaAggregate.addAndGet(value);
+            break;
+          case MEAN:
+            addToMeanAndSet(value, mean);
+            addToMeanAndSet(value, deltaMean);
+            break;
+          case MAX:
+            maxAndSet(value, aggregate);
+            maxAndSet(value, deltaAggregate);
+            break;
+          case MIN:
+            minAndSet(value, aggregate);
+            minAndSet(value, deltaAggregate);
+            break;
+          default:
+            throw illegalArgumentException();
+        }
+        return this;
+      } finally {
+        setDirty();
       }
-      return this;
     }
 
     private void minAndSet(Long value, AtomicLong target) {
@@ -500,26 +587,34 @@ public abstract class Counter<T> {
 
     @Override
     public Counter<Long> resetToValue(Long value) {
-      if (kind == MEAN) {
-        throw illegalArgumentException();
+      try {
+        if (kind == MEAN) {
+          throw illegalArgumentException();
+        }
+        aggregate.set(value);
+        deltaAggregate.set(value);
+        return this;
+      } finally {
+        setDirty();
       }
-      aggregate.set(value);
-      deltaAggregate.set(value);
-      return this;
     }
 
     @Override
     public Counter<Long> resetMeanToValue(long elementCount, Long value) {
-      if (kind != MEAN) {
-        throw illegalArgumentException();
-      }
-      if (elementCount < 0) {
-        throw new IllegalArgumentException("elementCount must be 
non-negative");
+      try {
+        if (kind != MEAN) {
+          throw illegalArgumentException();
+        }
+        if (elementCount < 0) {
+          throw new IllegalArgumentException("elementCount must be 
non-negative");
+        }
+        LongCounterMean counterMean = new LongCounterMean(value, elementCount);
+        mean.set(counterMean);
+        deltaMean.set(counterMean);
+        return this;
+      } finally {
+        setDirty();
       }
-      LongCounterMean counterMean = new LongCounterMean(value, elementCount);
-      mean.set(counterMean);
-      deltaMean.set(counterMean);
-      return this;
     }
 
     @Override
@@ -541,20 +636,25 @@ public abstract class Counter<T> {
 
     @Override
     public Counter<Long> merge(Counter<Long> that) {
-      checkArgument(this.isCompatibleWith(that), "Counters %s and %s are 
incompatible", this, that);
-      switch (kind) {
-        case SUM:
-        case MIN:
-        case MAX:
-          return addValue(that.getAggregate());
-        case MEAN:
-          CounterMean<Long> thisCounterMean = this.getMean();
-          CounterMean<Long> thatCounterMean = that.getMean();
-          return resetMeanToValue(
-              thisCounterMean.getCount() + thatCounterMean.getCount(),
-              thisCounterMean.getAggregate() + thatCounterMean.getAggregate());
-        default:
-          throw illegalArgumentException();
+      try {
+        checkArgument(
+            this.isCompatibleWith(that), "Counters %s and %s are 
incompatible", this, that);
+        switch (kind) {
+          case SUM:
+          case MIN:
+          case MAX:
+            return addValue(that.getAggregate());
+          case MEAN:
+            CounterMean<Long> thisCounterMean = this.getMean();
+            CounterMean<Long> thatCounterMean = that.getMean();
+            return resetMeanToValue(
+                thisCounterMean.getCount() + thatCounterMean.getCount(),
+                thisCounterMean.getAggregate() + 
thatCounterMean.getAggregate());
+          default:
+            throw illegalArgumentException();
+        }
+      } finally {
+        setDirty();
       }
     }
 
@@ -620,27 +720,31 @@ public abstract class Counter<T> {
 
     @Override
     public DoubleCounter addValue(Double value) {
-      switch (kind) {
-        case SUM:
-          aggregate.addAndGet(value);
-          deltaAggregate.addAndGet(value);
-          break;
-        case MEAN:
-          addToMeanAndSet(value, mean);
-          addToMeanAndSet(value, deltaMean);
-          break;
-        case MAX:
-          maxAndSet(value, aggregate);
-          maxAndSet(value, deltaAggregate);
-          break;
-        case MIN:
-          minAndSet(value, aggregate);
-          minAndSet(value, deltaAggregate);
-          break;
-        default:
-          throw illegalArgumentException();
+      try {
+        switch (kind) {
+          case SUM:
+            aggregate.addAndGet(value);
+            deltaAggregate.addAndGet(value);
+            break;
+          case MEAN:
+            addToMeanAndSet(value, mean);
+            addToMeanAndSet(value, deltaMean);
+            break;
+          case MAX:
+            maxAndSet(value, aggregate);
+            maxAndSet(value, deltaAggregate);
+            break;
+          case MIN:
+            minAndSet(value, aggregate);
+            minAndSet(value, deltaAggregate);
+            break;
+          default:
+            throw illegalArgumentException();
+        }
+        return this;
+      } finally {
+        setDirty();
       }
-      return this;
     }
 
     private void addToMeanAndSet(Double value, 
AtomicReference<DoubleCounterMean> target) {
@@ -686,26 +790,34 @@ public abstract class Counter<T> {
 
     @Override
     public Counter<Double> resetToValue(Double value) {
-      if (kind == MEAN) {
-        throw illegalArgumentException();
+      try {
+        if (kind == MEAN) {
+          throw illegalArgumentException();
+        }
+        aggregate.set(value);
+        deltaAggregate.set(value);
+        return this;
+      } finally {
+        setDirty();
       }
-      aggregate.set(value);
-      deltaAggregate.set(value);
-      return this;
     }
 
     @Override
     public Counter<Double> resetMeanToValue(long elementCount, Double value) {
-      if (kind != MEAN) {
-        throw illegalArgumentException();
-      }
-      if (elementCount < 0) {
-        throw new IllegalArgumentException("elementCount must be 
non-negative");
+      try {
+        if (kind != MEAN) {
+          throw illegalArgumentException();
+        }
+        if (elementCount < 0) {
+          throw new IllegalArgumentException("elementCount must be 
non-negative");
+        }
+        DoubleCounterMean counterMean = new DoubleCounterMean(value, 
elementCount);
+        mean.set(counterMean);
+        deltaMean.set(counterMean);
+        return this;
+      } finally {
+        setDirty();
       }
-      DoubleCounterMean counterMean = new DoubleCounterMean(value, 
elementCount);
-      mean.set(counterMean);
-      deltaMean.set(counterMean);
-      return this;
     }
 
     @Override
@@ -736,20 +848,25 @@ public abstract class Counter<T> {
 
     @Override
     public Counter<Double> merge(Counter<Double> that) {
-      checkArgument(this.isCompatibleWith(that), "Counters %s and %s are 
incompatible", this, that);
-      switch (kind) {
-        case SUM:
-        case MIN:
-        case MAX:
-          return addValue(that.getAggregate());
-        case MEAN:
-          CounterMean<Double> thisCounterMean = this.getMean();
-          CounterMean<Double> thatCounterMean = that.getMean();
-          return resetMeanToValue(
-              thisCounterMean.getCount() + thatCounterMean.getCount(),
-              thisCounterMean.getAggregate() + thatCounterMean.getAggregate());
-        default:
-          throw illegalArgumentException();
+      try {
+        checkArgument(
+            this.isCompatibleWith(that), "Counters %s and %s are 
incompatible", this, that);
+        switch (kind) {
+          case SUM:
+          case MIN:
+          case MAX:
+            return addValue(that.getAggregate());
+          case MEAN:
+            CounterMean<Double> thisCounterMean = this.getMean();
+            CounterMean<Double> thatCounterMean = that.getMean();
+            return resetMeanToValue(
+                thisCounterMean.getCount() + thatCounterMean.getCount(),
+                thisCounterMean.getAggregate() + 
thatCounterMean.getAggregate());
+          default:
+            throw illegalArgumentException();
+        }
+      } finally {
+        setDirty();
       }
     }
 
@@ -797,14 +914,18 @@ public abstract class Counter<T> {
 
     @Override
     public BooleanCounter addValue(Boolean value) {
-      if (kind.equals(AND) && !value) {
-        aggregate.set(value);
-        deltaAggregate.set(value);
-      } else if (kind.equals(OR) && value) {
-        aggregate.set(value);
-        deltaAggregate.set(value);
+      try {
+        if (kind.equals(AND) && !value) {
+          aggregate.set(value);
+          deltaAggregate.set(value);
+        } else if (kind.equals(OR) && value) {
+          aggregate.set(value);
+          deltaAggregate.set(value);
+        }
+        return this;
+      } finally {
+        setDirty();
       }
-      return this;
     }
 
     @Override
@@ -821,9 +942,13 @@ public abstract class Counter<T> {
 
     @Override
     public Counter<Boolean> resetToValue(Boolean value) {
-      aggregate.set(value);
-      deltaAggregate.set(value);
-      return this;
+      try {
+        aggregate.set(value);
+        deltaAggregate.set(value);
+        return this;
+      } finally {
+        setDirty();
+      }
     }
 
     @Override
@@ -849,8 +974,13 @@ public abstract class Counter<T> {
 
     @Override
     public Counter<Boolean> merge(Counter<Boolean> that) {
-      checkArgument(this.isCompatibleWith(that), "Counters %s and %s are 
incompatible", this, that);
-      return addValue(that.getAggregate());
+      try {
+        checkArgument(
+            this.isCompatibleWith(that), "Counters %s and %s are 
incompatible", this, that);
+        return addValue(that.getAggregate());
+      } finally {
+        setDirty();
+      }
     }
   }
 
@@ -968,27 +1098,31 @@ public abstract class Counter<T> {
 
     @Override
     public IntegerCounter addValue(Integer value) {
-      switch (kind) {
-        case SUM:
-          aggregate.getAndAdd(value);
-          deltaAggregate.getAndAdd(value);
-          break;
-        case MEAN:
-          addToMeanAndSet(value, mean);
-          addToMeanAndSet(value, deltaMean);
-          break;
-        case MAX:
-          maxAndSet(value, aggregate);
-          maxAndSet(value, deltaAggregate);
-          break;
-        case MIN:
-          minAndSet(value, aggregate);
-          minAndSet(value, deltaAggregate);
-          break;
-        default:
-          throw illegalArgumentException();
+      try {
+        switch (kind) {
+          case SUM:
+            aggregate.getAndAdd(value);
+            deltaAggregate.getAndAdd(value);
+            break;
+          case MEAN:
+            addToMeanAndSet(value, mean);
+            addToMeanAndSet(value, deltaMean);
+            break;
+          case MAX:
+            maxAndSet(value, aggregate);
+            maxAndSet(value, deltaAggregate);
+            break;
+          case MIN:
+            minAndSet(value, aggregate);
+            minAndSet(value, deltaAggregate);
+            break;
+          default:
+            throw illegalArgumentException();
+        }
+        return this;
+      } finally {
+        setDirty();
       }
-      return this;
     }
 
     private void addToMeanAndSet(int value, 
AtomicReference<IntegerCounterMean> target) {
@@ -1034,26 +1168,34 @@ public abstract class Counter<T> {
 
     @Override
     public Counter<Integer> resetToValue(Integer value) {
-      if (kind == MEAN) {
-        throw illegalArgumentException();
+      try {
+        if (kind == MEAN) {
+          throw illegalArgumentException();
+        }
+        aggregate.set(value);
+        deltaAggregate.set(value);
+        return this;
+      } finally {
+        setDirty();
       }
-      aggregate.set(value);
-      deltaAggregate.set(value);
-      return this;
     }
 
     @Override
     public Counter<Integer> resetMeanToValue(long elementCount, Integer value) 
{
-      if (kind != MEAN) {
-        throw illegalArgumentException();
-      }
-      if (elementCount < 0) {
-        throw new IllegalArgumentException("elementCount must be 
non-negative");
+      try {
+        if (kind != MEAN) {
+          throw illegalArgumentException();
+        }
+        if (elementCount < 0) {
+          throw new IllegalArgumentException("elementCount must be 
non-negative");
+        }
+        IntegerCounterMean counterMean = new IntegerCounterMean(value, 
elementCount);
+        mean.set(counterMean);
+        deltaMean.set(counterMean);
+        return this;
+      } finally {
+        setDirty();
       }
-      IntegerCounterMean counterMean = new IntegerCounterMean(value, 
elementCount);
-      mean.set(counterMean);
-      deltaMean.set(counterMean);
-      return this;
     }
 
     @Override
@@ -1084,20 +1226,25 @@ public abstract class Counter<T> {
 
     @Override
     public Counter<Integer> merge(Counter<Integer> that) {
-      checkArgument(this.isCompatibleWith(that), "Counters %s and %s are 
incompatible", this, that);
-      switch (kind) {
-        case SUM:
-        case MIN:
-        case MAX:
-          return addValue(that.getAggregate());
-        case MEAN:
-          CounterMean<Integer> thisCounterMean = this.getMean();
-          CounterMean<Integer> thatCounterMean = that.getMean();
-          return resetMeanToValue(
-              thisCounterMean.getCount() + thatCounterMean.getCount(),
-              thisCounterMean.getAggregate() + thatCounterMean.getAggregate());
-        default:
-          throw illegalArgumentException();
+      try {
+        checkArgument(
+            this.isCompatibleWith(that), "Counters %s and %s are 
incompatible", this, that);
+        switch (kind) {
+          case SUM:
+          case MIN:
+          case MAX:
+            return addValue(that.getAggregate());
+          case MEAN:
+            CounterMean<Integer> thisCounterMean = this.getMean();
+            CounterMean<Integer> thatCounterMean = that.getMean();
+            return resetMeanToValue(
+                thisCounterMean.getCount() + thatCounterMean.getCount(),
+                thisCounterMean.getAggregate() + 
thatCounterMean.getAggregate());
+          default:
+            throw illegalArgumentException();
+        }
+      } finally {
+        setDirty();
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/72cfa709/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java
index 5f75bb8..fb002de 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java
@@ -29,8 +29,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import org.apache.beam.sdk.util.common.Counter.CommitState;
 import org.apache.beam.sdk.util.common.Counter.CounterMean;
-
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -588,4 +588,148 @@ public class CounterTest {
     thrown.expectMessage("are incompatible");
     left.merge(right);
   }
+
+  @Test
+  public void testDirtyBit() {
+    Counter<Long> longSum = Counter.longs("long-sum", SUM);
+    Counter<Long> longMean = Counter.longs("long-mean", MEAN);
+    Counter<Double> doubleSum = Counter.doubles("double-sum", SUM);
+    Counter<Double> doubleMean = Counter.doubles("double-sum", MEAN);
+    Counter<Integer> intSum = Counter.ints("int-sum", SUM);
+    Counter<Integer> intMean = Counter.ints("int-sum", MEAN);
+    Counter<Boolean> boolAnd = Counter.booleans("and", AND);
+
+    // Test counters are not dirty and are COMMITTED initially.
+    assertFalse(longSum.isDirty());
+    assertFalse(longMean.isDirty());
+    assertFalse(doubleSum.isDirty());
+    assertFalse(doubleMean.isDirty());
+    assertFalse(intSum.isDirty());
+    assertFalse(intMean.isDirty());
+    assertFalse(boolAnd.isDirty());
+
+    assertEquals(CommitState.COMMITTED, longSum.commitState.get());
+    assertEquals(CommitState.COMMITTED, longMean.commitState.get());
+    assertEquals(CommitState.COMMITTED, doubleSum.commitState.get());
+    assertEquals(CommitState.COMMITTED, doubleMean.commitState.get());
+    assertEquals(CommitState.COMMITTED, intSum.commitState.get());
+    assertEquals(CommitState.COMMITTED, intMean.commitState.get());
+    assertEquals(CommitState.COMMITTED, boolAnd.commitState.get());
+
+    // Test counters are dirty after mutating.
+    longSum.addValue(1L);
+    longMean.resetMeanToValue(1L, 1L);
+    doubleSum.addValue(1.0);
+    doubleMean.resetMeanToValue(1L, 1.0);
+    intSum.addValue(1);
+    intMean.resetMeanToValue(1, 1);
+    boolAnd.addValue(true);
+
+    assertTrue(longSum.isDirty());
+    assertTrue(longMean.isDirty());
+    assertTrue(doubleSum.isDirty());
+    assertTrue(doubleMean.isDirty());
+    assertTrue(intSum.isDirty());
+    assertTrue(intMean.isDirty());
+    assertTrue(boolAnd.isDirty());
+
+    assertEquals(CommitState.DIRTY, longSum.commitState.get());
+    assertEquals(CommitState.DIRTY, longMean.commitState.get());
+    assertEquals(CommitState.DIRTY, doubleSum.commitState.get());
+    assertEquals(CommitState.DIRTY, doubleMean.commitState.get());
+    assertEquals(CommitState.DIRTY, intSum.commitState.get());
+    assertEquals(CommitState.DIRTY, intMean.commitState.get());
+    assertEquals(CommitState.DIRTY, boolAnd.commitState.get());
+
+    // Test counters are dirty and are COMMITTING.
+    assertTrue(longSum.committing());
+    assertTrue(longMean.committing());
+    assertTrue(doubleSum.committing());
+    assertTrue(doubleMean.committing());
+    assertTrue(intSum.committing());
+    assertTrue(intMean.committing());
+    assertTrue(boolAnd.committing());
+
+    assertTrue(longSum.isDirty());
+    assertTrue(longMean.isDirty());
+    assertTrue(doubleSum.isDirty());
+    assertTrue(doubleMean.isDirty());
+    assertTrue(intSum.isDirty());
+    assertTrue(intMean.isDirty());
+    assertTrue(boolAnd.isDirty());
+
+    assertEquals(CommitState.COMMITTING, longSum.commitState.get());
+    assertEquals(CommitState.COMMITTING, longMean.commitState.get());
+    assertEquals(CommitState.COMMITTING, doubleSum.commitState.get());
+    assertEquals(CommitState.COMMITTING, doubleMean.commitState.get());
+    assertEquals(CommitState.COMMITTING, intSum.commitState.get());
+    assertEquals(CommitState.COMMITTING, intMean.commitState.get());
+    assertEquals(CommitState.COMMITTING, boolAnd.commitState.get());
+
+    // Test counters are dirty again after mutating.
+    longSum.addValue(1L);
+    longMean.resetMeanToValue(1L, 1L);
+    doubleSum.addValue(1.0);
+    doubleMean.resetMeanToValue(1L, 1.0);
+    intSum.addValue(1);
+    intMean.resetMeanToValue(1, 1);
+    boolAnd.addValue(true);
+
+    assertFalse(longSum.committed());
+    assertFalse(longMean.committed());
+    assertFalse(doubleSum.committed());
+    assertFalse(doubleMean.committed());
+    assertFalse(intSum.committed());
+    assertFalse(intMean.committed());
+    assertFalse(boolAnd.committed());
+
+    assertTrue(longSum.isDirty());
+    assertTrue(longMean.isDirty());
+    assertTrue(doubleSum.isDirty());
+    assertTrue(doubleMean.isDirty());
+    assertTrue(intSum.isDirty());
+    assertTrue(intMean.isDirty());
+    assertTrue(boolAnd.isDirty());
+
+    assertEquals(CommitState.DIRTY, longSum.commitState.get());
+    assertEquals(CommitState.DIRTY, longMean.commitState.get());
+    assertEquals(CommitState.DIRTY, doubleSum.commitState.get());
+    assertEquals(CommitState.DIRTY, doubleMean.commitState.get());
+    assertEquals(CommitState.DIRTY, intSum.commitState.get());
+    assertEquals(CommitState.DIRTY, intMean.commitState.get());
+    assertEquals(CommitState.DIRTY, boolAnd.commitState.get());
+
+    // Test counters are not dirty and are COMMITTED.
+    assertTrue(longSum.committing());
+    assertTrue(longMean.committing());
+    assertTrue(doubleSum.committing());
+    assertTrue(doubleMean.committing());
+    assertTrue(intSum.committing());
+    assertTrue(intMean.committing());
+    assertTrue(boolAnd.committing());
+
+    assertTrue(longSum.committed());
+    assertTrue(longMean.committed());
+    assertTrue(doubleSum.committed());
+    assertTrue(doubleMean.committed());
+    assertTrue(intSum.committed());
+    assertTrue(intMean.committed());
+    assertTrue(boolAnd.committed());
+
+    assertFalse(longSum.isDirty());
+    assertFalse(longMean.isDirty());
+    assertFalse(doubleSum.isDirty());
+    assertFalse(doubleMean.isDirty());
+    assertFalse(intSum.isDirty());
+    assertFalse(intMean.isDirty());
+    assertFalse(boolAnd.isDirty());
+
+    assertEquals(CommitState.COMMITTED, longSum.commitState.get());
+    assertEquals(CommitState.COMMITTED, longMean.commitState.get());
+    assertEquals(CommitState.COMMITTED, doubleSum.commitState.get());
+    assertEquals(CommitState.COMMITTED, doubleMean.commitState.get());
+    assertEquals(CommitState.COMMITTED, intSum.commitState.get());
+    assertEquals(CommitState.COMMITTED, intMean.commitState.get());
+    assertEquals(CommitState.COMMITTED, boolAnd.commitState.get());
+  }
 }

Reply via email to