Repository: incubator-beam Updated Branches: refs/heads/master b5853a624 -> a1ac2222d
[BEAM-615] Add Support for Processing-Time Timers in FlinkRunner Window Operator Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/59f62318 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/59f62318 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/59f62318 Branch: refs/heads/master Commit: 59f623189184b225723ebd5686d912aa296ce35b Parents: 3879db0 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Wed Sep 28 11:49:54 2016 +0200 Committer: Maximilian Michels <m...@apache.org> Committed: Wed Sep 28 18:46:30 2016 +0200 ---------------------------------------------------------------------- .../wrappers/streaming/WindowDoFnOperator.java | 179 +++++++++++++++++-- 1 file changed, 165 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/59f62318/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index 14a3ca7..e06a783 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -17,6 +17,9 @@ */ package org.apache.beam.runners.flink.translation.wrappers.streaming; + +import com.google.common.collect.HashMultiset; +import com.google.common.collect.Multiset; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -26,12 +29,14 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.PriorityQueue; import java.util.Queue; import java.util.Set; +import java.util.concurrent.ScheduledFuture; import javax.annotation.Nullable; import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn; import org.apache.beam.runners.core.SystemReduceFn; @@ -53,12 +58,15 @@ import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; + + import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.tasks.StreamTaskState; import org.joda.time.Instant; @@ -69,7 +77,8 @@ import org.joda.time.Instant; * @param <OutputT> */ public class WindowDoFnOperator<K, InputT, OutputT> - extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>, WindowedValue<KV<K, OutputT>>> { + extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>, WindowedValue<KV<K, OutputT>>> + implements Triggerable { private final Coder<K> keyCoder; private final TimerInternals.TimerDataCoder timerCoder; @@ -77,6 +86,11 @@ public class WindowDoFnOperator<K, InputT, OutputT> private transient Set<Tuple2<ByteBuffer, TimerInternals.TimerData>> watermarkTimers; private transient Queue<Tuple2<ByteBuffer, TimerInternals.TimerData>> watermarkTimersQueue; + private transient Queue<Tuple2<ByteBuffer, TimerInternals.TimerData>> processingTimeTimersQueue; + private transient Set<Tuple2<ByteBuffer, TimerInternals.TimerData>> processingTimeTimers; + private transient Multiset<Long> processingTimeTimerTimestamps; + private transient Map<Long, ScheduledFuture<?>> processingTimeTimerFutures; + private FlinkStateInternals<K> stateInternals; private final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn; @@ -151,6 +165,24 @@ public class WindowDoFnOperator<K, InputT, OutputT> }); } + if (processingTimeTimers == null) { + processingTimeTimers = new HashSet<>(); + processingTimeTimerTimestamps = HashMultiset.create(); + processingTimeTimersQueue = new PriorityQueue<>( + 10, + new Comparator<Tuple2<ByteBuffer, TimerInternals.TimerData>>() { + @Override + public int compare( + Tuple2<ByteBuffer, TimerInternals.TimerData> o1, + Tuple2<ByteBuffer, TimerInternals.TimerData> o2) { + return o1.f1.compareTo(o2.f1); + } + }); + } + + // ScheduledFutures are not checkpointed + processingTimeTimerFutures = new HashMap<>(); + stateInternals = new FlinkStateInternals<>(getStateBackend(), keyCoder); // call super at the end because this will call getDoFn() which requires stateInternals @@ -177,6 +209,69 @@ public class WindowDoFnOperator<K, InputT, OutputT> if (watermarkTimers.remove(keyedTimer)) { watermarkTimersQueue.remove(keyedTimer); } + } + + private void registerProcessingTimeTimer(TimerInternals.TimerData timer) { + Tuple2<ByteBuffer, TimerInternals.TimerData> keyedTimer = + new Tuple2<>((ByteBuffer) getStateBackend().getCurrentKey(), timer); + if (processingTimeTimers.add(keyedTimer)) { + processingTimeTimersQueue.add(keyedTimer); + + // If this is the first timer added for this timestamp register a timer Task + if (processingTimeTimerTimestamps.add(timer.getTimestamp().getMillis(), 1) == 0) { + ScheduledFuture<?> scheduledFuture = registerTimer(timer.getTimestamp().getMillis(), this); + processingTimeTimerFutures.put(timer.getTimestamp().getMillis(), scheduledFuture); + } + } + } + + private void deleteProcessingTimeTimer(TimerInternals.TimerData timer) { + Tuple2<ByteBuffer, TimerInternals.TimerData> keyedTimer = + new Tuple2<>((ByteBuffer) getStateBackend().getCurrentKey(), timer); + if (processingTimeTimers.remove(keyedTimer)) { + processingTimeTimersQueue.remove(keyedTimer); + + // If there are no timers left for this timestamp, remove it from queue and cancel the + // timer Task + if (processingTimeTimerTimestamps.remove(timer.getTimestamp().getMillis(), 1) == 1) { + ScheduledFuture<?> triggerTaskFuture = + processingTimeTimerFutures.remove(timer.getTimestamp().getMillis()); + if (triggerTaskFuture != null && !triggerTaskFuture.isDone()) { + triggerTaskFuture.cancel(false); + } + } + + } + } + + @Override + public void trigger(long time) throws Exception { + + //Remove information about the triggering task + processingTimeTimerFutures.remove(time); + processingTimeTimerTimestamps.setCount(time, 0); + + boolean fire; + + do { + Tuple2<ByteBuffer, TimerInternals.TimerData> timer = processingTimeTimersQueue.peek(); + if (timer != null && timer.f1.getTimestamp().getMillis() <= time) { + fire = true; + + processingTimeTimersQueue.remove(); + processingTimeTimers.remove(timer); + + setKeyContext(timer.f0); + + pushbackDoFnRunner.processElement(WindowedValue.valueInGlobalWindow( + KeyedWorkItems.<K, InputT>timersWorkItem( + stateInternals.getKey(), + Collections.singletonList(timer.f1)))); + + } else { + fire = false; + } + } while (fire); } @@ -262,20 +357,21 @@ public class WindowDoFnOperator<K, InputT, OutputT> private void restoreTimers(InputStream in) throws IOException { DataInputStream dataIn = new DataInputStream(in); + int numWatermarkTimers = dataIn.readInt(); watermarkTimers = new HashSet<>(numWatermarkTimers); watermarkTimersQueue = new PriorityQueue<>( - Math.max(numWatermarkTimers, 1), - new Comparator<Tuple2<ByteBuffer, TimerInternals.TimerData>>() { - @Override - public int compare( - Tuple2<ByteBuffer, TimerInternals.TimerData> o1, - Tuple2<ByteBuffer, TimerInternals.TimerData> o2) { - return o1.f1.compareTo(o2.f1); - } - }); + Math.max(numWatermarkTimers, 1), + new Comparator<Tuple2<ByteBuffer, TimerInternals.TimerData>>() { + @Override + public int compare( + Tuple2<ByteBuffer, TimerInternals.TimerData> o1, + Tuple2<ByteBuffer, TimerInternals.TimerData> o2) { + return o1.f1.compareTo(o2.f1); + } + }); for (int i = 0; i < numWatermarkTimers; i++) { int length = dataIn.readInt(); @@ -288,6 +384,44 @@ public class WindowDoFnOperator<K, InputT, OutputT> watermarkTimersQueue.add(keyedTimer); } } + + int numProcessingTimeTimers = dataIn.readInt(); + + processingTimeTimers = new HashSet<>(numProcessingTimeTimers); + processingTimeTimersQueue = new PriorityQueue<>( + Math.max(numProcessingTimeTimers, 1), + new Comparator<Tuple2<ByteBuffer, TimerInternals.TimerData>>() { + @Override + public int compare( + Tuple2<ByteBuffer, TimerInternals.TimerData> o1, + Tuple2<ByteBuffer, TimerInternals.TimerData> o2) { + return o1.f1.compareTo(o2.f1); + } + }); + + processingTimeTimerTimestamps = HashMultiset.create(); + processingTimeTimerFutures = new HashMap<>(); + + for (int i = 0; i < numProcessingTimeTimers; i++) { + int length = dataIn.readInt(); + byte[] keyBytes = new byte[length]; + dataIn.readFully(keyBytes); + TimerInternals.TimerData timerData = timerCoder.decode(dataIn, Coder.Context.NESTED); + Tuple2<ByteBuffer, TimerInternals.TimerData> keyedTimer = + new Tuple2<>(ByteBuffer.wrap(keyBytes), timerData); + if (processingTimeTimers.add(keyedTimer)) { + processingTimeTimersQueue.add(keyedTimer); + + //If this is the first timer added for this timestamp register a timer Task + if (processingTimeTimerTimestamps.add(timerData.getTimestamp().getMillis(), 1) == 0) { + // this registers a timer with the Flink processing-time service + ScheduledFuture<?> scheduledFuture = + registerTimer(timerData.getTimestamp().getMillis(), this); + processingTimeTimerFutures.put(timerData.getTimestamp().getMillis(), scheduledFuture); + } + + } + } } private void snapshotTimers(OutputStream out) throws IOException { @@ -298,6 +432,13 @@ public class WindowDoFnOperator<K, InputT, OutputT> dataOut.write(timer.f0.array(), 0, timer.f0.limit()); timerCoder.encode(timer.f1, dataOut, Coder.Context.NESTED); } + + dataOut.writeInt(processingTimeTimersQueue.size()); + for (Tuple2<ByteBuffer, TimerInternals.TimerData> timer : processingTimeTimersQueue) { + dataOut.writeInt(timer.f0.limit()); + dataOut.write(timer.f0.array(), 0, timer.f0.limit()); + timerCoder.encode(timer.f1, dataOut, Coder.Context.NESTED); + } } /** @@ -313,25 +454,35 @@ public class WindowDoFnOperator<K, InputT, OutputT> public void setTimer(TimerData timerKey) { if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) { registerEventTimeTimer(timerKey); + } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) { + registerProcessingTimeTimer(timerKey); } else { - throw new UnsupportedOperationException("Processing-time timers not supported."); + throw new UnsupportedOperationException( + "Unsupported time domain: " + timerKey.getDomain()); } } @Override public void deleteTimer(TimerData timerKey) { - deleteEventTimeTimer(timerKey); + if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) { + deleteEventTimeTimer(timerKey); + } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) { + deleteProcessingTimeTimer(timerKey); + } else { + throw new UnsupportedOperationException( + "Unsupported time domain: " + timerKey.getDomain()); + } } @Override public Instant currentProcessingTime() { - return Instant.now(); + return new Instant(getCurrentProcessingTime()); } @Nullable @Override public Instant currentSynchronizedProcessingTime() { - return Instant.now(); + return new Instant(getCurrentProcessingTime()); } @Override