Allow setting timer by ID in DirectTimerInternals
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7f14c463 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7f14c463 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7f14c463 Branch: refs/heads/python-sdk Commit: 7f14c463acd2ae5b86ac81a9528ac4aa7dff765f Parents: 4d71924 Author: Kenneth Knowles <k...@google.com> Authored: Wed Dec 7 20:18:44 2016 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Wed Dec 21 13:45:37 2016 -0800 ---------------------------------------------------------------------- .../runners/direct/DirectTimerInternals.java | 2 +- .../beam/runners/direct/WatermarkManager.java | 25 ++++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f14c463/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java index 5ca276d..80e0721 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java @@ -49,7 +49,7 @@ class DirectTimerInternals implements TimerInternals { @Override public void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) { - throw new UnsupportedOperationException("Setting timer by ID not yet supported."); + timerUpdateBuilder.setTimer(TimerData.of(timerId, namespace, target, timeDomain)); } @Deprecated http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f14c463/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java index 7bed751..f7bafd1 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java @@ -23,11 +23,13 @@ import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.collect.ComparisonChain; +import com.google.common.collect.HashBasedTable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Ordering; import com.google.common.collect.SortedMultiset; +import com.google.common.collect.Table; import com.google.common.collect.TreeMultiset; import java.io.Serializable; import java.util.ArrayList; @@ -56,6 +58,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TaggedPValue; import org.joda.time.Instant; @@ -210,6 +213,10 @@ public class WatermarkManager { private final SortedMultiset<CommittedBundle<?>> pendingElements; private final Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers; + // Entries in this table represent the authoritative timestamp for which + // a per-key-and-StateNamespace timer is set. + private final Map<StructuralKey<?>, Table<StateNamespace, String, TimerData>> existingTimers; + private AtomicReference<Instant> currentWatermark; public AppliedPTransformInputWatermark(Collection<? extends Watermark> inputWatermarks) { @@ -222,6 +229,7 @@ public class WatermarkManager { this.pendingElements = TreeMultiset.create(pendingBundleComparator); this.objectTimers = new HashMap<>(); + this.existingTimers = new HashMap<>(); currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE); } @@ -276,14 +284,31 @@ public class WatermarkManager { keyTimers = new TreeSet<>(); objectTimers.put(update.key, keyTimers); } + Table<StateNamespace, String, TimerData> existingTimersForKey = + existingTimers.get(update.key); + if (existingTimersForKey == null) { + existingTimersForKey = HashBasedTable.create(); + existingTimers.put(update.key, existingTimersForKey); + } + for (TimerData timer : update.setTimers) { if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) { + @Nullable + TimerData existingTimer = + existingTimersForKey.get(timer.getNamespace(), timer.getTimerId()); + + if (existingTimer != null) { + keyTimers.remove(existingTimer); + } keyTimers.add(timer); + existingTimersForKey.put(timer.getNamespace(), timer.getTimerId(), timer); } } + for (TimerData timer : update.deletedTimers) { if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) { keyTimers.remove(timer); + existingTimersForKey.remove(timer.getNamespace(), timer.getTimerId()); } } // We don't keep references to timers that have been fired and delivered via #getFiredTimers()