Move InMemoryStateInternals to runners/core-java
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/af391b88 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/af391b88 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/af391b88 Branch: refs/heads/master Commit: af391b88cfc38659f594799bb58b6090a7bcd3a4 Parents: 2b6698d Author: Kenneth Knowles <k...@google.com> Authored: Thu Jan 26 21:03:42 2017 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Mon Feb 6 09:26:06 2017 -0800 ---------------------------------------------------------------------- .../runners/core/InMemoryStateInternals.java | 442 +++++++++++++++++++ .../core/TestInMemoryStateInternals.java | 65 +++ .../core/GroupAlsoByWindowsProperties.java | 1 - .../core/InMemoryStateInternalsTest.java | 359 +++++++++++++++ .../core/MergingActiveWindowSetTest.java | 1 - .../beam/runners/core/ReduceFnTester.java | 1 - .../beam/runners/core/SideInputHandlerTest.java | 1 - .../beam/runners/core/SplittableParDoTest.java | 1 - .../triggers/TriggerStateMachineTester.java | 2 +- .../CopyOnAccessInMemoryStateInternals.java | 12 +- .../spark/translation/TranslationUtils.java | 3 +- .../sdk/util/state/InMemoryStateInternals.java | 430 ------------------ .../util/state/TestInMemoryStateInternals.java | 61 --- .../util/state/InMemoryStateInternalsTest.java | 348 --------------- 14 files changed, 874 insertions(+), 853 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/af391b88/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java new file mode 100644 index 0000000..059e32d --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; +import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.util.CombineFnUtil; +import org.apache.beam.sdk.util.state.AccumulatorCombiningState; +import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.ReadableState; +import org.apache.beam.sdk.util.state.State; +import org.apache.beam.sdk.util.state.StateContext; +import org.apache.beam.sdk.util.state.StateContexts; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.apache.beam.sdk.util.state.StateTable; +import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.StateTag.StateBinder; +import org.apache.beam.sdk.util.state.ValueState; +import org.apache.beam.sdk.util.state.WatermarkHoldState; +import org.joda.time.Instant; + +/** + * In-memory implementation of {@link StateInternals}. Used in {@code BatchModeExecutionContext} + * and for running tests that need state. + */ +@Experimental(Kind.STATE) +public class InMemoryStateInternals<K> implements StateInternals<K> { + + public static <K> InMemoryStateInternals<K> forKey(K key) { + return new InMemoryStateInternals<>(key); + } + + private final K key; + + protected InMemoryStateInternals(K key) { + this.key = key; + } + + @Override + public K getKey() { + return key; + } + + /** + * Interface common to all in-memory state cells. Includes ability to see whether a cell has been + * cleared and the ability to create a clone of the contents. + */ + public interface InMemoryState<T extends InMemoryState<T>> { + boolean isCleared(); + T copy(); + } + + protected final StateTable<K> inMemoryState = new StateTable<K>() { + @Override + protected StateBinder<K> binderForNamespace(StateNamespace namespace, StateContext<?> c) { + return new InMemoryStateBinder<K>(key, c); + } + }; + + public void clear() { + inMemoryState.clear(); + } + + /** + * Return true if the given state is empty. This is used by the test framework to make sure + * that the state has been properly cleaned up. + */ + protected boolean isEmptyForTesting(State state) { + return ((InMemoryState<?>) state).isCleared(); + } + + @Override + public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) { + return inMemoryState.get(namespace, address, StateContexts.nullContext()); + } + + @Override + public <T extends State> T state( + StateNamespace namespace, StateTag<? super K, T> address, final StateContext<?> c) { + return inMemoryState.get(namespace, address, c); + } + + /** + * A {@link StateBinder} that returns In Memory {@link State} objects. + */ + public static class InMemoryStateBinder<K> implements StateBinder<K> { + private final K key; + private final StateContext<?> c; + + public InMemoryStateBinder(K key, StateContext<?> c) { + this.key = key; + this.c = c; + } + + @Override + public <T> ValueState<T> bindValue( + StateTag<? super K, ValueState<T>> address, Coder<T> coder) { + return new InMemoryValue<T>(); + } + + @Override + public <T> BagState<T> bindBag( + final StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) { + return new InMemoryBag<T>(); + } + + @Override + public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> + bindCombiningValue( + StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + Coder<AccumT> accumCoder, + final CombineFn<InputT, AccumT, OutputT> combineFn) { + return new InMemoryCombiningValue<K, InputT, AccumT, OutputT>(key, combineFn.<K>asKeyedFn()); + } + + @Override + public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( + StateTag<? super K, WatermarkHoldState<W>> address, + OutputTimeFn<? super W> outputTimeFn) { + return new InMemoryWatermarkHold<W>(outputTimeFn); + } + + @Override + public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> + bindKeyedCombiningValue( + StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + Coder<AccumT> accumCoder, + KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { + return new InMemoryCombiningValue<K, InputT, AccumT, OutputT>(key, combineFn); + } + + @Override + public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> + bindKeyedCombiningValueWithContext( + StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + Coder<AccumT> accumCoder, + KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) { + return bindKeyedCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c)); + } + } + + /** + * An {@link InMemoryState} implementation of {@link ValueState}. + */ + public static final class InMemoryValue<T> + implements ValueState<T>, InMemoryState<InMemoryValue<T>> { + private boolean isCleared = true; + private T value = null; + + @Override + public void clear() { + // Even though we're clearing we can't remove this from the in-memory state map, since + // other users may already have a handle on this Value. + value = null; + isCleared = true; + } + + @Override + public InMemoryValue<T> readLater() { + return this; + } + + @Override + public T read() { + return value; + } + + @Override + public void write(T input) { + isCleared = false; + this.value = input; + } + + @Override + public InMemoryValue<T> copy() { + InMemoryValue<T> that = new InMemoryValue<>(); + if (!this.isCleared) { + that.isCleared = this.isCleared; + that.value = this.value; + } + return that; + } + + @Override + public boolean isCleared() { + return isCleared; + } + } + + /** + * An {@link InMemoryState} implementation of {@link WatermarkHoldState}. + */ + public static final class InMemoryWatermarkHold<W extends BoundedWindow> + implements WatermarkHoldState<W>, InMemoryState<InMemoryWatermarkHold<W>> { + + private final OutputTimeFn<? super W> outputTimeFn; + + @Nullable + private Instant combinedHold = null; + + public InMemoryWatermarkHold(OutputTimeFn<? super W> outputTimeFn) { + this.outputTimeFn = outputTimeFn; + } + + @Override + public InMemoryWatermarkHold<W> readLater() { + return this; + } + + @Override + public void clear() { + // Even though we're clearing we can't remove this from the in-memory state map, since + // other users may already have a handle on this WatermarkBagInternal. + combinedHold = null; + } + + @Override + public Instant read() { + return combinedHold; + } + + @Override + public void add(Instant outputTime) { + combinedHold = combinedHold == null ? outputTime + : outputTimeFn.combine(combinedHold, outputTime); + } + + @Override + public boolean isCleared() { + return combinedHold == null; + } + + @Override + public ReadableState<Boolean> isEmpty() { + return new ReadableState<Boolean>() { + @Override + public ReadableState<Boolean> readLater() { + return this; + } + @Override + public Boolean read() { + return combinedHold == null; + } + }; + } + + @Override + public OutputTimeFn<? super W> getOutputTimeFn() { + return outputTimeFn; + } + + @Override + public String toString() { + return Objects.toString(combinedHold); + } + + @Override + public InMemoryWatermarkHold<W> copy() { + InMemoryWatermarkHold<W> that = + new InMemoryWatermarkHold<>(outputTimeFn); + that.combinedHold = this.combinedHold; + return that; + } + } + + /** + * An {@link InMemoryState} implementation of {@link AccumulatorCombiningState}. + */ + public static final class InMemoryCombiningValue<K, InputT, AccumT, OutputT> + implements AccumulatorCombiningState<InputT, AccumT, OutputT>, + InMemoryState<InMemoryCombiningValue<K, InputT, AccumT, OutputT>> { + private final K key; + private boolean isCleared = true; + private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn; + private AccumT accum; + + public InMemoryCombiningValue( + K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { + this.key = key; + this.combineFn = combineFn; + accum = combineFn.createAccumulator(key); + } + + @Override + public InMemoryCombiningValue<K, InputT, AccumT, OutputT> readLater() { + return this; + } + + @Override + public void clear() { + // Even though we're clearing we can't remove this from the in-memory state map, since + // other users may already have a handle on this CombiningValue. + accum = combineFn.createAccumulator(key); + isCleared = true; + } + + @Override + public OutputT read() { + return combineFn.extractOutput(key, accum); + } + + @Override + public void add(InputT input) { + isCleared = false; + accum = combineFn.addInput(key, accum, input); + } + + @Override + public AccumT getAccum() { + return accum; + } + + @Override + public ReadableState<Boolean> isEmpty() { + return new ReadableState<Boolean>() { + @Override + public ReadableState<Boolean> readLater() { + return this; + } + @Override + public Boolean read() { + return isCleared; + } + }; + } + + @Override + public void addAccum(AccumT accum) { + isCleared = false; + this.accum = combineFn.mergeAccumulators(key, Arrays.asList(this.accum, accum)); + } + + @Override + public AccumT mergeAccumulators(Iterable<AccumT> accumulators) { + return combineFn.mergeAccumulators(key, accumulators); + } + + @Override + public boolean isCleared() { + return isCleared; + } + + @Override + public InMemoryCombiningValue<K, InputT, AccumT, OutputT> copy() { + InMemoryCombiningValue<K, InputT, AccumT, OutputT> that = + new InMemoryCombiningValue<>(key, combineFn); + if (!this.isCleared) { + that.isCleared = this.isCleared; + that.addAccum(accum); + } + return that; + } + } + + /** + * An {@link InMemoryState} implementation of {@link BagState}. + */ + public static final class InMemoryBag<T> implements BagState<T>, InMemoryState<InMemoryBag<T>> { + private List<T> contents = new ArrayList<>(); + + @Override + public void clear() { + // Even though we're clearing we can't remove this from the in-memory state map, since + // other users may already have a handle on this Bag. + // The result of get/read below must be stable for the lifetime of the bundle within which it + // was generated. In batch and direct runners the bundle lifetime can be + // greater than the window lifetime, in which case this method can be called while + // the result is still in use. We protect against this by hot-swapping instead of + // clearing the contents. + contents = new ArrayList<>(); + } + + @Override + public InMemoryBag<T> readLater() { + return this; + } + + @Override + public Iterable<T> read() { + return contents; + } + + @Override + public void add(T input) { + contents.add(input); + } + + @Override + public boolean isCleared() { + return contents.isEmpty(); + } + + @Override + public ReadableState<Boolean> isEmpty() { + return new ReadableState<Boolean>() { + @Override + public ReadableState<Boolean> readLater() { + return this; + } + + @Override + public Boolean read() { + return contents.isEmpty(); + } + }; + } + + @Override + public InMemoryBag<T> copy() { + InMemoryBag<T> that = new InMemoryBag<>(); + that.contents.addAll(this.contents); + return that; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/af391b88/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java new file mode 100644 index 0000000..60754e2 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.beam.sdk.util.state.State; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.WatermarkHoldState; +import org.joda.time.Instant; + +/** + * Simulates state like {@link InMemoryStateInternals} and provides some extra helper methods. + */ +public class TestInMemoryStateInternals<K> extends InMemoryStateInternals<K> { + public TestInMemoryStateInternals(K key) { + super(key); + } + + public Set<StateTag<? super K, ?>> getTagsInUse(StateNamespace namespace) { + Set<StateTag<? super K, ?>> inUse = new HashSet<>(); + for (Map.Entry<StateTag<? super K, ?>, State> entry : + inMemoryState.getTagsInUse(namespace).entrySet()) { + if (!isEmptyForTesting(entry.getValue())) { + inUse.add(entry.getKey()); + } + } + return inUse; + } + + public Set<StateNamespace> getNamespacesInUse() { + return inMemoryState.getNamespacesInUse(); + } + + /** Return the earliest output watermark hold in state, or null if none. */ + public Instant earliestWatermarkHold() { + Instant minimum = null; + for (State storage : inMemoryState.values()) { + if (storage instanceof WatermarkHoldState) { + Instant hold = ((WatermarkHoldState<?>) storage).read(); + if (minimum == null || (hold != null && hold.isBefore(minimum))) { + minimum = hold; + } + } + } + return minimum; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/af391b88/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java index ef01106..fedc4ca 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java @@ -51,7 +51,6 @@ import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.InMemoryStateInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.KV; http://git-wip-us.apache.org/repos/asf/beam/blob/af391b88/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java new file mode 100644 index 0000000..c75399c --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; + +import java.util.Arrays; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.util.state.AccumulatorCombiningState; +import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.CombiningState; +import org.apache.beam.sdk.util.state.ReadableState; +import org.apache.beam.sdk.util.state.StateMerging; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.apache.beam.sdk.util.state.StateNamespaceForTest; +import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.StateTags; +import org.apache.beam.sdk.util.state.ValueState; +import org.apache.beam.sdk.util.state.WatermarkHoldState; +import org.hamcrest.Matchers; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link InMemoryStateInternals}. + */ +@RunWith(JUnit4.class) +public class InMemoryStateInternalsTest { + private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10)); + private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1"); + private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2"); + private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3"); + + private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR = + StateTags.value("stringValue", StringUtf8Coder.of()); + private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>> + SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal( + "sumInteger", VarIntCoder.of(), Sum.ofIntegers()); + private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR = + StateTags.bag("stringBag", StringUtf8Coder.of()); + private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> + WATERMARK_EARLIEST_ADDR = + StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp()); + private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> + WATERMARK_LATEST_ADDR = + StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp()); + private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR = + StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow()); + + InMemoryStateInternals<String> underTest = InMemoryStateInternals.forKey("dummyKey"); + + @Test + public void testValue() throws Exception { + ValueState<String> value = underTest.state(NAMESPACE_1, STRING_VALUE_ADDR); + + // State instances are cached, but depend on the namespace. + assertThat(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), Matchers.sameInstance(value)); + assertThat( + underTest.state(NAMESPACE_2, STRING_VALUE_ADDR), + Matchers.not(Matchers.sameInstance(value))); + + assertThat(value.read(), Matchers.nullValue()); + value.write("hello"); + assertThat(value.read(), Matchers.equalTo("hello")); + value.write("world"); + assertThat(value.read(), Matchers.equalTo("world")); + + value.clear(); + assertThat(value.read(), Matchers.nullValue()); + assertThat(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), Matchers.sameInstance(value)); + } + + @Test + public void testBag() throws Exception { + BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); + + // State instances are cached, but depend on the namespace. + assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR)); + assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR))); + + assertThat(value.read(), Matchers.emptyIterable()); + value.add("hello"); + assertThat(value.read(), Matchers.containsInAnyOrder("hello")); + + value.add("world"); + assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world")); + + value.clear(); + assertThat(value.read(), Matchers.emptyIterable()); + assertThat(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), Matchers.sameInstance(value)); + } + + @Test + public void testBagIsEmpty() throws Exception { + BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); + + assertThat(value.isEmpty().read(), Matchers.is(true)); + ReadableState<Boolean> readFuture = value.isEmpty(); + value.add("hello"); + assertThat(readFuture.read(), Matchers.is(false)); + + value.clear(); + assertThat(readFuture.read(), Matchers.is(true)); + } + + @Test + public void testMergeBagIntoSource() throws Exception { + BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); + BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR); + + bag1.add("Hello"); + bag2.add("World"); + bag1.add("!"); + + StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1); + + // Reading the merged bag gets both the contents + assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", "!")); + assertThat(bag2.read(), Matchers.emptyIterable()); + } + + @Test + public void testMergeBagIntoNewNamespace() throws Exception { + BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); + BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR); + BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR); + + bag1.add("Hello"); + bag2.add("World"); + bag1.add("!"); + + StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3); + + // Reading the merged bag gets both the contents + assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", "!")); + assertThat(bag1.read(), Matchers.emptyIterable()); + assertThat(bag2.read(), Matchers.emptyIterable()); + } + + @Test + public void testCombiningValue() throws Exception { + CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + + // State instances are cached, but depend on the namespace. + assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR)); + assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR))); + + assertThat(value.read(), Matchers.equalTo(0)); + value.add(2); + assertThat(value.read(), Matchers.equalTo(2)); + + value.add(3); + assertThat(value.read(), Matchers.equalTo(5)); + + value.clear(); + assertThat(value.read(), Matchers.equalTo(0)); + assertThat(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), Matchers.sameInstance(value)); + } + + @Test + public void testCombiningIsEmpty() throws Exception { + CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + + assertThat(value.isEmpty().read(), Matchers.is(true)); + ReadableState<Boolean> readFuture = value.isEmpty(); + value.add(5); + assertThat(readFuture.read(), Matchers.is(false)); + + value.clear(); + assertThat(readFuture.read(), Matchers.is(true)); + } + + @Test + public void testMergeCombiningValueIntoSource() throws Exception { + AccumulatorCombiningState<Integer, int[], Integer> value1 = + underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + AccumulatorCombiningState<Integer, int[], Integer> value2 = + underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); + + value1.add(5); + value2.add(10); + value1.add(6); + + assertThat(value1.read(), Matchers.equalTo(11)); + assertThat(value2.read(), Matchers.equalTo(10)); + + // Merging clears the old values and updates the result value. + StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1); + + assertThat(value1.read(), Matchers.equalTo(21)); + assertThat(value2.read(), Matchers.equalTo(0)); + } + + @Test + public void testMergeCombiningValueIntoNewNamespace() throws Exception { + AccumulatorCombiningState<Integer, int[], Integer> value1 = + underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + AccumulatorCombiningState<Integer, int[], Integer> value2 = + underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); + AccumulatorCombiningState<Integer, int[], Integer> value3 = + underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR); + + value1.add(5); + value2.add(10); + value1.add(6); + + StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3); + + // Merging clears the old values and updates the result value. + assertThat(value1.read(), Matchers.equalTo(0)); + assertThat(value2.read(), Matchers.equalTo(0)); + assertThat(value3.read(), Matchers.equalTo(21)); + } + + @Test + public void testWatermarkEarliestState() throws Exception { + WatermarkHoldState<BoundedWindow> value = + underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); + + // State instances are cached, but depend on the namespace. + assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR)); + assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR))); + + assertThat(value.read(), Matchers.nullValue()); + value.add(new Instant(2000)); + assertThat(value.read(), Matchers.equalTo(new Instant(2000))); + + value.add(new Instant(3000)); + assertThat(value.read(), Matchers.equalTo(new Instant(2000))); + + value.add(new Instant(1000)); + assertThat(value.read(), Matchers.equalTo(new Instant(1000))); + + value.clear(); + assertThat(value.read(), Matchers.equalTo(null)); + assertThat(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), Matchers.sameInstance(value)); + } + + @Test + public void testWatermarkLatestState() throws Exception { + WatermarkHoldState<BoundedWindow> value = + underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); + + // State instances are cached, but depend on the namespace. + assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR)); + assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR))); + + assertThat(value.read(), Matchers.nullValue()); + value.add(new Instant(2000)); + assertThat(value.read(), Matchers.equalTo(new Instant(2000))); + + value.add(new Instant(3000)); + assertThat(value.read(), Matchers.equalTo(new Instant(3000))); + + value.add(new Instant(1000)); + assertThat(value.read(), Matchers.equalTo(new Instant(3000))); + + value.clear(); + assertThat(value.read(), Matchers.equalTo(null)); + assertThat(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), Matchers.sameInstance(value)); + } + + @Test + public void testWatermarkEndOfWindowState() throws Exception { + WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR); + + // State instances are cached, but depend on the namespace. + assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR)); + assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR))); + + assertThat(value.read(), Matchers.nullValue()); + value.add(new Instant(2000)); + assertThat(value.read(), Matchers.equalTo(new Instant(2000))); + + value.clear(); + assertThat(value.read(), Matchers.equalTo(null)); + assertThat(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), Matchers.sameInstance(value)); + } + + @Test + public void testWatermarkStateIsEmpty() throws Exception { + WatermarkHoldState<BoundedWindow> value = + underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); + + assertThat(value.isEmpty().read(), Matchers.is(true)); + ReadableState<Boolean> readFuture = value.isEmpty(); + value.add(new Instant(1000)); + assertThat(readFuture.read(), Matchers.is(false)); + + value.clear(); + assertThat(readFuture.read(), Matchers.is(true)); + } + + @Test + public void testMergeEarliestWatermarkIntoSource() throws Exception { + WatermarkHoldState<BoundedWindow> value1 = + underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); + WatermarkHoldState<BoundedWindow> value2 = + underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR); + + value1.add(new Instant(3000)); + value2.add(new Instant(5000)); + value1.add(new Instant(4000)); + value2.add(new Instant(2000)); + + // Merging clears the old values and updates the merged value. + StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, WINDOW_1); + + assertThat(value1.read(), Matchers.equalTo(new Instant(2000))); + assertThat(value2.read(), Matchers.equalTo(null)); + } + + @Test + public void testMergeLatestWatermarkIntoSource() throws Exception { + WatermarkHoldState<BoundedWindow> value1 = + underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); + WatermarkHoldState<BoundedWindow> value2 = + underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR); + WatermarkHoldState<BoundedWindow> value3 = + underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR); + + value1.add(new Instant(3000)); + value2.add(new Instant(5000)); + value1.add(new Instant(4000)); + value2.add(new Instant(2000)); + + // Merging clears the old values and updates the result value. + StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, WINDOW_1); + + // Merging clears the old values and updates the result value. + assertThat(value3.read(), Matchers.equalTo(new Instant(5000))); + assertThat(value1.read(), Matchers.equalTo(null)); + assertThat(value2.read(), Matchers.equalTo(null)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/af391b88/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java index a4928e3..6cccdae 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java @@ -31,7 +31,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.Sessions; -import org.apache.beam.sdk.util.state.InMemoryStateInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.joda.time.Duration; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/af391b88/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index 226f5f0..d396a08 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -76,7 +76,6 @@ import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.TestInMemoryStateInternals; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; http://git-wip-us.apache.org/repos/asf/beam/blob/af391b88/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java index 0bf5e90..3a5d346 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java @@ -30,7 +30,6 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.InMemoryStateInternals; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Duration; http://git-wip-us.apache.org/repos/asf/beam/blob/af391b88/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java index ad0b01d..4bddd51 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java @@ -54,7 +54,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.state.InMemoryStateInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.util.state.TimerInternalsFactory; http://git-wip-us.apache.org/repos/asf/beam/blob/af391b88/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java index 2a626d4..c00cc48 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java @@ -37,6 +37,7 @@ import org.apache.beam.runners.core.ActiveWindowSet.MergeCallback; import org.apache.beam.runners.core.InMemoryTimerInternals; import org.apache.beam.runners.core.MergingActiveWindowSet; import org.apache.beam.runners.core.NonMergingActiveWindowSet; +import org.apache.beam.runners.core.TestInMemoryStateInternals; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; @@ -52,7 +53,6 @@ import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.util.state.StateNamespaces.WindowAndTriggerNamespace; import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace; -import org.apache.beam.sdk.util.state.TestInMemoryStateInternals; import org.apache.beam.sdk.values.TimestampedValue; import org.joda.time.Duration; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/af391b88/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java index e486a75..1e12e16 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java @@ -25,6 +25,12 @@ import java.util.Collection; import java.util.HashSet; import java.util.Map; import javax.annotation.Nullable; +import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryBag; +import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryCombiningValue; +import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryState; +import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryStateBinder; +import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryValue; +import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryWatermarkHold; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; @@ -34,12 +40,6 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.util.CombineFnUtil; import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryBag; -import org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryCombiningValue; -import org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryState; -import org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryStateBinder; -import org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryValue; -import org.apache.beam.sdk.util.state.InMemoryStateInternals.InMemoryWatermarkHold; import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateContext; import org.apache.beam.sdk.util.state.StateContexts; http://git-wip-us.apache.org/repos/asf/beam/blob/af391b88/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java index 965330c..8a9317e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java @@ -24,6 +24,7 @@ import java.io.Serializable; import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.beam.runners.core.InMemoryStateInternals; import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.sdk.transforms.DoFn; @@ -35,7 +36,6 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.InMemoryStateInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.KV; @@ -47,7 +47,6 @@ import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; - import scala.Tuple2; /** http://git-wip-us.apache.org/repos/asf/beam/blob/af391b88/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryStateInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryStateInternals.java deleted file mode 100644 index 6611837..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryStateInternals.java +++ /dev/null @@ -1,430 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.state; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; -import javax.annotation.Nullable; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.annotations.Experimental.Kind; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; -import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; -import org.apache.beam.sdk.util.CombineFnUtil; -import org.apache.beam.sdk.util.state.StateTag.StateBinder; -import org.joda.time.Instant; - -/** - * In-memory implementation of {@link StateInternals}. Used in {@code BatchModeExecutionContext} - * and for running tests that need state. - */ -@Experimental(Kind.STATE) -public class InMemoryStateInternals<K> implements StateInternals<K> { - - public static <K> InMemoryStateInternals<K> forKey(K key) { - return new InMemoryStateInternals<>(key); - } - - private final K key; - - protected InMemoryStateInternals(K key) { - this.key = key; - } - - @Override - public K getKey() { - return key; - } - - /** - * Interface common to all in-memory state cells. Includes ability to see whether a cell has been - * cleared and the ability to create a clone of the contents. - */ - public interface InMemoryState<T extends InMemoryState<T>> { - boolean isCleared(); - T copy(); - } - - protected final StateTable<K> inMemoryState = new StateTable<K>() { - @Override - protected StateBinder<K> binderForNamespace(StateNamespace namespace, StateContext<?> c) { - return new InMemoryStateBinder<K>(key, c); - } - }; - - public void clear() { - inMemoryState.clear(); - } - - /** - * Return true if the given state is empty. This is used by the test framework to make sure - * that the state has been properly cleaned up. - */ - protected boolean isEmptyForTesting(State state) { - return ((InMemoryState<?>) state).isCleared(); - } - - @Override - public <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address) { - return inMemoryState.get(namespace, address, StateContexts.nullContext()); - } - - @Override - public <T extends State> T state( - StateNamespace namespace, StateTag<? super K, T> address, final StateContext<?> c) { - return inMemoryState.get(namespace, address, c); - } - - /** - * A {@link StateBinder} that returns In Memory {@link State} objects. - */ - public static class InMemoryStateBinder<K> implements StateBinder<K> { - private final K key; - private final StateContext<?> c; - - public InMemoryStateBinder(K key, StateContext<?> c) { - this.key = key; - this.c = c; - } - - @Override - public <T> ValueState<T> bindValue( - StateTag<? super K, ValueState<T>> address, Coder<T> coder) { - return new InMemoryValue<T>(); - } - - @Override - public <T> BagState<T> bindBag( - final StateTag<? super K, BagState<T>> address, Coder<T> elemCoder) { - return new InMemoryBag<T>(); - } - - @Override - public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> - bindCombiningValue( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> accumCoder, - final CombineFn<InputT, AccumT, OutputT> combineFn) { - return new InMemoryCombiningValue<K, InputT, AccumT, OutputT>(key, combineFn.<K>asKeyedFn()); - } - - @Override - public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( - StateTag<? super K, WatermarkHoldState<W>> address, - OutputTimeFn<? super W> outputTimeFn) { - return new InMemoryWatermarkHold<W>(outputTimeFn); - } - - @Override - public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> - bindKeyedCombiningValue( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> accumCoder, - KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { - return new InMemoryCombiningValue<K, InputT, AccumT, OutputT>(key, combineFn); - } - - @Override - public <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> - bindKeyedCombiningValueWithContext( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> accumCoder, - KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) { - return bindKeyedCombiningValue(address, accumCoder, CombineFnUtil.bindContext(combineFn, c)); - } - } - - /** - * An {@link InMemoryState} implementation of {@link ValueState}. - */ - public static final class InMemoryValue<T> - implements ValueState<T>, InMemoryState<InMemoryValue<T>> { - private boolean isCleared = true; - private T value = null; - - @Override - public void clear() { - // Even though we're clearing we can't remove this from the in-memory state map, since - // other users may already have a handle on this Value. - value = null; - isCleared = true; - } - - @Override - public InMemoryValue<T> readLater() { - return this; - } - - @Override - public T read() { - return value; - } - - @Override - public void write(T input) { - isCleared = false; - this.value = input; - } - - @Override - public InMemoryValue<T> copy() { - InMemoryValue<T> that = new InMemoryValue<>(); - if (!this.isCleared) { - that.isCleared = this.isCleared; - that.value = this.value; - } - return that; - } - - @Override - public boolean isCleared() { - return isCleared; - } - } - - /** - * An {@link InMemoryState} implementation of {@link WatermarkHoldState}. - */ - public static final class InMemoryWatermarkHold<W extends BoundedWindow> - implements WatermarkHoldState<W>, InMemoryState<InMemoryWatermarkHold<W>> { - - private final OutputTimeFn<? super W> outputTimeFn; - - @Nullable - private Instant combinedHold = null; - - public InMemoryWatermarkHold(OutputTimeFn<? super W> outputTimeFn) { - this.outputTimeFn = outputTimeFn; - } - - @Override - public InMemoryWatermarkHold<W> readLater() { - return this; - } - - @Override - public void clear() { - // Even though we're clearing we can't remove this from the in-memory state map, since - // other users may already have a handle on this WatermarkBagInternal. - combinedHold = null; - } - - @Override - public Instant read() { - return combinedHold; - } - - @Override - public void add(Instant outputTime) { - combinedHold = combinedHold == null ? outputTime - : outputTimeFn.combine(combinedHold, outputTime); - } - - @Override - public boolean isCleared() { - return combinedHold == null; - } - - @Override - public ReadableState<Boolean> isEmpty() { - return new ReadableState<Boolean>() { - @Override - public ReadableState<Boolean> readLater() { - return this; - } - @Override - public Boolean read() { - return combinedHold == null; - } - }; - } - - @Override - public OutputTimeFn<? super W> getOutputTimeFn() { - return outputTimeFn; - } - - @Override - public String toString() { - return Objects.toString(combinedHold); - } - - @Override - public InMemoryWatermarkHold<W> copy() { - InMemoryWatermarkHold<W> that = - new InMemoryWatermarkHold<>(outputTimeFn); - that.combinedHold = this.combinedHold; - return that; - } - } - - /** - * An {@link InMemoryState} implementation of {@link AccumulatorCombiningState}. - */ - public static final class InMemoryCombiningValue<K, InputT, AccumT, OutputT> - implements AccumulatorCombiningState<InputT, AccumT, OutputT>, - InMemoryState<InMemoryCombiningValue<K, InputT, AccumT, OutputT>> { - private final K key; - private boolean isCleared = true; - private final KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn; - private AccumT accum; - - public InMemoryCombiningValue( - K key, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { - this.key = key; - this.combineFn = combineFn; - accum = combineFn.createAccumulator(key); - } - - @Override - public InMemoryCombiningValue<K, InputT, AccumT, OutputT> readLater() { - return this; - } - - @Override - public void clear() { - // Even though we're clearing we can't remove this from the in-memory state map, since - // other users may already have a handle on this CombiningValue. - accum = combineFn.createAccumulator(key); - isCleared = true; - } - - @Override - public OutputT read() { - return combineFn.extractOutput(key, accum); - } - - @Override - public void add(InputT input) { - isCleared = false; - accum = combineFn.addInput(key, accum, input); - } - - @Override - public AccumT getAccum() { - return accum; - } - - @Override - public ReadableState<Boolean> isEmpty() { - return new ReadableState<Boolean>() { - @Override - public ReadableState<Boolean> readLater() { - return this; - } - @Override - public Boolean read() { - return isCleared; - } - }; - } - - @Override - public void addAccum(AccumT accum) { - isCleared = false; - this.accum = combineFn.mergeAccumulators(key, Arrays.asList(this.accum, accum)); - } - - @Override - public AccumT mergeAccumulators(Iterable<AccumT> accumulators) { - return combineFn.mergeAccumulators(key, accumulators); - } - - @Override - public boolean isCleared() { - return isCleared; - } - - @Override - public InMemoryCombiningValue<K, InputT, AccumT, OutputT> copy() { - InMemoryCombiningValue<K, InputT, AccumT, OutputT> that = - new InMemoryCombiningValue<>(key, combineFn); - if (!this.isCleared) { - that.isCleared = this.isCleared; - that.addAccum(accum); - } - return that; - } - } - - /** - * An {@link InMemoryState} implementation of {@link BagState}. - */ - public static final class InMemoryBag<T> implements BagState<T>, InMemoryState<InMemoryBag<T>> { - private List<T> contents = new ArrayList<>(); - - @Override - public void clear() { - // Even though we're clearing we can't remove this from the in-memory state map, since - // other users may already have a handle on this Bag. - // The result of get/read below must be stable for the lifetime of the bundle within which it - // was generated. In batch and direct runners the bundle lifetime can be - // greater than the window lifetime, in which case this method can be called while - // the result is still in use. We protect against this by hot-swapping instead of - // clearing the contents. - contents = new ArrayList<>(); - } - - @Override - public InMemoryBag<T> readLater() { - return this; - } - - @Override - public Iterable<T> read() { - return contents; - } - - @Override - public void add(T input) { - contents.add(input); - } - - @Override - public boolean isCleared() { - return contents.isEmpty(); - } - - @Override - public ReadableState<Boolean> isEmpty() { - return new ReadableState<Boolean>() { - @Override - public ReadableState<Boolean> readLater() { - return this; - } - - @Override - public Boolean read() { - return contents.isEmpty(); - } - }; - } - - @Override - public InMemoryBag<T> copy() { - InMemoryBag<T> that = new InMemoryBag<>(); - that.contents.addAll(this.contents); - return that; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/af391b88/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TestInMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TestInMemoryStateInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TestInMemoryStateInternals.java deleted file mode 100644 index 7b6ee68..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TestInMemoryStateInternals.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.state; - -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import org.joda.time.Instant; - -/** - * Simulates state like {@link InMemoryStateInternals} and provides some extra helper methods. - */ -public class TestInMemoryStateInternals<K> extends InMemoryStateInternals<K> { - public TestInMemoryStateInternals(K key) { - super(key); - } - - public Set<StateTag<? super K, ?>> getTagsInUse(StateNamespace namespace) { - Set<StateTag<? super K, ?>> inUse = new HashSet<>(); - for (Map.Entry<StateTag<? super K, ?>, State> entry : - inMemoryState.getTagsInUse(namespace).entrySet()) { - if (!isEmptyForTesting(entry.getValue())) { - inUse.add(entry.getKey()); - } - } - return inUse; - } - - public Set<StateNamespace> getNamespacesInUse() { - return inMemoryState.getNamespacesInUse(); - } - - /** Return the earliest output watermark hold in state, or null if none. */ - public Instant earliestWatermarkHold() { - Instant minimum = null; - for (State storage : inMemoryState.values()) { - if (storage instanceof WatermarkHoldState) { - Instant hold = ((WatermarkHoldState<?>) storage).read(); - if (minimum == null || (hold != null && hold.isBefore(minimum))) { - minimum = hold; - } - } - } - return minimum; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/af391b88/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryStateInternalsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryStateInternalsTest.java deleted file mode 100644 index e43ad36..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryStateInternalsTest.java +++ /dev/null @@ -1,348 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.state; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; - -import java.util.Arrays; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; -import org.hamcrest.Matchers; -import org.joda.time.Instant; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link InMemoryStateInternals}. - */ -@RunWith(JUnit4.class) -public class InMemoryStateInternalsTest { - private static final BoundedWindow WINDOW_1 = new IntervalWindow(new Instant(0), new Instant(10)); - private static final StateNamespace NAMESPACE_1 = new StateNamespaceForTest("ns1"); - private static final StateNamespace NAMESPACE_2 = new StateNamespaceForTest("ns2"); - private static final StateNamespace NAMESPACE_3 = new StateNamespaceForTest("ns3"); - - private static final StateTag<Object, ValueState<String>> STRING_VALUE_ADDR = - StateTags.value("stringValue", StringUtf8Coder.of()); - private static final StateTag<Object, AccumulatorCombiningState<Integer, int[], Integer>> - SUM_INTEGER_ADDR = StateTags.combiningValueFromInputInternal( - "sumInteger", VarIntCoder.of(), Sum.ofIntegers()); - private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR = - StateTags.bag("stringBag", StringUtf8Coder.of()); - private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> - WATERMARK_EARLIEST_ADDR = - StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp()); - private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> - WATERMARK_LATEST_ADDR = - StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp()); - private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR = - StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow()); - - InMemoryStateInternals<String> underTest = InMemoryStateInternals.forKey("dummyKey"); - - @Test - public void testValue() throws Exception { - ValueState<String> value = underTest.state(NAMESPACE_1, STRING_VALUE_ADDR); - - // State instances are cached, but depend on the namespace. - assertThat(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), Matchers.sameInstance(value)); - assertThat( - underTest.state(NAMESPACE_2, STRING_VALUE_ADDR), - Matchers.not(Matchers.sameInstance(value))); - - assertThat(value.read(), Matchers.nullValue()); - value.write("hello"); - assertThat(value.read(), Matchers.equalTo("hello")); - value.write("world"); - assertThat(value.read(), Matchers.equalTo("world")); - - value.clear(); - assertThat(value.read(), Matchers.nullValue()); - assertThat(underTest.state(NAMESPACE_1, STRING_VALUE_ADDR), Matchers.sameInstance(value)); - } - - @Test - public void testBag() throws Exception { - BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - - // State instances are cached, but depend on the namespace. - assertEquals(value, underTest.state(NAMESPACE_1, STRING_BAG_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, STRING_BAG_ADDR))); - - assertThat(value.read(), Matchers.emptyIterable()); - value.add("hello"); - assertThat(value.read(), Matchers.containsInAnyOrder("hello")); - - value.add("world"); - assertThat(value.read(), Matchers.containsInAnyOrder("hello", "world")); - - value.clear(); - assertThat(value.read(), Matchers.emptyIterable()); - assertThat(underTest.state(NAMESPACE_1, STRING_BAG_ADDR), Matchers.sameInstance(value)); - } - - @Test - public void testBagIsEmpty() throws Exception { - BagState<String> value = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - - assertThat(value.isEmpty().read(), Matchers.is(true)); - ReadableState<Boolean> readFuture = value.isEmpty(); - value.add("hello"); - assertThat(readFuture.read(), Matchers.is(false)); - - value.clear(); - assertThat(readFuture.read(), Matchers.is(true)); - } - - @Test - public void testMergeBagIntoSource() throws Exception { - BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR); - - bag1.add("Hello"); - bag2.add("World"); - bag1.add("!"); - - StateMerging.mergeBags(Arrays.asList(bag1, bag2), bag1); - - // Reading the merged bag gets both the contents - assertThat(bag1.read(), Matchers.containsInAnyOrder("Hello", "World", "!")); - assertThat(bag2.read(), Matchers.emptyIterable()); - } - - @Test - public void testMergeBagIntoNewNamespace() throws Exception { - BagState<String> bag1 = underTest.state(NAMESPACE_1, STRING_BAG_ADDR); - BagState<String> bag2 = underTest.state(NAMESPACE_2, STRING_BAG_ADDR); - BagState<String> bag3 = underTest.state(NAMESPACE_3, STRING_BAG_ADDR); - - bag1.add("Hello"); - bag2.add("World"); - bag1.add("!"); - - StateMerging.mergeBags(Arrays.asList(bag1, bag2, bag3), bag3); - - // Reading the merged bag gets both the contents - assertThat(bag3.read(), Matchers.containsInAnyOrder("Hello", "World", "!")); - assertThat(bag1.read(), Matchers.emptyIterable()); - assertThat(bag2.read(), Matchers.emptyIterable()); - } - - @Test - public void testCombiningValue() throws Exception { - CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - - // State instances are cached, but depend on the namespace. - assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR))); - - assertThat(value.read(), Matchers.equalTo(0)); - value.add(2); - assertThat(value.read(), Matchers.equalTo(2)); - - value.add(3); - assertThat(value.read(), Matchers.equalTo(5)); - - value.clear(); - assertThat(value.read(), Matchers.equalTo(0)); - assertThat(underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR), Matchers.sameInstance(value)); - } - - @Test - public void testCombiningIsEmpty() throws Exception { - CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - - assertThat(value.isEmpty().read(), Matchers.is(true)); - ReadableState<Boolean> readFuture = value.isEmpty(); - value.add(5); - assertThat(readFuture.read(), Matchers.is(false)); - - value.clear(); - assertThat(readFuture.read(), Matchers.is(true)); - } - - @Test - public void testMergeCombiningValueIntoSource() throws Exception { - AccumulatorCombiningState<Integer, int[], Integer> value1 = - underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - AccumulatorCombiningState<Integer, int[], Integer> value2 = - underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); - - value1.add(5); - value2.add(10); - value1.add(6); - - assertThat(value1.read(), Matchers.equalTo(11)); - assertThat(value2.read(), Matchers.equalTo(10)); - - // Merging clears the old values and updates the result value. - StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value1); - - assertThat(value1.read(), Matchers.equalTo(21)); - assertThat(value2.read(), Matchers.equalTo(0)); - } - - @Test - public void testMergeCombiningValueIntoNewNamespace() throws Exception { - AccumulatorCombiningState<Integer, int[], Integer> value1 = - underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); - AccumulatorCombiningState<Integer, int[], Integer> value2 = - underTest.state(NAMESPACE_2, SUM_INTEGER_ADDR); - AccumulatorCombiningState<Integer, int[], Integer> value3 = - underTest.state(NAMESPACE_3, SUM_INTEGER_ADDR); - - value1.add(5); - value2.add(10); - value1.add(6); - - StateMerging.mergeCombiningValues(Arrays.asList(value1, value2), value3); - - // Merging clears the old values and updates the result value. - assertThat(value1.read(), Matchers.equalTo(0)); - assertThat(value2.read(), Matchers.equalTo(0)); - assertThat(value3.read(), Matchers.equalTo(21)); - } - - @Test - public void testWatermarkEarliestState() throws Exception { - WatermarkHoldState<BoundedWindow> value = - underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); - - // State instances are cached, but depend on the namespace. - assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR))); - - assertThat(value.read(), Matchers.nullValue()); - value.add(new Instant(2000)); - assertThat(value.read(), Matchers.equalTo(new Instant(2000))); - - value.add(new Instant(3000)); - assertThat(value.read(), Matchers.equalTo(new Instant(2000))); - - value.add(new Instant(1000)); - assertThat(value.read(), Matchers.equalTo(new Instant(1000))); - - value.clear(); - assertThat(value.read(), Matchers.equalTo(null)); - assertThat(underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR), Matchers.sameInstance(value)); - } - - @Test - public void testWatermarkLatestState() throws Exception { - WatermarkHoldState<BoundedWindow> value = - underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); - - // State instances are cached, but depend on the namespace. - assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR))); - - assertThat(value.read(), Matchers.nullValue()); - value.add(new Instant(2000)); - assertThat(value.read(), Matchers.equalTo(new Instant(2000))); - - value.add(new Instant(3000)); - assertThat(value.read(), Matchers.equalTo(new Instant(3000))); - - value.add(new Instant(1000)); - assertThat(value.read(), Matchers.equalTo(new Instant(3000))); - - value.clear(); - assertThat(value.read(), Matchers.equalTo(null)); - assertThat(underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR), Matchers.sameInstance(value)); - } - - @Test - public void testWatermarkEndOfWindowState() throws Exception { - WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR); - - // State instances are cached, but depend on the namespace. - assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR)); - assertFalse(value.equals(underTest.state(NAMESPACE_2, WATERMARK_EOW_ADDR))); - - assertThat(value.read(), Matchers.nullValue()); - value.add(new Instant(2000)); - assertThat(value.read(), Matchers.equalTo(new Instant(2000))); - - value.clear(); - assertThat(value.read(), Matchers.equalTo(null)); - assertThat(underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR), Matchers.sameInstance(value)); - } - - @Test - public void testWatermarkStateIsEmpty() throws Exception { - WatermarkHoldState<BoundedWindow> value = - underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); - - assertThat(value.isEmpty().read(), Matchers.is(true)); - ReadableState<Boolean> readFuture = value.isEmpty(); - value.add(new Instant(1000)); - assertThat(readFuture.read(), Matchers.is(false)); - - value.clear(); - assertThat(readFuture.read(), Matchers.is(true)); - } - - @Test - public void testMergeEarliestWatermarkIntoSource() throws Exception { - WatermarkHoldState<BoundedWindow> value1 = - underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR); - WatermarkHoldState<BoundedWindow> value2 = - underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR); - - value1.add(new Instant(3000)); - value2.add(new Instant(5000)); - value1.add(new Instant(4000)); - value2.add(new Instant(2000)); - - // Merging clears the old values and updates the merged value. - StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value1, WINDOW_1); - - assertThat(value1.read(), Matchers.equalTo(new Instant(2000))); - assertThat(value2.read(), Matchers.equalTo(null)); - } - - @Test - public void testMergeLatestWatermarkIntoSource() throws Exception { - WatermarkHoldState<BoundedWindow> value1 = - underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR); - WatermarkHoldState<BoundedWindow> value2 = - underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR); - WatermarkHoldState<BoundedWindow> value3 = - underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR); - - value1.add(new Instant(3000)); - value2.add(new Instant(5000)); - value1.add(new Instant(4000)); - value2.add(new Instant(2000)); - - // Merging clears the old values and updates the result value. - StateMerging.mergeWatermarks(Arrays.asList(value1, value2), value3, WINDOW_1); - - // Merging clears the old values and updates the result value. - assertThat(value3.read(), Matchers.equalTo(new Instant(5000))); - assertThat(value1.read(), Matchers.equalTo(null)); - assertThat(value2.read(), Matchers.equalTo(null)); - } -}