SAMZA-1045: Move classes from operator/api into samza-api
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1dac25e1 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1dac25e1 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1dac25e1 Branch: refs/heads/samza-sql Commit: 1dac25e1750b7f3fefa72def855136801462494d Parents: adcd266 Author: vjagadish1989 <jvenk...@linkedin.com> Authored: Mon Oct 31 14:14:24 2016 -0700 Committer: vjagadish1989 <jvenk...@linkedin.com> Committed: Tue Nov 1 14:34:28 2016 -0700 ---------------------------------------------------------------------- build.gradle | 4 +- checkstyle/checkstyle.xml | 9 +- gradle.properties | 3 +- gradle/wrapper/gradle-wrapper.jar | Bin 49875 -> 52818 bytes gradle/wrapper/gradle-wrapper.properties | 4 +- gradlew | 57 ++- .../apache/samza/operators/MessageStream.java | 186 ++++++++ .../apache/samza/operators/MessageStreams.java | 80 ++++ .../apache/samza/operators/TriggerBuilder.java | 321 +++++++++++++ .../org/apache/samza/operators/WindowState.java | 77 +++ .../org/apache/samza/operators/Windows.java | 200 ++++++++ .../operators/data/IncomingSystemMessage.java | 76 +++ .../operators/data/InputSystemMessage.java | 43 ++ .../apache/samza/operators/data/LongOffset.java | 80 ++++ .../apache/samza/operators/data/Message.java | 60 +++ .../org/apache/samza/operators/data/Offset.java | 27 ++ .../samza/operators/internal/Operators.java | 469 +++++++++++++++++++ .../samza/operators/internal/Trigger.java | 95 ++++ .../samza/operators/internal/WindowFn.java | 60 +++ .../samza/operators/internal/WindowOutput.java | 55 +++ .../operators/task/StreamOperatorTask.java | 43 ++ .../samza/storage/StorageEngineFactory.java | 16 +- .../org/apache/samza/operators/TestMessage.java | 47 ++ .../samza/operators/TestMessageStream.java | 180 +++++++ .../samza/operators/TestMessageStreams.java | 35 ++ .../samza/operators/TestOutputMessage.java | 47 ++ .../samza/operators/TestTriggerBuilder.java | 214 +++++++++ .../org/apache/samza/operators/TestWindows.java | 106 +++++ .../data/TestIncomingSystemMessage.java | 53 +++ .../samza/operators/data/TestLongOffset.java | 76 +++ .../samza/operators/internal/TestOperators.java | 128 +++++ .../samza/operators/internal/TestTrigger.java | 68 +++ .../operators/internal/TestWindowOutput.java | 36 ++ .../MockCoordinatorStreamWrappedConsumer.java | 3 +- .../TestCoordinatorStreamSystemConsumer.java | 4 +- .../samza/operators/api/MessageStream.java | 188 -------- .../samza/operators/api/MessageStreams.java | 80 ---- .../samza/operators/api/TriggerBuilder.java | 314 ------------- .../apache/samza/operators/api/WindowState.java | 77 --- .../org/apache/samza/operators/api/Windows.java | 195 -------- .../api/data/IncomingSystemMessage.java | 76 --- .../operators/api/data/InputSystemMessage.java | 43 -- .../samza/operators/api/data/LongOffset.java | 75 --- .../samza/operators/api/data/Message.java | 58 --- .../apache/samza/operators/api/data/Offset.java | 27 -- .../samza/operators/api/internal/Operators.java | 468 ------------------ .../samza/operators/api/internal/Trigger.java | 95 ---- .../samza/operators/api/internal/WindowFn.java | 60 --- .../operators/api/internal/WindowOutput.java | 55 --- .../samza/operators/impl/ChainedOperators.java | 6 +- .../samza/operators/impl/OperatorFactory.java | 8 +- .../samza/operators/impl/OperatorImpl.java | 6 +- .../samza/operators/impl/ProcessorContext.java | 2 +- .../operators/impl/SimpleOperatorImpl.java | 4 +- .../samza/operators/impl/SinkOperatorImpl.java | 6 +- .../samza/operators/impl/StateStoreImpl.java | 4 +- .../operators/impl/join/PartialJoinOpImpl.java | 11 +- .../impl/window/SessionWindowImpl.java | 10 +- .../samza/task/StreamOperatorAdaptorTask.java | 11 +- .../apache/samza/task/StreamOperatorTask.java | 42 -- .../apache/samza/operators/api/TestMessage.java | 47 -- .../samza/operators/api/TestMessageStream.java | 180 ------- .../samza/operators/api/TestMessageStreams.java | 35 -- .../samza/operators/api/TestOutputMessage.java | 47 -- .../samza/operators/api/TestTriggerBuilder.java | 211 --------- .../apache/samza/operators/api/TestWindows.java | 106 ----- .../api/data/TestIncomingSystemMessage.java | 53 --- .../operators/api/data/TestLongOffset.java | 76 --- .../operators/api/internal/TestOperators.java | 128 ----- .../operators/api/internal/TestTrigger.java | 62 --- .../api/internal/TestWindowOutput.java | 36 -- .../operators/impl/TestChainedOperators.java | 8 +- .../operators/impl/TestOperatorFactory.java | 16 +- .../samza/operators/impl/TestOperatorImpl.java | 4 +- .../operators/impl/TestProcessorContext.java | 2 +- .../operators/impl/TestSimpleOperatorImpl.java | 6 +- .../operators/impl/TestSinkOperatorImpl.java | 6 +- .../operators/impl/TestStateStoreImpl.java | 6 +- .../impl/window/TestSessionWindowImpl.java | 12 +- .../samza/task/BroadcastOperatorTask.java | 13 +- .../samza/task/InputJsonSystemMessage.java | 6 +- .../org/apache/samza/task/JoinOperatorTask.java | 9 +- .../task/TestStreamOperatorAdaptorTask.java | 1 + .../samza/task/TestStreamOperatorTasks.java | 2 +- .../apache/samza/task/WindowOperatorTask.java | 11 +- 85 files changed, 3001 insertions(+), 2965 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index aeefd1c..28c2dcf 100644 --- a/build.gradle +++ b/build.gradle @@ -123,9 +123,9 @@ project(':samza-api') { testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-all:$mockitoVersion" } - checkstyle { configFile = new File(rootDir, "checkstyle/checkstyle.xml") + toolVersion = "$checkstyleVersion" } } @@ -348,6 +348,8 @@ if (JavaVersion.current().isJava8Compatible()) { compile "org.apache.commons:commons-lang3:$commonsLang3Version" compile "org.apache.avro:avro:$avroVersion" compile "org.reactivestreams:reactive-streams:$reactiveStreamVersion" + + testCompile project(":samza-api").sourceSets.test.output testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-all:$mockitoVersion" } http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/checkstyle/checkstyle.xml ---------------------------------------------------------------------- diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml index 770b5e7..c23a617 100644 --- a/checkstyle/checkstyle.xml +++ b/checkstyle/checkstyle.xml @@ -53,19 +53,12 @@ </module> <module name="LocalVariableName"/> <module name="LocalFinalVariableName"/> - <module name="ClassTypeParameterName"/> <module name="MemberName"/> - <module name="MethodTypeParameterName"/> <module name="PackageName"/> <module name="ParameterName"/> <module name="StaticVariableName"/> <module name="TypeName"/> - <!-- dependencies --> - <module name="ImportControl"> - <property name="file" value="${basedir}/checkstyle/import-control.xml"/> - </module> - <!-- whitespace --> <module name="GenericWhitespace"/> <module name="NoWhitespaceBefore"/> @@ -85,4 +78,4 @@ <module name="ParenPad"/> <module name="TypecastParenPad"/> </module> -</module> \ No newline at end of file +</module> http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/gradle.properties ---------------------------------------------------------------------- diff --git a/gradle.properties b/gradle.properties index 16e1f5d..d8dfe7b 100644 --- a/gradle.properties +++ b/gradle.properties @@ -18,8 +18,9 @@ group=org.apache.samza version=0.10.1-SNAPSHOT scalaVersion=2.10 -gradleVersion=2.0 +gradleVersion=2.8 org.gradle.jvmargs="-XX:MaxPermSize=512m" systemProp.file.encoding=utf-8 +checkstyleVersion=6.15 http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/gradle/wrapper/gradle-wrapper.jar ---------------------------------------------------------------------- diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index a7634b0..deedc7f 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/gradle/wrapper/gradle-wrapper.properties ---------------------------------------------------------------------- diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 78596c0..55720c3 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ -#Thu Jul 03 20:51:36 PDT 2014 +#Mon Oct 31 23:13:44 PDT 2016 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=http\://services.gradle.org/distributions/gradle-2.0-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-2.8-bin.zip http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/gradlew ---------------------------------------------------------------------- diff --git a/gradlew b/gradlew index 91a7e26..9aa616c 100755 --- a/gradlew +++ b/gradlew @@ -6,12 +6,30 @@ ## ############################################################################## -# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -DEFAULT_JVM_OPTS="" +# Attempt to set APP_HOME +# Resolve links: $0 may be a link +PRG="$0" +# Need this for relative symlinks. +while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "$PRG"`"/$link" + fi +done +SAVED="`pwd`" +cd "`dirname \"$PRG\"`/" >/dev/null +APP_HOME="`pwd -P`" +cd "$SAVED" >/dev/null APP_NAME="Gradle" APP_BASE_NAME=`basename "$0"` +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS="" + # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD="maximum" @@ -30,6 +48,7 @@ die ( ) { cygwin=false msys=false darwin=false +nonstop=false case "`uname`" in CYGWIN* ) cygwin=true @@ -40,31 +59,11 @@ case "`uname`" in MINGW* ) msys=true ;; + NONSTOP* ) + nonstop=true + ;; esac -# For Cygwin, ensure paths are in UNIX format before anything is touched. -if $cygwin ; then - [ -n "$JAVA_HOME" ] && JAVA_HOME=`cygpath --unix "$JAVA_HOME"` -fi - -# Attempt to set APP_HOME -# Resolve links: $0 may be a link -PRG="$0" -# Need this for relative symlinks. -while [ -h "$PRG" ] ; do - ls=`ls -ld "$PRG"` - link=`expr "$ls" : '.*-> \(.*\)$'` - if expr "$link" : '/.*' > /dev/null; then - PRG="$link" - else - PRG=`dirname "$PRG"`"/$link" - fi -done -SAVED="`pwd`" -cd "`dirname \"$PRG\"`/" >&- -APP_HOME="`pwd -P`" -cd "$SAVED" >&- - CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar # Determine the Java command to use to start the JVM. @@ -90,7 +89,7 @@ location of your Java installation." fi # Increase the maximum file descriptors if we can. -if [ "$cygwin" = "false" -a "$darwin" = "false" ] ; then +if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then MAX_FD_LIMIT=`ulimit -H -n` if [ $? -eq 0 ] ; then if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then @@ -114,6 +113,7 @@ fi if $cygwin ; then APP_HOME=`cygpath --path --mixed "$APP_HOME"` CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + JAVACMD=`cygpath --unix "$JAVACMD"` # We build the pattern for arguments to be converted via cygpath ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` @@ -161,4 +161,9 @@ function splitJvmOpts() { eval splitJvmOpts $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS JVM_OPTS[${#JVM_OPTS[*]}]="-Dorg.gradle.appname=$APP_BASE_NAME" +# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong +if [[ "$(uname)" == "Darwin" ]] && [[ "$HOME" == "$PWD" ]]; then + cd "$(dirname "$0")" +fi + exec "$JAVACMD" "${JVM_OPTS[@]}" -classpath "$CLASSPATH" org.gradle.wrapper.GradleWrapperMain "$@" http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java new file mode 100644 index 0000000..4c632b8 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators; + +import org.apache.samza.operators.data.Message; +import org.apache.samza.operators.internal.Operators; +import org.apache.samza.operators.internal.Operators.Operator; +import org.apache.samza.operators.internal.WindowOutput; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.Function; + + +/** + * This class defines either the input or output streams to/from the operators. Users use the API methods defined here to + * directly program the stream processing stages that processes a stream and generate another one. + * + * @param <M> Type of message in this stream + */ +public class MessageStream<M extends Message> { + + private final Set<Operator> subscribers = new HashSet<>(); + + /** + * Helper method to get the corresponding list of subscribers to a specific {@link MessageStream}. + * + * NOTE: This is purely an internal API and should not be used directly by programmers. + * + * @return A unmodifiable set containing all {@link Operator}s that subscribe to this {@link MessageStream} object + */ + public Collection<Operator> getSubscribers() { + return Collections.unmodifiableSet(this.subscribers); + } + + /** + * Public API methods start here + */ + + /** + * Defines a function API that takes three input parameters w/ types {@code A}, {@code B}, and {@code C} and w/o a return value + * + * @param <A> the type of input {@code a} + * @param <B> the type of input {@code b} + * @param <C> the type of input {@code c} + */ + @FunctionalInterface + public interface VoidFunction3<A, B, C> { + public void apply(A a, B b, C c); + } + + /** + * Method to apply a map function (1:1) on a {@link MessageStream} + * + * @param mapper the mapper function to map one input {@link Message} to one output {@link Message} + * @param <OM> the type of the output {@link Message} in the output {@link MessageStream} + * @return the output {@link MessageStream} by applying the map function on the input {@link MessageStream} + */ + public <OM extends Message> MessageStream<OM> map(Function<M, OM> mapper) { + Operator<OM> op = Operators.<M, OM>getStreamOperator(m -> new ArrayList<OM>() { { + OM r = mapper.apply(m); + if (r != null) { + this.add(r); + } + } }); + this.subscribers.add(op); + return op.getOutputStream(); + } + + /** + * Method to apply a flatMap function (1:n) on a {@link MessageStream} + * + * @param flatMapper the flat mapper function to map one input {@link Message} to zero or more output {@link Message}s + * @param <OM> the type of the output {@link Message} in the output {@link MessageStream} + * @return the output {@link MessageStream} by applying the map function on the input {@link MessageStream} + */ + public <OM extends Message> MessageStream<OM> flatMap(Function<M, Collection<OM>> flatMapper) { + Operator<OM> op = Operators.getStreamOperator(flatMapper); + this.subscribers.add(op); + return op.getOutputStream(); + } + + /** + * Method to apply a filter function on a {@link MessageStream} + * + * @param filter the filter function to filter input {@link Message}s from the input {@link MessageStream} + * @return the output {@link MessageStream} after applying the filter function on the input {@link MessageStream} + */ + public MessageStream<M> filter(Function<M, Boolean> filter) { + Operator<M> op = Operators.<M, M>getStreamOperator(t -> new ArrayList<M>() { { + if (filter.apply(t)) { + this.add(t); + } + } }); + this.subscribers.add(op); + return op.getOutputStream(); + } + + /** + * Method to send an input {@link MessageStream} to an output {@link org.apache.samza.system.SystemStream}, and allows the output {@link MessageStream} + * to be consumed by downstream stream operators again. + * + * @param sink the user-defined sink function to send the input {@link Message}s to the external output systems + */ + public void sink(VoidFunction3<M, MessageCollector, TaskCoordinator> sink) { + this.subscribers.add(Operators.getSinkOperator(sink)); + } + + /** + * Method to perform a window function (i.e. a group-by, aggregate function) on a {@link MessageStream} + * + * @param window the window function to group and aggregate the input {@link Message}s from the input {@link MessageStream} + * @param <WK> the type of key in the output {@link Message} from the {@link Windows.Window} function + * @param <WV> the type of output value from + * @param <WS> the type of window state kept in the {@link Windows.Window} function + * @param <WM> the type of {@link org.apache.samza.operators.internal.WindowOutput} message from the {@link Windows.Window} function + * @return the output {@link MessageStream} after applying the window function on the input {@link MessageStream} + */ + public <WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> MessageStream<WM> window(Windows.Window<M, WK, WV, WM> window) { + Operator<WM> wndOp = Operators.getWindowOperator(Windows.getInternalWindowFn(window)); + this.subscribers.add(wndOp); + return wndOp.getOutputStream(); + } + + /** + * Method to add an input {@link MessageStream} to a join function. Note that we currently only support 2-way joins. + * + * @param other the other stream to be joined w/ + * @param merger the common function to merge messages from this {@link MessageStream} and {@code other} + * @param <K> the type of join key + * @param <JM> the type of message in the {@link Message} from the other join stream + * @param <RM> the type of message in the {@link Message} from the join function + * @return the output {@link MessageStream} from the join function {@code joiner} + */ + public <K, JM extends Message<K, ?>, RM extends Message> MessageStream<RM> join(MessageStream<JM> other, + BiFunction<M, JM, RM> merger) { + MessageStream<RM> outputStream = new MessageStream<>(); + + BiFunction<M, JM, RM> parJoin1 = merger::apply; + BiFunction<JM, M, RM> parJoin2 = (m, t1) -> merger.apply(t1, m); + + // TODO: need to add default store functions for the two partial join functions + + other.subscribers.add(Operators.<JM, K, M, RM>getPartialJoinOperator(parJoin2, outputStream)); + this.subscribers.add(Operators.<M, K, JM, RM>getPartialJoinOperator(parJoin1, outputStream)); + return outputStream; + } + + /** + * Method to merge all {@code others} streams w/ this {@link MessageStream}. The merging streams must have the same type {@code M} + * + * @param others other streams to be merged w/ this one + * @return the merged output stream + */ + public MessageStream<M> merge(Collection<MessageStream<M>> others) { + MessageStream<M> outputStream = new MessageStream<>(); + + others.add(this); + others.forEach(other -> other.subscribers.add(Operators.getMergeOperator(outputStream))); + return outputStream; + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/MessageStreams.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStreams.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStreams.java new file mode 100644 index 0000000..4a0ae6a --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStreams.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.apache.samza.operators.data.IncomingSystemMessage; +import org.apache.samza.system.SystemStreamPartition; + + +/** + * This class defines all methods to create a {@link MessageStream} object. Users can use this to create an {@link MessageStream} + * from a specific input source. + * + */ + +public final class MessageStreams { + + /** + * private constructor to prevent instantiation + */ + private MessageStreams() {} + + /** + * private class for system input/output {@link MessageStream} + */ + public static final class SystemMessageStream extends MessageStream<IncomingSystemMessage> { + /** + * The corresponding {@link org.apache.samza.system.SystemStream} + */ + private final SystemStreamPartition ssp; + + /** + * Constructor for input system stream + * + * @param ssp the input {@link SystemStreamPartition} for the input {@link SystemMessageStream} + */ + private SystemMessageStream(SystemStreamPartition ssp) { + this.ssp = ssp; + } + + /** + * Getter for the {@link SystemStreamPartition} of the input + * + * @return the input {@link SystemStreamPartition} + */ + public SystemStreamPartition getSystemStreamPartition() { + return this.ssp; + } + } + + /** + * Public static API methods start here + */ + + /** + * Static API method to create a {@link MessageStream} from a system input stream + * + * @param ssp the input {@link SystemStreamPartition} + * @return the {@link MessageStream} object takes {@code ssp} as the input + */ + public static SystemMessageStream input(SystemStreamPartition ssp) { + return new SystemMessageStream(ssp); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/TriggerBuilder.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/TriggerBuilder.java b/samza-api/src/main/java/org/apache/samza/operators/TriggerBuilder.java new file mode 100644 index 0000000..5b7db9c --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/TriggerBuilder.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + + +import org.apache.samza.operators.data.Message; +import org.apache.samza.operators.internal.Trigger; + +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; +import java.util.function.Function; + + +/** + * This class defines a builder of {@link org.apache.samza.operators.internal.Trigger} object for a {@link Windows.Window}. The triggers are categorized into + * three types: + * + * <p> + * early trigger: defines the condition when the first output from the window function is sent. + * late trigger: defines the condition when the updated output after the first output is sent. + * timer trigger: defines a system timeout condition to trigger output if no more inputs are received to enable early/late triggers + * </p> + * + * If multiple conditions are defined for a specific type of trigger, the aggregated trigger is the disjunction of the each individual trigger (i.e. OR). + * + * NOTE: Programmers should not use classes defined in {@link org.apache.samza.operators.internal} to create triggers + * + * + * @param <M> the type of input {@link Message} to the {@link Windows.Window} + * @param <V> the type of output value from the {@link Windows.Window} + */ +public final class TriggerBuilder<M extends Message, V> { + + /** + * Predicate helper to OR multiple trigger conditions + */ + static class PredicateHelper { + static <M, S> BiFunction<M, S, Boolean> or(BiFunction<M, S, Boolean> lhs, BiFunction<M, S, Boolean> rhs) { + return (m, s) -> lhs.apply(m, s) || rhs.apply(m, s); + } + + static <S> Function<S, Boolean> or(Function<S, Boolean> lhs, Function<S, Boolean> rhs) { + return s -> lhs.apply(s) || rhs.apply(s); + } + } + + /** + * The early trigger condition that determines the first output from the {@link Windows.Window} + */ + private BiFunction<M, WindowState<V>, Boolean> earlyTrigger = null; + + /** + * The late trigger condition that determines the late output(s) from the {@link Windows.Window} + */ + private BiFunction<M, WindowState<V>, Boolean> lateTrigger = null; + + /** + * The system timer based trigger conditions that guarantees the {@link Windows.Window} proceeds forward + */ + private Function<WindowState<V>, Boolean> timerTrigger = null; + + /** + * The state updater function to be applied after the first output is triggered + */ + private Function<WindowState<V>, WindowState<V>> earlyTriggerUpdater = Function.identity(); + + /** + * The state updater function to be applied after the late output is triggered + */ + private Function<WindowState<V>, WindowState<V>> lateTriggerUpdater = Function.identity(); + + /** + * Helper method to add a trigger condition + * + * @param currentTrigger current trigger condition + * @param newTrigger new trigger condition + * @return combined trigger condition that is {@code currentTrigger} OR {@code newTrigger} + */ + private BiFunction<M, WindowState<V>, Boolean> addTrigger(BiFunction<M, WindowState<V>, Boolean> currentTrigger, + BiFunction<M, WindowState<V>, Boolean> newTrigger) { + if (currentTrigger == null) { + return newTrigger; + } + + return PredicateHelper.or(currentTrigger, newTrigger); + } + + /** + * Helper method to add a system timer trigger + * + * @param currentTimer current timer condition + * @param newTimer new timer condition + * @return combined timer condition that is {@code currentTimer} OR {@code newTimer} + */ + private Function<WindowState<V>, Boolean> addTimerTrigger(Function<WindowState<V>, Boolean> currentTimer, + Function<WindowState<V>, Boolean> newTimer) { + if (currentTimer == null) { + return newTimer; + } + + return PredicateHelper.or(currentTimer, newTimer); + } + + /** + * default constructor to prevent instantiation + */ + private TriggerBuilder() {} + + /** + * Constructor that set the size limit as the early trigger for a window + * + * @param sizeLimit the number of messages in a window that would trigger the first output + */ + private TriggerBuilder(long sizeLimit) { + this.earlyTrigger = (m, s) -> s.getNumberMessages() > sizeLimit; + } + + /** + * Constructor that set the event time length as the early trigger + * + * @param eventTimeFunction the function that calculate the event time in nano-second from the input {@link Message} + * @param wndLenMs the window length in event time in milli-second + */ + private TriggerBuilder(Function<M, Long> eventTimeFunction, long wndLenMs) { + this.earlyTrigger = (m, s) -> + TimeUnit.NANOSECONDS.toMillis(Math.max(s.getLatestEventTimeNs() - s.getEarliestEventTimeNs(), + eventTimeFunction.apply(m) - s.getEarliestEventTimeNs())) > wndLenMs; + } + + /** + * Constructor that set the special token message as the early trigger + * + * @param tokenFunc the function that checks whether an input {@link Message} is a token message that triggers window output + */ + private TriggerBuilder(Function<M, Boolean> tokenFunc) { + this.earlyTrigger = (m, s) -> tokenFunc.apply(m); + } + + /** + * Build method that creates an {@link org.apache.samza.operators.internal.Trigger} object based on the trigger conditions set in {@link TriggerBuilder} + * This is kept package private and only used by {@link Windows} to convert the mutable {@link TriggerBuilder} object to an immutable {@link Trigger} object + * + * @return the final {@link org.apache.samza.operators.internal.Trigger} object + */ + Trigger<M, WindowState<V>> build() { + return Trigger.createTrigger(this.timerTrigger, this.earlyTrigger, this.lateTrigger, this.earlyTriggerUpdater, this.lateTriggerUpdater); + } + + /** + * Public API methods start here + */ + + + /** + * API method to allow users to set an update method to update the output value after the first window output is triggered + * by the early trigger condition + * + * @param onTriggerFunc the method to update the output value after the early trigger + * @return the {@link TriggerBuilder} object + */ + public TriggerBuilder<M, V> onEarlyTrigger(Function<V, V> onTriggerFunc) { + this.earlyTriggerUpdater = s -> { + s.setOutputValue(onTriggerFunc.apply(s.getOutputValue())); + return s; + }; + return this; + } + + /** + * API method to allow users to set an update method to update the output value after a late window output is triggered + * by the late trigger condition + * + * @param onTriggerFunc the method to update the output value after the late trigger + * @return the {@link TriggerBuilder} object + */ + public TriggerBuilder<M, V> onLateTrigger(Function<V, V> onTriggerFunc) { + this.lateTriggerUpdater = s -> { + s.setOutputValue(onTriggerFunc.apply(s.getOutputValue())); + return s; + }; + return this; + } + + /** + * API method to allow users to add a system timer trigger based on timeout after the last message received in the window + * + * @param timeoutMs the timeout in ms after the last message received in the window + * @return the {@link TriggerBuilder} object + */ + public TriggerBuilder<M, V> addTimeoutSinceLastMessage(long timeoutMs) { + this.timerTrigger = this.addTimerTrigger(this.timerTrigger, + s -> TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + timeoutMs < System.currentTimeMillis()); + return this; + } + + /** + * API method to allow users to add a system timer trigger based on the timeout after the first message received in the window + * + * @param timeoutMs the timeout in ms after the first message received in the window + * @return the {@link TriggerBuilder} object + */ + public TriggerBuilder<M, V> addTimeoutSinceFirstMessage(long timeoutMs) { + this.timerTrigger = this.addTimerTrigger(this.timerTrigger, s -> + TimeUnit.NANOSECONDS.toMillis(s.getFirstMessageTimeNs()) + timeoutMs < System.currentTimeMillis()); + return this; + } + + /** + * API method allow users to add a late trigger based on the window size limit + * + * @param sizeLimit limit on the number of messages in window + * @return the {@link TriggerBuilder} object + */ + public TriggerBuilder<M, V> addLateTriggerOnSizeLimit(long sizeLimit) { + this.lateTrigger = this.addTrigger(this.lateTrigger, (m, s) -> s.getNumberMessages() > sizeLimit); + return this; + } + + /** + * API method to allow users to define a customized late trigger function based on input message and the window state + * + * @param lateTrigger the late trigger condition based on input {@link Message} and the current {@link WindowState} + * @return the {@link TriggerBuilder} object + */ + public TriggerBuilder<M, V> addLateTrigger(BiFunction<M, WindowState<V>, Boolean> lateTrigger) { + this.lateTrigger = this.addTrigger(this.lateTrigger, lateTrigger); + return this; + } + + /** + * Static API method to create a {@link TriggerBuilder} w/ early trigger condition based on window size limit + * + * @param sizeLimit window size limit + * @param <M> the type of input {@link Message} + * @param <V> the type of {@link Windows.Window} output value + * @return the {@link TriggerBuilder} object + */ + public static <M extends Message, V> TriggerBuilder<M, V> earlyTriggerWhenExceedWndLen(long sizeLimit) { + return new TriggerBuilder<M, V>(sizeLimit); + } + + /** + * Static API method to create a {@link TriggerBuilder} w/ early trigger condition based on event time window + * + * + * @param eventTimeFunc the function to get the event time from the input message + * @param eventTimeWndSizeMs the event time window size in Ms + * @param <M> the type of input {@link Message} + * @param <V> the type of {@link Windows.Window} output value + * @return the {@link TriggerBuilder} object + */ + public static <M extends Message, V> TriggerBuilder<M, V> earlyTriggerOnEventTime(Function<M, Long> eventTimeFunc, long eventTimeWndSizeMs) { + return new TriggerBuilder<M, V>(eventTimeFunc, eventTimeWndSizeMs); + } + + /** + * Static API method to create a {@link TriggerBuilder} w/ early trigger condition based on token messages + * + * @param tokenFunc the function to determine whether an input message is a window token or not + * @param <M> the type of input {@link Message} + * @param <V> the type of {@link Windows.Window} output value + * @return the {@link TriggerBuilder} object + */ + public static <M extends Message, V> TriggerBuilder<M, V> earlyTriggerOnTokenMsg(Function<M, Boolean> tokenFunc) { + return new TriggerBuilder<M, V>(tokenFunc); + } + + /** + * Static API method to allow customized early trigger condition based on input {@link Message} and the corresponding {@link WindowState} + * + * @param earlyTrigger the user defined early trigger condition + * @param <M> the input message type + * @param <V> the output value from the window + * @return the {@link TriggerBuilder} object + */ + public static <M extends Message, V> TriggerBuilder<M, V> earlyTrigger(BiFunction<M, WindowState<V>, Boolean> earlyTrigger) { + TriggerBuilder<M, V> newTriggers = new TriggerBuilder<M, V>(); + newTriggers.earlyTrigger = newTriggers.addTrigger(newTriggers.earlyTrigger, earlyTrigger); + return newTriggers; + } + + /** + * Static API method to create a {@link TriggerBuilder} w/ system timeout after the last message received in the window + * + * @param timeoutMs timeout in ms after the last message received + * @param <M> the type of input {@link Message} + * @param <V> the type of {@link Windows.Window} output value + * @return the {@link TriggerBuilder} object + */ + public static <M extends Message, V> TriggerBuilder<M, V> timeoutSinceLastMessage(long timeoutMs) { + return new TriggerBuilder<M, V>().addTimeoutSinceLastMessage(timeoutMs); + } + + /** + * Static API method to create a {@link TriggerBuilder} w/ system timeout after the first message received in the window + * + * @param timeoutMs timeout in ms after the first message received + * @param <M> the type of input {@link Message} + * @param <V> the type of {@link Windows.Window} output value + * @return the {@link TriggerBuilder} object + */ + public static <M extends Message, V> TriggerBuilder<M, V> timeoutSinceFirstMessage(long timeoutMs) { + return new TriggerBuilder<M, V>().addTimeoutSinceFirstMessage(timeoutMs); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/WindowState.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/WindowState.java b/samza-api/src/main/java/org/apache/samza/operators/WindowState.java new file mode 100644 index 0000000..8f98d38 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/WindowState.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +/** + * This interface defines the methods a window state class has to implement. The programmers are allowed to implement + * customized window state to be stored in window state stores by implementing this interface class. + * + * @param <WV> the type for window output value + */ +public interface WindowState<WV> { + /** + * Method to get the system time when the first message in the window is received + * + * @return nano-second of system time for the first message received in the window + */ + long getFirstMessageTimeNs(); + + /** + * Method to get the system time when the last message in the window is received + * + * @return nano-second of system time for the last message received in the window + */ + long getLastMessageTimeNs(); + + /** + * Method to get the earliest event time in the window + * + * @return the earliest event time in nano-second in the window + */ + long getEarliestEventTimeNs(); + + /** + * Method to get the latest event time in the window + * + * @return the latest event time in nano-second in the window + */ + long getLatestEventTimeNs(); + + /** + * Method to get the total number of messages received in the window + * + * @return number of messages in the window + */ + long getNumberMessages(); + + /** + * Method to get the corresponding window's output value + * + * @return the corresponding window's output value + */ + WV getOutputValue(); + + /** + * Method to set the corresponding window's output value + * + * @param value the corresponding window's output value + */ + void setOutputValue(WV value); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/Windows.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/Windows.java b/samza-api/src/main/java/org/apache/samza/operators/Windows.java new file mode 100644 index 0000000..5ffa211 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/Windows.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import org.apache.samza.operators.data.Message; +import org.apache.samza.operators.internal.Operators; +import org.apache.samza.operators.internal.Trigger; +import org.apache.samza.operators.internal.WindowFn; +import org.apache.samza.operators.internal.WindowOutput; +import org.apache.samza.storage.kv.Entry; + +import java.util.Collection; +import java.util.function.BiFunction; +import java.util.function.Function; + + +/** + * This class defines a collection of {@link Window} functions. The public classes and methods here are intended to be + * used by the user (i.e. programmers) to create {@link Window} function directly. + * + */ +public final class Windows { + + /** + * private constructor to prevent instantiation + */ + private Windows() {} + + /** + * This class defines a session window function class + * + * @param <M> the type of input {@link Message} + * @param <WK> the type of session key in the session window + * @param <WV> the type of output value in each session window + */ + static class SessionWindow<M extends Message, WK, WV> implements Window<M, WK, WV, WindowOutput<WK, WV>> { + + /** + * Constructor. Made private s.t. it can only be instantiated via the static API methods in {@link Windows} + * + * @param sessionKeyFunction function to get the session key from the input {@link Message} + * @param aggregator function to calculate the output value based on the input {@link Message} and current output value + */ + private SessionWindow(Function<M, WK> sessionKeyFunction, BiFunction<M, WV, WV> aggregator) { + this.wndKeyFunction = sessionKeyFunction; + this.aggregator = aggregator; + } + + /** + * function to calculate the window key from input message + */ + private final Function<M, WK> wndKeyFunction; + + /** + * function to calculate the output value from the input message and the current output value + */ + private final BiFunction<M, WV, WV> aggregator; + + /** + * trigger condition that determines when to send out the output value in a {@link WindowOutput} message + */ + private Trigger<M, WindowState<WV>> trigger = null; + + //TODO: need to create a set of {@link StoreFunctions} that is default to input {@link Message} type for {@link Window} + private Operators.StoreFunctions<M, WK, WindowState<WV>> storeFunctions = null; + + /** + * Public API methods start here + */ + + /** + * Public API method to define the watermark trigger for the window operator + * + * @param wndTrigger {@link Trigger} function defines the watermark trigger for this {@link SessionWindow} + * @return The window operator w/ the defined watermark trigger + */ + @Override + public Window<M, WK, WV, WindowOutput<WK, WV>> setTriggers(TriggerBuilder<M, WV> wndTrigger) { + this.trigger = wndTrigger.build(); + return this; + } + + private BiFunction<M, Entry<WK, WindowState<WV>>, WindowOutput<WK, WV>> getTransformFunc() { + // TODO: actual implementation of the main session window logic, based on the wndKeyFunction, aggregator, and triggers; + return null; + } + + private WindowFn<M, WK, WindowState<WV>, WindowOutput<WK, WV>> getInternalWindowFn() { + return new WindowFn<M, WK, WindowState<WV>, WindowOutput<WK, WV>>() { + + @Override public BiFunction<M, Entry<WK, WindowState<WV>>, WindowOutput<WK, WV>> getTransformFunc() { + return SessionWindow.this.getTransformFunc(); + } + + @Override public Operators.StoreFunctions<M, WK, WindowState<WV>> getStoreFuncs() { + return SessionWindow.this.storeFunctions; + } + + @Override public Trigger<M, WindowState<WV>> getTrigger() { + return SessionWindow.this.trigger; + } + }; + } + } + + static <M extends Message, WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> WindowFn<M, WK, WS, WM> getInternalWindowFn( + Window<M, WK, WV, WM> window) { + if (window instanceof SessionWindow) { + SessionWindow<M, WK, WV> sessionWindow = (SessionWindow<M, WK, WV>) window; + return (WindowFn<M, WK, WS, WM>) sessionWindow.getInternalWindowFn(); + } + throw new IllegalArgumentException("Input window type not supported."); + } + + /** + * Public static API methods start here + * + */ + + /** + * The public programming interface class for window function + * + * @param <M> the type of input {@link Message} + * @param <WK> the type of key to the {@link Window} + * @param <WV> the type of output value in the {@link WindowOutput} + * @param <WM> the type of message in the window output stream + */ + public interface Window<M extends Message, WK, WV, WM extends WindowOutput<WK, WV>> { + + /** + * Set the triggers for this {@link Window} + * + * @param wndTrigger trigger conditions set by the programmers + * @return the {@link Window} function w/ the trigger {@code wndTrigger} + */ + Window<M, WK, WV, WM> setTriggers(TriggerBuilder<M, WV> wndTrigger); + } + + /** + * Static API method to create a {@link SessionWindow} in which the output value is simply the collection of input messages + * + * @param sessionKeyFunction function to calculate session window key + * @param <M> type of input {@link Message} + * @param <WK> type of the session window key + * @return the {@link Window} function for the session + */ + public static <M extends Message, WK> Window<M, WK, Collection<M>, WindowOutput<WK, Collection<M>>> intoSessions(Function<M, WK> sessionKeyFunction) { + return new SessionWindow<>(sessionKeyFunction, (m, c) -> { + c.add(m); + return c; + }); + } + + /** + * Static API method to create a {@link SessionWindow} in which the output value is a collection of {@code SI} from the input messages + * + * @param sessionKeyFunction function to calculate session window key + * @param sessionInfoExtractor function to retrieve session info of type {@code SI} from the input message of type {@code M} + * @param <M> type of the input {@link Message} + * @param <WK> type of the session window key + * @param <SI> type of the session information retrieved from each input message of type {@code M} + * @return the {@link Window} function for the session + */ + public static <M extends Message, WK, SI> Window<M, WK, Collection<SI>, WindowOutput<WK, Collection<SI>>> intoSessions(Function<M, WK> sessionKeyFunction, + Function<M, SI> sessionInfoExtractor) { + return new SessionWindow<>(sessionKeyFunction, (m, c) -> { + c.add(sessionInfoExtractor.apply(m)); + return c; + }); + } + + /** + * Static API method to create a {@link SessionWindow} as a counter of input messages + * + * @param sessionKeyFunction function to calculate session window key + * @param <M> type of the input {@link Message} + * @param <WK> type of the session window key + * @return the {@link Window} function for the session + */ + public static <M extends Message, WK> Window<M, WK, Integer, WindowOutput<WK, Integer>> intoSessionCounter(Function<M, WK> sessionKeyFunction) { + return new SessionWindow<>(sessionKeyFunction, (m, c) -> c + 1); + } + +} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessage.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessage.java b/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessage.java new file mode 100644 index 0000000..3c9874d --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/data/IncomingSystemMessage.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.data; + +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.SystemStreamPartition; + + +/** + * This class implements a {@link Message} that encapsulates an {@link IncomingMessageEnvelope} from the system + * + */ +public class IncomingSystemMessage implements Message<Object, Object>, InputSystemMessage<Offset> { + /** + * Incoming message envelope + */ + private final IncomingMessageEnvelope imsg; + + /** + * The receive time of this incoming message + */ + private final long recvTimeNano; + + /** + * Ctor to create a {@code IncomingSystemMessage} from {@link IncomingMessageEnvelope} + * + * @param imsg The incoming system message + */ + public IncomingSystemMessage(IncomingMessageEnvelope imsg) { + this.imsg = imsg; + this.recvTimeNano = System.nanoTime(); + } + + @Override + public Object getMessage() { + return this.imsg.getMessage(); + } + + @Override + public Object getKey() { + return this.imsg.getKey(); + } + + @Override + public long getTimestamp() { + return this.recvTimeNano; + } + + @Override + public Offset getOffset() { + // TODO: need to add offset factory to generate different types of offset. This is just a placeholder, + // assuming incoming message carries long value as offset (i.e. Kafka case) + return new LongOffset(this.imsg.getOffset()); + } + + @Override + public SystemStreamPartition getSystemStreamPartition() { + return imsg.getSystemStreamPartition(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/data/InputSystemMessage.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/InputSystemMessage.java b/samza-api/src/main/java/org/apache/samza/operators/data/InputSystemMessage.java new file mode 100644 index 0000000..509f640 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/data/InputSystemMessage.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.data; + +import org.apache.samza.system.SystemStreamPartition; + + +/** + * This interface defines additional methods a message from an system input should implement, including the methods to + * get {@link SystemStreamPartition} and the {@link Offset} of the input system message. + */ +public interface InputSystemMessage<O extends Offset> { + + /** + * Get the input message's {@link SystemStreamPartition} + * + * @return the {@link SystemStreamPartition} this message is coming from + */ + SystemStreamPartition getSystemStreamPartition(); + + /** + * Get the offset of the message in the input stream. This should be used to uniquely identify a message in an input stream. + * + * @return The offset of the message in the input stream. + */ + O getOffset(); +} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/data/LongOffset.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/LongOffset.java b/samza-api/src/main/java/org/apache/samza/operators/data/LongOffset.java new file mode 100644 index 0000000..0b6c0fa --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/data/LongOffset.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.data; + +/** + * An implementation of {@link org.apache.samza.operators.data.Offset}, w/ {@code long} value as the offset + */ +public class LongOffset implements Offset { + + /** + * The offset value in {@code long} + */ + private final Long offset; + + private LongOffset(long offset) { + this.offset = offset; + } + + public LongOffset(String offset) { + this.offset = Long.valueOf(offset); + } + + @Override + public int compareTo(Offset o) { + if (!(o instanceof LongOffset)) { + throw new IllegalArgumentException("Not comparable offset classes. LongOffset vs " + o.getClass().getName()); + } + LongOffset other = (LongOffset) o; + return this.offset.compareTo(other.offset); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof LongOffset)) { + return false; + } + LongOffset o = (LongOffset) other; + return this.offset.equals(o.offset); + } + + @Override + public int hashCode() { + return offset.hashCode(); + } + + /** + * Helper method to get the minimum offset + * + * @return The minimum offset + */ + public static LongOffset getMinOffset() { + return new LongOffset(Long.MIN_VALUE); + } + + /** + * Helper method to get the maximum offset + * + * @return The maximum offset + */ + public static LongOffset getMaxOffset() { + return new LongOffset(Long.MAX_VALUE); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/data/Message.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/Message.java b/samza-api/src/main/java/org/apache/samza/operators/data/Message.java new file mode 100644 index 0000000..aaafd94 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/data/Message.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.data; + +/** + * This class defines the generic interface of {@link Message}, which is a entry in the input/output stream. + * + * <p>The {@link Message} models the basic operatible unit in streaming SQL processes in Samza. + * + */ +public interface Message<K, M> { + + /** + * Access method to get the corresponding message body in {@link Message} + * + * @return Message object in this {@link Message} + */ + M getMessage(); + + /** + * Method to indicate whether this {@link Message} indicates deletion of a message w/ the message key + * + * @return A boolean value indicates whether the current message is a delete or insert message + */ + default boolean isDelete() { + return false; + }; + + /** + * Access method to the key of the message + * + * @return The key of the message + */ + K getKey(); + + /** + * Get the message creation timestamp of the message. + * + * @return The message's timestamp in nano seconds. + */ + long getTimestamp(); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/data/Offset.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/data/Offset.java b/samza-api/src/main/java/org/apache/samza/operators/data/Offset.java new file mode 100644 index 0000000..33eb9ba --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/data/Offset.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.data; + +/** + * A generic interface extending {@link java.lang.Comparable} to be used as {@code Offset} in a stream + */ +public interface Offset extends Comparable<Offset> { + +} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/internal/Operators.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/internal/Operators.java b/samza-api/src/main/java/org/apache/samza/operators/internal/Operators.java new file mode 100644 index 0000000..f06387c --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/internal/Operators.java @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.internal; + +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.WindowState; +import org.apache.samza.operators.data.Message; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.TaskCoordinator; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.UUID; +import java.util.function.BiFunction; +import java.util.function.Function; + + +/** + * This class defines all basic stream operator classes used by internal implementation only. All classes defined in + * this file are immutable. + * + * NOTE: Programmers should not use the operators defined in this class directly. All {@link Operator} objects + * should be initiated via {@link MessageStream} API methods + */ +public class Operators { + /** + * Private constructor to prevent instantiation of the {@link Operators} class + */ + private Operators() {} + + private static String getOperatorId() { + // TODO: need to change the IDs to be a consistent, durable IDs that can be recovered across container and job restarts + return UUID.randomUUID().toString(); + } + + /** + * Private interface for stream operator functions. The interface class defines the output of the stream operator function. + * + */ + public interface Operator<OM extends Message> { + MessageStream<OM> getOutputStream(); + } + + /** + * Linear stream operator function that takes 1 input {@link Message} and output a collection of output {@link Message}s. + * + * @param <M> the type of input {@link Message} + * @param <OM> the type of output {@link Message} + */ + public static class StreamOperator<M extends Message, OM extends Message> implements Operator<OM> { + /** + * The output {@link MessageStream} + */ + private final MessageStream<OM> outputStream; + + /** + * The transformation function + */ + private final Function<M, Collection<OM>> txfmFunction; + + /** + * Constructor of {@link StreamOperator}. Make it private s.t. it can only be created within {@link Operators}. + * + * @param transformFn the transformation function to be applied that transforms 1 input {@link Message} into a collection + * of output {@link Message}s + */ + private StreamOperator(Function<M, Collection<OM>> transformFn) { + this(transformFn, new MessageStream<>()); + } + + /** + * Constructor of {@link StreamOperator} which allows the user to define the output {@link MessageStream} + * + * @param transformFn the transformation function + * @param outputStream the output {@link MessageStream} + */ + private StreamOperator(Function<M, Collection<OM>> transformFn, MessageStream<OM> outputStream) { + this.outputStream = outputStream; + this.txfmFunction = transformFn; + } + + @Override + public MessageStream<OM> getOutputStream() { + return this.outputStream; + } + + /** + * Method to get the transformation function. + * + * @return the {@code txfmFunction} + */ + public Function<M, Collection<OM>> getFunction() { + return this.txfmFunction; + } + + } + + /** + * A sink operator function that allows customized code to send the output to external system. This is the terminal + * operator that does not have any output {@link MessageStream} that allows further processing in the same + * {@link org.apache.samza.operators.task.StreamOperatorTask} + * + * @param <M> the type of input {@link Message} + */ + public static class SinkOperator<M extends Message> implements Operator { + + /** + * The user-defined sink function + */ + private final MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> sink; + + /** + * Default constructor for {@link SinkOperator}. Make it private s.t. it can only be created within {@link Operators}. + * + * @param sink the user-defined sink function + */ + private SinkOperator(MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> sink) { + this.sink = sink; + } + + @Override + public MessageStream getOutputStream() { + return null; + } + + /** + * Method to get the user-defined function implements the {@link SinkOperator} + * + * @return a {@link MessageStream.VoidFunction3} function that allows the caller to pass in an input message, {@link MessageCollector} + * and {@link TaskCoordinator} to the sink function + */ + public MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> getFunction() { + return this.sink; + } + } + + /** + * The store functions that are used by {@link WindowOperator} and {@link PartialJoinOperator} to store and retrieve + * buffered messages and partial aggregation results + * + * @param <SK> the type of key used to store the operator states + * @param <SS> the type of operator state. e.g. could be the partial aggregation result for a window, or a buffered + * input message from the join stream for a join + */ + public static class StoreFunctions<M extends Message, SK, SS> { + /** + * Function to define the key to query in the operator state store, according to the incoming {@link Message} + * This method only supports finding the unique key for the incoming message, which supports use case of non-overlapping + * windows and unique-key-based join. + * + * TODO: for windows that overlaps (i.e. sliding windows and hopping windows) and non-unique-key-based join, the query + * to the state store is usually a range scan. We need to add a rangeKeyFinder function to map from a single input + * message to a range of keys in the store. + */ + private final Function<M, SK> storeKeyFinder; + + /** + * Function to update the store entry based on the current state and the incoming {@link Message} + * + * TODO: this is assuming a 1:1 mapping from the input message to the store entry. When implementing sliding/hopping + * windows and non-unique-key-based join, we may need to include the corresponding state key, in addition to the + * state value. + */ + private final BiFunction<M, SS, SS> stateUpdater; + + /** + * Constructor of state store functions. + * + */ + private StoreFunctions(Function<M, SK> keyFinder, + BiFunction<M, SS, SS> stateUpdater) { + this.storeKeyFinder = keyFinder; + this.stateUpdater = stateUpdater; + } + + /** + * Method to get the {@code storeKeyFinder} function + * + * @return the function to calculate the key from an input {@link Message} + */ + public Function<M, SK> getStoreKeyFinder() { + return this.storeKeyFinder; + } + + /** + * Method to get the {@code stateUpdater} function + * + * @return the function to update the corresponding state according to an input {@link Message} + */ + public BiFunction<M, SS, SS> getStateUpdater() { + return this.stateUpdater; + } + } + + /** + * Defines a window operator function that takes one {@link MessageStream} as an input, accumulate the window state, and generate + * an output {@link MessageStream} w/ output type {@code WM} which extends {@link WindowOutput} + * + * @param <M> the type of input {@link Message} + * @param <WK> the type of key in the output {@link Message} from the {@link WindowOperator} function + * @param <WS> the type of window state in the {@link WindowOperator} function + * @param <WM> the type of window output {@link Message} + */ + public static class WindowOperator<M extends Message, WK, WS extends WindowState, WM extends WindowOutput<WK, ?>> implements Operator<WM> { + /** + * The output {@link MessageStream} + */ + private final MessageStream<WM> outputStream; + + /** + * The main window transformation function that takes {@link Message}s from one input stream, aggregates w/ the window + * state(s) from the window state store, and generate output {@link Message}s to the output stream. + */ + private final BiFunction<M, Entry<WK, WS>, WM> txfmFunction; + + /** + * The state store functions for the {@link WindowOperator} + */ + private final StoreFunctions<M, WK, WS> storeFunctions; + + /** + * The window trigger function + */ + private final Trigger<M, WS> trigger; + + /** + * The unique ID of stateful operators + */ + private final String opId; + + /** + * Constructor for {@link WindowOperator}. Make it private s.t. it can only be created within {@link Operators}. + * + * @param windowFn description of the window function + * @param operatorId auto-generated unique ID of the operator + */ + private WindowOperator(WindowFn<M, WK, WS, WM> windowFn, String operatorId) { + this.outputStream = new MessageStream<>(); + this.txfmFunction = windowFn.getTransformFunc(); + this.storeFunctions = windowFn.getStoreFuncs(); + this.trigger = windowFn.getTrigger(); + this.opId = operatorId; + } + + @Override + public String toString() { + return this.opId; + } + + @Override + public MessageStream<WM> getOutputStream() { + return this.outputStream; + } + + /** + * Method to get the window's {@link StoreFunctions}. + * + * @return the window operator's {@code storeFunctions} + */ + public StoreFunctions<M, WK, WS> getStoreFunctions() { + return this.storeFunctions; + } + + /** + * Method to get the window operator's main function + * + * @return the window operator's {@code txfmFunction} + */ + public BiFunction<M, Entry<WK, WS>, WM> getFunction() { + return this.txfmFunction; + } + + /** + * Method to get the trigger functions + * + * @return the {@link Trigger} for this {@link WindowOperator} + */ + public Trigger<M, WS> getTrigger() { + return this.trigger; + } + + /** + * Method to generate the window operator's state store name + * + * @param inputStream the input {@link MessageStream} to this state store + * @return the persistent store name of the window operator + */ + public String getStoreName(MessageStream<M> inputStream) { + //TODO: need to get the persistent name of ds and the operator in a serialized form + return String.format("input-%s-wndop-%s", inputStream.toString(), this.toString()); + } + } + + /** + * The partial join operator that takes {@link Message}s from one input stream and join w/ buffered {@link Message}s from + * another stream and generate join output to {@code output} + * + * @param <M> the type of input {@link Message} + * @param <K> the type of join key + * @param <JM> the type of message of {@link Message} in the other join stream + * @param <RM> the type of message of {@link Message} in the join output stream + */ + public static class PartialJoinOperator<M extends Message<K, ?>, K, JM extends Message<K, ?>, RM extends Message> implements Operator<RM> { + + private final MessageStream<RM> joinOutput; + + /** + * The main transformation function of {@link PartialJoinOperator} that takes a type {@code M} input message, + * join w/ a stream of buffered {@link Message}s from another stream w/ type {@code JM}, and generate joined type {@code RM}. + */ + private final BiFunction<M, JM, RM> txfmFunction; + + /** + * The message store functions that read the buffered messages from the other stream in the join + */ + private final StoreFunctions<JM, K, JM> joinStoreFunctions; + + /** + * The message store functions that save the buffered messages of this {@link MessageStream} in the join + */ + private final StoreFunctions<M, K, M> selfStoreFunctions; + + /** + * The unique ID for the stateful operator + */ + private final String opId; + + /** + * Default constructor to create a {@link PartialJoinOperator} object + * + * @param partialJoin partial join function that take type {@code M} of input {@link Message} and join w/ type + * {@code JM} of buffered {@link Message} from another stream + * @param joinOutput the output {@link MessageStream} of the join results + */ + private PartialJoinOperator(BiFunction<M, JM, RM> partialJoin, MessageStream<RM> joinOutput, String opId) { + this.joinOutput = joinOutput; + this.txfmFunction = partialJoin; + // Read-only join store, no creator/updater functions specified + this.joinStoreFunctions = new StoreFunctions<>(m -> m.getKey(), null); + // Buffered message store for this input stream + this.selfStoreFunctions = new StoreFunctions<>(m -> m.getKey(), (m, s1) -> m); + this.opId = opId; + } + + @Override + public String toString() { + return this.opId; + } + + @Override + public MessageStream<RM> getOutputStream() { + return this.joinOutput; + } + + /** + * Method to get {@code joinStoreFunctions} + * + * @return {@code joinStoreFunctions} + */ + public StoreFunctions<JM, K, JM> getJoinStoreFunctions() { + return this.joinStoreFunctions; + } + + /** + * Method to get {@code selfStoreFunctions} + * + * @return {@code selfStoreFunctions} + */ + public StoreFunctions<M, K, M> getSelfStoreFunctions() { + return this.selfStoreFunctions; + } + + /** + * Method to get {@code txfmFunction} + * + * @return {@code txfmFunction} + */ + public BiFunction<M, JM, RM> getFunction() { + return this.txfmFunction; + } + } + + /** + * The method only to be used internally in {@link MessageStream} to create {@link StreamOperator} + * + * @param transformFn the corresponding transformation function + * @param <M> type of input {@link Message} + * @param <OM> type of output {@link Message} + * @return the {@link StreamOperator} + */ + public static <M extends Message, OM extends Message> StreamOperator<M, OM> getStreamOperator(Function<M, Collection<OM>> transformFn) { + return new StreamOperator<>(transformFn); + } + + /** + * The method only to be used internally in {@link MessageStream} to create {@link SinkOperator} + * + * @param sinkFn the sink function + * @param <M> type of input {@link Message} + * @return the {@link SinkOperator} + */ + public static <M extends Message> SinkOperator<M> getSinkOperator(MessageStream.VoidFunction3<M, MessageCollector, TaskCoordinator> sinkFn) { + return new SinkOperator<>(sinkFn); + } + + /** + * The method only to be used internally in {@link MessageStream} to create {@link WindowOperator} + * + * @param windowFn the {@link WindowFn} function + * @param <M> type of input {@link Message} + * @param <WK> type of window key + * @param <WS> type of {@link WindowState} + * @param <WM> type of output {@link WindowOutput} + * @return the {@link WindowOperator} + */ + public static <M extends Message, WK, WS extends WindowState, WM extends WindowOutput<WK, ?>> WindowOperator<M, WK, WS, WM> getWindowOperator( + WindowFn<M, WK, WS, WM> windowFn) { + return new WindowOperator<>(windowFn, Operators.getOperatorId()); + } + + /** + * The method only to be used internally in {@link MessageStream} to create {@link WindowOperator} + * + * @param joiner the {@link WindowFn} function + * @param joinOutput the output {@link MessageStream} + * @param <M> type of input {@link Message} + * @param <K> type of join key + * @param <JM> the type of message in the {@link Message} from the other join stream + * @param <RM> the type of message in the {@link Message} from the join function + * @return the {@link PartialJoinOperator} + */ + public static <M extends Message<K, ?>, K, JM extends Message<K, ?>, RM extends Message> PartialJoinOperator<M, K, JM, RM> getPartialJoinOperator( + BiFunction<M, JM, RM> joiner, MessageStream<RM> joinOutput) { + return new PartialJoinOperator<>(joiner, joinOutput, Operators.getOperatorId()); + } + + /** + * The method only to be used internally in {@link MessageStream} to create {@link StreamOperator} as a merger function + * + * @param mergeOutput the common output {@link MessageStream} from the merger + * @param <M> the type of input {@link Message} + * @return the {@link StreamOperator} for merge + */ + public static <M extends Message> StreamOperator<M, M> getMergeOperator(MessageStream<M> mergeOutput) { + return new StreamOperator<M, M>(t -> + new ArrayList<M>() { { + this.add(t); + } }, + mergeOutput); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/internal/Trigger.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/internal/Trigger.java b/samza-api/src/main/java/org/apache/samza/operators/internal/Trigger.java new file mode 100644 index 0000000..3b50e2b --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/internal/Trigger.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.internal; + +import org.apache.samza.operators.WindowState; +import org.apache.samza.operators.data.Message; + +import java.util.function.BiFunction; +import java.util.function.Function; + +/** + * Defines the trigger functions for {@link Operators.WindowOperator}. This class is immutable. + * + * @param <M> the type of message from the input stream + * @param <S> the type of state variable in the window's state store + */ +public class Trigger<M extends Message, S extends WindowState> { + + /** + * System timer based trigger condition. This is the only guarantee that the {@link Operators.WindowOperator} will proceed forward + */ + private final Function<S, Boolean> timerTrigger; + + /** + * early trigger condition that determines when to send the first output from the {@link Operators.WindowOperator} + */ + private final BiFunction<M, S, Boolean> earlyTrigger; + + /** + * late trigger condition that determines when to send the updated output after the first one from a {@link Operators.WindowOperator} + */ + private final BiFunction<M, S, Boolean> lateTrigger; + + /** + * the function to updated the window state when the first output is triggered + */ + private final Function<S, S> earlyTriggerUpdater; + + /** + * the function to updated the window state when the late output is triggered + */ + private final Function<S, S> lateTriggerUpdater; + + /** + * Private constructor to prevent instantiation + * + * @param timerTrigger system timer trigger condition + * @param earlyTrigger early trigger condition + * @param lateTrigger late trigger condition + * @param earlyTriggerUpdater early trigger state updater + * @param lateTriggerUpdater late trigger state updater + */ + private Trigger(Function<S, Boolean> timerTrigger, BiFunction<M, S, Boolean> earlyTrigger, BiFunction<M, S, Boolean> lateTrigger, + Function<S, S> earlyTriggerUpdater, Function<S, S> lateTriggerUpdater) { + this.timerTrigger = timerTrigger; + this.earlyTrigger = earlyTrigger; + this.lateTrigger = lateTrigger; + this.earlyTriggerUpdater = earlyTriggerUpdater; + this.lateTriggerUpdater = lateTriggerUpdater; + } + + /** + * Static method to create a {@link Trigger} object + * + * @param timerTrigger system timer trigger condition + * @param earlyTrigger early trigger condition + * @param lateTrigger late trigger condition + * @param earlyTriggerUpdater early trigger state updater + * @param lateTriggerUpdater late trigger state updater + * @param <M> the type of input {@link Message} + * @param <S> the type of window state extends {@link WindowState} + * @return the {@link Trigger} function + */ + public static <M extends Message, S extends WindowState> Trigger<M, S> createTrigger(Function<S, Boolean> timerTrigger, + BiFunction<M, S, Boolean> earlyTrigger, BiFunction<M, S, Boolean> lateTrigger, Function<S, S> earlyTriggerUpdater, + Function<S, S> lateTriggerUpdater) { + return new Trigger(timerTrigger, earlyTrigger, lateTrigger, earlyTriggerUpdater, lateTriggerUpdater); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/internal/WindowFn.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/internal/WindowFn.java b/samza-api/src/main/java/org/apache/samza/operators/internal/WindowFn.java new file mode 100644 index 0000000..489e5b8 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/internal/WindowFn.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.internal; + +import org.apache.samza.operators.WindowState; +import org.apache.samza.operators.data.Message; +import org.apache.samza.storage.kv.Entry; + +import java.util.function.BiFunction; + + +/** + * Defines an internal representation of a window function. This class SHOULD NOT be used by the programmer directly. It is used + * by the internal representation and implementation classes in operators. + * + * @param <M> type of input stream {@link Message} for window + * @param <WK> type of window key in the output {@link Message} + * @param <WS> type of {@link WindowState} variable in the state store + * @param <WM> type of the message in the output stream + */ +public interface WindowFn<M extends Message, WK, WS extends WindowState, WM extends WindowOutput<WK, ?>> { + + /** + * get the transformation function of the {@link WindowFn} + * + * @return the transformation function takes type {@code M} message and the window state entry, then transform to an {@link WindowOutput} + */ + BiFunction<M, Entry<WK, WS>, WM> getTransformFunc(); + + /** + * get the state store functions for this {@link WindowFn} + * + * @return the collection of state store methods + */ + Operators.StoreFunctions<M, WK, WS> getStoreFuncs(); + + /** + * get the trigger conditions for this {@link WindowFn} + * + * @return the trigger condition for the {@link WindowFn} function + */ + Trigger<M, WS> getTrigger(); + +} http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/internal/WindowOutput.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/internal/WindowOutput.java b/samza-api/src/main/java/org/apache/samza/operators/internal/WindowOutput.java new file mode 100644 index 0000000..643b703 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/internal/WindowOutput.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.internal; + +import org.apache.samza.operators.data.Message; + + +/** + * This class defines the specific type of output messages from a {@link Operators.WindowOperator} function + * + * @param <K> the type of key in the output window result + * @param <M> the type of value in the output window result + */ +public final class WindowOutput<K, M> implements Message<K, M> { + private final K key; + private final M value; + + WindowOutput(K key, M aggregated) { + this.key = key; + this.value = aggregated; + } + + @Override public M getMessage() { + return this.value; + } + + @Override public K getKey() { + return this.key; + } + + @Override public long getTimestamp() { + return 0; + } + + static public <K, M> WindowOutput<K, M> of(K key, M result) { + return new WindowOutput<>(key, result); + } +} +