Repository: incubator-beam Updated Branches: refs/heads/master c59ca3868 -> d299e2c25
Add DirtyBit to represent whether Counters have been committed. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/72cfa709 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/72cfa709 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/72cfa709 Branch: refs/heads/master Commit: 72cfa709689496f02cbe00fdc3f281e45fa11b23 Parents: c59ca38 Author: Pei He <pe...@google.com> Authored: Tue Apr 19 18:21:40 2016 -0700 Committer: bchambers <bchamb...@google.com> Committed: Tue Apr 26 08:24:30 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/util/common/Counter.java | 459 ++++++++++++------- .../beam/sdk/util/common/CounterTest.java | 146 +++++- 2 files changed, 448 insertions(+), 157 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/72cfa709/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java index 6024576..9f9b0c1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/Counter.java @@ -25,6 +25,7 @@ import static com.google.common.base.Preconditions.checkArgument; import org.apache.beam.sdk.values.TypeDescriptor; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.AtomicDouble; import java.util.Objects; @@ -44,6 +45,14 @@ import javax.annotation.Nullable; * <p>Counters compare using value equality of their name, kind, and * cumulative value. Equal counters should have equal toString()s. * + * <p>After all possible mutations have completed, the reader should check + * {@link #isDirty} for every counter, otherwise updates may be lost. + * + * <p>A counter may become dirty without a corresponding update to the value. + * This generally will occur when the calls to {@code addValue()}, {@code committing()}, + * and {@code committed()} are interleaved such that the value is updated + * between the calls to committing and the read of the value. + * * @param <T> the type of values aggregated by this counter */ public abstract class Counter<T> { @@ -295,6 +304,76 @@ public abstract class Counter<T> { public abstract CounterMean<T> getMean(); /** + * Represents whether counters' values have been committed to the backend. + * + * <p>Runners can use this information to optimize counters updates. + * For example, if counters are committed, runners may choose to skip the updates. + * + * <p>Counters' state transition table: + * {@code + * Action\Current State COMMITTED DIRTY COMMITTING + * addValue() DIRTY DIRTY DIRTY + * committing() None COMMITTING None + * committed() None None COMMITTED + * } + */ + @VisibleForTesting + enum CommitState { + /** + * There are no local updates that haven't been committed to the backend. + */ + COMMITTED, + /** + * There are local updates that haven't been committed to the backend. + */ + DIRTY, + /** + * Local updates are committing to the backend, but are still pending. + */ + COMMITTING, + } + + /** + * Returns if the counter contains non-committed aggregate. + */ + public boolean isDirty() { + return commitState.get() != CommitState.COMMITTED; + } + + /** + * Changes the counter from {@code CommitState.DIRTY} to {@code CommitState.COMMITTING}. + * + * @return true if successful. False return indicates that the commit state + * was not in {@code CommitState.DIRTY}. + */ + public boolean committing() { + return commitState.compareAndSet(CommitState.DIRTY, CommitState.COMMITTING); + } + + /** + * Changes the counter from {@code CommitState.COMMITTING} to {@code CommitState.COMMITTED}. + * + * @return true if successful. + * + * <p>False return indicates that the counter was updated while the committing is pending. + * That counter update might or might not has been committed. The {@code commitState} has to + * stay in {@code CommitState.DIRTY}. + */ + public boolean committed() { + return commitState.compareAndSet(CommitState.COMMITTING, CommitState.COMMITTED); + } + + /** + * Sets the counter to {@code CommitState.DIRTY}. + * + * <p>Must be called at the end of {@link #addValue}, {@link #resetToValue}, + * {@link #resetMeanToValue}, and {@link #merge}. + */ + protected void setDirty() { + commitState.set(CommitState.DIRTY); + } + + /** * Returns a string representation of the Counter. Useful for debugging logs. * Example return value: "ElementCount:SUM(15)". */ @@ -382,9 +461,13 @@ public abstract class Counter<T> { /** The kind of aggregation function to apply to this counter. */ protected final AggregationKind kind; + /** The commit state of this counter. **/ + protected final AtomicReference<CommitState> commitState; + protected Counter(CounterName name, AggregationKind kind) { this.name = name; this.kind = kind; + this.commitState = new AtomicReference<>(CommitState.COMMITTED); } ////////////////////////////////////////////////////////////////////////////// @@ -425,27 +508,31 @@ public abstract class Counter<T> { @Override public LongCounter addValue(Long value) { - switch (kind) { - case SUM: - aggregate.addAndGet(value); - deltaAggregate.addAndGet(value); - break; - case MEAN: - addToMeanAndSet(value, mean); - addToMeanAndSet(value, deltaMean); - break; - case MAX: - maxAndSet(value, aggregate); - maxAndSet(value, deltaAggregate); - break; - case MIN: - minAndSet(value, aggregate); - minAndSet(value, deltaAggregate); - break; - default: - throw illegalArgumentException(); + try { + switch (kind) { + case SUM: + aggregate.addAndGet(value); + deltaAggregate.addAndGet(value); + break; + case MEAN: + addToMeanAndSet(value, mean); + addToMeanAndSet(value, deltaMean); + break; + case MAX: + maxAndSet(value, aggregate); + maxAndSet(value, deltaAggregate); + break; + case MIN: + minAndSet(value, aggregate); + minAndSet(value, deltaAggregate); + break; + default: + throw illegalArgumentException(); + } + return this; + } finally { + setDirty(); } - return this; } private void minAndSet(Long value, AtomicLong target) { @@ -500,26 +587,34 @@ public abstract class Counter<T> { @Override public Counter<Long> resetToValue(Long value) { - if (kind == MEAN) { - throw illegalArgumentException(); + try { + if (kind == MEAN) { + throw illegalArgumentException(); + } + aggregate.set(value); + deltaAggregate.set(value); + return this; + } finally { + setDirty(); } - aggregate.set(value); - deltaAggregate.set(value); - return this; } @Override public Counter<Long> resetMeanToValue(long elementCount, Long value) { - if (kind != MEAN) { - throw illegalArgumentException(); - } - if (elementCount < 0) { - throw new IllegalArgumentException("elementCount must be non-negative"); + try { + if (kind != MEAN) { + throw illegalArgumentException(); + } + if (elementCount < 0) { + throw new IllegalArgumentException("elementCount must be non-negative"); + } + LongCounterMean counterMean = new LongCounterMean(value, elementCount); + mean.set(counterMean); + deltaMean.set(counterMean); + return this; + } finally { + setDirty(); } - LongCounterMean counterMean = new LongCounterMean(value, elementCount); - mean.set(counterMean); - deltaMean.set(counterMean); - return this; } @Override @@ -541,20 +636,25 @@ public abstract class Counter<T> { @Override public Counter<Long> merge(Counter<Long> that) { - checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); - switch (kind) { - case SUM: - case MIN: - case MAX: - return addValue(that.getAggregate()); - case MEAN: - CounterMean<Long> thisCounterMean = this.getMean(); - CounterMean<Long> thatCounterMean = that.getMean(); - return resetMeanToValue( - thisCounterMean.getCount() + thatCounterMean.getCount(), - thisCounterMean.getAggregate() + thatCounterMean.getAggregate()); - default: - throw illegalArgumentException(); + try { + checkArgument( + this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); + switch (kind) { + case SUM: + case MIN: + case MAX: + return addValue(that.getAggregate()); + case MEAN: + CounterMean<Long> thisCounterMean = this.getMean(); + CounterMean<Long> thatCounterMean = that.getMean(); + return resetMeanToValue( + thisCounterMean.getCount() + thatCounterMean.getCount(), + thisCounterMean.getAggregate() + thatCounterMean.getAggregate()); + default: + throw illegalArgumentException(); + } + } finally { + setDirty(); } } @@ -620,27 +720,31 @@ public abstract class Counter<T> { @Override public DoubleCounter addValue(Double value) { - switch (kind) { - case SUM: - aggregate.addAndGet(value); - deltaAggregate.addAndGet(value); - break; - case MEAN: - addToMeanAndSet(value, mean); - addToMeanAndSet(value, deltaMean); - break; - case MAX: - maxAndSet(value, aggregate); - maxAndSet(value, deltaAggregate); - break; - case MIN: - minAndSet(value, aggregate); - minAndSet(value, deltaAggregate); - break; - default: - throw illegalArgumentException(); + try { + switch (kind) { + case SUM: + aggregate.addAndGet(value); + deltaAggregate.addAndGet(value); + break; + case MEAN: + addToMeanAndSet(value, mean); + addToMeanAndSet(value, deltaMean); + break; + case MAX: + maxAndSet(value, aggregate); + maxAndSet(value, deltaAggregate); + break; + case MIN: + minAndSet(value, aggregate); + minAndSet(value, deltaAggregate); + break; + default: + throw illegalArgumentException(); + } + return this; + } finally { + setDirty(); } - return this; } private void addToMeanAndSet(Double value, AtomicReference<DoubleCounterMean> target) { @@ -686,26 +790,34 @@ public abstract class Counter<T> { @Override public Counter<Double> resetToValue(Double value) { - if (kind == MEAN) { - throw illegalArgumentException(); + try { + if (kind == MEAN) { + throw illegalArgumentException(); + } + aggregate.set(value); + deltaAggregate.set(value); + return this; + } finally { + setDirty(); } - aggregate.set(value); - deltaAggregate.set(value); - return this; } @Override public Counter<Double> resetMeanToValue(long elementCount, Double value) { - if (kind != MEAN) { - throw illegalArgumentException(); - } - if (elementCount < 0) { - throw new IllegalArgumentException("elementCount must be non-negative"); + try { + if (kind != MEAN) { + throw illegalArgumentException(); + } + if (elementCount < 0) { + throw new IllegalArgumentException("elementCount must be non-negative"); + } + DoubleCounterMean counterMean = new DoubleCounterMean(value, elementCount); + mean.set(counterMean); + deltaMean.set(counterMean); + return this; + } finally { + setDirty(); } - DoubleCounterMean counterMean = new DoubleCounterMean(value, elementCount); - mean.set(counterMean); - deltaMean.set(counterMean); - return this; } @Override @@ -736,20 +848,25 @@ public abstract class Counter<T> { @Override public Counter<Double> merge(Counter<Double> that) { - checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); - switch (kind) { - case SUM: - case MIN: - case MAX: - return addValue(that.getAggregate()); - case MEAN: - CounterMean<Double> thisCounterMean = this.getMean(); - CounterMean<Double> thatCounterMean = that.getMean(); - return resetMeanToValue( - thisCounterMean.getCount() + thatCounterMean.getCount(), - thisCounterMean.getAggregate() + thatCounterMean.getAggregate()); - default: - throw illegalArgumentException(); + try { + checkArgument( + this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); + switch (kind) { + case SUM: + case MIN: + case MAX: + return addValue(that.getAggregate()); + case MEAN: + CounterMean<Double> thisCounterMean = this.getMean(); + CounterMean<Double> thatCounterMean = that.getMean(); + return resetMeanToValue( + thisCounterMean.getCount() + thatCounterMean.getCount(), + thisCounterMean.getAggregate() + thatCounterMean.getAggregate()); + default: + throw illegalArgumentException(); + } + } finally { + setDirty(); } } @@ -797,14 +914,18 @@ public abstract class Counter<T> { @Override public BooleanCounter addValue(Boolean value) { - if (kind.equals(AND) && !value) { - aggregate.set(value); - deltaAggregate.set(value); - } else if (kind.equals(OR) && value) { - aggregate.set(value); - deltaAggregate.set(value); + try { + if (kind.equals(AND) && !value) { + aggregate.set(value); + deltaAggregate.set(value); + } else if (kind.equals(OR) && value) { + aggregate.set(value); + deltaAggregate.set(value); + } + return this; + } finally { + setDirty(); } - return this; } @Override @@ -821,9 +942,13 @@ public abstract class Counter<T> { @Override public Counter<Boolean> resetToValue(Boolean value) { - aggregate.set(value); - deltaAggregate.set(value); - return this; + try { + aggregate.set(value); + deltaAggregate.set(value); + return this; + } finally { + setDirty(); + } } @Override @@ -849,8 +974,13 @@ public abstract class Counter<T> { @Override public Counter<Boolean> merge(Counter<Boolean> that) { - checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); - return addValue(that.getAggregate()); + try { + checkArgument( + this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); + return addValue(that.getAggregate()); + } finally { + setDirty(); + } } } @@ -968,27 +1098,31 @@ public abstract class Counter<T> { @Override public IntegerCounter addValue(Integer value) { - switch (kind) { - case SUM: - aggregate.getAndAdd(value); - deltaAggregate.getAndAdd(value); - break; - case MEAN: - addToMeanAndSet(value, mean); - addToMeanAndSet(value, deltaMean); - break; - case MAX: - maxAndSet(value, aggregate); - maxAndSet(value, deltaAggregate); - break; - case MIN: - minAndSet(value, aggregate); - minAndSet(value, deltaAggregate); - break; - default: - throw illegalArgumentException(); + try { + switch (kind) { + case SUM: + aggregate.getAndAdd(value); + deltaAggregate.getAndAdd(value); + break; + case MEAN: + addToMeanAndSet(value, mean); + addToMeanAndSet(value, deltaMean); + break; + case MAX: + maxAndSet(value, aggregate); + maxAndSet(value, deltaAggregate); + break; + case MIN: + minAndSet(value, aggregate); + minAndSet(value, deltaAggregate); + break; + default: + throw illegalArgumentException(); + } + return this; + } finally { + setDirty(); } - return this; } private void addToMeanAndSet(int value, AtomicReference<IntegerCounterMean> target) { @@ -1034,26 +1168,34 @@ public abstract class Counter<T> { @Override public Counter<Integer> resetToValue(Integer value) { - if (kind == MEAN) { - throw illegalArgumentException(); + try { + if (kind == MEAN) { + throw illegalArgumentException(); + } + aggregate.set(value); + deltaAggregate.set(value); + return this; + } finally { + setDirty(); } - aggregate.set(value); - deltaAggregate.set(value); - return this; } @Override public Counter<Integer> resetMeanToValue(long elementCount, Integer value) { - if (kind != MEAN) { - throw illegalArgumentException(); - } - if (elementCount < 0) { - throw new IllegalArgumentException("elementCount must be non-negative"); + try { + if (kind != MEAN) { + throw illegalArgumentException(); + } + if (elementCount < 0) { + throw new IllegalArgumentException("elementCount must be non-negative"); + } + IntegerCounterMean counterMean = new IntegerCounterMean(value, elementCount); + mean.set(counterMean); + deltaMean.set(counterMean); + return this; + } finally { + setDirty(); } - IntegerCounterMean counterMean = new IntegerCounterMean(value, elementCount); - mean.set(counterMean); - deltaMean.set(counterMean); - return this; } @Override @@ -1084,20 +1226,25 @@ public abstract class Counter<T> { @Override public Counter<Integer> merge(Counter<Integer> that) { - checkArgument(this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); - switch (kind) { - case SUM: - case MIN: - case MAX: - return addValue(that.getAggregate()); - case MEAN: - CounterMean<Integer> thisCounterMean = this.getMean(); - CounterMean<Integer> thatCounterMean = that.getMean(); - return resetMeanToValue( - thisCounterMean.getCount() + thatCounterMean.getCount(), - thisCounterMean.getAggregate() + thatCounterMean.getAggregate()); - default: - throw illegalArgumentException(); + try { + checkArgument( + this.isCompatibleWith(that), "Counters %s and %s are incompatible", this, that); + switch (kind) { + case SUM: + case MIN: + case MAX: + return addValue(that.getAggregate()); + case MEAN: + CounterMean<Integer> thisCounterMean = this.getMean(); + CounterMean<Integer> thatCounterMean = that.getMean(); + return resetMeanToValue( + thisCounterMean.getCount() + thatCounterMean.getCount(), + thisCounterMean.getAggregate() + thatCounterMean.getAggregate()); + default: + throw illegalArgumentException(); + } + } finally { + setDirty(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/72cfa709/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java index 5f75bb8..fb002de 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java @@ -29,8 +29,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import org.apache.beam.sdk.util.common.Counter.CommitState; import org.apache.beam.sdk.util.common.Counter.CounterMean; - import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -588,4 +588,148 @@ public class CounterTest { thrown.expectMessage("are incompatible"); left.merge(right); } + + @Test + public void testDirtyBit() { + Counter<Long> longSum = Counter.longs("long-sum", SUM); + Counter<Long> longMean = Counter.longs("long-mean", MEAN); + Counter<Double> doubleSum = Counter.doubles("double-sum", SUM); + Counter<Double> doubleMean = Counter.doubles("double-sum", MEAN); + Counter<Integer> intSum = Counter.ints("int-sum", SUM); + Counter<Integer> intMean = Counter.ints("int-sum", MEAN); + Counter<Boolean> boolAnd = Counter.booleans("and", AND); + + // Test counters are not dirty and are COMMITTED initially. + assertFalse(longSum.isDirty()); + assertFalse(longMean.isDirty()); + assertFalse(doubleSum.isDirty()); + assertFalse(doubleMean.isDirty()); + assertFalse(intSum.isDirty()); + assertFalse(intMean.isDirty()); + assertFalse(boolAnd.isDirty()); + + assertEquals(CommitState.COMMITTED, longSum.commitState.get()); + assertEquals(CommitState.COMMITTED, longMean.commitState.get()); + assertEquals(CommitState.COMMITTED, doubleSum.commitState.get()); + assertEquals(CommitState.COMMITTED, doubleMean.commitState.get()); + assertEquals(CommitState.COMMITTED, intSum.commitState.get()); + assertEquals(CommitState.COMMITTED, intMean.commitState.get()); + assertEquals(CommitState.COMMITTED, boolAnd.commitState.get()); + + // Test counters are dirty after mutating. + longSum.addValue(1L); + longMean.resetMeanToValue(1L, 1L); + doubleSum.addValue(1.0); + doubleMean.resetMeanToValue(1L, 1.0); + intSum.addValue(1); + intMean.resetMeanToValue(1, 1); + boolAnd.addValue(true); + + assertTrue(longSum.isDirty()); + assertTrue(longMean.isDirty()); + assertTrue(doubleSum.isDirty()); + assertTrue(doubleMean.isDirty()); + assertTrue(intSum.isDirty()); + assertTrue(intMean.isDirty()); + assertTrue(boolAnd.isDirty()); + + assertEquals(CommitState.DIRTY, longSum.commitState.get()); + assertEquals(CommitState.DIRTY, longMean.commitState.get()); + assertEquals(CommitState.DIRTY, doubleSum.commitState.get()); + assertEquals(CommitState.DIRTY, doubleMean.commitState.get()); + assertEquals(CommitState.DIRTY, intSum.commitState.get()); + assertEquals(CommitState.DIRTY, intMean.commitState.get()); + assertEquals(CommitState.DIRTY, boolAnd.commitState.get()); + + // Test counters are dirty and are COMMITTING. + assertTrue(longSum.committing()); + assertTrue(longMean.committing()); + assertTrue(doubleSum.committing()); + assertTrue(doubleMean.committing()); + assertTrue(intSum.committing()); + assertTrue(intMean.committing()); + assertTrue(boolAnd.committing()); + + assertTrue(longSum.isDirty()); + assertTrue(longMean.isDirty()); + assertTrue(doubleSum.isDirty()); + assertTrue(doubleMean.isDirty()); + assertTrue(intSum.isDirty()); + assertTrue(intMean.isDirty()); + assertTrue(boolAnd.isDirty()); + + assertEquals(CommitState.COMMITTING, longSum.commitState.get()); + assertEquals(CommitState.COMMITTING, longMean.commitState.get()); + assertEquals(CommitState.COMMITTING, doubleSum.commitState.get()); + assertEquals(CommitState.COMMITTING, doubleMean.commitState.get()); + assertEquals(CommitState.COMMITTING, intSum.commitState.get()); + assertEquals(CommitState.COMMITTING, intMean.commitState.get()); + assertEquals(CommitState.COMMITTING, boolAnd.commitState.get()); + + // Test counters are dirty again after mutating. + longSum.addValue(1L); + longMean.resetMeanToValue(1L, 1L); + doubleSum.addValue(1.0); + doubleMean.resetMeanToValue(1L, 1.0); + intSum.addValue(1); + intMean.resetMeanToValue(1, 1); + boolAnd.addValue(true); + + assertFalse(longSum.committed()); + assertFalse(longMean.committed()); + assertFalse(doubleSum.committed()); + assertFalse(doubleMean.committed()); + assertFalse(intSum.committed()); + assertFalse(intMean.committed()); + assertFalse(boolAnd.committed()); + + assertTrue(longSum.isDirty()); + assertTrue(longMean.isDirty()); + assertTrue(doubleSum.isDirty()); + assertTrue(doubleMean.isDirty()); + assertTrue(intSum.isDirty()); + assertTrue(intMean.isDirty()); + assertTrue(boolAnd.isDirty()); + + assertEquals(CommitState.DIRTY, longSum.commitState.get()); + assertEquals(CommitState.DIRTY, longMean.commitState.get()); + assertEquals(CommitState.DIRTY, doubleSum.commitState.get()); + assertEquals(CommitState.DIRTY, doubleMean.commitState.get()); + assertEquals(CommitState.DIRTY, intSum.commitState.get()); + assertEquals(CommitState.DIRTY, intMean.commitState.get()); + assertEquals(CommitState.DIRTY, boolAnd.commitState.get()); + + // Test counters are not dirty and are COMMITTED. + assertTrue(longSum.committing()); + assertTrue(longMean.committing()); + assertTrue(doubleSum.committing()); + assertTrue(doubleMean.committing()); + assertTrue(intSum.committing()); + assertTrue(intMean.committing()); + assertTrue(boolAnd.committing()); + + assertTrue(longSum.committed()); + assertTrue(longMean.committed()); + assertTrue(doubleSum.committed()); + assertTrue(doubleMean.committed()); + assertTrue(intSum.committed()); + assertTrue(intMean.committed()); + assertTrue(boolAnd.committed()); + + assertFalse(longSum.isDirty()); + assertFalse(longMean.isDirty()); + assertFalse(doubleSum.isDirty()); + assertFalse(doubleMean.isDirty()); + assertFalse(intSum.isDirty()); + assertFalse(intMean.isDirty()); + assertFalse(boolAnd.isDirty()); + + assertEquals(CommitState.COMMITTED, longSum.commitState.get()); + assertEquals(CommitState.COMMITTED, longMean.commitState.get()); + assertEquals(CommitState.COMMITTED, doubleSum.commitState.get()); + assertEquals(CommitState.COMMITTED, doubleMean.commitState.get()); + assertEquals(CommitState.COMMITTED, intSum.commitState.get()); + assertEquals(CommitState.COMMITTED, intMean.commitState.get()); + assertEquals(CommitState.COMMITTED, boolAnd.commitState.get()); + } }