http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java index 1a4ed8f..f7e1f36 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java @@ -16,85 +16,360 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.samza.operators.windows; +import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.triggers.TimeTrigger; +import org.apache.samza.operators.triggers.Trigger; +import org.apache.samza.operators.triggers.Triggers; +import org.apache.samza.operators.windows.internal.WindowInternal; +import java.time.Duration; import java.util.Collection; +import java.util.function.BiFunction; import java.util.function.Function; - /** - * This class defines a collection of {@link Window} functions. The public classes and methods here are intended to be - * used by the user (i.e. programmers) to create {@link Window} function directly. + * APIs for creating different types of {@link Window}s. + * + * Groups the incoming {@link MessageEnvelope}s in the {@link org.apache.samza.operators.MessageStream} into finite windows for processing. + * + * <p> Each window is uniquely identified by its {@link WindowKey}. A window can have one or more associated {@link Trigger}s + * that determine when results from the {@link Window} are emitted. Each emitted result contains one or more + * {@link MessageEnvelope}s in the window and is called a {@link WindowPane}. + * + * <p> A window can have early triggers that allow emitting {@link WindowPane}s speculatively before all data for the window + * has arrived or late triggers that allow handling of late data arrivals. + * + * window wk1 + * +--------------------------------+ + * ------------+--------+-----------+ + * | | | | + * | pane 1 |pane2 | pane3 | + * +-----------+--------+-----------+ + * + ----------------------------------- + *incoming message stream ------+ + ----------------------------------- + * window wk2 + * +---------------------+---------+ + * | pane 1| pane 2 | pane 3 | + * | | | | + * +---------+-----------+---------+ + * + * window wk3 + * +----------+-----------+---------+ + * | | | | + * | pane 1 | pane 2 | pane 3| + * | | | | + * +----------+-----------+---------+ + * + * + * <p> A {@link Window} can be one of the following types: + * <ul> + * <li> + * Tumbling Windows: A tumbling window defines a series of non-overlapping, fixed size, contiguous intervals. + * <li> + * Session Windows: A session window groups a {@link org.apache.samza.operators.MessageStream} into sessions. + * A <i>session</i> captures some period of activity over a {@link org.apache.samza.operators.MessageStream}. + * The boundary for a session is defined by a {@code sessionGap}. All {@link MessageEnvelope}s that that arrive within + * the gap are grouped into the same session. + * <li> + * Global Windows: A global window defines a single infinite window over the entire {@link org.apache.samza.operators.MessageStream}. + * An early trigger must be specified when defining a global window. + * </ul> + * + * <p> A {@link Window} is defined as "keyed" when the incoming {@link MessageEnvelope}s are first grouped based on their key + * and triggers are fired and window panes are emitted per-key. It is possible to construct "keyed" variants of all the above window + * types. * */ +@InterfaceStability.Unstable public final class Windows { + private Windows() { } + /** - * private constructor to prevent instantiation + * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s into fixed-size, non-overlapping processing + * time based windows based on the provided keyFn and applies the provided fold function to them. + * + * <p>The below example computes the maximum value per-key over fixed size 10 second windows. + * + * <pre> {@code + * MessageStream<UserClick> stream = ...; + * Function<UserClick, String> keyFn = ...; + * BiFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseInt(m), c); + * MessageStream<WindowOutput<WindowKey<String>, Integer>> windowedStream = stream.window( + * Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10), maxAggregator)); + * } + * </pre> + * + * @param keyFn the function to extract the window key from a {@link MessageEnvelope} + * @param interval the duration in processing time + * @param foldFn the function to aggregate {@link MessageEnvelope}s in the {@link WindowPane} + * @param <M> the type of the input {@link MessageEnvelope} + * @param <WV> the type of the {@link WindowPane} output value + * @param <K> the type of the key in the {@link Window} + * @return the created {@link Window} function. */ - private Windows() {} - - static <M extends MessageEnvelope, WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> WindowFn<M, WK, WS, WM> getInternalWindowFn( - Window<M, WK, WV, WM> window) { - if (window instanceof SessionWindow) { - SessionWindow<M, WK, WV> sessionWindow = (SessionWindow<M, WK, WV>) window; - return (WindowFn<M, WK, WS, WM>) sessionWindow.getInternalWindowFn(); - } - throw new IllegalArgumentException("Input window type not supported."); + public static <M extends MessageEnvelope, K, WV> Window<M, K, WV, WindowPane<K, WV>> + keyedTumblingWindow(Function<M, K> keyFn, Duration interval, BiFunction<M, WV, WV> foldFn) { + + Trigger defaultTrigger = new TimeTrigger(interval); + return new WindowInternal<M, K, WV>(defaultTrigger, foldFn, keyFn, null); } + /** - * Public static API methods start here + * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s into fixed-size, non-overlapping + * processing time based windows using the provided keyFn. * + * <p>The below example groups the stream into fixed-size 10 second windows for each key. + * + * <pre> {@code + * MessageStream<UserClick> stream = ...; + * Function<UserClick, String> keyFn = ...; + * MessageStream<WindowOutput<WindowKey<String>, Collection<UserClick>>> windowedStream = stream.window( + * Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10))); + * } + * </pre> + * + * @param keyFn function to extract key from the {@link MessageEnvelope} + * @param interval the duration in processing time + * @param <M> the type of the input {@link MessageEnvelope} + * @param <K> the type of the key in the {@link Window} + * @return the created {@link Window} function */ + public static <M extends MessageEnvelope, K> Window<M, K, Collection<M>, WindowPane<K, Collection<M>>> + keyedTumblingWindow(Function<M, K> keyFn, Duration interval) { + BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> { + c.add(m); + return c; + }; + return keyedTumblingWindow(keyFn, interval, aggregator); + } /** - * Static API method to create a {@link SessionWindow} in which the output value is simply the collection of input {@link MessageEnvelope}s + * Creates a {@link Window} that windows values into fixed-size processing time based windows and aggregates + * them applying the provided function. + * + * <p>The below example computes the maximum value per-key over fixed size 10 second windows. * - * @param sessionKeyFunction function to calculate session window key - * @param <M> type of input {@link MessageEnvelope} - * @param <WK> type of the session window key - * @return the {@link Window} function for the session + * <pre> {@code + * MessageStream<String> stream = ...; + * BiFunction<String, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseInt(m), c); + * MessageStream<WindowOutput<WindowKey, Integer>> windowedStream = stream.window( + * Windows.tumblingWindow(Duration.ofSeconds(10), maxAggregator)); + * } + * </pre> + * + * @param duration the duration in processing time + * @param foldFn to aggregate {@link MessageEnvelope}s in the {@link WindowPane} + * @param <M> the type of the input {@link MessageEnvelope} + * @param <WV> the type of the {@link WindowPane} output value + * @return the created {@link Window} function */ - public static <M extends MessageEnvelope, WK> Window<M, WK, Collection<M>, WindowOutput<WK, Collection<M>>> intoSessions(Function<M, WK> sessionKeyFunction) { - return new SessionWindow<>(sessionKeyFunction, (m, c) -> { - c.add(m); - return c; - } - ); + public static <M extends MessageEnvelope, WV> Window<M, Void, WV, WindowPane<Void, WV>> + tumblingWindow(Duration duration, BiFunction<M, WV, WV> foldFn) { + Trigger defaultTrigger = Triggers.repeat(new TimeTrigger(duration)); + return new WindowInternal<M, Void, WV>(defaultTrigger, foldFn, null, null); } /** - * Static API method to create a {@link SessionWindow} in which the output value is a collection of {@code SI} from the input {@link MessageEnvelope}s - * - * @param sessionKeyFunction function to calculate session window key - * @param sessionInfoExtractor function to retrieve session info of type {@code SI} from the input {@link MessageEnvelope} of type {@code M} - * @param <M> type of the input {@link MessageEnvelope} - * @param <WK> type of the session window key - * @param <SI> type of the session information retrieved from each input {@link MessageEnvelope} of type {@code M} - * @return the {@link Window} function for the session + * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s into fixed-size, non-overlapping + * processing time based windows. + * + * <p>The below example groups the stream into fixed-size 10 second windows and computes a windowed-percentile. + * + * <pre> {@code + * MessageStream<Long> stream = ...; + * Function<Collection<Long, Long>> percentile99 = .. + * + * MessageStream<WindowOutput<WindowKey, Collection<Long>>> windowedStream = integerStream.window(Windows.tumblingWindow(Duration.ofSeconds(10))); + * MessageStream<Long> windowedPercentiles = windowed.map(windowedOutput -> percentile99(windowedOutput.getMessage()); + * } + * </pre> + * + * @param duration the duration in processing time + * @param <M> the type of the input {@link MessageEnvelope} + * @return the created {@link Window} function */ - public static <M extends MessageEnvelope, WK, SI> Window<M, WK, Collection<SI>, WindowOutput<WK, Collection<SI>>> intoSessions(Function<M, WK> sessionKeyFunction, - Function<M, SI> sessionInfoExtractor) { - return new SessionWindow<>(sessionKeyFunction, (m, c) -> { - c.add(sessionInfoExtractor.apply(m)); - return c; - } - ); + public static <M extends MessageEnvelope> Window<M, Void, Collection<M>, WindowPane<Void, Collection<M>>> tumblingWindow(Duration duration) { + BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> { + c.add(m); + return c; + }; + return tumblingWindow(duration, aggregator); } /** - * Static API method to create a {@link SessionWindow} as a counter of input {@link MessageEnvelope}s + * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s into sessions per-key based on the provided {@code sessionGap} + * and applies the provided fold function to them. + * + * <p>A <i>session</i> captures some period of activity over a {@link org.apache.samza.operators.MessageStream}. + * A session is considered complete when no new messages arrive within the {@code sessionGap}. All {@link MessageEnvelope}s that arrive within + * the gap are grouped into the same session. + * + * <p>The below example computes the maximum value per-key over a session window of gap 10 seconds. + * + * <pre> {@code + * MessageStream<UserClick> stream = ...; + * BiFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseInt(m), c); + * Function<UserClick, String> userIdExtractor = m -> m.getUserId()..; + * MessageStream<WindowOutput<WindowKey<String>, Integer>> windowedStream = stream.window( + * Windows.keyedSessionWindow(userIdExtractor, Duration.minute(1), maxAggregator)); + * } + * </pre> * - * @param sessionKeyFunction function to calculate session window key - * @param <M> type of the input {@link MessageEnvelope} - * @param <WK> type of the session window key - * @return the {@link Window} function for the session + * @param keyFn the function to extract the window key from a {@link MessageEnvelope} + * @param sessionGap the timeout gap for defining the session + * @param foldFn the function to aggregate {@link MessageEnvelope}s in the {@link WindowPane} + * @param <M> the type of the input {@link MessageEnvelope} + * @param <K> the type of the key in the {@link Window} + * @param <WV> the type of the output value in the {@link WindowPane} + * @return the created {@link Window} function */ - public static <M extends MessageEnvelope, WK> Window<M, WK, Integer, WindowOutput<WK, Integer>> intoSessionCounter(Function<M, WK> sessionKeyFunction) { - return new SessionWindow<>(sessionKeyFunction, (m, c) -> c + 1); + public static <M extends MessageEnvelope, K, WV> Window<M, K, WV, WindowPane<K, WV>> keyedSessionWindow(Function<M, K> keyFn, Duration sessionGap, BiFunction<M, WV, WV> foldFn) { + Trigger defaultTrigger = Triggers.timeSinceLastMessage(sessionGap); + return new WindowInternal<M, K, WV>(defaultTrigger, foldFn, keyFn, null); } + /** + * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s into sessions per-key based on the provided {@code sessionGap}. + * + * <p>A <i>session</i> captures some period of activity over a {@link org.apache.samza.operators.MessageStream}. The + * boundary for the session is defined by a {@code sessionGap}. All {@link MessageEnvelope}s that that arrive within + * the gap are grouped into the same session. + * + * <p>The below example groups the stream into per-key session windows of gap 10 seconds. + * + * <pre> {@code + * MessageStream<UserClick> stream = ...; + * BiFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseIntField(m), c); + * Function<UserClick, String> userIdExtractor = m -> m.getUserId()..; + * MessageStream<WindowOutput<WindowKey<String>, Collection<M>>> windowedStream = stream.window( + * Windows.keyedSessionWindow(userIdExtractor, Duration.ofSeconds(10))); + * } + * </pre> + * + * @param keyFn the function to extract the window key from a {@link MessageEnvelope} + * @param sessionGap the timeout gap for defining the session + * @param <M> the type of the input {@link MessageEnvelope} + * @param <K> the type of the key in the {@link Window} + * @return the created {@link Window} function + */ + public static <M extends MessageEnvelope, K> Window<M, K, Collection<M>, WindowPane<K, Collection<M>>> keyedSessionWindow(Function<M, K> keyFn, Duration sessionGap) { + + BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> { + c.add(m); + return c; + }; + return keyedSessionWindow(keyFn, sessionGap, aggregator); + } + + + /** + * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s into a single global window. This window does not have a + * default trigger. The triggering behavior must be specified by setting an early trigger. + * + * <p>The below example computes the maximum value over a count based window. The window emits {@link WindowPane}s when + * there are either 50 messages in the window pane or when 10 seconds have passed since the first message in the pane. + * + * <pre> {@code + * MessageStream<Long> stream = ...; + * BiFunction<Long, Long, Long> maxAggregator = (m, c)-> Math.max(m, c); + * MessageStream<WindowOutput<WindowKey, Long>> windowedStream = stream.window(Windows.globalWindow(maxAggregator) + * .setEarlyTriggers(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.ofSeconds(10)))))) + * } + * </pre> + * + * @param foldFn the function to aggregate {@link MessageEnvelope}s in the {@link WindowPane} + * @param <M> the type of {@link MessageEnvelope} + * @param <WV> type of the output value in the {@link WindowPane} + * @return the created {@link Window} function. + */ + public static <M extends MessageEnvelope, WV> Window<M, Void, WV, WindowPane<Void, WV>> globalWindow(BiFunction<M, WV, WV> foldFn) { + return new WindowInternal<M, Void, WV>(null, foldFn, null, null); + } + + /** + * Creates a {@link Window} that groups incoming {@link MessageEnvelope}s into a single global window. This window does not have a + * default trigger. The triggering behavior must be specified by setting an early trigger. + * + * The below example groups the stream into count based windows that trigger every 50 messages or every 10 minutes. + * <pre> {@code + * MessageStream<Long> stream = ...; + * MessageStream<WindowOutput<WindowKey, Collection<Long>>> windowedStream = stream.window(Windows.globalWindow() + * .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.ofSeconds(10)))))) + * } + * </pre> + * + * @param <M> the type of {@link MessageEnvelope} + * @return the created {@link Window} function. + */ + public static <M extends MessageEnvelope> Window<M, Void, Collection<M>, WindowPane<Void, Collection<M>>> globalWindow() { + BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> { + c.add(m); + return c; + }; + return globalWindow(aggregator); + } + + /** + * Returns a global {@link Window} that groups incoming {@link MessageEnvelope}s using the provided keyFn. + * The window does not have a default trigger. The triggering behavior must be specified by setting an early + * trigger. + * + * <p> The below example groups the stream into count based windows. The window triggers every 50 messages or every + * 10 minutes. + * + * <pre> {@code + * MessageStream<UserClick> stream = ...; + * BiFunction<UserClick, Long, Long> maxAggregator = (m, c)-> Math.max(parseLongField(m), c); + * Function<UserClick, String> keyFn = ...; + * MessageStream<WindowOutput<WindowKey<String>, Long>> windowedStream = stream.window(Windows.keyedGlobalWindow(keyFn, maxAggregator) + * .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.minutes(10)))))) + * } + * </pre> + * + * @param keyFn the function to extract the window key from a {@link MessageEnvelope} + * @param foldFn the function to aggregate {@link MessageEnvelope}s in the {@link WindowPane} + * @param <M> the type of {@link MessageEnvelope} + * @param <K> type of the key in the {@link Window} + * @param <WV> the type of the output value in the {@link WindowPane} + * @return the created {@link Window} function + */ + public static <M extends MessageEnvelope, K, WV> Window<M, K, WV, WindowPane<K, WV>> keyedGlobalWindow(Function<M, K> keyFn, BiFunction<M, WV, WV> foldFn) { + return new WindowInternal<M, K, WV>(null, foldFn, keyFn, null); + } + + /** + * Returns a global {@link Window} that groups incoming {@link MessageEnvelope}s using the provided keyFn. + * The window does not have a default trigger. The triggering behavior must be specified by setting an early trigger. + * + * <p> The below example groups the stream per-key into count based windows. The window triggers every 50 messages or + * every 10 minutes. + * + * <pre> {@code + * MessageStream<UserClick> stream = ...; + * Function<UserClick, String> keyFn = ...; + * MessageStream<WindowOutput<WindowKey<String>, Collection<UserClick>>> windowedStream = stream.window(Windows.keyedGlobalWindow(keyFn) + * .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.minutes(10)))))) + * } + * </pre> + * + * @param keyFn the function to extract the window key from a {@link MessageEnvelope} + * @param <M> the type of {@link MessageEnvelope} + * @param <K> the type of the key in the {@link Window} + * @return the created {@link Window} function + */ + public static <M extends MessageEnvelope, K> Window<M, K, Collection<M>, WindowPane<K, Collection<M>>> keyedGlobalWindow(Function<M, K> keyFn) { + BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> { + c.add(m); + return c; + }; + return keyedGlobalWindow(keyFn, aggregator); + } }
http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java new file mode 100644 index 0000000..8825867 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.windows.internal; +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.triggers.Trigger; +import org.apache.samza.operators.windows.AccumulationMode; +import org.apache.samza.operators.windows.Window; +import org.apache.samza.operators.windows.WindowPane; + +import java.util.function.BiFunction; +import java.util.function.Function; + +/** + * Internal representation of a {@link Window}. This specifies default, early and late triggers for the {@link Window} + * and whether to accumulate or discard previously emitted panes. + * + * Note: This class is meant to be used internally by Samza, and is not to be instantiated by programmers. + */ +@InterfaceStability.Unstable +public final class WindowInternal<M extends MessageEnvelope, K, WV> implements Window<M, K, WV, WindowPane<K, WV>> { + + private final Trigger defaultTrigger; + + /* + * The function that is applied each time a {@link MessageEnvelope} is added to this window. + */ + private final BiFunction<M, WV, WV> foldFunction; + + /* + * The function that extracts the key from a {@link MessageEnvelope} + */ + private final Function<M, K> keyExtractor; + + /* + * The function that extracts the event time from a {@link MessageEnvelope} + */ + private final Function<M, Long> eventTimeExtractor; + + private Trigger earlyTrigger; + + private Trigger lateTrigger; + + private AccumulationMode mode; + + public WindowInternal(Trigger defaultTrigger, BiFunction<M, WV, WV> foldFunction, Function<M, K> keyExtractor, Function<M, Long> eventTimeExtractor) { + this.foldFunction = foldFunction; + this.eventTimeExtractor = eventTimeExtractor; + this.keyExtractor = keyExtractor; + this.defaultTrigger = defaultTrigger; + } + + @Override + public Window<M, K, WV, WindowPane<K, WV>> setEarlyTrigger(Trigger trigger) { + this.earlyTrigger = trigger; + return this; + } + + @Override + public Window<M, K, WV, WindowPane<K, WV>> setLateTrigger(Trigger trigger) { + this.lateTrigger = trigger; + return this; + } + + @Override + public Window<M, K, WV, WindowPane<K, WV>> setAccumulationMode(AccumulationMode mode) { + this.mode = mode; + return this; + } + + public Trigger getDefaultTrigger() { + return defaultTrigger; + } + + public Trigger getEarlyTrigger() { + return earlyTrigger; + } + + public Trigger getLateTrigger() { + return lateTrigger; + } + + public BiFunction<M, WV, WV> getFoldFunction() { + return foldFunction; + } + + public Function<M, K> getKeyExtractor() { + return keyExtractor; + } + + public Function<M, Long> getEventTimeExtractor() { + return eventTimeExtractor; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/test/java/org/apache/samza/operators/windows/TestTrigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestTrigger.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestTrigger.java deleted file mode 100644 index a91af24..0000000 --- a/samza-api/src/test/java/org/apache/samza/operators/windows/TestTrigger.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.windows; - -import org.apache.samza.operators.data.MessageEnvelope; -import org.junit.Test; - -import java.lang.reflect.Field; -import java.util.concurrent.TimeUnit; -import java.util.function.BiFunction; -import java.util.function.Function; - -import static org.junit.Assert.assertEquals; - - -public class TestTrigger { - - @Test - public void testConstructor() throws Exception { - BiFunction<MessageEnvelope<Object, Object>, WindowState<Integer>, Boolean> earlyTrigger = (m, s) -> s.getOutputValue() > 1000; - BiFunction<MessageEnvelope<Object, Object>, WindowState<Integer>, Boolean> lateTrigger = (m, s) -> s.getOutputValue() > 1000; - Function<WindowState<Integer>, Boolean> timerTrigger = s -> TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + 50000 < System.currentTimeMillis(); - Function<WindowState<Integer>, WindowState<Integer>> earlyTriggerUpdater = s -> { - s.setOutputValue(0); - return s; - }; - Function<WindowState<Integer>, WindowState<Integer>> lateTriggerUpdater = s -> { - s.setOutputValue(1); - return s; - }; - - Trigger<MessageEnvelope<Object, Object>, WindowState<Integer>> trigger = Trigger.createTrigger(timerTrigger, earlyTrigger, lateTrigger, - earlyTriggerUpdater, lateTriggerUpdater); - - Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger"); - Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger"); - Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger"); - Field earlyTriggerUpdaterField = Trigger.class.getDeclaredField("earlyTriggerUpdater"); - Field lateTriggerUpdaterField = Trigger.class.getDeclaredField("lateTriggerUpdater"); - earlyTriggerField.setAccessible(true); - lateTriggerField.setAccessible(true); - timerTriggerField.setAccessible(true); - earlyTriggerUpdaterField.setAccessible(true); - lateTriggerUpdaterField.setAccessible(true); - - assertEquals(earlyTrigger, earlyTriggerField.get(trigger)); - assertEquals(timerTrigger, timerTriggerField.get(trigger)); - assertEquals(lateTrigger, lateTriggerField.get(trigger)); - assertEquals(earlyTriggerUpdater, earlyTriggerUpdaterField.get(trigger)); - assertEquals(lateTriggerUpdater, lateTriggerUpdaterField.get(trigger)); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/test/java/org/apache/samza/operators/windows/TestTriggerBuilder.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestTriggerBuilder.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestTriggerBuilder.java deleted file mode 100644 index 6a9b55d..0000000 --- a/samza-api/src/test/java/org/apache/samza/operators/windows/TestTriggerBuilder.java +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.windows; - -import org.apache.samza.operators.TestMessageEnvelope; -import org.junit.Before; -import org.junit.Test; - -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.function.BiFunction; -import java.util.function.Function; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - - -public class TestTriggerBuilder { - private Field earlyTriggerField; - private Field lateTriggerField; - private Field timerTriggerField; - private Field earlyTriggerUpdater; - private Field lateTriggerUpdater; - - @Before - public void testPrep() throws Exception { - this.earlyTriggerField = TriggerBuilder.class.getDeclaredField("earlyTrigger"); - this.lateTriggerField = TriggerBuilder.class.getDeclaredField("lateTrigger"); - this.timerTriggerField = TriggerBuilder.class.getDeclaredField("timerTrigger"); - this.earlyTriggerUpdater = TriggerBuilder.class.getDeclaredField("earlyTriggerUpdater"); - this.lateTriggerUpdater = TriggerBuilder.class.getDeclaredField("lateTriggerUpdater"); - - this.earlyTriggerField.setAccessible(true); - this.lateTriggerField.setAccessible(true); - this.timerTriggerField.setAccessible(true); - this.earlyTriggerUpdater.setAccessible(true); - this.lateTriggerUpdater.setAccessible(true); - } - - @Test - public void testStaticCreators() throws NoSuchFieldException, IllegalAccessException { - TriggerBuilder<TestMessageEnvelope, Collection<TestMessageEnvelope>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); - BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean> triggerField = - (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder); - WindowState<Collection<TestMessageEnvelope>> mockState = mock(WindowState.class); - when(mockState.getNumberMessages()).thenReturn(200L); - assertFalse(triggerField.apply(null, mockState)); - when(mockState.getNumberMessages()).thenReturn(2000L); - assertTrue(triggerField.apply(null, mockState)); - - Function<TestMessageEnvelope, Boolean> tokenFunc = m -> true; - builder = TriggerBuilder.earlyTriggerOnTokenMsg(tokenFunc); - triggerField = (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder); - TestMessageEnvelope m = mock(TestMessageEnvelope.class); - assertTrue(triggerField.apply(m, mockState)); - - builder = TriggerBuilder.earlyTriggerOnEventTime(tm -> tm.getMessage().getEventTime(), 30000L); - triggerField = (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder); - when(mockState.getEarliestEventTimeNs()).thenReturn(1000000000L); - when(mockState.getLatestEventTimeNs()).thenReturn(20000000000L); - TestMessageEnvelope.MessageType mockInnerMessage; - mockInnerMessage = mock(TestMessageEnvelope.MessageType.class); - when(mockInnerMessage.getEventTime()).thenReturn(19999000000L); - when(m.getMessage()).thenReturn(mockInnerMessage); - assertFalse(triggerField.apply(m, mockState)); - mockInnerMessage = mock(TestMessageEnvelope.MessageType.class); - when(mockInnerMessage.getEventTime()).thenReturn(32000000000L); - when(m.getMessage()).thenReturn(mockInnerMessage); - assertTrue(triggerField.apply(m, mockState)); - mockInnerMessage = mock(TestMessageEnvelope.MessageType.class); - when(m.getMessage()).thenReturn(mockInnerMessage); - when(mockInnerMessage.getEventTime()).thenReturn(1001000000L); - when(mockState.getLatestEventTimeNs()).thenReturn(32000000000L); - assertTrue(triggerField.apply(m, mockState)); - - BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean> mockFunc = mock(BiFunction.class); - builder = TriggerBuilder.earlyTrigger(mockFunc); - triggerField = (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder); - assertEquals(triggerField, mockFunc); - - builder = TriggerBuilder.timeoutSinceFirstMessage(10000L); - Function<WindowState<Collection<TestMessageEnvelope>>, Boolean> timerTrigger = - (Function<WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.timerTriggerField.get(builder); - when(mockState.getFirstMessageTimeNs()).thenReturn(0L); - assertTrue(timerTrigger.apply(mockState)); - // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion - when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L)); - assertFalse(timerTrigger.apply(mockState)); - - builder = TriggerBuilder.timeoutSinceLastMessage(10000L); - timerTrigger = (Function<WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.timerTriggerField.get(builder); - when(mockState.getLastMessageTimeNs()).thenReturn(0L); - assertTrue(timerTrigger.apply(mockState)); - // set the lastMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion - when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000)); - assertFalse(timerTrigger.apply(mockState)); - } - - @Test - public void testAddTimerTriggers() throws IllegalAccessException { - TriggerBuilder<TestMessageEnvelope, Collection<TestMessageEnvelope>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); - builder.addTimeoutSinceFirstMessage(10000L); - // exam that both earlyTrigger and timer triggers are set up - BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean> triggerField = - (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder); - WindowState<Collection<TestMessageEnvelope>> mockState = mock(WindowState.class); - when(mockState.getNumberMessages()).thenReturn(200L); - assertFalse(triggerField.apply(null, mockState)); - // check the timer trigger - Function<WindowState<Collection<TestMessageEnvelope>>, Boolean> timerTrigger = - (Function<WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.timerTriggerField.get(builder); - when(mockState.getFirstMessageTimeNs()).thenReturn(0L); - assertTrue(timerTrigger.apply(mockState)); - // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion - when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L)); - assertFalse(timerTrigger.apply(mockState)); - - // exam that both early trigger and timer triggers are set up - builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); - triggerField = (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder); - mockState = mock(WindowState.class); - when(mockState.getNumberMessages()).thenReturn(200L); - assertFalse(triggerField.apply(null, mockState)); - builder.addTimeoutSinceLastMessage(20000L); - // check the timer trigger - timerTrigger = (Function<WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.timerTriggerField.get(builder); - when(mockState.getLastMessageTimeNs()).thenReturn(0L); - assertTrue(timerTrigger.apply(mockState)); - // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion - when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L)); - assertFalse(timerTrigger.apply(mockState)); - } - - @Test - public void testAddLateTriggers() throws IllegalAccessException { - TriggerBuilder<TestMessageEnvelope, Collection<TestMessageEnvelope>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); - builder.addLateTriggerOnSizeLimit(10000L); - // exam that both earlyTrigger and lateTriggers are set up - BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean> earlyTrigger = - (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder); - WindowState<Collection<TestMessageEnvelope>> mockState = mock(WindowState.class); - when(mockState.getNumberMessages()).thenReturn(200L); - assertFalse(earlyTrigger.apply(null, mockState)); - // check the late trigger - BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean> lateTrigger = - (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.lateTriggerField.get(builder); - assertFalse(lateTrigger.apply(null, mockState)); - // set the number of messages to 10001 to trigger the late trigger - when(mockState.getNumberMessages()).thenReturn(10001L); - assertTrue(lateTrigger.apply(null, mockState)); - - builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); - builder.addLateTrigger((m, s) -> s.getOutputValue().size() > 0); - // exam that both earlyTrigger and lateTriggers are set up - earlyTrigger = (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder); - mockState = mock(WindowState.class); - when(mockState.getNumberMessages()).thenReturn(200L); - assertFalse(earlyTrigger.apply(null, mockState)); - // exam the lateTrigger - when(mockState.getOutputValue()).thenReturn(new ArrayList<>()); - lateTrigger = (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.lateTriggerField.get(builder); - assertFalse(lateTrigger.apply(null, mockState)); - List<TestMessageEnvelope> mockList = mock(ArrayList.class); - when(mockList.size()).thenReturn(200); - when(mockState.getOutputValue()).thenReturn(mockList); - assertTrue(lateTrigger.apply(null, mockState)); - } - - @Test - public void testAddTriggerUpdater() throws IllegalAccessException { - TriggerBuilder<TestMessageEnvelope, Collection<TestMessageEnvelope>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000); - builder.onEarlyTrigger(c -> { - c.clear(); - return c; - }); - List<TestMessageEnvelope> collection = new ArrayList<TestMessageEnvelope>() { { - for (int i = 0; i < 10; i++) { - this.add(new TestMessageEnvelope(String.format("key-%d", i), "string-value", System.nanoTime())); - } - } }; - // exam that earlyTriggerUpdater is set up - Function<WindowState<Collection<TestMessageEnvelope>>, WindowState<Collection<TestMessageEnvelope>>> earlyTriggerUpdater = - (Function<WindowState<Collection<TestMessageEnvelope>>, WindowState<Collection<TestMessageEnvelope>>>) this.earlyTriggerUpdater.get(builder); - WindowState<Collection<TestMessageEnvelope>> mockState = mock(WindowState.class); - when(mockState.getOutputValue()).thenReturn(collection); - earlyTriggerUpdater.apply(mockState); - assertTrue(collection.isEmpty()); - - collection.add(new TestMessageEnvelope("key-to-stay", "string-to-stay", System.nanoTime())); - collection.add(new TestMessageEnvelope("key-to-remove", "string-to-remove", System.nanoTime())); - builder.onLateTrigger(c -> { - c.removeIf(t -> t.getKey().equals("key-to-remove")); - return c; - }); - // check the late trigger updater - Function<WindowState<Collection<TestMessageEnvelope>>, WindowState<Collection<TestMessageEnvelope>>> lateTriggerUpdater = - (Function<WindowState<Collection<TestMessageEnvelope>>, WindowState<Collection<TestMessageEnvelope>>>) this.lateTriggerUpdater.get(builder); - when(mockState.getOutputValue()).thenReturn(collection); - lateTriggerUpdater.apply(mockState); - assertTrue(collection.size() == 1); - assertFalse(collection.get(0).isDelete()); - assertEquals(collection.get(0).getKey(), "key-to-stay"); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java index 7f81fc9..9679e1d 100644 --- a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java +++ b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java @@ -25,11 +25,10 @@ import static org.junit.Assert.assertFalse; public class TestWindowOutput { - @Test public void testConstructor() { - WindowOutput<String, Integer> wndOutput = WindowOutput.of("testMsg", 10); - assertEquals(wndOutput.getKey(), "testMsg"); + WindowPane<String, Integer> wndOutput = WindowPane.of(new WindowKey("testMsg", null), 10); + assertEquals(wndOutput.getKey().getKey(), "testMsg"); assertEquals(wndOutput.getMessage(), Integer.valueOf(10)); assertFalse(wndOutput.isDelete()); } http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindows.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindows.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindows.java deleted file mode 100644 index 26af26e..0000000 --- a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindows.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.windows; - -import org.apache.samza.operators.TestMessageEnvelope; -import org.junit.Test; - -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Collection; -import java.util.function.BiFunction; -import java.util.function.Function; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - - -public class TestWindows { - - @Test - public void testSessionWindows() throws NoSuchFieldException, IllegalAccessException { - // test constructing the default session window - Window<TestMessageEnvelope, String, Collection<TestMessageEnvelope>, WindowOutput<String, Collection<TestMessageEnvelope>>> testWnd = Windows - .intoSessions( - TestMessageEnvelope::getKey); - assertTrue(testWnd instanceof SessionWindow); - Field wndKeyFuncField = SessionWindow.class.getDeclaredField("wndKeyFunction"); - Field aggregatorField = SessionWindow.class.getDeclaredField("aggregator"); - wndKeyFuncField.setAccessible(true); - aggregatorField.setAccessible(true); - Function<TestMessageEnvelope, String> wndKeyFunc = (Function<TestMessageEnvelope, String>) wndKeyFuncField.get(testWnd); - assertEquals(wndKeyFunc.apply(new TestMessageEnvelope("test-key", "test-value", 0)), "test-key"); - BiFunction<TestMessageEnvelope, Collection<TestMessageEnvelope>, Collection<TestMessageEnvelope>> aggrFunc = - (BiFunction<TestMessageEnvelope, Collection<TestMessageEnvelope>, Collection<TestMessageEnvelope>>) aggregatorField.get(testWnd); - TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class); - Collection<TestMessageEnvelope> collection = aggrFunc.apply(mockMsg, new ArrayList<>()); - assertTrue(collection.size() == 1); - assertTrue(collection.contains(mockMsg)); - - // test constructing the session window w/ customized session info - Window<TestMessageEnvelope, String, Collection<Character>, WindowOutput<String, Collection<Character>>> testWnd2 = Windows.intoSessions( - m -> String.format("key-%d", m.getMessage().getEventTime()), m -> m.getMessage().getValue().charAt(0)); - assertTrue(testWnd2 instanceof SessionWindow); - wndKeyFunc = (Function<TestMessageEnvelope, String>) wndKeyFuncField.get(testWnd2); - aggrFunc = (BiFunction<TestMessageEnvelope, Collection<TestMessageEnvelope>, Collection<TestMessageEnvelope>>) aggregatorField.get(testWnd2); - assertEquals(wndKeyFunc.apply(new TestMessageEnvelope("test-key", "test-value", 0)), "key-0"); - TestMessageEnvelope.MessageType mockInnerMessage = mock(TestMessageEnvelope.MessageType.class); - when(mockMsg.getMessage()).thenReturn(mockInnerMessage); - when(mockInnerMessage.getValue()).thenReturn("x-001"); - collection = aggrFunc.apply(mockMsg, new ArrayList<>()); - assertTrue(collection.size() == 1); - assertTrue(collection.contains('x')); - - // test constructing session window w/ a default counter - Window<TestMessageEnvelope, String, Integer, WindowOutput<String, Integer>> testCounter = Windows.intoSessionCounter( - m -> String.format("key-%d", m.getMessage().getEventTime())); - assertTrue(testCounter instanceof SessionWindow); - wndKeyFunc = (Function<TestMessageEnvelope, String>) wndKeyFuncField.get(testCounter); - BiFunction<TestMessageEnvelope, Integer, Integer> counterFn = (BiFunction<TestMessageEnvelope, Integer, Integer>) aggregatorField.get(testCounter); - when(mockMsg.getMessage().getEventTime()).thenReturn(12345L); - assertEquals(wndKeyFunc.apply(mockMsg), "key-12345"); - assertEquals(counterFn.apply(mockMsg, 1), Integer.valueOf(2)); - } - - @Test - public void testSetTriggers() throws NoSuchFieldException, IllegalAccessException { - Window<TestMessageEnvelope, String, Integer, WindowOutput<String, Integer>> testCounter = Windows.intoSessionCounter( - m -> String.format("key-%d", m.getMessage().getEventTime())); - // test session window w/ a trigger - TriggerBuilder<TestMessageEnvelope, Integer> triggerBuilder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000L); - testCounter.setTriggers(triggerBuilder); - Trigger<TestMessageEnvelope, WindowState<Integer>> expectedTrigger = triggerBuilder.build(); - Trigger<TestMessageEnvelope, WindowState<Integer>> actualTrigger = Windows.getInternalWindowFn(testCounter).getTrigger(); - // examine all trigger fields are expected - Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger"); - Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger"); - Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger"); - Field earlyTriggerUpdater = Trigger.class.getDeclaredField("earlyTriggerUpdater"); - Field lateTriggerUpdater = Trigger.class.getDeclaredField("lateTriggerUpdater"); - earlyTriggerField.setAccessible(true); - lateTriggerField.setAccessible(true); - timerTriggerField.setAccessible(true); - earlyTriggerUpdater.setAccessible(true); - lateTriggerUpdater.setAccessible(true); - assertEquals(earlyTriggerField.get(expectedTrigger), earlyTriggerField.get(actualTrigger)); - assertEquals(lateTriggerField.get(expectedTrigger), lateTriggerField.get(actualTrigger)); - assertEquals(timerTriggerField.get(expectedTrigger), timerTriggerField.get(actualTrigger)); - assertEquals(earlyTriggerUpdater.get(expectedTrigger), earlyTriggerUpdater.get(actualTrigger)); - assertEquals(lateTriggerUpdater.get(expectedTrigger), lateTriggerUpdater.get(actualTrigger)); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java index 16f1563..5c5ea65 100644 --- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java +++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java @@ -314,7 +314,6 @@ public class AsyncRunLoop implements Runnable, Throttleable { private final TaskCallbackManager callbackManager; private volatile AsyncTaskState state; - AsyncTaskWorker(TaskInstance<AsyncStreamTask> task) { this.task = task; this.callbackManager = new TaskCallbackManager(this, callbackTimer, callbackTimeoutMs, maxConcurrency, clock); @@ -571,6 +570,7 @@ public class AsyncRunLoop implements Runnable, Throttleable { private volatile boolean windowOrCommitInFlight = false; private final AtomicInteger messagesInFlight = new AtomicInteger(0); private final ArrayDeque<PendingEnvelope> pendingEnvelopeQueue; + //Set of SSPs that we are currently processing for this task instance private final Set<SystemStreamPartition> processingSspSet; private final TaskName taskName; @@ -583,7 +583,6 @@ public class AsyncRunLoop implements Runnable, Throttleable { this.processingSspSet = sspSet; } - private boolean checkEndOfStream() { if (pendingEnvelopeQueue.size() == 1) { PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek(); @@ -608,7 +607,6 @@ public class AsyncRunLoop implements Runnable, Throttleable { if (coordinatorRequests.commitRequests().remove(taskName)) { needCommit = true; } - if (needWindow || needCommit || endOfStream) { // ready for window or commit only when no messages are in progress and // no window/commit in flight http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java index 231d3f5..286893c 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java @@ -28,9 +28,8 @@ import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.OperatorSpecs; import org.apache.samza.operators.windows.Window; -import org.apache.samza.operators.windows.WindowFn; -import org.apache.samza.operators.windows.WindowOutput; -import org.apache.samza.operators.windows.WindowState; +import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.operators.windows.internal.WindowInternal; import java.util.ArrayList; import java.util.Collection; @@ -55,7 +54,7 @@ public class MessageStreamImpl<M extends MessageEnvelope> implements MessageStre @Override public <OM extends MessageEnvelope> MessageStream<OM> map(MapFunction<M, OM> mapFn) { - OperatorSpec<OM> op = OperatorSpecs.<M, OM>createStreamOperator(m -> new ArrayList<OM>() { { + OperatorSpec<OM> op = OperatorSpecs.<M, OM>createStreamOperatorSpec(m -> new ArrayList<OM>() { { OM r = mapFn.apply(m); if (r != null) { this.add(r); @@ -67,14 +66,14 @@ public class MessageStreamImpl<M extends MessageEnvelope> implements MessageStre @Override public <OM extends MessageEnvelope> MessageStream<OM> flatMap(FlatMapFunction<M, OM> flatMapFn) { - OperatorSpec<OM> op = OperatorSpecs.createStreamOperator(flatMapFn); + OperatorSpec<OM> op = OperatorSpecs.createStreamOperatorSpec(flatMapFn); this.registeredOperatorSpecs.add(op); return op.getOutputStream(); } @Override public MessageStream<M> filter(FilterFunction<M> filterFn) { - OperatorSpec<M> op = OperatorSpecs.<M, M>createStreamOperator(t -> new ArrayList<M>() { { + OperatorSpec<M> op = OperatorSpecs.<M, M>createStreamOperatorSpec(t -> new ArrayList<M>() { { if (filterFn.apply(t)) { this.add(t); } @@ -85,13 +84,13 @@ public class MessageStreamImpl<M extends MessageEnvelope> implements MessageStre @Override public void sink(SinkFunction<M> sinkFn) { - this.registeredOperatorSpecs.add(OperatorSpecs.createSinkOperator(sinkFn)); + this.registeredOperatorSpecs.add(OperatorSpecs.createSinkOperatorSpec(sinkFn)); } @Override - public <WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> MessageStream<WM> window( - Window<M, WK, WV, WM> window) { - OperatorSpec<WM> wndOp = OperatorSpecs.createWindowOperator((WindowFn<M, WK, WS, WM>) window.getInternalWindowFn()); + public <K, WV, WM extends WindowPane<K, WV>> MessageStream<WM> window( + Window<M, K, WV, WM> window) { + OperatorSpec<WM> wndOp = OperatorSpecs.createWindowOperatorSpec((WindowInternal<MessageEnvelope, K, WV>) window); this.registeredOperatorSpecs.add(wndOp); return wndOp.getOutputStream(); } @@ -106,9 +105,8 @@ public class MessageStreamImpl<M extends MessageEnvelope> implements MessageStre // TODO: need to add default store functions for the two partial join functions - ((MessageStreamImpl<JM>) otherStream).registeredOperatorSpecs.add( - OperatorSpecs.createPartialJoinOperator(parJoin2, outputStream)); - this.registeredOperatorSpecs.add(OperatorSpecs.createPartialJoinOperator(parJoin1, outputStream)); + ((MessageStreamImpl<JM>) otherStream).registeredOperatorSpecs.add(OperatorSpecs.createPartialJoinOperatorSpec(parJoin2, outputStream)); + this.registeredOperatorSpecs.add(OperatorSpecs.createPartialJoinOperatorSpec(parJoin1, outputStream)); return outputStream; } @@ -118,7 +116,7 @@ public class MessageStreamImpl<M extends MessageEnvelope> implements MessageStre otherStreams.add(this); otherStreams.forEach(other -> - ((MessageStreamImpl<M>) other).registeredOperatorSpecs.add(OperatorSpecs.createMergeOperator(outputStream))); + ((MessageStreamImpl<M>) other).registeredOperatorSpecs.add(OperatorSpecs.createMergeOperatorSpec(outputStream))); return outputStream; } http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-operator/src/main/java/org/apache/samza/operators/StateStoreImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/StateStoreImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/StateStoreImpl.java deleted file mode 100644 index 2572f14..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/StateStoreImpl.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators; - -import org.apache.samza.operators.data.MessageEnvelope; -import org.apache.samza.operators.windows.StoreFunctions; -import org.apache.samza.storage.kv.Entry; -import org.apache.samza.storage.kv.KeyValueStore; -import org.apache.samza.task.TaskContext; - - -/** - * The base class for all state stores - */ -public class StateStoreImpl<M extends MessageEnvelope, SK, SS> { - private final String storeName; - private final StoreFunctions<M, SK, SS> storeFunctions; - private KeyValueStore<SK, SS> kvStore = null; - - public StateStoreImpl(StoreFunctions<M, SK, SS> store, String storeName) { - this.storeFunctions = store; - this.storeName = storeName; - } - - public void init(TaskContext context) { - this.kvStore = (KeyValueStore<SK, SS>) context.getStore(this.storeName); - } - - public Entry<SK, SS> getState(M m) { - SK key = this.storeFunctions.getStoreKeyFn().apply(m); - SS state = this.kvStore.get(key); - return new Entry<>(key, state); - } - - public Entry<SK, SS> updateState(M m, Entry<SK, SS> oldEntry) { - SS newValue = this.storeFunctions.getStateUpdaterFn().apply(m, oldEntry.getValue()); - this.kvStore.put(oldEntry.getKey(), newValue); - return new Entry<>(oldEntry.getKey(), newValue); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpls.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpls.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpls.java index 79446be..02095cb 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpls.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpls.java @@ -27,8 +27,7 @@ import org.apache.samza.operators.spec.PartialJoinOperatorSpec; import org.apache.samza.operators.spec.SinkOperatorSpec; import org.apache.samza.operators.spec.StreamOperatorSpec; import org.apache.samza.operators.spec.WindowOperatorSpec; -import org.apache.samza.operators.windows.WindowOutput; -import org.apache.samza.operators.windows.WindowState; +import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.task.TaskContext; import java.util.Collection; @@ -115,7 +114,7 @@ public class OperatorImpls { } else if (operatorSpec instanceof SinkOperatorSpec) { return new SinkOperatorImpl<>((SinkOperatorSpec<M>) operatorSpec); } else if (operatorSpec instanceof WindowOperatorSpec) { - return new SessionWindowOperatorImpl<>((WindowOperatorSpec<M, ?, ? extends WindowState, ? extends WindowOutput>) operatorSpec); + return new WindowOperatorImpl<>((WindowOperatorSpec<M, ?, ?, ?, ? extends WindowPane>) operatorSpec); } else if (operatorSpec instanceof PartialJoinOperatorSpec) { return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec); } http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java deleted file mode 100644 index e8a635c..0000000 --- a/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.impl; - -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.StateStoreImpl; -import org.apache.samza.operators.data.MessageEnvelope; -import org.apache.samza.operators.spec.WindowOperatorSpec; -import org.apache.samza.operators.windows.WindowState; -import org.apache.samza.operators.windows.WindowOutput; -import org.apache.samza.storage.kv.Entry; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; -import org.apache.samza.task.TaskCoordinator; - - -/** - * Default implementation class of a {@link WindowOperatorSpec} for a session window. - * - * @param <M> the type of input {@link MessageEnvelope} - * @param <RK> the type of window key - * @param <WS> the type of window state - * @param <RM> the type of aggregated value of the window - */ -class SessionWindowOperatorImpl<M extends MessageEnvelope, RK, WS extends WindowState, RM extends WindowOutput<RK, ?>> - extends OperatorImpl<M, RM> { - - private final WindowOperatorSpec<M, RK, WS, RM> windowSpec; - private StateStoreImpl<M, RK, WS> stateStore = null; - - SessionWindowOperatorImpl(WindowOperatorSpec<M, RK, WS, RM> windowSpec) { - this.windowSpec = windowSpec; - } - - @Override - public void init(MessageStream<M> source, TaskContext context) { - this.stateStore = new StateStoreImpl<>(this.windowSpec.getStoreFns(), windowSpec.getStoreName(source)); - this.stateStore.init(context); - } - - @Override - public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { - Entry<RK, WS> state = this.stateStore.getState(message); - this.propagateResult(this.windowSpec.getTransformFn().apply(message, state), collector, coordinator); - this.stateStore.updateState(message, state); - } - - public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { - // This is to periodically check the timeout triggers to get the list of window states to be updated - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java new file mode 100644 index 0000000..a5b71a7 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.spec.WindowOperatorSpec; +import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.operators.windows.internal.WindowInternal; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; + +public class WindowOperatorImpl<M extends MessageEnvelope, K, WK, WV, WM extends WindowPane<WK, WV>> extends OperatorImpl<M, WM> { + + private final WindowInternal<M, K, WV> window; + + public WindowOperatorImpl(WindowOperatorSpec spec) { + window = spec.getWindow(); + } + + @Override + public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { + + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java index f622b34..fc25929 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java @@ -23,9 +23,8 @@ import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.operators.functions.FlatMapFunction; import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.operators.windows.WindowState; -import org.apache.samza.operators.windows.WindowFn; -import org.apache.samza.operators.windows.WindowOutput; +import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.operators.windows.internal.WindowInternal; import java.util.ArrayList; import java.util.UUID; @@ -52,7 +51,7 @@ public class OperatorSpecs { * @param <OM> type of output {@link MessageEnvelope} * @return the {@link StreamOperatorSpec} */ - public static <M extends MessageEnvelope, OM extends MessageEnvelope> StreamOperatorSpec<M, OM> createStreamOperator( + public static <M extends MessageEnvelope, OM extends MessageEnvelope> StreamOperatorSpec<M, OM> createStreamOperatorSpec( FlatMapFunction<M, OM> transformFn) { return new StreamOperatorSpec<>(transformFn); } @@ -64,23 +63,25 @@ public class OperatorSpecs { * @param <M> type of input {@link MessageEnvelope} * @return the {@link SinkOperatorSpec} */ - public static <M extends MessageEnvelope> SinkOperatorSpec<M> createSinkOperator(SinkFunction<M> sinkFn) { + public static <M extends MessageEnvelope> SinkOperatorSpec<M> createSinkOperatorSpec(SinkFunction<M> sinkFn) { return new SinkOperatorSpec<>(sinkFn); } /** * Creates a {@link WindowOperatorSpec}. * - * @param windowFn the {@link WindowFn} function - * @param <M> type of input {@link MessageEnvelope} - * @param <WK> type of window key - * @param <WS> type of {@link WindowState} - * @param <WM> type of output {@link WindowOutput} + * @param window the description of the window. + * @param <M> the type of input {@link MessageEnvelope} + * @param <K> the type of key in the {@link MessageEnvelope} in this {@link org.apache.samza.operators.MessageStream}. If a key is specified, + * results are emitted per-key + * @param <WK> the type of key in the {@link WindowPane} + * @param <WV> the type of value in the window + * @param <WM> the type of output {@link WindowPane} * @return the {@link WindowOperatorSpec} */ - public static <M extends MessageEnvelope, WK, WS extends WindowState, WM extends WindowOutput<WK, ?>> WindowOperatorSpec<M, WK, WS, WM> createWindowOperator( - WindowFn<M, WK, WS, WM> windowFn) { - return new WindowOperatorSpec<>(windowFn, OperatorSpecs.getOperatorId()); + + public static <M extends MessageEnvelope, K, WK, WV, WM extends WindowPane<WK, WV>> WindowOperatorSpec<M, K, WK, WV, WM> createWindowOperatorSpec(WindowInternal<M, K, WV> window) { + return new WindowOperatorSpec<>(window, OperatorSpecs.getOperatorId()); } /** @@ -94,7 +95,7 @@ public class OperatorSpecs { * @param <OM> the type of {@link MessageEnvelope} in the join output * @return the {@link PartialJoinOperatorSpec} */ - public static <M extends MessageEnvelope<K, ?>, K, JM extends MessageEnvelope<K, ?>, OM extends MessageEnvelope> PartialJoinOperatorSpec<M, K, JM, OM> createPartialJoinOperator( + public static <M extends MessageEnvelope<K, ?>, K, JM extends MessageEnvelope<K, ?>, OM extends MessageEnvelope> PartialJoinOperatorSpec<M, K, JM, OM> createPartialJoinOperatorSpec( BiFunction<M, JM, OM> partialJoinFn, MessageStreamImpl<OM> joinOutput) { return new PartialJoinOperatorSpec<>(partialJoinFn, joinOutput, OperatorSpecs.getOperatorId()); } @@ -106,7 +107,7 @@ public class OperatorSpecs { * @param <M> the type of input {@link MessageEnvelope} * @return the {@link StreamOperatorSpec} for the merge */ - public static <M extends MessageEnvelope> StreamOperatorSpec<M, M> createMergeOperator(MessageStreamImpl<M> mergeOutput) { + public static <M extends MessageEnvelope> StreamOperatorSpec<M, M> createMergeOperatorSpec(MessageStreamImpl<M> mergeOutput) { return new StreamOperatorSpec<M, M>(t -> new ArrayList<M>() { { this.add(t); http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java index f74f35d..e6d77f6 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/PartialJoinOperatorSpec.java @@ -20,7 +20,6 @@ package org.apache.samza.operators.spec; import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.operators.windows.StoreFunctions; import java.util.function.BiFunction; @@ -46,17 +45,6 @@ public class PartialJoinOperatorSpec<M extends MessageEnvelope<K, ?>, K, JM exte */ private final BiFunction<M, JM, RM> transformFn; - /** - * The {@link MessageEnvelope} store functions that read the buffered {@link MessageEnvelope}s from the other - * stream in the join. - */ - private final StoreFunctions<JM, K, JM> joinStoreFns; - - /** - * The {@link MessageEnvelope} store functions that save the buffered {@link MessageEnvelope} of this - * {@link MessageStreamImpl} in the join. - */ - private final StoreFunctions<M, K, M> selfStoreFns; /** * The unique ID for this operator. @@ -73,10 +61,6 @@ public class PartialJoinOperatorSpec<M extends MessageEnvelope<K, ?>, K, JM exte PartialJoinOperatorSpec(BiFunction<M, JM, RM> partialJoinFn, MessageStreamImpl<RM> joinOutput, String operatorId) { this.joinOutput = joinOutput; this.transformFn = partialJoinFn; - // Read-only join store, no creator/updater functions required. - this.joinStoreFns = new StoreFunctions<>(m -> m.getKey(), null); - // Buffered message envelope store for this input stream. - this.selfStoreFns = new StoreFunctions<>(m -> m.getKey(), (m, s1) -> m); this.operatorId = operatorId; } @@ -90,14 +74,6 @@ public class PartialJoinOperatorSpec<M extends MessageEnvelope<K, ?>, K, JM exte return this.joinOutput; } - public StoreFunctions<JM, K, JM> getJoinStoreFns() { - return this.joinStoreFns; - } - - public StoreFunctions<M, K, M> getSelfStoreFns() { - return this.selfStoreFns; - } - public BiFunction<M, JM, RM> getTransformFn() { return this.transformFn; } http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java index 2f5b1e7..cdc02a8 100644 --- a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java +++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java @@ -16,104 +16,40 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.samza.operators.spec; import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.operators.MessageStreamImpl; -import org.apache.samza.operators.windows.StoreFunctions; -import org.apache.samza.operators.windows.Trigger; -import org.apache.samza.operators.windows.WindowFn; -import org.apache.samza.operators.windows.WindowOutput; -import org.apache.samza.operators.windows.WindowState; -import org.apache.samza.storage.kv.Entry; +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.operators.windows.internal.WindowInternal; -import java.util.function.BiFunction; +public class WindowOperatorSpec<M extends MessageEnvelope, K, WK, WV, WM extends WindowPane<WK, WV>> implements OperatorSpec<WM> { + private final WindowInternal window; -/** - * Defines a window operator that takes one {@link MessageStreamImpl} as an input, accumulates the window state, - * and generates an output {@link MessageStreamImpl} with output type {@code WM} which extends {@link WindowOutput} - * - * @param <M> the type of input {@link MessageEnvelope} - * @param <WK> the type of key in the output {@link MessageEnvelope} from the {@link WindowOperatorSpec} function - * @param <WS> the type of window state in the {@link WindowOperatorSpec} function - * @param <WM> the type of window output {@link MessageEnvelope} - */ -public class WindowOperatorSpec<M extends MessageEnvelope, WK, WS extends WindowState, WM extends WindowOutput<WK, ?>> implements - OperatorSpec<WM> { - - /** - * The output {@link MessageStream}. - */ private final MessageStreamImpl<WM> outputStream; - /** - * The window transformation function that takes {@link MessageEnvelope}s from one input stream, aggregates with the window - * state(s) from the window state store, and generate output {@link MessageEnvelope}s for the output stream. - */ - private final BiFunction<M, Entry<WK, WS>, WM> transformFn; - - /** - * The state store functions for the {@link WindowOperatorSpec}. - */ - private final StoreFunctions<M, WK, WS> storeFns; - - /** - * The window trigger. - */ - private final Trigger<M, WS> trigger; - - /** - * The unique ID of this operator. - */ private final String operatorId; - /** - * Constructor for {@link WindowOperatorSpec}. - * - * @param windowFn the window function - * @param operatorId auto-generated unique ID of this operator - */ - WindowOperatorSpec(WindowFn<M, WK, WS, WM> windowFn, String operatorId) { + + public WindowOperatorSpec(WindowInternal window, String operatorId) { + this.window = window; this.outputStream = new MessageStreamImpl<>(); - this.transformFn = windowFn.getTransformFn(); - this.storeFns = windowFn.getStoreFns(); - this.trigger = windowFn.getTrigger(); this.operatorId = operatorId; } @Override - public String toString() { - return this.operatorId; - } - - @Override - public MessageStreamImpl<WM> getOutputStream() { + public MessageStream<WM> getOutputStream() { return this.outputStream; } - public StoreFunctions<M, WK, WS> getStoreFns() { - return this.storeFns; - } - - public BiFunction<M, Entry<WK, WS>, WM> getTransformFn() { - return this.transformFn; - } - - public Trigger<M, WS> getTrigger() { - return this.trigger; + public WindowInternal getWindow() { + return window; } - /** - * Method to generate the window operator's state store name - * TODO HIGH pmaheshw: should this be here? - * - * @param inputStream the input {@link MessageStreamImpl} to this state store - * @return the persistent store name of the window operator - */ - public String getStoreName(MessageStream<M> inputStream) { - //TODO: need to get the persistent name of ds and the operator in a serialized form - return String.format("input-%s-wndop-%s", inputStream.toString(), this.toString()); + public String getOperatorId() { + return operatorId; } } http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java new file mode 100644 index 0000000..e9af043 --- /dev/null +++ b/samza-operator/src/main/java/org/apache/samza/operators/spec/WindowState.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.spec; + +import org.apache.samza.annotation.InterfaceStability; + + +/** + * This interface defines the methods a window state class has to implement. The programmers are allowed to implement + * customized window state to be stored in window state stores by implementing this interface class. + * + * @param <WV> the type for window output value + */ +@InterfaceStability.Unstable +public interface WindowState<WV> { + /** + * Method to get the system time when the first {@link org.apache.samza.operators.data.MessageEnvelope} + * in the window is received + * + * @return nano-second of system time for the first {@link org.apache.samza.operators.data.MessageEnvelope} + * received in the window + */ + long getFirstMessageTimeNs(); + + /** + * Method to get the system time when the last {@link org.apache.samza.operators.data.MessageEnvelope} + * in the window is received + * + * @return nano-second of system time for the last {@link org.apache.samza.operators.data.MessageEnvelope} + * received in the window + */ + long getLastMessageTimeNs(); + + /** + * Method to get the earliest event time in the window + * + * @return the earliest event time in nano-second in the window + */ + long getEarliestEventTimeNs(); + + /** + * Method to get the latest event time in the window + * + * @return the latest event time in nano-second in the window + */ + long getLatestEventTimeNs(); + + /** + * Method to get the total number of {@link org.apache.samza.operators.data.MessageEnvelope}s received in the window + * + * @return number of {@link org.apache.samza.operators.data.MessageEnvelope}s in the window + */ + long getNumberMessages(); + + /** + * Method to get the corresponding window's output value + * + * @return the corresponding window's output value + */ + WV getOutputValue(); + + /** + * Method to set the corresponding window's output value + * + * @param value the corresponding window's output value + */ + void setOutputValue(WV value); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/6dc33a85/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java ---------------------------------------------------------------------- diff --git a/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java b/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java index e45d068..663d98c 100644 --- a/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java +++ b/samza-operator/src/test/java/org/apache/samza/operators/BroadcastTask.java @@ -23,12 +23,13 @@ package org.apache.samza.operators; import org.apache.samza.operators.data.IncomingSystemMessageEnvelope; import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; import org.apache.samza.operators.data.Offset; -import org.apache.samza.operators.windows.TriggerBuilder; +import org.apache.samza.operators.triggers.Triggers; import org.apache.samza.operators.windows.Windows; import org.apache.samza.system.SystemStreamPartition; -import java.util.Collection; +import java.time.Duration; import java.util.Map; +import java.util.function.BiFunction; /** @@ -57,27 +58,21 @@ public class BroadcastTask implements StreamOperatorTask { @Override public void transform(Map<SystemStreamPartition, MessageStream<IncomingSystemMessageEnvelope>> messageStreams) { + BiFunction<JsonMessageEnvelope, Integer, Integer> sumAggregator = (m, c) -> c + 1; messageStreams.values().forEach(entry -> { MessageStream<JsonMessageEnvelope> inputStream = entry.map(this::getInputMessage); - inputStream.filter(this::myFilter1). - window(Windows.<JsonMessageEnvelope, String>intoSessionCounter( - m -> String.format("%s-%s", m.getMessage().field1, m.getMessage().field2)). - setTriggers(TriggerBuilder.<JsonMessageEnvelope, Integer>earlyTriggerWhenExceedWndLen(100). - addLateTriggerOnSizeLimit(10). - addTimeoutSinceLastMessage(30000))); - - inputStream.filter(this::myFilter2). - window(Windows.<JsonMessageEnvelope, String>intoSessions( - m -> String.format("%s-%s", m.getMessage().field3, m.getMessage().field4)). - setTriggers(TriggerBuilder.<JsonMessageEnvelope, Collection<JsonMessageEnvelope>>earlyTriggerWhenExceedWndLen(100). - addTimeoutSinceLastMessage(30000))); - - inputStream.filter(this::myFilter3). - window(Windows.<JsonMessageEnvelope, String, MessageType>intoSessions( - m -> String.format("%s-%s", m.getMessage().field3, m.getMessage().field4), m -> m.getMessage()). - setTriggers(TriggerBuilder.<JsonMessageEnvelope, Collection<MessageType>>earlyTriggerOnEventTime(m -> m.getMessage().getTimestamp(), 30000). - addTimeoutSinceFirstMessage(60000))); + inputStream.filter(this::myFilter1) + .window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator) + .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); + + inputStream.filter(this::myFilter1) + .window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator) + .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); + + inputStream.filter(this::myFilter1) + .window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator) + .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); }); }