http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/internal/Operators.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/internal/Operators.java b/samza-api/src/main/java/org/apache/samza/operators/internal/Operators.java deleted file mode 100644 index f06387c..0000000 --- a/samza-api/src/main/java/org/apache/samza/operators/internal/Operators.java +++ /dev/null @@ -1,469 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.operators.internal; - -import org.apache.samza.operators.MessageStream; -import org.apache.samza.operators.WindowState; -import org.apache.samza.operators.data.Message; -import org.apache.samza.storage.kv.Entry; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskCoordinator; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.UUID; -import java.util.function.BiFunction; -import java.util.function.Function; - - -/** - * This class defines all basic stream operator classes used by internal implementation only. All classes defined in - * this file are immutable. - * - * NOTE: Programmers should not use the operators defined in this class directly. All {@link Operator} objects - * should be initiated via {@link MessageStream} API methods - */ -public class Operators { - /** - * Private constructor to prevent instantiation of the {@link Operators} class - */ - private Operators() {} - - private static String getOperatorId() { - // TODO: need to change the IDs to be a consistent, durable IDs that can be recovered across container and job restarts - return UUID.randomUUID().toString(); - } - - /** - * Private interface for stream operator functions. The interface class defines the output of the stream operator function. - * - */ - public interface Operator<OM extends Message> { - MessageStream<OM> getOutputStream(); - } - - /** - * Linear stream operator function that takes 1 input {@link Message} and output a collection of output {@link Message}s. - * - * @param <M> the type of input {@link Message} - * @param <OM> the type of output {@link Message} - */ - public static class StreamOperator<M extends Message, OM extends Message> implements Operator<OM> { - /** - * The output {@link MessageStream} - */ - private final MessageStream<OM> outputStream; - - /** - * The transformation function - */ - private final Function<M, Collection<OM>> txfmFunction; - - /** - * Constructor of {@link StreamOperator}. Make it private s.t. it can only be created within {@link Operators}. - * - * @param transformFn the transformation function to be applied that transforms 1 input {@link Message} into a collection - * of output {@link Message}s - */ - private StreamOperator(Function<M, Collection<OM>> transformFn) { - this(transformFn, new MessageStream<>()); - } - - /** - * Constructor of {@link StreamOperator} which allows the user to define the output {@link MessageStream} - * - * @param transformFn the transformation function - * @param outputStream the output {@link MessageStream} - */ - private StreamOperator(Function<M, Collection<OM>> transformFn, MessageStream<OM> outputStream) { - this.outputStream = outputStream; - this.txfmFunction = transformFn; - } - - @Override - public MessageStream<OM> getOutputStream() { - return this.outputStream; - } - - /** - * Method to get the transformation function. - * - * @return the {@code txfmFunction} - */ - public Function<M, Collection<OM>> getFunction() { - return this.txfmFunction; - } - - } - - /** - * A sink operator function that allows customized code to send the output to external system. This is the terminal - * operator that does not have any output {@link MessageStream} that allows further processing in the same - * {@link org.apache.samza.operators.task.StreamOperatorTask} - * - * @param <M> the type of input {@link Message} - */ - public static class SinkOperator<M extends Message> implements Operator { - - /** - * The user-defined sink function - */ - private final MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> sink; - - /** - * Default constructor for {@link SinkOperator}. Make it private s.t. it can only be created within {@link Operators}. - * - * @param sink the user-defined sink function - */ - private SinkOperator(MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> sink) { - this.sink = sink; - } - - @Override - public MessageStream getOutputStream() { - return null; - } - - /** - * Method to get the user-defined function implements the {@link SinkOperator} - * - * @return a {@link MessageStream.VoidFunction3} function that allows the caller to pass in an input message, {@link MessageCollector} - * and {@link TaskCoordinator} to the sink function - */ - public MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> getFunction() { - return this.sink; - } - } - - /** - * The store functions that are used by {@link WindowOperator} and {@link PartialJoinOperator} to store and retrieve - * buffered messages and partial aggregation results - * - * @param <SK> the type of key used to store the operator states - * @param <SS> the type of operator state. e.g. could be the partial aggregation result for a window, or a buffered - * input message from the join stream for a join - */ - public static class StoreFunctions<M extends Message, SK, SS> { - /** - * Function to define the key to query in the operator state store, according to the incoming {@link Message} - * This method only supports finding the unique key for the incoming message, which supports use case of non-overlapping - * windows and unique-key-based join. - * - * TODO: for windows that overlaps (i.e. sliding windows and hopping windows) and non-unique-key-based join, the query - * to the state store is usually a range scan. We need to add a rangeKeyFinder function to map from a single input - * message to a range of keys in the store. - */ - private final Function<M, SK> storeKeyFinder; - - /** - * Function to update the store entry based on the current state and the incoming {@link Message} - * - * TODO: this is assuming a 1:1 mapping from the input message to the store entry. When implementing sliding/hopping - * windows and non-unique-key-based join, we may need to include the corresponding state key, in addition to the - * state value. - */ - private final BiFunction<M, SS, SS> stateUpdater; - - /** - * Constructor of state store functions. - * - */ - private StoreFunctions(Function<M, SK> keyFinder, - BiFunction<M, SS, SS> stateUpdater) { - this.storeKeyFinder = keyFinder; - this.stateUpdater = stateUpdater; - } - - /** - * Method to get the {@code storeKeyFinder} function - * - * @return the function to calculate the key from an input {@link Message} - */ - public Function<M, SK> getStoreKeyFinder() { - return this.storeKeyFinder; - } - - /** - * Method to get the {@code stateUpdater} function - * - * @return the function to update the corresponding state according to an input {@link Message} - */ - public BiFunction<M, SS, SS> getStateUpdater() { - return this.stateUpdater; - } - } - - /** - * Defines a window operator function that takes one {@link MessageStream} as an input, accumulate the window state, and generate - * an output {@link MessageStream} w/ output type {@code WM} which extends {@link WindowOutput} - * - * @param <M> the type of input {@link Message} - * @param <WK> the type of key in the output {@link Message} from the {@link WindowOperator} function - * @param <WS> the type of window state in the {@link WindowOperator} function - * @param <WM> the type of window output {@link Message} - */ - public static class WindowOperator<M extends Message, WK, WS extends WindowState, WM extends WindowOutput<WK, ?>> implements Operator<WM> { - /** - * The output {@link MessageStream} - */ - private final MessageStream<WM> outputStream; - - /** - * The main window transformation function that takes {@link Message}s from one input stream, aggregates w/ the window - * state(s) from the window state store, and generate output {@link Message}s to the output stream. - */ - private final BiFunction<M, Entry<WK, WS>, WM> txfmFunction; - - /** - * The state store functions for the {@link WindowOperator} - */ - private final StoreFunctions<M, WK, WS> storeFunctions; - - /** - * The window trigger function - */ - private final Trigger<M, WS> trigger; - - /** - * The unique ID of stateful operators - */ - private final String opId; - - /** - * Constructor for {@link WindowOperator}. Make it private s.t. it can only be created within {@link Operators}. - * - * @param windowFn description of the window function - * @param operatorId auto-generated unique ID of the operator - */ - private WindowOperator(WindowFn<M, WK, WS, WM> windowFn, String operatorId) { - this.outputStream = new MessageStream<>(); - this.txfmFunction = windowFn.getTransformFunc(); - this.storeFunctions = windowFn.getStoreFuncs(); - this.trigger = windowFn.getTrigger(); - this.opId = operatorId; - } - - @Override - public String toString() { - return this.opId; - } - - @Override - public MessageStream<WM> getOutputStream() { - return this.outputStream; - } - - /** - * Method to get the window's {@link StoreFunctions}. - * - * @return the window operator's {@code storeFunctions} - */ - public StoreFunctions<M, WK, WS> getStoreFunctions() { - return this.storeFunctions; - } - - /** - * Method to get the window operator's main function - * - * @return the window operator's {@code txfmFunction} - */ - public BiFunction<M, Entry<WK, WS>, WM> getFunction() { - return this.txfmFunction; - } - - /** - * Method to get the trigger functions - * - * @return the {@link Trigger} for this {@link WindowOperator} - */ - public Trigger<M, WS> getTrigger() { - return this.trigger; - } - - /** - * Method to generate the window operator's state store name - * - * @param inputStream the input {@link MessageStream} to this state store - * @return the persistent store name of the window operator - */ - public String getStoreName(MessageStream<M> inputStream) { - //TODO: need to get the persistent name of ds and the operator in a serialized form - return String.format("input-%s-wndop-%s", inputStream.toString(), this.toString()); - } - } - - /** - * The partial join operator that takes {@link Message}s from one input stream and join w/ buffered {@link Message}s from - * another stream and generate join output to {@code output} - * - * @param <M> the type of input {@link Message} - * @param <K> the type of join key - * @param <JM> the type of message of {@link Message} in the other join stream - * @param <RM> the type of message of {@link Message} in the join output stream - */ - public static class PartialJoinOperator<M extends Message<K, ?>, K, JM extends Message<K, ?>, RM extends Message> implements Operator<RM> { - - private final MessageStream<RM> joinOutput; - - /** - * The main transformation function of {@link PartialJoinOperator} that takes a type {@code M} input message, - * join w/ a stream of buffered {@link Message}s from another stream w/ type {@code JM}, and generate joined type {@code RM}. - */ - private final BiFunction<M, JM, RM> txfmFunction; - - /** - * The message store functions that read the buffered messages from the other stream in the join - */ - private final StoreFunctions<JM, K, JM> joinStoreFunctions; - - /** - * The message store functions that save the buffered messages of this {@link MessageStream} in the join - */ - private final StoreFunctions<M, K, M> selfStoreFunctions; - - /** - * The unique ID for the stateful operator - */ - private final String opId; - - /** - * Default constructor to create a {@link PartialJoinOperator} object - * - * @param partialJoin partial join function that take type {@code M} of input {@link Message} and join w/ type - * {@code JM} of buffered {@link Message} from another stream - * @param joinOutput the output {@link MessageStream} of the join results - */ - private PartialJoinOperator(BiFunction<M, JM, RM> partialJoin, MessageStream<RM> joinOutput, String opId) { - this.joinOutput = joinOutput; - this.txfmFunction = partialJoin; - // Read-only join store, no creator/updater functions specified - this.joinStoreFunctions = new StoreFunctions<>(m -> m.getKey(), null); - // Buffered message store for this input stream - this.selfStoreFunctions = new StoreFunctions<>(m -> m.getKey(), (m, s1) -> m); - this.opId = opId; - } - - @Override - public String toString() { - return this.opId; - } - - @Override - public MessageStream<RM> getOutputStream() { - return this.joinOutput; - } - - /** - * Method to get {@code joinStoreFunctions} - * - * @return {@code joinStoreFunctions} - */ - public StoreFunctions<JM, K, JM> getJoinStoreFunctions() { - return this.joinStoreFunctions; - } - - /** - * Method to get {@code selfStoreFunctions} - * - * @return {@code selfStoreFunctions} - */ - public StoreFunctions<M, K, M> getSelfStoreFunctions() { - return this.selfStoreFunctions; - } - - /** - * Method to get {@code txfmFunction} - * - * @return {@code txfmFunction} - */ - public BiFunction<M, JM, RM> getFunction() { - return this.txfmFunction; - } - } - - /** - * The method only to be used internally in {@link MessageStream} to create {@link StreamOperator} - * - * @param transformFn the corresponding transformation function - * @param <M> type of input {@link Message} - * @param <OM> type of output {@link Message} - * @return the {@link StreamOperator} - */ - public static <M extends Message, OM extends Message> StreamOperator<M, OM> getStreamOperator(Function<M, Collection<OM>> transformFn) { - return new StreamOperator<>(transformFn); - } - - /** - * The method only to be used internally in {@link MessageStream} to create {@link SinkOperator} - * - * @param sinkFn the sink function - * @param <M> type of input {@link Message} - * @return the {@link SinkOperator} - */ - public static <M extends Message> SinkOperator<M> getSinkOperator(MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> sinkFn) { - return new SinkOperator<>(sinkFn); - } - - /** - * The method only to be used internally in {@link MessageStream} to create {@link WindowOperator} - * - * @param windowFn the {@link WindowFn} function - * @param <M> type of input {@link Message} - * @param <WK> type of window key - * @param <WS> type of {@link WindowState} - * @param <WM> type of output {@link WindowOutput} - * @return the {@link WindowOperator} - */ - public static <M extends Message, WK, WS extends WindowState, WM extends WindowOutput<WK, ?>> WindowOperator<M, WK, WS, WM> getWindowOperator( - WindowFn<M, WK, WS, WM> windowFn) { - return new WindowOperator<>(windowFn, Operators.getOperatorId()); - } - - /** - * The method only to be used internally in {@link MessageStream} to create {@link WindowOperator} - * - * @param joiner the {@link WindowFn} function - * @param joinOutput the output {@link MessageStream} - * @param <M> type of input {@link Message} - * @param <K> type of join key - * @param <JM> the type of message in the {@link Message} from the other join stream - * @param <RM> the type of message in the {@link Message} from the join function - * @return the {@link PartialJoinOperator} - */ - public static <M extends Message<K, ?>, K, JM extends Message<K, ?>, RM extends Message> PartialJoinOperator<M, K, JM, RM> getPartialJoinOperator( - BiFunction<M, JM, RM> joiner, MessageStream<RM> joinOutput) { - return new PartialJoinOperator<>(joiner, joinOutput, Operators.getOperatorId()); - } - - /** - * The method only to be used internally in {@link MessageStream} to create {@link StreamOperator} as a merger function - * - * @param mergeOutput the common output {@link MessageStream} from the merger - * @param <M> the type of input {@link Message} - * @return the {@link StreamOperator} for merge - */ - public static <M extends Message> StreamOperator<M, M> getMergeOperator(MessageStream<M> mergeOutput) { - return new StreamOperator<M, M>(t -> - new ArrayList<M>() { { - this.add(t); - } }, - mergeOutput); - } -}
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/internal/Trigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/internal/Trigger.java b/samza-api/src/main/java/org/apache/samza/operators/internal/Trigger.java deleted file mode 100644 index 3b50e2b..0000000 --- a/samza-api/src/main/java/org/apache/samza/operators/internal/Trigger.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.internal; - -import org.apache.samza.operators.WindowState; -import org.apache.samza.operators.data.Message; - -import java.util.function.BiFunction; -import java.util.function.Function; - -/** - * Defines the trigger functions for {@link Operators.WindowOperator}. This class is immutable. - * - * @param <M> the type of message from the input stream - * @param <S> the type of state variable in the window's state store - */ -public class Trigger<M extends Message, S extends WindowState> { - - /** - * System timer based trigger condition. This is the only guarantee that the {@link Operators.WindowOperator} will proceed forward - */ - private final Function<S, Boolean> timerTrigger; - - /** - * early trigger condition that determines when to send the first output from the {@link Operators.WindowOperator} - */ - private final BiFunction<M, S, Boolean> earlyTrigger; - - /** - * late trigger condition that determines when to send the updated output after the first one from a {@link Operators.WindowOperator} - */ - private final BiFunction<M, S, Boolean> lateTrigger; - - /** - * the function to updated the window state when the first output is triggered - */ - private final Function<S, S> earlyTriggerUpdater; - - /** - * the function to updated the window state when the late output is triggered - */ - private final Function<S, S> lateTriggerUpdater; - - /** - * Private constructor to prevent instantiation - * - * @param timerTrigger system timer trigger condition - * @param earlyTrigger early trigger condition - * @param lateTrigger late trigger condition - * @param earlyTriggerUpdater early trigger state updater - * @param lateTriggerUpdater late trigger state updater - */ - private Trigger(Function<S, Boolean> timerTrigger, BiFunction<M, S, Boolean> earlyTrigger, BiFunction<M, S, Boolean> lateTrigger, - Function<S, S> earlyTriggerUpdater, Function<S, S> lateTriggerUpdater) { - this.timerTrigger = timerTrigger; - this.earlyTrigger = earlyTrigger; - this.lateTrigger = lateTrigger; - this.earlyTriggerUpdater = earlyTriggerUpdater; - this.lateTriggerUpdater = lateTriggerUpdater; - } - - /** - * Static method to create a {@link Trigger} object - * - * @param timerTrigger system timer trigger condition - * @param earlyTrigger early trigger condition - * @param lateTrigger late trigger condition - * @param earlyTriggerUpdater early trigger state updater - * @param lateTriggerUpdater late trigger state updater - * @param <M> the type of input {@link Message} - * @param <S> the type of window state extends {@link WindowState} - * @return the {@link Trigger} function - */ - public static <M extends Message, S extends WindowState> Trigger<M, S> createTrigger(Function<S, Boolean> timerTrigger, - BiFunction<M, S, Boolean> earlyTrigger, BiFunction<M, S, Boolean> lateTrigger, Function<S, S> earlyTriggerUpdater, - Function<S, S> lateTriggerUpdater) { - return new Trigger(timerTrigger, earlyTrigger, lateTrigger, earlyTriggerUpdater, lateTriggerUpdater); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/internal/WindowFn.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/internal/WindowFn.java b/samza-api/src/main/java/org/apache/samza/operators/internal/WindowFn.java deleted file mode 100644 index 489e5b8..0000000 --- a/samza-api/src/main/java/org/apache/samza/operators/internal/WindowFn.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.internal; - -import org.apache.samza.operators.WindowState; -import org.apache.samza.operators.data.Message; -import org.apache.samza.storage.kv.Entry; - -import java.util.function.BiFunction; - - -/** - * Defines an internal representation of a window function. This class SHOULD NOT be used by the programmer directly. It is used - * by the internal representation and implementation classes in operators. - * - * @param <M> type of input stream {@link Message} for window - * @param <WK> type of window key in the output {@link Message} - * @param <WS> type of {@link WindowState} variable in the state store - * @param <WM> type of the message in the output stream - */ -public interface WindowFn<M extends Message, WK, WS extends WindowState, WM extends WindowOutput<WK, ?>> { - - /** - * get the transformation function of the {@link WindowFn} - * - * @return the transformation function takes type {@code M} message and the window state entry, then transform to an {@link WindowOutput} - */ - BiFunction<M, Entry<WK, WS>, WM> getTransformFunc(); - - /** - * get the state store functions for this {@link WindowFn} - * - * @return the collection of state store methods - */ - Operators.StoreFunctions<M, WK, WS> getStoreFuncs(); - - /** - * get the trigger conditions for this {@link WindowFn} - * - * @return the trigger condition for the {@link WindowFn} function - */ - Trigger<M, WS> getTrigger(); - -} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/internal/WindowOutput.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/internal/WindowOutput.java b/samza-api/src/main/java/org/apache/samza/operators/internal/WindowOutput.java deleted file mode 100644 index 643b703..0000000 --- a/samza-api/src/main/java/org/apache/samza/operators/internal/WindowOutput.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.internal; - -import org.apache.samza.operators.data.Message; - - -/** - * This class defines the specific type of output messages from a {@link Operators.WindowOperator} function - * - * @param <K> the type of key in the output window result - * @param <M> the type of value in the output window result - */ -public final class WindowOutput<K, M> implements Message<K, M> { - private final K key; - private final M value; - - WindowOutput(K key, M aggregated) { - this.key = key; - this.value = aggregated; - } - - @Override public M getMessage() { - return this.value; - } - - @Override public K getKey() { - return this.key; - } - - @Override public long getTimestamp() { - return 0; - } - - static public <K, M> WindowOutput<K, M> of(K key, M result) { - return new WindowOutput<>(key, result); - } -} - http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/task/StreamOperatorTask.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/task/StreamOperatorTask.java b/samza-api/src/main/java/org/apache/samza/operators/task/StreamOperatorTask.java deleted file mode 100644 index 42c8f74..0000000 --- a/samza-api/src/main/java/org/apache/samza/operators/task/StreamOperatorTask.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators.task; - -import org.apache.samza.annotation.InterfaceStability; -import org.apache.samza.operators.MessageStreams.SystemMessageStream; - -import java.util.Collection; - -/** - * This interface defines the methods that user needs to implement via the operator programming APIs. - */ -@InterfaceStability.Unstable -public interface StreamOperatorTask { - - /** - * Defines the method for users to initialize the operator chains consuming from all {@link SystemMessageStream}s. - * Users have to implement this function to define their transformation logic on each of the incoming - * {@link SystemMessageStream}. - * - * Note that each {@link SystemMessageStream} corresponds to an input {@link org.apache.samza.system.SystemStreamPartition} - * - * @param sources the collection of {@link SystemMessageStream}s that takes {@link org.apache.samza.operators.data.IncomingSystemMessage} - * from a {@link org.apache.samza.system.SystemStreamPartition} - */ - void initOperators(Collection<SystemMessageStream> sources); - - -} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/windows/SessionWindow.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/SessionWindow.java b/samza-api/src/main/java/org/apache/samza/operators/windows/SessionWindow.java new file mode 100644 index 0000000..287025c --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/SessionWindow.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.windows; + +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.storage.kv.Entry; + +import java.util.function.BiFunction; +import java.util.function.Function; + +/** + * This class defines a session window function class + * + * @param <M> the type of input {@link MessageEnvelope} + * @param <WK> the type of session key in the session window + * @param <WV> the type of output value in each session window + */ +public class SessionWindow<M extends MessageEnvelope, WK, WV> implements Window<M, WK, WV, WindowOutput<WK, WV>> { + + /** + * Constructor. Made private s.t. it can only be instantiated via the static API methods in {@link Windows} + * + * @param sessionKeyFunction function to get the session key from the input {@link MessageEnvelope} + * @param aggregator function to calculate the output value based on the input {@link MessageEnvelope} and current output value + */ + SessionWindow(Function<M, WK> sessionKeyFunction, BiFunction<M, WV, WV> aggregator) { + this.wndKeyFunction = sessionKeyFunction; + this.aggregator = aggregator; + } + + /** + * function to calculate the window key from input {@link MessageEnvelope} + */ + private final Function<M, WK> wndKeyFunction; + + /** + * function to calculate the output value from the input {@link MessageEnvelope} and the current output value + */ + private final BiFunction<M, WV, WV> aggregator; + + /** + * trigger condition that determines when to send the {@link WindowOutput} + */ + private Trigger<M, WindowState<WV>> trigger = null; + + //TODO: need to create a set of {@link StoreFunctions} that is default to input {@link MessageEnvelope} type for {@link Window} + private StoreFunctions<M, WK, WindowState<WV>> storeFunctions = null; + + /** + * Public API methods start here + */ + + /** + * Public API method to define the watermark trigger for the window operator + * + * @param wndTrigger {@link Trigger} function defines the watermark trigger for this {@link SessionWindow} + * @return The window operator w/ the defined watermark trigger + */ + @Override + public Window<M, WK, WV, WindowOutput<WK, WV>> setTriggers(TriggerBuilder<M, WV> wndTrigger) { + this.trigger = wndTrigger.build(); + return this; + } + + private BiFunction<M, Entry<WK, WindowState<WV>>, WindowOutput<WK, WV>> getTransformFunc() { + // TODO: actual implementation of the main session window logic, based on the wndKeyFunction, aggregator, and triggers; + return null; + } + + public WindowFn<M, WK, WindowState<WV>, WindowOutput<WK, WV>> getInternalWindowFn() { + return new WindowFn<M, WK, WindowState<WV>, WindowOutput<WK, WV>>() { + + @Override public BiFunction<M, Entry<WK, WindowState<WV>>, WindowOutput<WK, WV>> getTransformFn() { + return SessionWindow.this.getTransformFunc(); + } + + @Override public StoreFunctions<M, WK, WindowState<WV>> getStoreFns() { + return SessionWindow.this.storeFunctions; + } + + @Override public Trigger<M, WindowState<WV>> getTrigger() { + return SessionWindow.this.trigger; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/windows/StoreFunctions.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/StoreFunctions.java b/samza-api/src/main/java/org/apache/samza/operators/windows/StoreFunctions.java new file mode 100644 index 0000000..0d40761 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/StoreFunctions.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.windows; + +import org.apache.samza.operators.data.MessageEnvelope; + +import java.util.function.BiFunction; +import java.util.function.Function; + +/** + * The store functions that are used by window and partial join operators to store and retrieve buffered {@link MessageEnvelope}s + * and partial aggregation results. + * + * @param <SK> the type of key used to store the operator state + * @param <SS> the type of operator state. E.g. could be the partial aggregation result for a window, or a buffered + * input {@link MessageEnvelope} from the join stream for a join + */ +public class StoreFunctions<M extends MessageEnvelope, SK, SS> { + /** + * Function that returns the key to query in the operator state store for a particular {@link MessageEnvelope}. + * This 1:1 function only returns a single key for the incoming {@link MessageEnvelope}. This is sufficient to support + * non-overlapping windows and unique-key based joins. + * + * TODO: for windows that overlaps (i.e. sliding windows and hopping windows) and non-unique-key-based join, + * the query to the state store is usually a range scan. We need to add a rangeKeyFinder function + * (or make this function return a collection) to map from a single input {@link MessageEnvelope} to a range of keys in the store. + */ + private final Function<M, SK> storeKeyFn; + + /** + * Function to update the store entry based on the current operator state and the incoming {@link MessageEnvelope}. + * + * TODO: this is assuming a 1:1 mapping from the input {@link MessageEnvelope} to the store entry. When implementing sliding/hopping + * windows and non-unique-key-based join, we may need to include the corresponding state key in addition to the + * state value. Alternatively this can be called once for each store key for the {@link MessageEnvelope}. + */ + private final BiFunction<M, SS, SS> stateUpdaterFn; + + public StoreFunctions(Function<M, SK> storeKeyFn, BiFunction<M, SS, SS> stateUpdaterFn) { + this.storeKeyFn = storeKeyFn; + this.stateUpdaterFn = stateUpdaterFn; + } + + public Function<M, SK> getStoreKeyFn() { + return this.storeKeyFn; + } + + public BiFunction<M, SS, SS> getStateUpdaterFn() { + return this.stateUpdaterFn; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/windows/Trigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Trigger.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Trigger.java new file mode 100644 index 0000000..c8b0edb --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Trigger.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.windows; + +import org.apache.samza.operators.data.MessageEnvelope; + +import java.util.function.BiFunction; +import java.util.function.Function; + +/** + * Defines the trigger functions for the window operator. This class is immutable. + * + * @param <M> the type of {@link MessageEnvelope} in the input stream + * @param <S> the type of state variable in the window's state store + */ +public class Trigger<M extends MessageEnvelope, S extends WindowState> { + + /** + * System timer based trigger condition. This is the only guarantee that the window operator will proceed forward + */ + private final Function<S, Boolean> timerTrigger; + + /** + * early trigger condition that determines when to send the first output from the window operator + */ + private final BiFunction<M, S, Boolean> earlyTrigger; + + /** + * late trigger condition that determines when to send the updated output after the first one from a window operator + */ + private final BiFunction<M, S, Boolean> lateTrigger; + + /** + * the function to updated the window state when the first output is triggered + */ + private final Function<S, S> earlyTriggerUpdater; + + /** + * the function to updated the window state when the late output is triggered + */ + private final Function<S, S> lateTriggerUpdater; + + /** + * Private constructor to prevent instantiation + * + * @param timerTrigger system timer trigger condition + * @param earlyTrigger early trigger condition + * @param lateTrigger late trigger condition + * @param earlyTriggerUpdater early trigger state updater + * @param lateTriggerUpdater late trigger state updater + */ + private Trigger(Function<S, Boolean> timerTrigger, BiFunction<M, S, Boolean> earlyTrigger, BiFunction<M, S, Boolean> lateTrigger, + Function<S, S> earlyTriggerUpdater, Function<S, S> lateTriggerUpdater) { + this.timerTrigger = timerTrigger; + this.earlyTrigger = earlyTrigger; + this.lateTrigger = lateTrigger; + this.earlyTriggerUpdater = earlyTriggerUpdater; + this.lateTriggerUpdater = lateTriggerUpdater; + } + + /** + * Static method to create a {@link Trigger} object + * + * @param timerTrigger system timer trigger condition + * @param earlyTrigger early trigger condition + * @param lateTrigger late trigger condition + * @param earlyTriggerUpdater early trigger state updater + * @param lateTriggerUpdater late trigger state updater + * @param <M> the type of input {@link MessageEnvelope} + * @param <S> the type of window state extends {@link WindowState} + * @return the {@link Trigger} function + */ + public static <M extends MessageEnvelope, S extends WindowState> Trigger<M, S> createTrigger(Function<S, Boolean> timerTrigger, + BiFunction<M, S, Boolean> earlyTrigger, BiFunction<M, S, Boolean> lateTrigger, Function<S, S> earlyTriggerUpdater, + Function<S, S> lateTriggerUpdater) { + return new Trigger(timerTrigger, earlyTrigger, lateTrigger, earlyTriggerUpdater, lateTriggerUpdater); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/windows/TriggerBuilder.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/TriggerBuilder.java b/samza-api/src/main/java/org/apache/samza/operators/windows/TriggerBuilder.java new file mode 100644 index 0000000..6336a50 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/TriggerBuilder.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.windows; + + +import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.operators.data.MessageEnvelope; + +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.function.Function; + + +/** + * This class defines a builder of {@link Trigger} object for a {@link Window}. The triggers are categorized into + * three types: + * + * <p> + * early trigger: defines the condition when the first output from the window function is sent. + * late trigger: defines the condition when the updated output after the first output is sent. + * timer trigger: defines a system timeout condition to trigger output if no more inputs are received to enable early/late triggers + * </p> + * + * If multiple conditions are defined for a specific type of trigger, the aggregated trigger is the disjunction + * of each individual trigger (i.e. OR). + * + * @param <M> the type of input {@link MessageEnvelope} to the {@link Window} + * @param <V> the type of output value from the {@link Window} + */ +@InterfaceStability.Unstable +public final class TriggerBuilder<M extends MessageEnvelope, V> { + + /** + * Predicate helper to OR multiple trigger conditions + */ + static class PredicateHelper { + static <M, S> BiFunction<M, S, Boolean> or(BiFunction<M, S, Boolean> lhs, BiFunction<M, S, Boolean> rhs) { + return (m, s) -> lhs.apply(m, s) || rhs.apply(m, s); + } + + static <S> Function<S, Boolean> or(Function<S, Boolean> lhs, Function<S, Boolean> rhs) { + return s -> lhs.apply(s) || rhs.apply(s); + } + } + + /** + * The early trigger condition that determines the first output from the {@link Window} + */ + private BiFunction<M, WindowState<V>, Boolean> earlyTrigger = null; + + /** + * The late trigger condition that determines the late output(s) from the {@link Window} + */ + private BiFunction<M, WindowState<V>, Boolean> lateTrigger = null; + + /** + * The system timer based trigger conditions that guarantees the {@link Window} proceeds forward + */ + private Function<WindowState<V>, Boolean> timerTrigger = null; + + /** + * The state updater function to be applied after the first output is triggered + */ + private Function<WindowState<V>, WindowState<V>> earlyTriggerUpdater = Function.identity(); + + /** + * The state updater function to be applied after the late output is triggered + */ + private Function<WindowState<V>, WindowState<V>> lateTriggerUpdater = Function.identity(); + + /** + * Helper method to add a trigger condition + * + * @param currentTrigger current trigger condition + * @param newTrigger new trigger condition + * @return combined trigger condition that is {@code currentTrigger} OR {@code newTrigger} + */ + private BiFunction<M, WindowState<V>, Boolean> addTrigger(BiFunction<M, WindowState<V>, Boolean> currentTrigger, + BiFunction<M, WindowState<V>, Boolean> newTrigger) { + if (currentTrigger == null) { + return newTrigger; + } + + return PredicateHelper.or(currentTrigger, newTrigger); + } + + /** + * Helper method to add a system timer trigger + * + * @param currentTimer current timer condition + * @param newTimer new timer condition + * @return combined timer condition that is {@code currentTimer} OR {@code newTimer} + */ + private Function<WindowState<V>, Boolean> addTimerTrigger(Function<WindowState<V>, Boolean> currentTimer, + Function<WindowState<V>, Boolean> newTimer) { + if (currentTimer == null) { + return newTimer; + } + + return PredicateHelper.or(currentTimer, newTimer); + } + + /** + * default constructor to prevent instantiation + */ + private TriggerBuilder() {} + + /** + * Constructor that set the size limit as the early trigger for a window + * + * @param sizeLimit the number of {@link MessageEnvelope}s in a window that would trigger the first output + */ + private TriggerBuilder(long sizeLimit) { + this.earlyTrigger = (m, s) -> s.getNumberMessages() > sizeLimit; + } + + /** + * Constructor that set the event time length as the early trigger + * + * @param eventTimeFunction the function that calculate the event time in nano-second from the input {@link MessageEnvelope} + * @param wndLenMs the window length in event time in milli-second + */ + private TriggerBuilder(Function<M, Long> eventTimeFunction, long wndLenMs) { + this.earlyTrigger = (m, s) -> + TimeUnit.NANOSECONDS.toMillis(Math.max(s.getLatestEventTimeNs() - s.getEarliestEventTimeNs(), + eventTimeFunction.apply(m) - s.getEarliestEventTimeNs())) > wndLenMs; + } + + /** + * Constructor that set the special token {@link MessageEnvelope} as the early trigger + * + * @param tokenFunc the function that checks whether an input {@link MessageEnvelope} is a token {@link MessageEnvelope} that triggers window output + */ + private TriggerBuilder(Function<M, Boolean> tokenFunc) { + this.earlyTrigger = (m, s) -> tokenFunc.apply(m); + } + + /** + * Build method that creates an {@link Trigger} object based on the trigger conditions set in {@link TriggerBuilder} + * This is kept package private and only used by {@link Windows} to convert the mutable {@link TriggerBuilder} object to an immutable {@link Trigger} object + * + * @return the final {@link Trigger} object + */ + Trigger<M, WindowState<V>> build() { + return Trigger.createTrigger(this.timerTrigger, this.earlyTrigger, this.lateTrigger, this.earlyTriggerUpdater, this.lateTriggerUpdater); + } + + /** + * Public API methods start here + */ + + + /** + * API method to allow users to set an update method to update the output value after the first window output is triggered + * by the early trigger condition + * + * @param onTriggerFunc the method to update the output value after the early trigger + * @return the {@link TriggerBuilder} object + */ + public TriggerBuilder<M, V> onEarlyTrigger(Function<V, V> onTriggerFunc) { + this.earlyTriggerUpdater = s -> { + s.setOutputValue(onTriggerFunc.apply(s.getOutputValue())); + return s; + }; + return this; + } + + /** + * API method to allow users to set an update method to update the output value after a late window output is triggered + * by the late trigger condition + * + * @param onTriggerFunc the method to update the output value after the late trigger + * @return the {@link TriggerBuilder} object + */ + public TriggerBuilder<M, V> onLateTrigger(Function<V, V> onTriggerFunc) { + this.lateTriggerUpdater = s -> { + s.setOutputValue(onTriggerFunc.apply(s.getOutputValue())); + return s; + }; + return this; + } + + /** + * API method to allow users to add a system timer trigger based on timeout after the last {@link MessageEnvelope} received in the window + * + * @param timeoutMs the timeout in ms after the last {@link MessageEnvelope} received in the window + * @return the {@link TriggerBuilder} object + */ + public TriggerBuilder<M, V> addTimeoutSinceLastMessage(long timeoutMs) { + this.timerTrigger = this.addTimerTrigger(this.timerTrigger, + s -> TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + timeoutMs < System.currentTimeMillis()); + return this; + } + + /** + * API method to allow users to add a system timer trigger based on the timeout after the first {@link MessageEnvelope} received in the window + * + * @param timeoutMs the timeout in ms after the first {@link MessageEnvelope} received in the window + * @return the {@link TriggerBuilder} object + */ + public TriggerBuilder<M, V> addTimeoutSinceFirstMessage(long timeoutMs) { + this.timerTrigger = this.addTimerTrigger(this.timerTrigger, s -> + TimeUnit.NANOSECONDS.toMillis(s.getFirstMessageTimeNs()) + timeoutMs < System.currentTimeMillis()); + return this; + } + + /** + * API method allow users to add a late trigger based on the window size limit + * + * @param sizeLimit limit on the number of {@link MessageEnvelope}s in window + * @return the {@link TriggerBuilder} object + */ + public TriggerBuilder<M, V> addLateTriggerOnSizeLimit(long sizeLimit) { + this.lateTrigger = this.addTrigger(this.lateTrigger, (m, s) -> s.getNumberMessages() > sizeLimit); + return this; + } + + /** + * API method to allow users to define a customized late trigger function based on input {@link MessageEnvelope} and the window state + * + * @param lateTrigger the late trigger condition based on input {@link MessageEnvelope} and the current {@link WindowState} + * @return the {@link TriggerBuilder} object + */ + public TriggerBuilder<M, V> addLateTrigger(BiFunction<M, WindowState<V>, Boolean> lateTrigger) { + this.lateTrigger = this.addTrigger(this.lateTrigger, lateTrigger); + return this; + } + + /** + * Static API method to create a {@link TriggerBuilder} w/ early trigger condition based on window size limit + * + * @param sizeLimit window size limit + * @param <M> the type of input {@link MessageEnvelope} + * @param <V> the type of {@link Window} output value + * @return the {@link TriggerBuilder} object + */ + public static <M extends MessageEnvelope, V> TriggerBuilder<M, V> earlyTriggerWhenExceedWndLen(long sizeLimit) { + return new TriggerBuilder<M, V>(sizeLimit); + } + + /** + * Static API method to create a {@link TriggerBuilder} w/ early trigger condition based on event time window + * + * + * @param eventTimeFunc the function to get the event time from the input {@link MessageEnvelope} + * @param eventTimeWndSizeMs the event time window size in Ms + * @param <M> the type of input {@link MessageEnvelope} + * @param <V> the type of {@link Window} output value + * @return the {@link TriggerBuilder} object + */ + public static <M extends MessageEnvelope, V> TriggerBuilder<M, V> earlyTriggerOnEventTime(Function<M, Long> eventTimeFunc, long eventTimeWndSizeMs) { + return new TriggerBuilder<M, V>(eventTimeFunc, eventTimeWndSizeMs); + } + + /** + * Static API method to create a {@link TriggerBuilder} w/ early trigger condition based on token {@link MessageEnvelope}s + * + * @param tokenFunc the function to determine whether an input {@link MessageEnvelope} is a window token or not + * @param <M> the type of input {@link MessageEnvelope} + * @param <V> the type of {@link Window} output value + * @return the {@link TriggerBuilder} object + */ + public static <M extends MessageEnvelope, V> TriggerBuilder<M, V> earlyTriggerOnTokenMsg(Function<M, Boolean> tokenFunc) { + return new TriggerBuilder<M, V>(tokenFunc); + } + + /** + * Static API method to allow customized early trigger condition based on input {@link MessageEnvelope} and the corresponding {@link WindowState} + * + * @param earlyTrigger the user defined early trigger condition + * @param <M> the input {@link MessageEnvelope} type + * @param <V> the output value from the window + * @return the {@link TriggerBuilder} object + */ + public static <M extends MessageEnvelope, V> TriggerBuilder<M, V> earlyTrigger(BiFunction<M, WindowState<V>, Boolean> earlyTrigger) { + TriggerBuilder<M, V> newTriggers = new TriggerBuilder<M, V>(); + newTriggers.earlyTrigger = newTriggers.addTrigger(newTriggers.earlyTrigger, earlyTrigger); + return newTriggers; + } + + /** + * Static API method to create a {@link TriggerBuilder} w/ system timeout after the last {@link MessageEnvelope} received in the window + * + * @param timeoutMs timeout in ms after the last {@link MessageEnvelope} received + * @param <M> the type of input {@link MessageEnvelope} + * @param <V> the type of {@link Window} output value + * @return the {@link TriggerBuilder} object + */ + public static <M extends MessageEnvelope, V> TriggerBuilder<M, V> timeoutSinceLastMessage(long timeoutMs) { + return new TriggerBuilder<M, V>().addTimeoutSinceLastMessage(timeoutMs); + } + + /** + * Static API method to create a {@link TriggerBuilder} w/ system timeout after the first {@link MessageEnvelope} received in the window + * + * @param timeoutMs timeout in ms after the first {@link MessageEnvelope} received + * @param <M> the type of input {@link MessageEnvelope} + * @param <V> the type of {@link Window} output value + * @return the {@link TriggerBuilder} object + */ + public static <M extends MessageEnvelope, V> TriggerBuilder<M, V> timeoutSinceFirstMessage(long timeoutMs) { + return new TriggerBuilder<M, V>().addTimeoutSinceFirstMessage(timeoutMs); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java new file mode 100644 index 0000000..56a307d --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Window.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.windows; + +import org.apache.samza.operators.data.MessageEnvelope; + +/** + * The public programming interface class for window function + * + * @param <M> the type of input {@link MessageEnvelope} + * @param <WK> the type of key to the {@link Window} + * @param <WV> the type of output value in the {@link WindowOutput} + * @param <WM> the type of {@link MessageEnvelope} in the window output stream + */ +public interface Window<M extends MessageEnvelope, WK, WV, WM extends WindowOutput<WK, WV>> { + + /** + * Set the triggers for this {@link Window} + * + * @param wndTrigger trigger conditions set by the programmers + * @return the {@link Window} function w/ the trigger {@code wndTrigger} + */ + Window<M, WK, WV, WM> setTriggers(TriggerBuilder<M, WV> wndTrigger); + + /** + * Internal implementation helper to get the functions associated with this Window. + * + * <b>NOTE:</b> This is purely an internal API and should not be used directly by users. + * + * @return the functions associated with this Window. + */ + WindowFn<M, WK, WindowState<WV>, WindowOutput<WK, WV>> getInternalWindowFn(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/windows/WindowFn.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowFn.java b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowFn.java new file mode 100644 index 0000000..8878bf9 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowFn.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.windows; + +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.storage.kv.Entry; + +import java.util.function.BiFunction; + + +/** + * Defines an internal representation of a window function. + * + * @param <M> type of the input {@link MessageEnvelope} for the window + * @param <WK> type of the window key in the output {@link MessageEnvelope} + * @param <WS> type of the {@link WindowState} in the state store + * @param <WM> type of the {@link MessageEnvelope} in the output stream + */ +public interface WindowFn<M extends MessageEnvelope, WK, WS extends WindowState, WM extends WindowOutput<WK, ?>> { + + /** + * Get the transformation function of the {@link WindowFn}. + * + * @return the transformation function which takes a {@link MessageEnvelope} of type {@code M} and its window state entry, + * and transforms it to an {@link WindowOutput} + */ + BiFunction<M, Entry<WK, WS>, WM> getTransformFn(); + + /** + * Get the state store functions for this {@link WindowFn}. + * + * @return the state store functions + */ + StoreFunctions<M, WK, WS> getStoreFns(); + + /** + * Get the trigger conditions for this {@link WindowFn}. + * + * @return the trigger condition for this {@link WindowFn} + */ + Trigger<M, WS> getTrigger(); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/windows/WindowOutput.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowOutput.java b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowOutput.java new file mode 100644 index 0000000..63e34c8 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowOutput.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.windows; + +import org.apache.samza.operators.data.MessageEnvelope; + + +/** + * The type of output {@link MessageEnvelope}s in a window operator output stream. + * + * @param <K> the type of key in the window output + * @param <M> the type of value in the window output + */ +public final class WindowOutput<K, M> implements MessageEnvelope<K, M> { + private final K key; + private final M value; + + WindowOutput(K key, M value) { + this.key = key; + this.value = value; + } + + @Override public M getMessage() { + return this.value; + } + + @Override public K getKey() { + return this.key; + } + + static public <K, M> WindowOutput<K, M> of(K key, M result) { + return new WindowOutput<>(key, result); + } +} + http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/windows/WindowState.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowState.java b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowState.java new file mode 100644 index 0000000..835d749 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowState.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.windows; + +import org.apache.samza.annotation.InterfaceStability; + + +/** + * This interface defines the methods a window state class has to implement. The programmers are allowed to implement + * customized window state to be stored in window state stores by implementing this interface class. + * + * @param <WV> the type for window output value + */ +@InterfaceStability.Unstable +public interface WindowState<WV> { + /** + * Method to get the system time when the first {@link org.apache.samza.operators.data.MessageEnvelope} + * in the window is received + * + * @return nano-second of system time for the first {@link org.apache.samza.operators.data.MessageEnvelope} + * received in the window + */ + long getFirstMessageTimeNs(); + + /** + * Method to get the system time when the last {@link org.apache.samza.operators.data.MessageEnvelope} + * in the window is received + * + * @return nano-second of system time for the last {@link org.apache.samza.operators.data.MessageEnvelope} + * received in the window + */ + long getLastMessageTimeNs(); + + /** + * Method to get the earliest event time in the window + * + * @return the earliest event time in nano-second in the window + */ + long getEarliestEventTimeNs(); + + /** + * Method to get the latest event time in the window + * + * @return the latest event time in nano-second in the window + */ + long getLatestEventTimeNs(); + + /** + * Method to get the total number of {@link org.apache.samza.operators.data.MessageEnvelope}s received in the window + * + * @return number of {@link org.apache.samza.operators.data.MessageEnvelope}s in the window + */ + long getNumberMessages(); + + /** + * Method to get the corresponding window's output value + * + * @return the corresponding window's output value + */ + WV getOutputValue(); + + /** + * Method to set the corresponding window's output value + * + * @param value the corresponding window's output value + */ + void setOutputValue(WV value); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java new file mode 100644 index 0000000..1a4ed8f --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.windows; + +import org.apache.samza.operators.data.MessageEnvelope; + +import java.util.Collection; +import java.util.function.Function; + + +/** + * This class defines a collection of {@link Window} functions. The public classes and methods here are intended to be + * used by the user (i.e. programmers) to create {@link Window} function directly. + * + */ +public final class Windows { + + /** + * private constructor to prevent instantiation + */ + private Windows() {} + + static <M extends MessageEnvelope, WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> WindowFn<M, WK, WS, WM> getInternalWindowFn( + Window<M, WK, WV, WM> window) { + if (window instanceof SessionWindow) { + SessionWindow<M, WK, WV> sessionWindow = (SessionWindow<M, WK, WV>) window; + return (WindowFn<M, WK, WS, WM>) sessionWindow.getInternalWindowFn(); + } + throw new IllegalArgumentException("Input window type not supported."); + } + + /** + * Public static API methods start here + * + */ + + /** + * Static API method to create a {@link SessionWindow} in which the output value is simply the collection of input {@link MessageEnvelope}s + * + * @param sessionKeyFunction function to calculate session window key + * @param <M> type of input {@link MessageEnvelope} + * @param <WK> type of the session window key + * @return the {@link Window} function for the session + */ + public static <M extends MessageEnvelope, WK> Window<M, WK, Collection<M>, WindowOutput<WK, Collection<M>>> intoSessions(Function<M, WK> sessionKeyFunction) { + return new SessionWindow<>(sessionKeyFunction, (m, c) -> { + c.add(m); + return c; + } + ); + } + + /** + * Static API method to create a {@link SessionWindow} in which the output value is a collection of {@code SI} from the input {@link MessageEnvelope}s + * + * @param sessionKeyFunction function to calculate session window key + * @param sessionInfoExtractor function to retrieve session info of type {@code SI} from the input {@link MessageEnvelope} of type {@code M} + * @param <M> type of the input {@link MessageEnvelope} + * @param <WK> type of the session window key + * @param <SI> type of the session information retrieved from each input {@link MessageEnvelope} of type {@code M} + * @return the {@link Window} function for the session + */ + public static <M extends MessageEnvelope, WK, SI> Window<M, WK, Collection<SI>, WindowOutput<WK, Collection<SI>>> intoSessions(Function<M, WK> sessionKeyFunction, + Function<M, SI> sessionInfoExtractor) { + return new SessionWindow<>(sessionKeyFunction, (m, c) -> { + c.add(sessionInfoExtractor.apply(m)); + return c; + } + ); + } + + /** + * Static API method to create a {@link SessionWindow} as a counter of input {@link MessageEnvelope}s + * + * @param sessionKeyFunction function to calculate session window key + * @param <M> type of the input {@link MessageEnvelope} + * @param <WK> type of the session window key + * @return the {@link Window} function for the session + */ + public static <M extends MessageEnvelope, WK> Window<M, WK, Integer, WindowOutput<WK, Integer>> intoSessionCounter(Function<M, WK> sessionKeyFunction) { + return new SessionWindow<>(sessionKeyFunction, (m, c) -> c + 1); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/TestMessage.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestMessage.java b/samza-api/src/test/java/org/apache/samza/operators/TestMessage.java deleted file mode 100644 index 8c56287..0000000 --- a/samza-api/src/test/java/org/apache/samza/operators/TestMessage.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators; - -import org.apache.samza.operators.data.Message; - - -public class TestMessage implements Message<String, String> { - - private final String key; - private final String value; - private final long timestamp; - - TestMessage(String key, String value, long timestamp) { - this.key = key; - this.value = value; - this.timestamp = timestamp; - } - - @Override public String getMessage() { - return this.value; - } - - @Override public String getKey() { - return this.key; - } - - @Override public long getTimestamp() { - return this.timestamp; - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/TestMessageEnvelope.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestMessageEnvelope.java b/samza-api/src/test/java/org/apache/samza/operators/TestMessageEnvelope.java new file mode 100644 index 0000000..dfa69ac --- /dev/null +++ b/samza-api/src/test/java/org/apache/samza/operators/TestMessageEnvelope.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.apache.samza.operators.data.MessageEnvelope; + + +public class TestMessageEnvelope implements MessageEnvelope<String, TestMessageEnvelope.MessageType> { + + private final String key; + private final MessageType value; + + public TestMessageEnvelope(String key, String value, long eventTime) { + this.key = key; + this.value = new MessageType(value, eventTime); + } + + @Override + public MessageType getMessage() { + return this.value; + } + + @Override + public String getKey() { + return this.key; + } + + public class MessageType { + private final String value; + private final long eventTime; + + public MessageType(String value, long eventTime) { + this.value = value; + this.eventTime = eventTime; + } + + public long getEventTime() { + return eventTime; + } + + public String getValue() { + return value; + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/TestMessageStream.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestMessageStream.java b/samza-api/src/test/java/org/apache/samza/operators/TestMessageStream.java deleted file mode 100644 index 4dbe233..0000000 --- a/samza-api/src/test/java/org/apache/samza/operators/TestMessageStream.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators; - -import org.apache.samza.operators.internal.Operators.*; -import org.apache.samza.operators.internal.WindowOutput; -import org.apache.samza.system.OutgoingMessageEnvelope; -import org.apache.samza.system.SystemStream; -import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskCoordinator; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashSet; -import java.util.Set; -import java.util.function.BiFunction; -import java.util.function.Function; - -import static org.junit.Assert.*; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - - -public class TestMessageStream { - - @Test public void testMap() { - MessageStream<TestMessage> inputStream = new MessageStream<>(); - Function<TestMessage, TestOutputMessage> xMap = m -> new TestOutputMessage(m.getKey(), m.getMessage().length() + 1, m.getTimestamp() + 2); - MessageStream<TestOutputMessage> outputStream = inputStream.map(xMap); - Collection<Operator> subs = inputStream.getSubscribers(); - assertEquals(subs.size(), 1); - Operator<TestOutputMessage> mapOp = subs.iterator().next(); - assertTrue(mapOp instanceof StreamOperator); - assertEquals(mapOp.getOutputStream(), outputStream); - // assert that the transformation function is what we defined above - TestMessage xTestMsg = mock(TestMessage.class); - when(xTestMsg.getKey()).thenReturn("test-msg-key"); - when(xTestMsg.getMessage()).thenReturn("123456789"); - when(xTestMsg.getTimestamp()).thenReturn(12345L); - Collection<TestOutputMessage> cOutputMsg = ((StreamOperator<TestMessage, TestOutputMessage>) mapOp).getFunction().apply(xTestMsg); - assertEquals(cOutputMsg.size(), 1); - TestOutputMessage outputMessage = cOutputMsg.iterator().next(); - assertEquals(outputMessage.getKey(), xTestMsg.getKey()); - assertEquals(outputMessage.getMessage(), Integer.valueOf(xTestMsg.getMessage().length() + 1)); - assertEquals(outputMessage.getTimestamp(), xTestMsg.getTimestamp() + 2); - } - - @Test public void testFlatMap() { - MessageStream<TestMessage> inputStream = new MessageStream<>(); - Set<TestOutputMessage> flatOuts = new HashSet<TestOutputMessage>() { { - this.add(mock(TestOutputMessage.class)); - this.add(mock(TestOutputMessage.class)); - this.add(mock(TestOutputMessage.class)); - } }; - Function<TestMessage, Collection<TestOutputMessage>> xFlatMap = m -> flatOuts; - MessageStream<TestOutputMessage> outputStream = inputStream.flatMap(xFlatMap); - Collection<Operator> subs = inputStream.getSubscribers(); - assertEquals(subs.size(), 1); - Operator<TestOutputMessage> flatMapOp = subs.iterator().next(); - assertTrue(flatMapOp instanceof StreamOperator); - assertEquals(flatMapOp.getOutputStream(), outputStream); - // assert that the transformation function is what we defined above - assertEquals(((StreamOperator<TestMessage, TestOutputMessage>) flatMapOp).getFunction(), xFlatMap); - } - - @Test public void testFilter() { - MessageStream<TestMessage> inputStream = new MessageStream<>(); - Function<TestMessage, Boolean> xFilter = m -> m.getTimestamp() > 123456L; - MessageStream<TestMessage> outputStream = inputStream.filter(xFilter); - Collection<Operator> subs = inputStream.getSubscribers(); - assertEquals(subs.size(), 1); - Operator<TestMessage> filterOp = subs.iterator().next(); - assertTrue(filterOp instanceof StreamOperator); - assertEquals(filterOp.getOutputStream(), outputStream); - // assert that the transformation function is what we defined above - Function<TestMessage, Collection<TestMessage>> txfmFn = ((StreamOperator<TestMessage, TestMessage>) filterOp).getFunction(); - TestMessage mockMsg = mock(TestMessage.class); - when(mockMsg.getTimestamp()).thenReturn(11111L); - Collection<TestMessage> output = txfmFn.apply(mockMsg); - assertTrue(output.isEmpty()); - when(mockMsg.getTimestamp()).thenReturn(999999L); - output = txfmFn.apply(mockMsg); - assertEquals(output.size(), 1); - assertEquals(output.iterator().next(), mockMsg); - } - - @Test public void testSink() { - MessageStream<TestMessage> inputStream = new MessageStream<>(); - MessageStream.VoidFunction3<TestMessage, MessageCollector, TaskCoordinator> xSink = (m, mc, tc) -> { - mc.send(new OutgoingMessageEnvelope(new SystemStream("test-sys", "test-stream"), m.getMessage())); - tc.commit(TaskCoordinator.RequestScope.CURRENT_TASK); - }; - inputStream.sink(xSink); - Collection<Operator> subs = inputStream.getSubscribers(); - assertEquals(subs.size(), 1); - Operator<TestMessage> sinkOp = subs.iterator().next(); - assertTrue(sinkOp instanceof SinkOperator); - assertEquals(((SinkOperator) sinkOp).getFunction(), xSink); - assertNull(((SinkOperator) sinkOp).getOutputStream()); - } - - @Test public void testWindow() { - MessageStream<TestMessage> inputStream = new MessageStream<>(); - Windows.SessionWindow<TestMessage, String, Integer> window = mock(Windows.SessionWindow.class); - MessageStream<WindowOutput<String, Integer>> outStream = inputStream.window(window); - Collection<Operator> subs = inputStream.getSubscribers(); - assertEquals(subs.size(), 1); - Operator<TestMessage> wndOp = subs.iterator().next(); - assertTrue(wndOp instanceof WindowOperator); - assertEquals(((WindowOperator) wndOp).getOutputStream(), outStream); - } - - @Test public void testJoin() { - MessageStream<TestMessage> source1 = new MessageStream<>(); - MessageStream<TestMessage> source2 = new MessageStream<>(); - BiFunction<TestMessage, TestMessage, TestOutputMessage> joiner = (m1, m2) -> new TestOutputMessage(m1.getKey(), m1.getMessage().length() + m2.getMessage().length(), m1.getTimestamp()); - MessageStream<TestOutputMessage> joinOutput = source1.join(source2, joiner); - Collection<Operator> subs = source1.getSubscribers(); - assertEquals(subs.size(), 1); - Operator<TestMessage> joinOp1 = subs.iterator().next(); - assertTrue(joinOp1 instanceof PartialJoinOperator); - assertEquals(((PartialJoinOperator) joinOp1).getOutputStream(), joinOutput); - subs = source2.getSubscribers(); - assertEquals(subs.size(), 1); - Operator<TestMessage> joinOp2 = subs.iterator().next(); - assertTrue(joinOp2 instanceof PartialJoinOperator); - assertEquals(((PartialJoinOperator) joinOp2).getOutputStream(), joinOutput); - TestMessage joinMsg1 = new TestMessage("test-join-1", "join-msg-001", 11111L); - TestMessage joinMsg2 = new TestMessage("test-join-2", "join-msg-002", 22222L); - TestOutputMessage xOut = (TestOutputMessage) ((PartialJoinOperator) joinOp1).getFunction().apply(joinMsg1, joinMsg2); - assertEquals(xOut.getKey(), "test-join-1"); - assertEquals(xOut.getMessage(), Integer.valueOf(24)); - assertEquals(xOut.getTimestamp(), 11111L); - xOut = (TestOutputMessage) ((PartialJoinOperator) joinOp2).getFunction().apply(joinMsg2, joinMsg1); - assertEquals(xOut.getKey(), "test-join-1"); - assertEquals(xOut.getMessage(), Integer.valueOf(24)); - assertEquals(xOut.getTimestamp(), 11111L); - } - - @Test public void testMerge() { - MessageStream<TestMessage> merge1 = new MessageStream<>(); - Collection<MessageStream<TestMessage>> others = new ArrayList<MessageStream<TestMessage>>() { { - this.add(new MessageStream<>()); - this.add(new MessageStream<>()); - } }; - MessageStream<TestMessage> mergeOutput = merge1.merge(others); - validateMergeOperator(merge1, mergeOutput); - - others.forEach(merge -> validateMergeOperator(merge, mergeOutput)); - } - - private void validateMergeOperator(MessageStream<TestMessage> mergeSource, MessageStream<TestMessage> mergeOutput) { - Collection<Operator> subs = mergeSource.getSubscribers(); - assertEquals(subs.size(), 1); - Operator<TestMessage> mergeOp = subs.iterator().next(); - assertTrue(mergeOp instanceof StreamOperator); - assertEquals(((StreamOperator) mergeOp).getOutputStream(), mergeOutput); - TestMessage mockMsg = mock(TestMessage.class); - Collection<TestMessage> outputs = ((StreamOperator<TestMessage, TestMessage>) mergeOp).getFunction().apply(mockMsg); - assertEquals(outputs.size(), 1); - assertEquals(outputs.iterator().next(), mockMsg); - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/TestMessageStreams.java ---------------------------------------------------------------------- diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestMessageStreams.java b/samza-api/src/test/java/org/apache/samza/operators/TestMessageStreams.java deleted file mode 100644 index c5fcceb..0000000 --- a/samza-api/src/test/java/org/apache/samza/operators/TestMessageStreams.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators; - -import org.apache.samza.Partition; -import org.apache.samza.system.SystemStreamPartition; -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - - -public class TestMessageStreams { - - @Test public void testInput() { - SystemStreamPartition ssp = new SystemStreamPartition("my-system", "my-stream", new Partition(0)); - MessageStreams.SystemMessageStream mSysStream = MessageStreams.input(ssp); - assertEquals(mSysStream.getSystemStreamPartition(), ssp); - } -}