Repository: incubator-beam Updated Branches: refs/heads/master 142229e37 -> bc9ed7dbd
Refactor StateSpec out of StateTag Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7c1ba2e1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7c1ba2e1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7c1ba2e1 Branch: refs/heads/master Commit: 7c1ba2e1062556ac98b29f5bb4f5b75a7e7832e2 Parents: 135790b Author: Kenneth Knowles <k...@google.com> Authored: Thu Aug 4 20:50:28 2016 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Tue Oct 11 20:27:12 2016 -0700 ---------------------------------------------------------------------- .../src/main/resources/beam/findbugs-filter.xml | 2 +- .../apache/beam/sdk/util/state/StateBinder.java | 67 +++ .../apache/beam/sdk/util/state/StateSpec.java | 39 ++ .../apache/beam/sdk/util/state/StateSpecs.java | 452 +++++++++++++++++++ .../apache/beam/sdk/util/state/StateTag.java | 82 ++-- .../apache/beam/sdk/util/state/StateTags.java | 386 ++-------------- 6 files changed, 655 insertions(+), 373 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c1ba2e1/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml ---------------------------------------------------------------------- diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml index 3c01690..c9223a7 100644 --- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml +++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml @@ -116,7 +116,7 @@ <!--[BEAM-420] Non-transient non-serializable instance field in serializable class--> </Match> <Match> - <Class name="org.apache.beam.sdk.util.state.StateTags$CombiningValueStateTag"/> + <Class name="org.apache.beam.sdk.util.state.StateSpecs$CombiningValueStateSpec"/> <Method name="equals"/> <Bug pattern="EQ_DOESNT_OVERRIDE_EQUALS"/> <!--[BEAM-421] Class doesn't override equals in superclass--> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c1ba2e1/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java new file mode 100644 index 0000000..0521e15 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util.state; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.CombineWithContext; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; + +/** + * Visitor for binding a {@link StateSpec} and to the associated {@link State}. + * + * @param <K> the type of key this binder embodies. + */ +public interface StateBinder<K> { + <T> ValueState<T> bindValue(String id, StateSpec<? super K, ValueState<T>> spec, Coder<T> coder); + + <T> BagState<T> bindBag(String id, StateSpec<? super K, BagState<T>> spec, Coder<T> elemCoder); + + <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue( + String id, + StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, + Coder<AccumT> accumCoder, + Combine.CombineFn<InputT, AccumT, OutputT> combineFn); + + <InputT, AccumT, OutputT> + AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( + String id, + StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, + Coder<AccumT> accumCoder, + Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn); + + <InputT, AccumT, OutputT> + AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( + String id, + StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, + Coder<AccumT> accumCoder, + CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> + combineFn); + + /** + * Bind to a watermark {@link StateSpec}. + * + * <p>This accepts the {@link OutputTimeFn} that dictates how watermark hold timestamps added to + * the returned {@link WatermarkHoldState} are to be combined. + */ + <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( + String id, + StateSpec<? super K, WatermarkHoldState<W>> spec, + OutputTimeFn<? super W> outputTimeFn); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c1ba2e1/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java new file mode 100644 index 0000000..4fdeefb --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpec.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util.state; + +import java.io.Serializable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * A specification of a persistent state cell. This includes information necessary to encode the + * value and details about the intended access pattern. + * + * @param <K> The type of key that must be used with the state tag. Contravariant: methods should + * accept values of type {@code StateSpec<? super K, StateT>}. + * @param <StateT> The type of state being described. + */ +@Experimental(Kind.STATE) +public interface StateSpec<K, StateT extends State> extends Serializable { + + /** + * Use the {@code binder} to create an instance of {@code StateT} appropriate for this address. + */ + StateT bind(String id, StateBinder<? extends K> binder); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c1ba2e1/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java new file mode 100644 index 0000000..db0eec6 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java @@ -0,0 +1,452 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util.state; + +import java.util.Objects; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; +import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; + +/** + * Static utility methods for creating {@link StateSpec} instances. + */ +@Experimental(Kind.STATE) +public class StateSpecs { + + private static final CoderRegistry STANDARD_REGISTRY = new CoderRegistry(); + + static { + STANDARD_REGISTRY.registerStandardCoders(); + } + + private StateSpecs() {} + + /** Create a simple state spec for values of type {@code T}. */ + public static <T> StateSpec<Object, ValueState<T>> value(Coder<T> valueCoder) { + return new ValueStateSpec<>(valueCoder); + } + + /** + * Create a state spec for values that use a {@link CombineFn} to automatically merge multiple + * {@code InputT}s into a single {@code OutputT}. + */ + public static <InputT, AccumT, OutputT> + StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningValue( + Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { + return combiningValueInternal(accumCoder, combineFn); + } + + /** + * Create a state spec for values that use a {@link KeyedCombineFn} to automatically merge + * multiple {@code InputT}s into a single {@code OutputT}. The key provided to the {@link + * KeyedCombineFn} comes from the keyed {@link StateAccessor}. + */ + public static <K, InputT, AccumT, OutputT> + StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> keyedCombiningValue( + Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) { + return keyedCombiningValueInternal(accumCoder, combineFn); + } + + /** + * Create a state spec for values that use a {@link KeyedCombineFnWithContext} to automatically + * merge multiple {@code InputT}s into a single {@code OutputT}. The key provided to the {@link + * KeyedCombineFn} comes from the keyed {@link StateAccessor}, the context provided comes from the + * {@link StateContext}. + */ + public static <K, InputT, AccumT, OutputT> + StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> + keyedCombiningValueWithContext( + Coder<AccumT> accumCoder, + KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) { + return new KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>( + accumCoder, combineFn); + } + + /** + * Create a state spec for values that use a {@link CombineFn} to automatically merge multiple + * {@code InputT}s into a single {@code OutputT}. + * + * <p>This determines the {@code Coder<AccumT>} from the given {@code Coder<InputT>}, and should + * only be used to initialize static values. + */ + public static <InputT, AccumT, OutputT> + StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> + combiningValueFromInputInternal( + Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { + try { + Coder<AccumT> accumCoder = combineFn.getAccumulatorCoder(STANDARD_REGISTRY, inputCoder); + return combiningValueInternal(accumCoder, combineFn); + } catch (CannotProvideCoderException e) { + throw new IllegalArgumentException( + "Unable to determine accumulator coder for " + + combineFn.getClass().getSimpleName() + + " from " + + inputCoder, + e); + } + } + + private static <InputT, AccumT, OutputT> + StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningValueInternal( + Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { + return new CombiningValueStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn); + } + + private static <K, InputT, AccumT, OutputT> + StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> keyedCombiningValueInternal( + Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) { + return new KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>(accumCoder, combineFn); + } + + /** + * Create a state spec that is optimized for adding values frequently, and occasionally retrieving + * all the values that have been added. + */ + public static <T> StateSpec<Object, BagState<T>> bag(Coder<T> elemCoder) { + return new BagStateSpec<T>(elemCoder); + } + + /** Create a state spec for holding the watermark. */ + public static <W extends BoundedWindow> + StateSpec<Object, WatermarkHoldState<W>> watermarkStateInternal( + OutputTimeFn<? super W> outputTimeFn) { + return new WatermarkStateSpecInternal<W>(outputTimeFn); + } + + public static <K, InputT, AccumT, OutputT> + StateSpec<Object, BagState<AccumT>> convertToBagSpecInternal( + StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningSpec) { + if (combiningSpec instanceof KeyedCombiningValueStateSpec) { + // Checked above; conversion to a bag spec depends on the provided spec being one of those + // created via the factory methods in this class. + @SuppressWarnings("unchecked") + KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT> typedSpec = + (KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>) combiningSpec; + return typedSpec.asBagSpec(); + } else if (combiningSpec instanceof KeyedCombiningValueWithContextStateSpec) { + @SuppressWarnings("unchecked") + KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT> typedSpec = + (KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>) combiningSpec; + return typedSpec.asBagSpec(); + } else { + throw new IllegalArgumentException("Unexpected StateSpec " + combiningSpec); + } + } + + /** + * A specification for a state cell holding a settable value of type {@code T}. + * + * <p>Includes the coder for {@code T}. + */ + private static class ValueStateSpec<T> implements StateSpec<Object, ValueState<T>> { + + private final Coder<T> coder; + + private ValueStateSpec(Coder<T> coder) { + this.coder = coder; + } + + @Override + public ValueState<T> bind(String id, StateBinder<?> visitor) { + return visitor.bindValue(id, this, coder); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof ValueStateSpec)) { + return false; + } + + ValueStateSpec<?> that = (ValueStateSpec<?>) obj; + return Objects.equals(this.coder, that.coder); + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), coder); + } + } + + /** + * A specification for a state cell that is combined according to a {@link CombineFn}. + * + * <p>Includes the {@link CombineFn} and the coder for the accumulator type. + */ + private static class CombiningValueStateSpec<InputT, AccumT, OutputT> + extends KeyedCombiningValueStateSpec<Object, InputT, AccumT, OutputT> + implements StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> { + + private final Coder<AccumT> accumCoder; + private final CombineFn<InputT, AccumT, OutputT> combineFn; + + private CombiningValueStateSpec( + Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { + super(accumCoder, combineFn.asKeyedFn()); + this.combineFn = combineFn; + this.accumCoder = accumCoder; + } + } + + /** + * A specification for a state cell that is combined according to a + * {@link KeyedCombineFnWithContext}. + * + * <p>Includes the {@link KeyedCombineFnWithContext} and the coder for the accumulator type. + */ + private static class KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT> + implements StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> { + + private final Coder<AccumT> accumCoder; + private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn; + + protected KeyedCombiningValueWithContextStateSpec( + Coder<AccumT> accumCoder, KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) { + this.combineFn = combineFn; + this.accumCoder = accumCoder; + } + + @Override + public AccumulatorCombiningState<InputT, AccumT, OutputT> bind( + String id, StateBinder<? extends K> visitor) { + return visitor.bindKeyedCombiningValueWithContext(id, this, accumCoder, combineFn); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof KeyedCombiningValueWithContextStateSpec)) { + return false; + } + + KeyedCombiningValueWithContextStateSpec<?, ?, ?, ?> that = + (KeyedCombiningValueWithContextStateSpec<?, ?, ?, ?>) obj; + return Objects.equals(this.accumCoder, that.accumCoder); + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), accumCoder); + } + + private StateSpec<Object, BagState<AccumT>> asBagSpec() { + return new BagStateSpec<AccumT>(accumCoder); + } + } + + /** + * A specification for a state cell that is combined according to a {@link KeyedCombineFn}. + * + * <p>Includes the {@link KeyedCombineFn} and the coder for the accumulator type. + */ + private static class KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT> + implements StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> { + + private final Coder<AccumT> accumCoder; + private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn; + + protected KeyedCombiningValueStateSpec( + Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) { + this.keyedCombineFn = keyedCombineFn; + this.accumCoder = accumCoder; + } + + @Override + public AccumulatorCombiningState<InputT, AccumT, OutputT> bind( + String id, StateBinder<? extends K> visitor) { + return visitor.bindKeyedCombiningValue(id, this, accumCoder, keyedCombineFn); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof CombiningValueStateSpec)) { + return false; + } + + KeyedCombiningValueStateSpec<?, ?, ?, ?> that = + (KeyedCombiningValueStateSpec<?, ?, ?, ?>) obj; + return Objects.equals(this.accumCoder, that.accumCoder); + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), accumCoder); + } + + private StateSpec<Object, BagState<AccumT>> asBagSpec() { + return new BagStateSpec<AccumT>(accumCoder); + } + } + + /** + * A specification for a state cell supporting for bag-like access patterns + * (frequent additions, occasional reads of all the values). + * + * <p>Includes the coder for the element type {@code T}</p> + */ + private static class BagStateSpec<T> implements StateSpec<Object, BagState<T>> { + + private final Coder<T> elemCoder; + + private BagStateSpec(Coder<T> elemCoder) { + this.elemCoder = elemCoder; + } + + @Override + public BagState<T> bind(String id, StateBinder<?> visitor) { + return visitor.bindBag(id, this, elemCoder); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof BagStateSpec)) { + return false; + } + + BagStateSpec<?> that = (BagStateSpec<?>) obj; + return Objects.equals(this.elemCoder, that.elemCoder); + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), elemCoder); + } + } + + /** + * A specification for a state cell tracking a combined watermark hold. + * + * <p>Includes the {@link OutputTimeFn} according to which the output times + * are combined. + */ + private static class WatermarkStateSpecInternal<W extends BoundedWindow> + implements StateSpec<Object, WatermarkHoldState<W>> { + + /** + * When multiple output times are added to hold the watermark, this determines how they are + * combined, and also the behavior when merging windows. Does not contribute to equality/hash + * since we have at most one watermark hold spec per computation. + */ + private final OutputTimeFn<? super W> outputTimeFn; + + private WatermarkStateSpecInternal(OutputTimeFn<? super W> outputTimeFn) { + this.outputTimeFn = outputTimeFn; + } + + @Override + public WatermarkHoldState<W> bind(String id, StateBinder<?> visitor) { + return visitor.bindWatermark(id, this, outputTimeFn); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + // All instance of WatermarkHoldState are considered equal + return obj instanceof WatermarkStateSpecInternal; + } + + @Override + public int hashCode() { + return Objects.hash(getClass()); + } + } + + /** + * @deprecated for migration purposes only + */ + @Deprecated + public static <K> StateBinder<K> adaptTagBinder(final StateTag.StateBinder<K> binder) { + return new StateBinder<K>() { + @Override + public <T> ValueState<T> bindValue( + String id, StateSpec<? super K, ValueState<T>> spec, Coder<T> coder) { + return binder.bindValue(StateTags.tagForSpec(id, spec), coder); + } + + @Override + public <T> BagState<T> bindBag( + String id, StateSpec<? super K, BagState<T>> spec, Coder<T> elemCoder) { + return binder.bindBag(StateTags.tagForSpec(id, spec), elemCoder); + } + + @Override + public <InputT, AccumT, OutputT> + AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue( + String id, + StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, + Coder<AccumT> accumCoder, + CombineFn<InputT, AccumT, OutputT> combineFn) { + return binder.bindCombiningValue(StateTags.tagForSpec(id, spec), accumCoder, combineFn); + } + + @Override + public <InputT, AccumT, OutputT> + AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( + String id, + StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, + Coder<AccumT> accumCoder, + KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { + return binder.bindKeyedCombiningValue( + StateTags.tagForSpec(id, spec), accumCoder, combineFn); + } + + @Override + public <InputT, AccumT, OutputT> + AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( + String id, + StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, + Coder<AccumT> accumCoder, + KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) { + return binder.bindKeyedCombiningValueWithContext( + StateTags.tagForSpec(id, spec), accumCoder, combineFn); + } + + @Override + public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( + String id, + StateSpec<? super K, WatermarkHoldState<W>> spec, + OutputTimeFn<? super W> outputTimeFn) { + return binder.bindWatermark(StateTags.tagForSpec(id, spec), outputTimeFn); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c1ba2e1/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTag.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTag.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTag.java index 94cba2f..feca927 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTag.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTag.java @@ -30,8 +30,9 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; /** - * An address for persistent state. This includes a unique identifier for the location, the - * information necessary to encode the value, and details about the intended access pattern. + * An address and specification for a persistent state cell. This includes a unique identifier for + * the location, the information necessary to encode the value, and details about the intended + * access pattern. * * <p>State can be thought of as a sparse table, with each {@code StateTag} defining a column * that has cells of type {@code StateT}. @@ -45,53 +46,66 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; @Experimental(Kind.STATE) public interface StateTag<K, StateT extends State> extends Serializable { + /** Append the UTF-8 encoding of this tag to the given {@link Appendable}. */ + void appendTo(Appendable sb) throws IOException; + + /** + * An identifier for the state cell that this tag references. + */ + String getId(); + + /** + * The specification for the state stored in the referenced cell. + */ + StateSpec<K, StateT> getSpec(); + + /** + * Bind this state tag. See {@link StateSpec#bind}. + * + * @deprecated Use the {@link StateSpec#bind} method via {@link #getSpec} for now. + */ + @Deprecated + StateT bind(StateBinder<? extends K> binder); + /** - * Visitor for binding a {@link StateTag} and to the associated {@link State}. + * Visitor for binding a {@link StateSpec} and to the associated {@link State}. * * @param <K> the type of key this binder embodies. + * @deprecated for migration only; runners should reference the top level {@link StateBinder} + * and move towards {@link StateSpec} rather than {@link StateTag}. */ + @Deprecated public interface StateBinder<K> { - <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> address, Coder<T> coder); + <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> spec, Coder<T> coder); - <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> address, Coder<T> elemCoder); + <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> spec, Coder<T> elemCoder); - <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> - bindCombiningValue( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn); + <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue( + StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, + Coder<AccumT> accumCoder, + CombineFn<InputT, AccumT, OutputT> combineFn); - <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> - bindKeyedCombiningValue( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, - Coder<AccumT> accumCoder, KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn); + <InputT, AccumT, OutputT> + AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( + StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, + Coder<AccumT> accumCoder, + KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn); - <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> - bindKeyedCombiningValueWithContext( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> address, + <InputT, AccumT, OutputT> + AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( + StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, - KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn); + KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> + combineFn); /** - * Bind to a watermark {@link StateTag}. + * Bind to a watermark {@link StateSpec}. * - * <p>This accepts the {@link OutputTimeFn} that dictates how watermark hold timestamps - * added to the returned {@link WatermarkHoldState} are to be combined. + * <p>This accepts the {@link OutputTimeFn} that dictates how watermark hold timestamps added to + * the returned {@link WatermarkHoldState} are to be combined. */ <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( - StateTag<? super K, WatermarkHoldState<W>> address, + StateTag<? super K, WatermarkHoldState<W>> spec, OutputTimeFn<? super W> outputTimeFn); } - - /** Append the UTF-8 encoding of this tag to the given {@link Appendable}. */ - void appendTo(Appendable sb) throws IOException; - - /** - * Returns the user-provided name of this state cell. - */ - String getId(); - - /** - * Use the {@code binder} to create an instance of {@code StateT} appropriate for this address. - */ - StateT bind(StateBinder<? extends K> binder); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c1ba2e1/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java index b0797b6..3c12848 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java @@ -23,7 +23,6 @@ import java.io.Serializable; import java.util.Objects; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; -import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.transforms.Combine.CombineFn; @@ -61,11 +60,17 @@ public class StateTags { StateTag<K, StateT> asKind(StateKind kind); } + /** Create a state tag for the given id and spec. */ + public static <K, StateT extends State> StateTag<K, StateT> tagForSpec( + String id, StateSpec<K, StateT> spec) { + return new SimpleStateTag<>(new StructuredId(id), spec); + } + /** * Create a simple state tag for values of type {@code T}. */ public static <T> StateTag<Object, ValueState<T>> value(String id, Coder<T> valueCoder) { - return new ValueStateTag<>(new StructuredId(id), valueCoder); + return new SimpleStateTag<>(new StructuredId(id), StateSpecs.value(valueCoder)); } /** @@ -76,7 +81,8 @@ public class StateTags { StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningValue( String id, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { - return combiningValueInternal(id, accumCoder, combineFn); + return new SimpleStateTag<>( + new StructuredId(id), StateSpecs.combiningValue(accumCoder, combineFn)); } /** @@ -88,7 +94,8 @@ public class StateTags { OutputT> StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> keyedCombiningValue(String id, Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) { - return keyedCombiningValueInternal(id, accumCoder, combineFn); + return new SimpleStateTag<>( + new StructuredId(id), StateSpecs.keyedCombiningValue(accumCoder, combineFn)); } /** @@ -103,10 +110,8 @@ public class StateTags { String id, Coder<AccumT> accumCoder, KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) { - return new KeyedCombiningValueWithContextStateTag<K, InputT, AccumT, OutputT>( - new StructuredId(id), - accumCoder, - combineFn); + return new SimpleStateTag<>( + new StructuredId(id), StateSpecs.keyedCombiningValueWithContext(accumCoder, combineFn)); } /** @@ -120,32 +125,8 @@ public class StateTags { StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningValueFromInputInternal( String id, Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { - try { - Coder<AccumT> accumCoder = combineFn.getAccumulatorCoder(STANDARD_REGISTRY, inputCoder); - return combiningValueInternal(id, accumCoder, combineFn); - } catch (CannotProvideCoderException e) { - throw new IllegalArgumentException( - "Unable to determine accumulator coder for " + combineFn.getClass().getSimpleName() - + " from " + inputCoder, e); - } - } - - private static <InputT, AccumT, - OutputT> StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> - combiningValueInternal( - String id, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { - return - new CombiningValueStateTag<InputT, AccumT, OutputT>( - new StructuredId(id), accumCoder, combineFn); - } - - private static <K, InputT, AccumT, OutputT> - StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> keyedCombiningValueInternal( - String id, - Coder<AccumT> accumCoder, - KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) { - return new KeyedCombiningValueStateTag<K, InputT, AccumT, OutputT>( - new StructuredId(id), accumCoder, combineFn); + return new SimpleStateTag<>( + new StructuredId(id), StateSpecs.combiningValueFromInputInternal(inputCoder, combineFn)); } /** @@ -153,7 +134,7 @@ public class StateTags { * occasionally retrieving all the values that have been added. */ public static <T> StateTag<Object, BagState<T>> bag(String id, Coder<T> elemCoder) { - return new BagStateTag<T>(new StructuredId(id), elemCoder); + return new SimpleStateTag<>(new StructuredId(id), StateSpecs.bag(elemCoder)); } /** @@ -161,7 +142,8 @@ public class StateTags { */ public static <W extends BoundedWindow> StateTag<Object, WatermarkHoldState<W>> watermarkStateInternal(String id, OutputTimeFn<? super W> outputTimeFn) { - return new WatermarkStateTagInternal<W>(new StructuredId(id), outputTimeFn); + return new SimpleStateTag<>( + new StructuredId(id), StateSpecs.watermarkStateInternal(outputTimeFn)); } /** @@ -171,7 +153,7 @@ public class StateTags { public static <K, StateT extends State> StateTag<K, StateT> makeSystemTagInternal( StateTag<K, StateT> tag) { if (!(tag instanceof SystemStateTag)) { - throw new IllegalArgumentException("Expected subclass of StateTagBase, got " + tag); + throw new IllegalArgumentException("Expected subclass of SimpleStateTag, got " + tag); } // Checked above @SuppressWarnings("unchecked") @@ -182,21 +164,9 @@ public class StateTags { public static <K, InputT, AccumT, OutputT> StateTag<Object, BagState<AccumT>> convertToBagTagInternal( StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningTag) { - if (combiningTag instanceof KeyedCombiningValueStateTag) { - // Checked above; conversion to a bag tag depends on the provided tag being one of those - // created via the factory methods in this class. - @SuppressWarnings("unchecked") - KeyedCombiningValueStateTag<K, InputT, AccumT, OutputT> typedTag = - (KeyedCombiningValueStateTag<K, InputT, AccumT, OutputT>) combiningTag; - return typedTag.asBagTag(); - } else if (combiningTag instanceof KeyedCombiningValueWithContextStateTag) { - @SuppressWarnings("unchecked") - KeyedCombiningValueWithContextStateTag<K, InputT, AccumT, OutputT> typedTag = - (KeyedCombiningValueWithContextStateTag<K, InputT, AccumT, OutputT>) combiningTag; - return typedTag.asBagTag(); - } else { - throw new IllegalArgumentException("Unexpected StateTag " + combiningTag); - } + return new SimpleStateTag<>( + new StructuredId(combiningTag.getId()), + StateSpecs.convertToBagSpecInternal(combiningTag.getSpec())); } private static class StructuredId implements Serializable { @@ -254,15 +224,24 @@ public class StateTags { } /** - * A base class that just manages the structured ids. + * A basic {@link StateTag} implementation that manages the structured ids. */ - private abstract static class StateTagBase<K, StateT extends State> + private static class SimpleStateTag<K, StateT extends State> implements StateTag<K, StateT>, SystemStateTag<K, StateT> { - protected final StructuredId id; + private final StateSpec<K, StateT> spec; + private final StructuredId id; - protected StateTagBase(StructuredId id) { + public SimpleStateTag(StructuredId id, StateSpec<K, StateT> spec) { this.id = id; + this.spec = spec; + } + + @Override + @Deprecated + public StateT bind(StateTag.StateBinder<? extends K> binder) { + return spec.bind( + this.id.getRawId(), StateSpecs.adaptTagBinder(binder)); } @Override @@ -271,6 +250,11 @@ public class StateTags { } @Override + public StateSpec<K, StateT> getSpec() { + return spec; + } + + @Override public String toString() { return MoreObjects.toStringHelper(getClass()) .add("id", id) @@ -283,298 +267,24 @@ public class StateTags { } @Override - public abstract StateTag<K, StateT> asKind(StateKind kind); - } - - /** - * A value state cell for values of type {@code T}. - * - * @param <T> the type of value being stored - */ - private static class ValueStateTag<T> extends StateTagBase<Object, ValueState<T>> - implements StateTag<Object, ValueState<T>> { - - private final Coder<T> coder; - - private ValueStateTag(StructuredId id, Coder<T> coder) { - super(id); - this.coder = coder; - } - - @Override - public ValueState<T> bind(StateBinder<?> visitor) { - return visitor.bindValue(this, coder); - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - - if (!(obj instanceof ValueStateTag)) { - return false; - } - - ValueStateTag<?> that = (ValueStateTag<?>) obj; - return Objects.equals(this.id, that.id) - && Objects.equals(this.coder, that.coder); - } - - @Override - public int hashCode() { - return Objects.hash(getClass(), id, coder); - } - - @Override - public StateTag<Object, ValueState<T>> asKind(StateKind kind) { - return new ValueStateTag<T>(id.asKind(kind), coder); - } - } - - /** - * A state cell for values that are combined according to a {@link CombineFn}. - * - * @param <InputT> the type of input values - * @param <AccumT> type of mutable accumulator values - * @param <OutputT> type of output values - */ - private static class CombiningValueStateTag<InputT, AccumT, OutputT> - extends KeyedCombiningValueStateTag<Object, InputT, AccumT, OutputT> - implements StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>, - SystemStateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> { - - private final Coder<AccumT> accumCoder; - private final CombineFn<InputT, AccumT, OutputT> combineFn; - - private CombiningValueStateTag( - StructuredId id, - Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { - super(id, accumCoder, combineFn.asKeyedFn()); - this.combineFn = combineFn; - this.accumCoder = accumCoder; - } - - @Override - public StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> - asKind(StateKind kind) { - return new CombiningValueStateTag<InputT, AccumT, OutputT>( - id.asKind(kind), accumCoder, combineFn); - } - } - - /** - * A state cell for values that are combined according to a {@link KeyedCombineFnWithContext}. - * - * @param <K> the type of keys - * @param <InputT> the type of input values - * @param <AccumT> type of mutable accumulator values - * @param <OutputT> type of output values - */ - private static class KeyedCombiningValueWithContextStateTag<K, InputT, AccumT, OutputT> - extends StateTagBase<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> - implements SystemStateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> { - - private final Coder<AccumT> accumCoder; - private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn; - - protected KeyedCombiningValueWithContextStateTag( - StructuredId id, - Coder<AccumT> accumCoder, - KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) { - super(id); - this.combineFn = combineFn; - this.accumCoder = accumCoder; - } - - @Override - public AccumulatorCombiningState<InputT, AccumT, OutputT> bind( - StateBinder<? extends K> visitor) { - return visitor.bindKeyedCombiningValueWithContext(this, accumCoder, combineFn); - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - - if (!(obj instanceof KeyedCombiningValueWithContextStateTag)) { - return false; - } - - KeyedCombiningValueWithContextStateTag<?, ?, ?, ?> that = - (KeyedCombiningValueWithContextStateTag<?, ?, ?, ?>) obj; - return Objects.equals(this.id, that.id) - && Objects.equals(this.accumCoder, that.accumCoder); - } - - @Override - public int hashCode() { - return Objects.hash(getClass(), id, accumCoder); - } - - @Override - public StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> asKind( - StateKind kind) { - return new KeyedCombiningValueWithContextStateTag<>( - id.asKind(kind), accumCoder, combineFn); - } - - private StateTag<Object, BagState<AccumT>> asBagTag() { - return new BagStateTag<AccumT>(id, accumCoder); - } - } - - /** - * A state cell for values that are combined according to a {@link KeyedCombineFn}. - * - * @param <K> the type of keys - * @param <InputT> the type of input values - * @param <AccumT> type of mutable accumulator values - * @param <OutputT> type of output values - */ - private static class KeyedCombiningValueStateTag<K, InputT, AccumT, OutputT> - extends StateTagBase<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> - implements SystemStateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> { - - private final Coder<AccumT> accumCoder; - private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn; - - protected KeyedCombiningValueStateTag( - StructuredId id, - Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) { - super(id); - this.keyedCombineFn = keyedCombineFn; - this.accumCoder = accumCoder; - } - - @Override - public AccumulatorCombiningState<InputT, AccumT, OutputT> bind( - StateBinder<? extends K> visitor) { - return visitor.bindKeyedCombiningValue(this, accumCoder, keyedCombineFn); + public StateTag<K, StateT> asKind(StateKind kind) { + return new SimpleStateTag<>(id.asKind(kind), spec); } @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - - if (!(obj instanceof CombiningValueStateTag)) { + public boolean equals(Object other) { + if (!(other instanceof SimpleStateTag)) { return false; } - KeyedCombiningValueStateTag<?, ?, ?, ?> that = (KeyedCombiningValueStateTag<?, ?, ?, ?>) obj; - return Objects.equals(this.id, that.id) - && Objects.equals(this.accumCoder, that.accumCoder); + SimpleStateTag<?, ?> otherTag = (SimpleStateTag<?, ?>) other; + return Objects.equals(this.getId(), otherTag.getId()) + && Objects.equals(this.getSpec(), otherTag.getSpec()); } @Override public int hashCode() { - return Objects.hash(getClass(), id, accumCoder); - } - - @Override - public StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> asKind( - StateKind kind) { - return new KeyedCombiningValueStateTag<>(id.asKind(kind), accumCoder, keyedCombineFn); - } - - private StateTag<Object, BagState<AccumT>> asBagTag() { - return new BagStateTag<AccumT>(id, accumCoder); - } - } - - /** - * A state cell optimized for bag-like access patterns (frequent additions, occasional reads - * of all the values). - * - * @param <T> the type of value in the bag - */ - private static class BagStateTag<T> extends StateTagBase<Object, BagState<T>> - implements StateTag<Object, BagState<T>>{ - - private final Coder<T> elemCoder; - - private BagStateTag(StructuredId id, Coder<T> elemCoder) { - super(id); - this.elemCoder = elemCoder; - } - - @Override - public BagState<T> bind(StateBinder<?> visitor) { - return visitor.bindBag(this, elemCoder); - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - - if (!(obj instanceof BagStateTag)) { - return false; - } - - BagStateTag<?> that = (BagStateTag<?>) obj; - return Objects.equals(this.id, that.id) - && Objects.equals(this.elemCoder, that.elemCoder); - } - - @Override - public int hashCode() { - return Objects.hash(getClass(), id, elemCoder); - } - - @Override - public StateTag<Object, BagState<T>> asKind(StateKind kind) { - return new BagStateTag<>(id.asKind(kind), elemCoder); - } - } - - private static class WatermarkStateTagInternal<W extends BoundedWindow> - extends StateTagBase<Object, WatermarkHoldState<W>> { - - /** - * When multiple output times are added to hold the watermark, this determines how they are - * combined, and also the behavior when merging windows. Does not contribute to equality/hash - * since we have at most one watermark hold tag per computation. - */ - private final OutputTimeFn<? super W> outputTimeFn; - - private WatermarkStateTagInternal(StructuredId id, OutputTimeFn<? super W> outputTimeFn) { - super(id); - this.outputTimeFn = outputTimeFn; - } - - @Override - public WatermarkHoldState<W> bind(StateBinder<?> visitor) { - return visitor.bindWatermark(this, outputTimeFn); - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - - if (!(obj instanceof WatermarkStateTagInternal)) { - return false; - } - - WatermarkStateTagInternal<?> that = (WatermarkStateTagInternal<?>) obj; - return Objects.equals(this.id, that.id); - } - - @Override - public int hashCode() { - return Objects.hash(getClass(), id); - } - - @Override - public StateTag<Object, WatermarkHoldState<W>> asKind(StateKind kind) { - return new WatermarkStateTagInternal<W>(id.asKind(kind), outputTimeFn); + return Objects.hash(getClass(), this.getId(), this.getSpec()); } } }