Move StateMerging to runners/core-java

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9c0047f7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9c0047f7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9c0047f7

Branch: refs/heads/master
Commit: 9c0047f74635319e28813dd992d5aa3923dac713
Parents: 459a8f8
Author: Kenneth Knowles <k...@google.com>
Authored: Fri Feb 3 19:40:46 2017 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Tue Feb 7 11:42:54 2017 -0800

----------------------------------------------------------------------
 .../utils/ApexStateInternalsTest.java           |   2 +-
 .../apache/beam/runners/core/NonEmptyPanes.java |   1 -
 .../apache/beam/runners/core/StateMerging.java  | 267 +++++++++++++++++++
 .../beam/runners/core/SystemReduceFn.java       |   1 -
 .../apache/beam/runners/core/WatermarkHold.java |   1 -
 .../AfterDelayFromFirstElementStateMachine.java |   2 +-
 .../core/triggers/AfterPaneStateMachine.java    |   2 +-
 .../core/InMemoryStateInternalsTest.java        |   1 -
 .../streaming/FlinkStateInternalsTest.java      |   2 +-
 .../src/main/resources/beam/findbugs-filter.xml |   2 +-
 .../beam/sdk/util/state/StateMerging.java       | 259 ------------------
 11 files changed, 272 insertions(+), 268 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9c0047f7/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
index 75f648b..d6a4515 100644
--- 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertThat;
 
 import com.datatorrent.lib.util.KryoCloneUtils;
 import java.util.Arrays;
+import org.apache.beam.runners.core.StateMerging;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaceForTest;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -36,7 +37,6 @@ import 
org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.StateMerging;
 import org.apache.beam.sdk.util.state.StateTag;
 import org.apache.beam.sdk.util.state.StateTags;
 import org.apache.beam.sdk.util.state.ValueState;

http://git-wip-us.apache.org/repos/asf/beam/blob/9c0047f7/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
index 0a6fd93..4d67c66 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
@@ -26,7 +26,6 @@ import 
org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.MergingStateAccessor;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateMerging;
 import org.apache.beam.sdk.util.state.StateTag;
 import org.apache.beam.sdk.util.state.StateTags;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9c0047f7/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
new file mode 100644
index 0000000..c85458c
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.joda.time.Instant;
+
+/**
+ * Helpers for merging state.
+ */
+public class StateMerging {
+  /**
+   * Clear all state in {@code address} in all windows under merge (even 
result windows)
+   * in {@code context}.
+   */
+  public static <K, StateT extends State, W extends BoundedWindow> void clear(
+      MergingStateAccessor<K, W> context, StateTag<? super K, StateT> address) 
{
+    for (StateT state : context.accessInEachMergingWindow(address).values()) {
+      state.clear();
+    }
+  }
+
+  /**
+   * Prefetch all bag state in {@code address} across all windows under merge 
in
+   * {@code context}, except for the bag state in the final state address 
window which we can
+   * blindly append to.
+   */
+  public static <K, T, W extends BoundedWindow> void prefetchBags(
+      MergingStateAccessor<K, W> context, StateTag<? super K, BagState<T>> 
address) {
+    Map<W, BagState<T>> map = context.accessInEachMergingWindow(address);
+    if (map.isEmpty()) {
+      // Nothing to prefetch.
+      return;
+    }
+    BagState<T> result = context.access(address);
+    // Prefetch everything except what's already in result.
+    for (BagState<T> source : map.values()) {
+      if (!source.equals(result)) {
+        prefetchRead(source);
+      }
+    }
+  }
+
+  /**
+   * Merge all bag state in {@code address} across all windows under merge.
+   */
+  public static <K, T, W extends BoundedWindow> void mergeBags(
+      MergingStateAccessor<K, W> context, StateTag<? super K, BagState<T>> 
address) {
+    mergeBags(context.accessInEachMergingWindow(address).values(), 
context.access(address));
+  }
+
+  /**
+   * Merge all bag state in {@code sources} (which may include {@code result}) 
into {@code result}.
+   */
+  public static <T, W extends BoundedWindow> void mergeBags(
+      Collection<BagState<T>> sources, BagState<T> result) {
+    if (sources.isEmpty()) {
+      // Nothing to merge.
+      return;
+    }
+    // Prefetch everything except what's already in result.
+    List<ReadableState<Iterable<T>>> futures = new ArrayList<>(sources.size());
+    for (BagState<T> source : sources) {
+      if (!source.equals(result)) {
+        prefetchRead(source);
+        futures.add(source);
+      }
+    }
+    if (futures.isEmpty()) {
+      // Result already holds all the values.
+      return;
+    }
+    // Transfer from sources to result.
+    for (ReadableState<Iterable<T>> future : futures) {
+      for (T element : future.read()) {
+        result.add(element);
+      }
+    }
+    // Clear sources except for result.
+    for (BagState<T> source : sources) {
+      if (!source.equals(result)) {
+        source.clear();
+      }
+    }
+  }
+
+  /**
+   * Prefetch all combining value state for {@code address} across all merging 
windows in {@code
+   * context}.
+   */
+  public static <K, StateT extends CombiningState<?, ?>, W extends 
BoundedWindow> void
+      prefetchCombiningValues(MergingStateAccessor<K, W> context,
+          StateTag<? super K, StateT> address) {
+    for (StateT state : context.accessInEachMergingWindow(address).values()) {
+      prefetchRead(state);
+    }
+  }
+
+  /**
+   * Merge all value state in {@code address} across all merging windows in 
{@code context}.
+   */
+  public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> void 
mergeCombiningValues(
+      MergingStateAccessor<K, W> context,
+      StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> 
address) {
+    mergeCombiningValues(
+        context.accessInEachMergingWindow(address).values(), 
context.access(address));
+  }
+
+  /**
+   * Merge all value state from {@code sources} (which may include {@code 
result}) into
+   * {@code result}.
+   */
+  public static <InputT, AccumT, OutputT, W extends BoundedWindow> void 
mergeCombiningValues(
+      Collection<AccumulatorCombiningState<InputT, AccumT, OutputT>> sources,
+      AccumulatorCombiningState<InputT, AccumT, OutputT> result) {
+    if (sources.isEmpty()) {
+      // Nothing to merge.
+      return;
+    }
+    if (sources.size() == 1 && sources.contains(result)) {
+      // Result already holds combined value.
+      return;
+    }
+    // Prefetch.
+    List<ReadableState<AccumT>> futures = new ArrayList<>(sources.size());
+    for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) {
+      prefetchRead(source);
+    }
+    // Read.
+    List<AccumT> accumulators = new ArrayList<>(futures.size());
+    for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) {
+      accumulators.add(source.getAccum());
+    }
+    // Merge (possibly update and return one of the existing accumulators).
+    AccumT merged = result.mergeAccumulators(accumulators);
+    // Clear sources.
+    for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) {
+      source.clear();
+    }
+    // Update result.
+    result.addAccum(merged);
+  }
+
+  /**
+   * Prefetch all watermark state for {@code address} across all merging 
windows in
+   * {@code context}.
+   */
+  public static <K, W extends BoundedWindow> void prefetchWatermarks(
+      MergingStateAccessor<K, W> context,
+      StateTag<? super K, WatermarkHoldState<W>> address) {
+    Map<W, WatermarkHoldState<W>> map = 
context.accessInEachMergingWindow(address);
+    WatermarkHoldState<W> result = context.access(address);
+    if (map.isEmpty()) {
+      // Nothing to prefetch.
+      return;
+    }
+    if (map.size() == 1 && map.values().contains(result)
+        && result.getOutputTimeFn().dependsOnlyOnEarliestInputTimestamp()) {
+      // Nothing to change.
+      return;
+    }
+    if (result.getOutputTimeFn().dependsOnlyOnWindow()) {
+      // No need to read existing holds.
+      return;
+    }
+    // Prefetch.
+    for (WatermarkHoldState<W> source : map.values()) {
+      prefetchRead(source);
+    }
+  }
+
+  private static void prefetchRead(ReadableState<?> source) {
+    source.readLater();
+  }
+
+  /**
+   * Merge all watermark state in {@code address} across all merging windows 
in {@code context},
+   * where the final merge result window is {@code mergeResult}.
+   */
+  public static <K, W extends BoundedWindow> void mergeWatermarks(
+      MergingStateAccessor<K, W> context,
+      StateTag<? super K, WatermarkHoldState<W>> address,
+      W mergeResult) {
+    mergeWatermarks(
+        context.accessInEachMergingWindow(address).values(), 
context.access(address), mergeResult);
+  }
+
+  /**
+   * Merge all watermark state in {@code sources} (which must include {@code 
result} if non-empty)
+   * into {@code result}, where the final merge result window is {@code 
mergeResult}.
+   */
+  public static <W extends BoundedWindow> void mergeWatermarks(
+      Collection<WatermarkHoldState<W>> sources, WatermarkHoldState<W> result,
+      W resultWindow) {
+    if (sources.isEmpty()) {
+      // Nothing to merge.
+      return;
+    }
+    if (sources.size() == 1 && sources.contains(result)
+        && result.getOutputTimeFn().dependsOnlyOnEarliestInputTimestamp()) {
+      // Nothing to merge.
+      return;
+    }
+    if (result.getOutputTimeFn().dependsOnlyOnWindow()) {
+      // Clear sources.
+      for (WatermarkHoldState<W> source : sources) {
+        source.clear();
+      }
+      // Update directly from window-derived hold.
+      Instant hold = result.getOutputTimeFn().assignOutputTime(
+          BoundedWindow.TIMESTAMP_MIN_VALUE, resultWindow);
+      checkState(hold.isAfter(BoundedWindow.TIMESTAMP_MIN_VALUE));
+      result.add(hold);
+    } else {
+      // Prefetch.
+      List<ReadableState<Instant>> futures = new ArrayList<>(sources.size());
+      for (WatermarkHoldState<W> source : sources) {
+        futures.add(source);
+      }
+      // Read.
+      List<Instant> outputTimesToMerge = new ArrayList<>(sources.size());
+      for (ReadableState<Instant> future : futures) {
+        Instant sourceOutputTime = future.read();
+        if (sourceOutputTime != null) {
+          outputTimesToMerge.add(sourceOutputTime);
+        }
+      }
+      // Clear sources.
+      for (WatermarkHoldState<W> source : sources) {
+        source.clear();
+      }
+      if (!outputTimesToMerge.isEmpty()) {
+        // Merge and update.
+        result.add(result.getOutputTimeFn().merge(resultWindow, 
outputTimesToMerge));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/9c0047f7/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
index 6c12bad..4c05949 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
@@ -31,7 +31,6 @@ import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MergingStateAccessor;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateMerging;
 import org.apache.beam.sdk.util.state.StateTag;
 import org.apache.beam.sdk.util.state.StateTags;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9c0047f7/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
index a7968db..727d275 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -31,7 +31,6 @@ import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.MergingStateAccessor;
 import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.StateMerging;
 import org.apache.beam.sdk.util.state.StateTag;
 import org.apache.beam.sdk.util.state.StateTags;
 import org.apache.beam.sdk.util.state.WatermarkHoldState;

http://git-wip-us.apache.org/repos/asf/beam/blob/9c0047f7/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
index b60c690..cc4a8ae 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Objects;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.StateMerging;
 import 
org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.InstantCoder;
@@ -33,7 +34,6 @@ import 
org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.MergingStateAccessor;
 import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateMerging;
 import org.apache.beam.sdk.util.state.StateTag;
 import org.apache.beam.sdk.util.state.StateTags;
 import org.joda.time.Duration;

http://git-wip-us.apache.org/repos/asf/beam/blob/9c0047f7/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
index d8ad370..b2798aa 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.core.triggers;
 
 import java.util.Objects;
+import org.apache.beam.runners.core.StateMerging;
 import 
org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.VarLongCoder;
@@ -25,7 +26,6 @@ import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.MergingStateAccessor;
 import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateMerging;
 import org.apache.beam.sdk.util.state.StateTag;
 import org.apache.beam.sdk.util.state.StateTags;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9c0047f7/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
index 8bcd177..ca0a8e5 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
@@ -32,7 +32,6 @@ import 
org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.StateMerging;
 import org.apache.beam.sdk.util.state.StateTag;
 import org.apache.beam.sdk.util.state.StateTags;
 import org.apache.beam.sdk.util.state.ValueState;

http://git-wip-us.apache.org/repos/asf/beam/blob/9c0047f7/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
index 5784b68..6a086a7 100644
--- 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
+++ 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertThat;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import org.apache.beam.runners.core.StateMerging;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaceForTest;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkStateInternals;
@@ -39,7 +40,6 @@ import 
org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.StateMerging;
 import org.apache.beam.sdk.util.state.StateTag;
 import org.apache.beam.sdk.util.state.StateTags;
 import org.apache.beam.sdk.util.state.ValueState;

http://git-wip-us.apache.org/repos/asf/beam/blob/9c0047f7/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml 
b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
index 91ab9be..24eaec2 100644
--- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml
@@ -189,7 +189,7 @@
   </Match>
 
   <Match>
-    <Class name="org.apache.beam.sdk.util.state.StateMerging"/>
+    <Class name="org.apache.beam.runners.core.StateMerging"/>
     <Method name="prefetchRead" />
     <Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"/>
     <!-- prefetch call readLater -->

http://git-wip-us.apache.org/repos/asf/beam/blob/9c0047f7/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateMerging.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateMerging.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateMerging.java
deleted file mode 100644
index 457b213..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateMerging.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.joda.time.Instant;
-
-/**
- * Helpers for merging state.
- */
-public class StateMerging {
-  /**
-   * Clear all state in {@code address} in all windows under merge (even 
result windows)
-   * in {@code context}.
-   */
-  public static <K, StateT extends State, W extends BoundedWindow> void clear(
-      MergingStateAccessor<K, W> context, StateTag<? super K, StateT> address) 
{
-    for (StateT state : context.accessInEachMergingWindow(address).values()) {
-      state.clear();
-    }
-  }
-
-  /**
-   * Prefetch all bag state in {@code address} across all windows under merge 
in
-   * {@code context}, except for the bag state in the final state address 
window which we can
-   * blindly append to.
-   */
-  public static <K, T, W extends BoundedWindow> void prefetchBags(
-      MergingStateAccessor<K, W> context, StateTag<? super K, BagState<T>> 
address) {
-    Map<W, BagState<T>> map = context.accessInEachMergingWindow(address);
-    if (map.isEmpty()) {
-      // Nothing to prefetch.
-      return;
-    }
-    BagState<T> result = context.access(address);
-    // Prefetch everything except what's already in result.
-    for (BagState<T> source : map.values()) {
-      if (!source.equals(result)) {
-        prefetchRead(source);
-      }
-    }
-  }
-
-  /**
-   * Merge all bag state in {@code address} across all windows under merge.
-   */
-  public static <K, T, W extends BoundedWindow> void mergeBags(
-      MergingStateAccessor<K, W> context, StateTag<? super K, BagState<T>> 
address) {
-    mergeBags(context.accessInEachMergingWindow(address).values(), 
context.access(address));
-  }
-
-  /**
-   * Merge all bag state in {@code sources} (which may include {@code result}) 
into {@code result}.
-   */
-  public static <T, W extends BoundedWindow> void mergeBags(
-      Collection<BagState<T>> sources, BagState<T> result) {
-    if (sources.isEmpty()) {
-      // Nothing to merge.
-      return;
-    }
-    // Prefetch everything except what's already in result.
-    List<ReadableState<Iterable<T>>> futures = new ArrayList<>(sources.size());
-    for (BagState<T> source : sources) {
-      if (!source.equals(result)) {
-        prefetchRead(source);
-        futures.add(source);
-      }
-    }
-    if (futures.isEmpty()) {
-      // Result already holds all the values.
-      return;
-    }
-    // Transfer from sources to result.
-    for (ReadableState<Iterable<T>> future : futures) {
-      for (T element : future.read()) {
-        result.add(element);
-      }
-    }
-    // Clear sources except for result.
-    for (BagState<T> source : sources) {
-      if (!source.equals(result)) {
-        source.clear();
-      }
-    }
-  }
-
-  /**
-   * Prefetch all combining value state for {@code address} across all merging 
windows in {@code
-   * context}.
-   */
-  public static <K, StateT extends CombiningState<?, ?>, W extends 
BoundedWindow> void
-      prefetchCombiningValues(MergingStateAccessor<K, W> context,
-          StateTag<? super K, StateT> address) {
-    for (StateT state : context.accessInEachMergingWindow(address).values()) {
-      prefetchRead(state);
-    }
-  }
-
-  /**
-   * Merge all value state in {@code address} across all merging windows in 
{@code context}.
-   */
-  public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> void 
mergeCombiningValues(
-      MergingStateAccessor<K, W> context,
-      StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> 
address) {
-    mergeCombiningValues(
-        context.accessInEachMergingWindow(address).values(), 
context.access(address));
-  }
-
-  /**
-   * Merge all value state from {@code sources} (which may include {@code 
result}) into
-   * {@code result}.
-   */
-  public static <InputT, AccumT, OutputT, W extends BoundedWindow> void 
mergeCombiningValues(
-      Collection<AccumulatorCombiningState<InputT, AccumT, OutputT>> sources,
-      AccumulatorCombiningState<InputT, AccumT, OutputT> result) {
-    if (sources.isEmpty()) {
-      // Nothing to merge.
-      return;
-    }
-    if (sources.size() == 1 && sources.contains(result)) {
-      // Result already holds combined value.
-      return;
-    }
-    // Prefetch.
-    List<ReadableState<AccumT>> futures = new ArrayList<>(sources.size());
-    for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) {
-      prefetchRead(source);
-    }
-    // Read.
-    List<AccumT> accumulators = new ArrayList<>(futures.size());
-    for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) {
-      accumulators.add(source.getAccum());
-    }
-    // Merge (possibly update and return one of the existing accumulators).
-    AccumT merged = result.mergeAccumulators(accumulators);
-    // Clear sources.
-    for (AccumulatorCombiningState<InputT, AccumT, OutputT> source : sources) {
-      source.clear();
-    }
-    // Update result.
-    result.addAccum(merged);
-  }
-
-  /**
-   * Prefetch all watermark state for {@code address} across all merging 
windows in
-   * {@code context}.
-   */
-  public static <K, W extends BoundedWindow> void prefetchWatermarks(
-      MergingStateAccessor<K, W> context,
-      StateTag<? super K, WatermarkHoldState<W>> address) {
-    Map<W, WatermarkHoldState<W>> map = 
context.accessInEachMergingWindow(address);
-    WatermarkHoldState<W> result = context.access(address);
-    if (map.isEmpty()) {
-      // Nothing to prefetch.
-      return;
-    }
-    if (map.size() == 1 && map.values().contains(result)
-        && result.getOutputTimeFn().dependsOnlyOnEarliestInputTimestamp()) {
-      // Nothing to change.
-      return;
-    }
-    if (result.getOutputTimeFn().dependsOnlyOnWindow()) {
-      // No need to read existing holds.
-      return;
-    }
-    // Prefetch.
-    for (WatermarkHoldState<W> source : map.values()) {
-      prefetchRead(source);
-    }
-  }
-
-  private static void prefetchRead(ReadableState<?> source) {
-    source.readLater();
-  }
-
-  /**
-   * Merge all watermark state in {@code address} across all merging windows 
in {@code context},
-   * where the final merge result window is {@code mergeResult}.
-   */
-  public static <K, W extends BoundedWindow> void mergeWatermarks(
-      MergingStateAccessor<K, W> context,
-      StateTag<? super K, WatermarkHoldState<W>> address,
-      W mergeResult) {
-    mergeWatermarks(
-        context.accessInEachMergingWindow(address).values(), 
context.access(address), mergeResult);
-  }
-
-  /**
-   * Merge all watermark state in {@code sources} (which must include {@code 
result} if non-empty)
-   * into {@code result}, where the final merge result window is {@code 
mergeResult}.
-   */
-  public static <W extends BoundedWindow> void mergeWatermarks(
-      Collection<WatermarkHoldState<W>> sources, WatermarkHoldState<W> result,
-      W resultWindow) {
-    if (sources.isEmpty()) {
-      // Nothing to merge.
-      return;
-    }
-    if (sources.size() == 1 && sources.contains(result)
-        && result.getOutputTimeFn().dependsOnlyOnEarliestInputTimestamp()) {
-      // Nothing to merge.
-      return;
-    }
-    if (result.getOutputTimeFn().dependsOnlyOnWindow()) {
-      // Clear sources.
-      for (WatermarkHoldState<W> source : sources) {
-        source.clear();
-      }
-      // Update directly from window-derived hold.
-      Instant hold = result.getOutputTimeFn().assignOutputTime(
-          BoundedWindow.TIMESTAMP_MIN_VALUE, resultWindow);
-      checkState(hold.isAfter(BoundedWindow.TIMESTAMP_MIN_VALUE));
-      result.add(hold);
-    } else {
-      // Prefetch.
-      List<ReadableState<Instant>> futures = new ArrayList<>(sources.size());
-      for (WatermarkHoldState<W> source : sources) {
-        futures.add(source);
-      }
-      // Read.
-      List<Instant> outputTimesToMerge = new ArrayList<>(sources.size());
-      for (ReadableState<Instant> future : futures) {
-        Instant sourceOutputTime = future.read();
-        if (sourceOutputTime != null) {
-          outputTimesToMerge.add(sourceOutputTime);
-        }
-      }
-      // Clear sources.
-      for (WatermarkHoldState<W> source : sources) {
-        source.clear();
-      }
-      if (!outputTimesToMerge.isEmpty()) {
-        // Merge and update.
-        result.add(result.getOutputTimeFn().merge(resultWindow, 
outputTimesToMerge));
-      }
-    }
-  }
-}

Reply via email to